diff --git a/Control/Monad/MVar.hs b/Control/Monad/MVar.hs new file mode 100644 index 0000000..3f41a37 --- /dev/null +++ b/Control/Monad/MVar.hs @@ -0,0 +1,79 @@ +{- | Lift MVar operations so you can do them within monads stacked on top of IO. Analogous to MonadIO -} + +{-# LANGUAGE TupleSections #-} + +module Control.Monad.MVar ( + MVar, + module Control.Monad.MVar, + liftIO +) where + +import Control.Concurrent.MVar (MVar) +import qualified Control.Concurrent.MVar as IO +import Control.Monad.Error +import Control.Monad.Reader +import Control.Monad.State + +newEmptyMVar :: (MonadIO m) => m (MVar a) +newEmptyMVar = liftIO IO.newEmptyMVar + +newMVar :: (MonadIO m) => a -> m (MVar a) +newMVar = liftIO . IO.newMVar + +takeMVar :: (MonadIO m) => MVar a -> m a +takeMVar = liftIO . IO.takeMVar + +putMVar :: (MonadIO m) => MVar a -> a -> m () +putMVar var = liftIO . IO.putMVar var + +readMVar :: (MonadIO m) => MVar a -> m a +readMVar = liftIO . IO.readMVar + +swapMVar :: (MonadIO m) => MVar a -> a -> m a +swapMVar var = liftIO . IO.swapMVar var + +tryTakeMVar :: (MonadIO m) => MVar a -> m (Maybe a) +tryTakeMVar = liftIO . IO.tryTakeMVar + +tryPutMVar :: (MonadIO m) => MVar a -> a -> m Bool +tryPutMVar var = liftIO . IO.tryPutMVar var + +isEmptyMVar :: (MonadIO m) => MVar a -> m Bool +isEmptyMVar = liftIO . IO.isEmptyMVar + +class (MonadIO m) => MonadMVar m where + modifyMVar :: MVar a -> (a -> m (a, b)) -> m b + addMVarFinalizer :: MVar a -> m () -> m () + +modifyMVar_ :: (MonadMVar m) => MVar a -> (a -> m a) -> m () +modifyMVar_ var act = modifyMVar var $ \a -> do + a <- act a + return (a, ()) + +withMVar :: (MonadMVar m) => MVar a -> (a -> m b) -> m b +withMVar var act = modifyMVar var $ \a -> do + b <- act a + return (a, b) + +instance MonadMVar IO where + modifyMVar = IO.modifyMVar + addMVarFinalizer = IO.addMVarFinalizer + +instance (MonadMVar m, Error e) => MonadMVar (ErrorT e m) where + modifyMVar var f = ErrorT $ modifyMVar var $ \a -> do + e <- runErrorT (f a) + return $ either ((a, ) . Left) (fmap Right) e + addMVarFinalizer var (ErrorT act) = ErrorT $ + addMVarFinalizer var (act >> return ()) >> return (Right ()) + -- NOTE, error is silently dropped + +instance (MonadMVar m) => MonadMVar (ReaderT r m) where + modifyMVar var f = ReaderT $ \r -> modifyMVar var $ \a -> runReaderT (f a) r + addMVarFinalizer var (ReaderT act) = ReaderT (addMVarFinalizer var . act) + +instance (MonadMVar m) => MonadMVar (StateT s m) where + modifyMVar var f = StateT $ \s -> modifyMVar var $ \a -> do + ((a, b), s) <- runStateT (f a) s + return (a, (b, s)) + addMVarFinalizer var (StateT act) = StateT $ \s -> + addMVarFinalizer var (act s >> return ()) >> return ((), s) diff --git a/Control/Monad/Throw.hs b/Control/Monad/Throw.hs index b0f6d92..ef989e3 100644 --- a/Control/Monad/Throw.hs +++ b/Control/Monad/Throw.hs @@ -1,6 +1,6 @@ {- | This is just like "Control.Monad.Error.Class" except you can throw/catch the error of any ErrorT in the monad stack instead of just the top one as long as the error types are different. If two or more ErrorTs in the stack have the same error type you get the error of the top one. -} -{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances #-} +{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances, UndecidableInstances #-} module Control.Monad.Throw where @@ -17,7 +17,15 @@ class (Monad m) => Throw e m where throwLeft :: (Throw e m) => m (Either e a) -> m a -- ^ Execute action and throw exception if result is Left, otherwise return the Right result -throwLeft = (either throw return =<<) +throwLeft = throwLeft' id + +throwLeft' :: (Throw e m) => (x -> e) -> m (Either x a) -> m a +-- ^ Execute action and throw transformed exception if result is Left, otherwise return Right result +throwLeft' f = (either (throw . f) return =<<) + +onException :: (Throw e m) => m a -> (e -> m b) -> m a +-- ^ If first action throws an exception then run second action then re-throw +onException action releaser = catch action $ \e -> releaser e >> throw e instance (Error e) => Throw e (Either e) where throw = throwError diff --git a/Control/Monad/Util.hs b/Control/Monad/Util.hs new file mode 100644 index 0000000..5dfccdf --- /dev/null +++ b/Control/Monad/Util.hs @@ -0,0 +1,46 @@ +-- | Extra monad functions and instances + +{-# LANGUAGE FlexibleInstances, UndecidableInstances #-} + +module Control.Monad.Util where + +import Control.Applicative (Applicative(..), (<$>)) +import Control.Arrow ((+++)) +import Control.Monad.Reader +import Control.Monad.Error + +instance (Monad m) => Applicative (ReaderT r m) where + pure = return + (<*>) = ap + +instance (Monad m, Error e) => Applicative (ErrorT e m) where + pure = return + (<*>) = ap + +class (MonadIO m, Applicative m, Functor m) => MonadIO' m +instance (MonadIO m, Applicative m, Functor m) => MonadIO' m + +ignore :: (Monad m) => a -> m () +ignore _ = return () + +loop :: (Functor m, Monad m) => m (Maybe a) -> m [a] +-- ^ Repeatedy execute action, collecting results, until it returns Nothing +loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act) + +untilJust :: (Monad m) => (a -> m (Maybe b)) -> [a] -> m (Maybe b) +-- ^ Apply action to elements one at a time until one returns Just. Return Nothing if all return Nothing. +untilJust f [] = return Nothing +untilJust f (a:as) = f a >>= maybe (untilJust f as) (return . Just) + +untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b +-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty. +untilSuccess = untilSuccess' (strMsg "empty untilSuccess") + +untilSuccess' :: (MonadError e m) => e -> (a -> m b) -> [a] -> m b +-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw given error if list is empty +untilSuccess' e f [] = throwError e +untilSuccess' _ f (x : xs) = catchError (f x) (\e -> untilSuccess' e f xs) + +mapError :: (Functor m) => (e' -> e) -> ErrorT e' m a -> ErrorT e m a +-- ^ Convert error type thrown +mapError f (ErrorT m) = ErrorT $ (f +++ id) <$> m diff --git a/Control/Pipeline.hs b/Control/Pipeline.hs index 7c3cc0d..bd4a63c 100644 --- a/Control/Pipeline.hs +++ b/Control/Pipeline.hs @@ -1,10 +1,12 @@ -{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipe (and get promises back); the pipe will pipeline each thread's request right away without waiting. -} +{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will pipeline each thread's request right away without waiting. + +A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. -} {-# LANGUAGE DoRec, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-} module Control.Pipeline ( - -- * Pipe - Pipe, newPipe, send, call, + -- * Pipeline + Pipeline, newPipeline, send, call, -- * Util Size, Length(..), @@ -16,7 +18,7 @@ module Control.Pipeline ( import Prelude hiding (length) import Control.Applicative ((<$>)) import Control.Monad (forever) -import Control.Exception (assert) +import Control.Exception (assert, onException) import System.IO.Error (try, mkIOError, eofErrorType) import System.IO (Handle, hFlush, hClose, hIsClosed) import qualified Data.ByteString as S @@ -85,10 +87,10 @@ instance Stream Handle L.ByteString where put = L.hPut get = L.hGet --- * Pipe +-- * Pipeline -- | Thread-safe and pipelined socket -data Pipe handle bytes = Pipe { +data Pipeline handle bytes = Pipeline { encodeSize :: Size -> bytes, decodeSize :: bytes -> Size, vHandle :: MVar handle, -- ^ Mutex on handle, so only one thread at a time can write to it @@ -96,33 +98,33 @@ data Pipe handle bytes = Pipe { listenThread :: ThreadId } --- | Create new Pipe with given encodeInt, decodeInt, and handle. You should 'close' pipe when finished, which will also close handle. If pipe is not closed but eventually garbage collected, it will be closed along with handle. -newPipe :: (Stream h b, Resource IO h) => +-- | Create new Pipeline with given encodeInt, decodeInt, and handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. +newPipeline :: (Stream h b, Resource IO h) => (Size -> b) -- ^ Convert Size to bytes of fixed length. Every Int must translate to same number of bytes. -> (b -> Size) -- ^ Convert bytes of fixed length to Size. Must be exact inverse of encodeSize. - -> h -- ^ Underlying socket (handle) this pipe will read/write from - -> IO (Pipe h b) -newPipe encodeSize decodeSize handle = do + -> h -- ^ Underlying socket (handle) this pipeline will read/write from + -> IO (Pipeline h b) +newPipeline encodeSize decodeSize handle = do vHandle <- newMVar handle responseQueue <- newChan rec - let pipe = Pipe{..} + let pipe = Pipeline{..} listenThread <- forkIO (listen pipe) addMVarFinalizer vHandle $ do killThread listenThread close handle return pipe -instance (Resource IO h) => Resource IO (Pipe h b) where +instance (Resource IO h) => Resource IO (Pipeline h b) where -- | Close pipe and underlying socket (handle) - close Pipe{..} = do + close Pipeline{..} = do killThread listenThread close =<< readMVar vHandle - isClosed Pipe{..} = isClosed =<< readMVar vHandle + isClosed Pipeline{..} = isClosed =<< readMVar vHandle -listen :: (Stream h b) => Pipe h b -> IO () +listen :: (Stream h b, Resource IO h) => Pipeline h b -> IO () -- ^ Listen for responses and supply them to waiting threads in order -listen Pipe{..} = do +listen Pipeline{..} = do let n = length (encodeSize 0) h <- readMVar vHandle forever $ do @@ -131,23 +133,30 @@ listen Pipe{..} = do getN h len var <- readChan responseQueue putMVar var e + case e of + Left err -> close h >> fail (show err) -- close and stop looping + Right _ -> return () -send :: (Stream h b) => Pipe h b -> [b] -> IO () +send :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO () -- ^ Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future 'call's will get these responses instead of their own). -- Each message is preceeded by its length when written to socket. -send Pipe{..} messages = withMVar vHandle $ \h -> do - mapM_ (write encodeSize h) messages - flush h +-- Raises IOError and closes pipeline if send fails +send Pipeline{..} messages = withMVar vHandle (writeAll listenThread encodeSize messages) -call :: (Stream h b) => Pipe h b -> [b] -> IO (IO b) +call :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO (IO b) -- ^ Send messages all together to destination (no messages will be interleaved between them), and return /promise/ of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them). -- Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length. -call Pipe{..} messages = withMVar vHandle $ \h -> do - mapM_ (write encodeSize h) messages - flush h +-- Raises IOError and closes pipeline if send fails, likewise for reply. +call Pipeline{..} messages = withMVar vHandle $ \h -> do + writeAll listenThread encodeSize messages h var <- newEmptyMVar writeChan responseQueue var return (either ioError return =<< readMVar var) -- return promise -write :: (Stream h b, Monoid b, Length b) => (Size -> b) -> h -> b -> IO () -write encodeSize h bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes) +writeAll :: (Stream h b, Monoid b, Length b, Resource IO h) => ThreadId -> (Size -> b) -> [b] -> h -> IO () +-- ^ Write messages to stream. On error, close pipeline and raise IOError. +writeAll listenThread encodeSize messages h = onException + (mapM_ write messages >> flush h) + (killThread listenThread >> close h) + where + write bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes) diff --git a/Database/MongoDB.hs b/Database/MongoDB.hs index 2e16264..cd53a48 100644 --- a/Database/MongoDB.hs +++ b/Database/MongoDB.hs @@ -1,5 +1,5 @@ {- | -Client interface to MongoDB server(s). +Client interface to MongoDB database management system. Simple example below. Use with language extension /OvererloadedStrings/. @@ -10,12 +10,11 @@ Simple example below. Use with language extension /OvererloadedStrings/. > import Control.Monad.Trans (liftIO) > > main = do -> ee <- runNet $ do -> conn <- connect (host "127.0.0.1") -> runConn run conn -> print ee +> conn <- connect 1 (host "127.0.0.1") +> e <- access safe Master conn run +> print e > -> run = useDb "baseball" $ do +> run = use (Database "baseball") $ do > clearTeams > insertTeams > print' "All Teams" =<< allTeams @@ -30,13 +29,13 @@ Simple example below. Use with language extension /OvererloadedStrings/. > ["name" =: u"Phillies", "home" =: ["city" =: u"Philadelphia", "state" =: u"PA"], "league" =: u"National"], > ["name" =: u"Red Sox", "home" =: ["city" =: u"Boston", "state" =: u"MA"], "league" =: u"American"] ] > -> allTeams = rest =<< find (select [] "team") {sort = ["city" =: (1 :: Int)]} +> allTeams = rest =<< find (select [] "team") {sort = ["home.city" =: (1 :: Int)]} > > nationalLeagueTeams = rest =<< find (select ["league" =: u"National"] "team") > > newYorkTeams = rest =<< find (select ["home.state" =: u"NY"] "team") {project = ["name" =: (1 :: Int), "league" =: (1 :: Int)]} > -> print' title docs = liftIO $ putStrLn title >> mapM_ print docs +> print' title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs -} module Database.MongoDB ( diff --git a/Database/MongoDB/Admin.hs b/Database/MongoDB/Admin.hs index 69ec02b..9d3fe80 100644 --- a/Database/MongoDB/Admin.hs +++ b/Database/MongoDB/Admin.hs @@ -11,7 +11,7 @@ module Database.MongoDB.Admin ( -- ** User allUsers, addUser, removeUser, -- ** Database - cloneDatabase, copyDatabase, dropDatabase, repairDatabase, + admin, cloneDatabase, copyDatabase, dropDatabase, repairDatabase, -- ** Server serverBuildInfo, serverVersion, -- * Diagnotics @@ -51,17 +51,17 @@ coptElem Capped = "capped" =: True coptElem (MaxByteSize n) = "size" =: n coptElem (MaxItems n) = "max" =: n -createCollection :: (DbConn m) => [CollectionOption] -> Collection -> m Document +createCollection :: (DbAccess m) => [CollectionOption] -> Collection -> m Document -- ^ Create collection with given options. You only need to call this to set options, otherwise a collection is created automatically on first use with no options. createCollection opts col = runCommand $ ["create" =: col] ++ map coptElem opts -renameCollection :: (DbConn m) => Collection -> Collection -> m Document +renameCollection :: (DbAccess m) => Collection -> Collection -> m Document -- ^ Rename first collection to second collection renameCollection from to = do - db <- thisDatabase - useDb "admin" $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True] + Database db <- thisDatabase + use admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True] -dropCollection :: (DbConn m) => Collection -> m Bool +dropCollection :: (DbAccess m) => Collection -> m Bool -- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action). dropCollection coll = do resetIndexCache @@ -70,7 +70,7 @@ dropCollection coll = do if at "errmsg" r == ("ns not found" :: UString) then return False else fail $ "dropCollection failed: " ++ show r -validateCollection :: (DbConn m) => Collection -> m Document +validateCollection :: (DbAccess m) => Collection -> m Document -- ^ This operation takes a while validateCollection coll = runCommand ["validate" =: coll] @@ -87,7 +87,7 @@ data Index = Index { } deriving (Show, Eq) idxDocument :: Index -> Database -> Document -idxDocument Index{..} db = [ +idxDocument Index{..} (Database db) = [ "ns" =: db <.> iColl, "key" =: iKey, "name" =: iName, @@ -102,32 +102,32 @@ genName :: Order -> IndexName genName keys = intercalate "_" (map f keys) where f (k := v) = k `append` "_" `append` pack (show v) -ensureIndex :: (DbConn m) => Index -> m () +ensureIndex :: (DbAccess m) => Index -> m () -- ^ Create index if we did not already create one. May be called repeatedly with practically no performance hit, because we remember if we already called this for the same index (although this memory gets wiped out every 15 minutes, in case another client drops the index and we want to create it again). ensureIndex idx = let k = (iColl idx, iName idx) in do icache <- fetchIndexCache set <- liftIO (readIORef icache) unless (S.member k set) $ do - writeMode Safe (createIndex idx) + writeMode (Safe []) (createIndex idx) liftIO $ writeIORef icache (S.insert k set) -createIndex :: (DbConn m) => Index -> m () +createIndex :: (DbAccess m) => Index -> m () -- ^ Create index on the server. This call goes to the server every time. createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase -dropIndex :: (DbConn m) => Collection -> IndexName -> m Document +dropIndex :: (DbAccess m) => Collection -> IndexName -> m Document -- ^ Remove the index dropIndex coll idxName = do resetIndexCache runCommand ["deleteIndexes" =: coll, "index" =: idxName] -getIndexes :: (DbConn m) => Collection -> m [Document] +getIndexes :: (DbAccess m) => Collection -> m [Document] -- ^ Get all indexes on this collection getIndexes coll = do - db <- thisDatabase + Database db <- thisDatabase rest =<< find (select ["ns" =: db <.> coll] "system.indexes") -dropIndexes :: (DbConn m) => Collection -> m Document +dropIndexes :: (DbAccess m) => Collection -> m Document -- ^ Drop all indexes on this collection dropIndexes coll = do resetIndexCache @@ -143,7 +143,7 @@ type IndexCache = IORef (S.Set (Collection, IndexName)) dbIndexCache :: DbIndexCache -- ^ initialize cache and fork thread that clears it every 15 minutes dbIndexCache = unsafePerformIO $ do - table <- T.new (==) (T.hashString . unpack) + table <- T.new (==) (T.hashString . unpack . databaseName) _ <- forkIO . forever $ threadDelay 900000000 >> clearDbIndexCache return table {-# NOINLINE dbIndexCache #-} @@ -153,7 +153,7 @@ clearDbIndexCache = do keys <- map fst <$> T.toList dbIndexCache mapM_ (T.delete dbIndexCache) keys -fetchIndexCache :: (DbConn m) => m IndexCache +fetchIndexCache :: (DbAccess m) => m IndexCache -- ^ Get index cache for current database fetchIndexCache = do db <- thisDatabase @@ -166,7 +166,7 @@ fetchIndexCache = do T.insert dbIndexCache db idx return idx -resetIndexCache :: (DbConn m) => m () +resetIndexCache :: (DbAccess m) => m () -- ^ reset index cache for current database resetIndexCache = do icache <- fetchIndexCache @@ -174,70 +174,73 @@ resetIndexCache = do -- ** User -allUsers :: (DbConn m) => m [Document] +allUsers :: (DbAccess m) => m [Document] -- ^ Fetch all users of this database allUsers = map (exclude ["_id"]) <$> (rest =<< find (select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]}) -addUser :: (DbConn m) => Bool -> Username -> Password -> m () +addUser :: (DbAccess m) => Bool -> Username -> Password -> m () -- ^ Add user with password with read-only access if bool is True or read-write access if bool is False addUser readOnly user pass = do mu <- findOne (select ["user" =: user] "system.users") let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu) save "system.users" u -removeUser :: (DbConn m) => Username -> m () +removeUser :: (DbAccess m) => Username -> m () removeUser user = delete (select ["user" =: user] "system.users") -- ** Database -cloneDatabase :: (Conn m) => Database -> Host -> m Document --- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case). -cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost] +admin = Database "admin" +-- ^ \"admin\" database -copyDatabase :: (Conn m) => Database -> Host -> Maybe (Username, Password) -> Database -> m Document +cloneDatabase :: (Access m) => Database -> Host -> m Document +-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case). +cloneDatabase db fromHost = use db $ runCommand ["clone" =: showHostPort fromHost] + +copyDatabase :: (Access m) => Database -> Host -> Maybe (Username, Password) -> Database -> m Document -- ^ Copy database from given host to the server I am connected to. If username & password is supplied use them to read from given host. -copyDatabase fromDb fromHost mup toDb = do +copyDatabase (Database fromDb) fromHost mup (Database toDb) = do let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb] - useDb "admin" $ case mup of + use admin $ case mup of Nothing -> runCommand c Just (u, p) -> do n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost] runCommand $ c ++ ["username" =: u, "nonce" =: n, "key" =: pwKey n u p] -dropDatabase :: (Conn m) => Database -> m Document +dropDatabase :: (Access m) => Database -> m Document -- ^ Delete the given database! -dropDatabase db = useDb db $ runCommand ["dropDatabase" =: (1 :: Int)] +dropDatabase db = use db $ runCommand ["dropDatabase" =: (1 :: Int)] -repairDatabase :: (Conn m) => Database -> m Document +repairDatabase :: (Access m) => Database -> m Document -- ^ Attempt to fix any corrupt records. This operation takes a while. -repairDatabase db = useDb db $ runCommand ["repairDatabase" =: (1 :: Int)] +repairDatabase db = use db $ runCommand ["repairDatabase" =: (1 :: Int)] -- ** Server -serverBuildInfo :: (Conn m) => m Document -serverBuildInfo = useDb "admin" $ runCommand ["buildinfo" =: (1 :: Int)] +serverBuildInfo :: (Access m) => m Document +serverBuildInfo = use admin $ runCommand ["buildinfo" =: (1 :: Int)] -serverVersion :: (Conn m) => m UString +serverVersion :: (Access m) => m UString serverVersion = at "version" <$> serverBuildInfo -- * Diagnostics -- ** Collection -collectionStats :: (DbConn m) => Collection -> m Document +collectionStats :: (DbAccess m) => Collection -> m Document collectionStats coll = runCommand ["collstats" =: coll] -dataSize :: (DbConn m) => Collection -> m Int +dataSize :: (DbAccess m) => Collection -> m Int dataSize c = at "size" <$> collectionStats c -storageSize :: (DbConn m) => Collection -> m Int +storageSize :: (DbAccess m) => Collection -> m Int storageSize c = at "storageSize" <$> collectionStats c -totalIndexSize :: (DbConn m) => Collection -> m Int +totalIndexSize :: (DbAccess m) => Collection -> m Int totalIndexSize c = at "totalIndexSize" <$> collectionStats c -totalSize :: (DbConn m) => Collection -> m Int +totalSize :: (DbAccess m) => Collection -> m Int totalSize coll = do x <- storageSize coll xs <- mapM isize =<< getIndexes coll @@ -249,33 +252,33 @@ totalSize coll = do data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq) -getProfilingLevel :: (DbConn m) => m ProfilingLevel +getProfilingLevel :: (DbAccess m) => m ProfilingLevel getProfilingLevel = toEnum . at "was" <$> runCommand ["profile" =: (-1 :: Int)] type MilliSec = Int -setProfilingLevel :: (DbConn m) => ProfilingLevel -> Maybe MilliSec -> m () +setProfilingLevel :: (DbAccess m) => ProfilingLevel -> Maybe MilliSec -> m () setProfilingLevel p mSlowMs = runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return () -- ** Database -dbStats :: (DbConn m) => m Document +dbStats :: (DbAccess m) => m Document dbStats = runCommand ["dbstats" =: (1 :: Int)] -currentOp :: (DbConn m) => m (Maybe Document) +currentOp :: (DbAccess m) => m (Maybe Document) -- ^ See currently running operation on the database, if any currentOp = findOne (select [] "$cmd.sys.inprog") type OpNum = Int -killOp :: (DbConn m) => OpNum -> m (Maybe Document) +killOp :: (DbAccess m) => OpNum -> m (Maybe Document) killOp op = findOne (select ["op" =: op] "$cmd.sys.killop") -- ** Server -serverStatus :: (Conn m) => m Document -serverStatus = useDb "admin" $ runCommand ["serverStatus" =: (1 :: Int)] +serverStatus :: (Access m) => m Document +serverStatus = use admin $ runCommand ["serverStatus" =: (1 :: Int)] {- Authors: Tony Hannan diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index 144064b..19437ab 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -1,41 +1,39 @@ -{- | A replica set is a set of servers that mirror each other (a non-replicated server can act like a replica set of one). One server in a replica set is the master and the rest are slaves. When the master goes down, one of the slaves becomes master. The ReplicaSet object in this client maintains a list of servers that it currently knows are in the set. It refreshes this list every time it establishes a new connection with one of the servers in the set. Each server in the set knows who the other member in the set are, and who is master. The user asks the ReplicaSet object for a new master or slave connection. When a connection fails, the user must ask the ReplicaSet for a new connection (which most likely will connect to another server since the previous one failed). When connecting to a new server you loose all session state that was stored with the old server, which includes open cursors and temporary map-reduce output collections. Attempting to read from a lost cursor on a new server will raise a ServerFailure exception. Attempting to read a lost map-reduce temp output on a new server will return an empty set (not an error, like it maybe should). -} +{- | A Mongo connection is a pool of TCP connections to a single server or a replica set of servers. -} -{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-} +{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes #-} module Database.MongoDB.Connection ( - runNet, -- * Host Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM, -- * ReplicaSet - ReplicaSet, replicaSet, replicas, - newConnection, + ReplicaSet(..), -- * MasterOrSlaveOk MasterOrSlaveOk(..), -- * Connection - Connection, connect, - -- * Resource - Resource(..) + Server(..), ) where import Database.MongoDB.Internal.Protocol -import Data.Bson ((=:), at) +import Data.Bson ((=:), at, UString) import Control.Pipeline (Resource(..)) import Control.Applicative ((<$>)) import Control.Arrow ((+++), left) import Control.Exception (assert) -import System.IO.Error as E (try) +import System.IO.Error as E (try, mkIOError, userErrorType) import Control.Monad.Error -import Control.Monad.Throw -import Data.IORef +import Control.Monad.Throw (throw, onException) +import Control.Monad.MVar import Network (HostName, PortID(..), connectTo) import Data.Bson (Document, look, typed) import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) import Control.Monad.Identity -import Database.MongoDB.Internal.Util (true1, MonadIO') -- PortID instances +import Control.Monad.Util (MonadIO', untilSuccess) +import Database.MongoDB.Internal.Util (true1) -- PortID instances +import Var.Pool +import System.Random (newStdGen, randomRs) +import Data.List (delete, find, nub) -runNet :: ErrorT IOError m a -> m (Either IOError a) --- ^ Execute action that raises IOError only on network problem. Other IOErrors like file access errors are not caught by this. -runNet = runErrorT +type Name = UString adminCommand :: Document -> Request -- ^ Convert command to request @@ -88,46 +86,48 @@ readHostPort :: String -> Host -- ^ Read string \"hostname:port\" as @Host hostname port@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax. readHostPort = runIdentity . readHostPortM --- ** Replica Set +-- * Replica Set -newtype ReplicaSet = ReplicaSet (IORef [Host]) --- ^ Reference to a replica set of hosts. Ok if really not a replica set and just a stand-alone server, in which case it acts like a replica set of one. +data ReplicaSet = ReplicaSet {setName :: Name, seedHosts :: [Host]} deriving (Show) +-- ^ Replica set of hosts identified by set name. At least one of the seed hosts must be an active member of the set. However, this list is not used to identify the set, just the set name. -replicaSet :: [Host] -> IO ReplicaSet --- ^ Create a reference to a replica set with given hosts as the initial seed list (a subset of the hosts in the replica set) -replicaSet s = assert (not $ null s) (ReplicaSet <$> newIORef s) +instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y -replicas :: ReplicaSet -> IO [Host] --- ^ Return current list of known hosts in replica set. This list is updated on every 'newConnection'. -replicas (ReplicaSet ref) = readIORef ref +-- ** Replica Info --- * Replica Info +getReplicaInfo :: Pipe -> ErrorT IOError IO ReplicaInfo +-- ^ Get replica info of the connected host. Throw IOError if connection fails or host is not part of a replica set (no /hosts/ and /primary/ field). +getReplicaInfo pipe = do + promise <- call pipe [] (adminCommand ["ismaster" =: (1 :: Int)]) + info <- commandReply "ismaster" <$> promise + look "hosts" info + look "primary" info + return info -data ReplicaInfo = ReplicaInfo Host Document deriving (Show, Eq) +type ReplicaInfo = Document -- ^ Configuration info of a host in a replica set. Contains all the hosts in the replica set plus its role in that set (master, slave, or arbiter) -isMaster :: ReplicaInfo -> Bool +isPrimary :: ReplicaInfo -> Bool -- ^ Is the replica described by this info a master/primary (not slave or arbiter)? -isMaster (ReplicaInfo _ i) = true1 "ismaster" i +isPrimary = true1 "ismaster" -isSlave :: ReplicaInfo -> Bool +isSecondary :: ReplicaInfo -> Bool -- ^ Is the replica described by this info a slave/secondary (not master or arbiter) -isSlave = not . isMaster -- TODO: distinguish between slave and arbiter +isSecondary = true1 "secondary" -allReplicas :: ReplicaInfo -> [Host] +replicas :: ReplicaInfo -> [Host] -- ^ All replicas in set according to this replica configuration info. --- If host is stand-alone then it won't have \"hosts\" in its configuration, in which case we return the host by itself. -allReplicas (ReplicaInfo h i) = maybe [h] (map readHostPort . typed) (look "hosts" i) +replicas = map readHostPort . at "hosts" -sortedReplicas :: ReplicaInfo -> IO [Host] --- ^ All replicas in set sorted by distance from this client. TODO -sortedReplicas = return . allReplicas +primary :: ReplicaInfo -> Host +-- ^ Read primary from configuration info +primary = readHostPort . at "primary" -getReplicaInfo :: (Throw IOError m, MonadIO' m) => Host -> Connection -> m ReplicaInfo --- ^ Get replica info of the connected host. Throw IOError if connection fails. -getReplicaInfo host' conn = do - promise <- throwLeft . liftIO . E.try $ call conn [] (adminCommand ["ismaster" =: (1 :: Int)]) - fmap (ReplicaInfo host' . commandReply "ismaster") . throwLeft . liftIO $ E.try promise +hosts :: ReplicaInfo -> [Host] +-- ^ replicas with primary at head +hosts info = master : delete master members where + members = replicas info + master = primary info -- * MasterOrSlaveOk @@ -138,49 +138,94 @@ data MasterOrSlaveOk = isMS :: MasterOrSlaveOk -> ReplicaInfo -> Bool -- ^ Does the host (as described by its replica-info) match the master/slave type -isMS Master i = isMaster i -isMS SlaveOk i = isSlave i || isMaster i +isMS Master i = isPrimary i +isMS SlaveOk i = isSecondary i || isPrimary i -- * Connection -newConnection :: (Throw IOError m, MonadIO' m) => MasterOrSlaveOk -> ReplicaSet -> m Connection --- ^ Create a connection to a master or slave in the replica set. Throw IOError if failed to connect to any host in replica set that is the right master/slave type. 'close' connection when you are done using it even if a failure is raised. Garbage collected connections will be closed automatically (but don't rely on this when creating many connections). --- TODO: prefer slave over master when SlaveOk and both are available. -newConnection mos (ReplicaSet vHosts) = throwLeft . liftIO $ left (userError . show) <$> do - hosts <- readIORef vHosts - e <- connectFirst mos hosts - case e of - Right (conn, info) -> do - writeIORef vHosts =<< sortedReplicas info - return (Right conn) - Left (fs, is) -> if null is - then return (Left fs) - else do - reps <- sortedReplicas (head is) - writeIORef vHosts reps - -- try again in case new replicas in info - (fst +++ fst) <$> connectFirst mos reps +type Pool' = Pool IOError -connectFirst :: MasterOrSlaveOk -> [Host] -> IO (Either ([(Host, IOError)], [ReplicaInfo]) (Connection, ReplicaInfo)) --- ^ Connect to first host that succeeds and is master/slave, otherwise return list of failed connections plus info of connections that succeeded but were not master/slave. -connectFirst mos = connectFirst' ([], []) where - connectFirst' (fs, is) [] = return $ Left (fs, is) - connectFirst' (fs, is) (h : hs) = do - e <- runErrorT $ do - c <- connect h - i <- getReplicaInfo h c - return (c, i) - case e of - Left f -> connectFirst' ((h, f) : fs, is) hs - Right (c, i) -> if isMS mos i - then return $ Right (c, i) - else do - close c - connectFirst' ((h, userError $ "not a " ++ show mos) : fs, i : is) hs +-- | A Server is a single server ('Host') or a replica set of servers ('ReplicaSet') +class Server t where + data Connection t + -- ^ A Mongo connection is a pool of TCP connections to a host or a replica set of hosts + connect :: (MonadIO' m) => Int -> t -> m (Connection t) + -- ^ Create a Mongo Connection to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host. + getPipe :: MasterOrSlaveOk -> Connection t -> ErrorT IOError IO Pipe + -- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk). + killPipes :: Connection t -> IO () + -- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when this Connection is garbage collected. -connect :: (Throw IOError m, MonadIO' m) => Host -> m Connection --- ^ Create a connection to the given host (as opposed to connecting to some host in a replica set via 'newConnection'). Throw IOError if can't connect. -connect (Host hostname port) = throwLeft . liftIO $ E.try (mkConnection =<< connectTo hostname port) +-- ** Connection Host + +instance Server Host where + data Connection Host = HostConnection {connHost :: Host, connPool :: Pool' Pipe} + -- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style. + connect poolSize host = liftIO (connectHost poolSize host) + -- ^ Create a Connection (pool of TCP connections) to server (host or replica set) + getPipe _ = getHostPipe + -- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect. + killPipes (HostConnection _ pool) = killAll pool + +connectHost :: Int -> Host -> IO (Connection Host) +-- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style. +connectHost poolSize host = HostConnection host <$> newPool Factory{..} poolSize where + newResource = tcpConnect host + killResource = close + isExpired = isClosed + +getHostPipe :: Connection Host -> ErrorT IOError IO Pipe +-- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host. +getHostPipe (HostConnection _ pool) = aResource pool + +tcpConnect :: Host -> ErrorT IOError IO Pipe +-- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect. +tcpConnect (Host hostname port) = ErrorT . E.try $ mkPipe =<< connectTo hostname port + +-- ** Connection ReplicaSet + +instance Server ReplicaSet where + data Connection ReplicaSet = ReplicaSetConnection { + repsetName :: Name, + currentMembers :: MVar [Connection Host] } -- master at head after a refresh + connect poolSize repset = liftIO (connectSet poolSize repset) + getPipe = getSetPipe + killPipes ReplicaSetConnection{..} = withMVar currentMembers (mapM_ killPipes) + +replicaSet :: (MonadIO' m) => Connection ReplicaSet -> m ReplicaSet +-- ^ Set name with current members as seed list +replicaSet ReplicaSetConnection{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers + +connectSet :: Int -> ReplicaSet -> IO (Connection ReplicaSet) +-- ^ Create a connection to each member of the replica set. +connectSet poolSize repset = assert (not . null $ seedHosts repset) $ do + currentMembers <- newMVar =<< mapM (connect poolSize) (seedHosts repset) + return $ ReplicaSetConnection (setName repset) currentMembers + +getMembers :: Name -> [Connection Host] -> ErrorT IOError IO [Host] +-- ^ Get members of replica set, master first. Query supplied connections until config found. +-- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not. +getMembers repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections + +refreshMembers :: Name -> [Connection Host] -> ErrorT IOError IO [Connection Host] +-- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected. +refreshMembers repsetName connections = do + n <- liftIO . poolSize . connPool $ head connections + mapM (connection n) =<< getMembers repsetName connections + where + connection n host = maybe (connect n host) return $ find ((host ==) . connHost) connections + +getSetPipe :: MasterOrSlaveOk -> Connection ReplicaSet -> ErrorT IOError IO Pipe +-- ^ Return a pipe to primary or a random secondary in replica set. Use primary for SlaveOk if and only if no secondaries. Note, refreshes members each time (makes ismaster call to primary). +getSetPipe mos ReplicaSetConnection{..} = modifyMVar currentMembers $ \connections -> do + connections <- refreshMembers repsetName connections -- master at head after refresh + pipe <- case mos of + Master -> getHostPipe (head connections) + SlaveOk -> do + let n = length connections - 1 + is <- take (max 1 n) . nub . randomRs (min 1 n, n) <$> liftIO newStdGen + untilSuccess (getHostPipe . (connections !!)) is + return (connections, pipe) {- Authors: Tony Hannan diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 2d77dd3..4fdf769 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -1,12 +1,12 @@ -{-| Low-level messaging between this client and the MongoDB server. See Mongo Wire Protocol (). +{-| Low-level messaging between this client and the MongoDB server, see Mongo Wire Protocol (). This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -} -{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-} +{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts #-} module Database.MongoDB.Internal.Protocol ( - -- * Connection - Connection, mkConnection, + -- * Pipe + Pipe, mkPipe, send, call, -- * Message FullCollection, @@ -37,30 +37,33 @@ import Data.IORef import System.IO.Unsafe (unsafePerformIO) import Data.Digest.OpenSSL.MD5 (md5sum) import Data.UString as U (pack, append, toByteString) +import System.IO.Error as E (try) +import Control.Monad.Error +import Control.Monad.Trans (MonadIO(..)) --- * Connection +-- * Pipe -type Connection = P.Pipe Handle ByteString +type Pipe = P.Pipeline Handle ByteString -- ^ Thread-safe TCP connection to server with pipelined requests -mkConnection :: Handle -> IO Connection +mkPipe :: Handle -> IO Pipe -- ^ New thread-safe pipelined connection over handle -mkConnection = P.newPipe encodeSize decodeSize where +mkPipe = P.newPipeline encodeSize decodeSize where encodeSize = runPut . putInt32 . toEnum . (+ 4) decodeSize = subtract 4 . fromEnum . runGet getInt32 -send :: Connection -> [Notice] -> IO () --- ^ Send notices as a contiguous batch to server with no reply. Raise IOError if connection fails. -send conn notices = P.send conn =<< mapM noticeBytes notices +send :: Pipe -> [Notice] -> ErrorT IOError IO () +-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. +send conn notices = ErrorT . E.try $ P.send conn =<< mapM noticeBytes notices -call :: Connection -> [Notice] -> Request -> IO (IO Reply) --- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will raise IOError if connection fails. -call conn notices request = do +call :: Pipe -> [Notice] -> Request -> ErrorT IOError IO (ErrorT IOError IO Reply) +-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. +call conn notices request = ErrorT . E.try $ do nMessages <- mapM noticeBytes notices requestId <- genRequestId let rMessage = runPut (putRequest request requestId) promise <- P.call conn (nMessages ++ [rMessage]) - return (bytesReply requestId <$> promise) + return (ErrorT . E.try $ bytesReply requestId <$> promise) noticeBytes :: Notice -> IO ByteString noticeBytes notice = runPut . putNotice notice <$> genRequestId diff --git a/Database/MongoDB/Internal/Util.hs b/Database/MongoDB/Internal/Util.hs index aa3bd90..56fe15a 100644 --- a/Database/MongoDB/Internal/Util.hs +++ b/Database/MongoDB/Internal/Util.hs @@ -1,14 +1,11 @@ -- | Miscellaneous general functions -{-# LANGUAGE StandaloneDeriving, FlexibleInstances, UndecidableInstances #-} +{-# LANGUAGE StandaloneDeriving #-} module Database.MongoDB.Internal.Util where import Prelude hiding (length) import Network (PortID(..)) -import Control.Applicative (Applicative(..), (<$>)) -import Control.Monad.Reader -import Control.Monad.Error import Data.UString as U (cons, append) import Data.Bits (Bits, (.|.)) import Data.Bson @@ -17,20 +14,6 @@ deriving instance Show PortID deriving instance Eq PortID deriving instance Ord PortID -instance (Monad m) => Applicative (ReaderT r m) where - pure = return - (<*>) = ap - -instance (Monad m, Error e) => Applicative (ErrorT e m) where - pure = return - (<*>) = ap - -class (MonadIO m, Applicative m, Functor m) => MonadIO' m -instance (MonadIO m, Applicative m, Functor m) => MonadIO' m - -ignore :: (Monad m) => a -> m () -ignore _ = return () - snoc :: [a] -> a -> [a] -- ^ add element to end of list (/snoc/ is reverse of /cons/, which adds to front of list) snoc list a = list ++ [a] @@ -45,10 +28,6 @@ bitOr = foldl (.|.) 0 -- ^ Concat first and second together with period in between. Eg. @\"hello\" \<.\> \"world\" = \"hello.world\"@ a <.> b = U.append a (cons '.' b) -loop :: (Functor m, Monad m) => m (Maybe a) -> m [a] --- ^ Repeatedy execute action, collecting results, until it returns Nothing -loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act) - true1 :: Label -> Document -> Bool -- ^ Is field's value a 1 or True (MongoDB use both Int and Bools for truth values). Error if field not in document or field not a Num or Bool. true1 k doc = case valueAt k doc of diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 4c1f656..76e843e 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -3,10 +3,10 @@ {-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes, ImpredicativeTypes #-} module Database.MongoDB.Query ( - -- * Connected - Connected, runConn, Conn, Failure(..), + -- * Access + access, Access, Action, runAction, Failure(..), -- * Database - Database, allDatabases, DbConn, useDb, thisDatabase, + Database(..), allDatabases, DbAccess, use, thisDatabase, -- ** Authentication P.Username, P.Password, auth, -- * Collection @@ -16,7 +16,7 @@ module Database.MongoDB.Query ( Select(select), -- * Write -- ** WriteMode - WriteMode(..), writeMode, + WriteMode(..), safe, GetLastError, writeMode, -- ** Insert insert, insert_, insertMany, insertMany_, -- ** Update @@ -51,48 +51,57 @@ import Control.Concurrent.MVar import Control.Pipeline (Resource(..)) import qualified Database.MongoDB.Internal.Protocol as P import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call) -import Database.MongoDB.Connection (MasterOrSlaveOk(..)) +import Database.MongoDB.Connection (MasterOrSlaveOk(..), Server(..)) import Data.Bson import Data.Word import Data.Int import Data.Maybe (listToMaybe, catMaybes) -import Data.UString as U (dropWhile, any, tail) -import Database.MongoDB.Internal.Util (loop, (<.>), true1, MonadIO') -- plus Applicative instances of ErrorT & ReaderT +import Data.UString as U (dropWhile, any, tail, unpack) +import Control.Monad.Util (MonadIO', loop) -- plus Applicative instances of ErrorT & ReaderT +import Database.MongoDB.Internal.Util ((<.>), true1) -send :: (Context Connection m, Throw IOError m, MonadIO m) => [Notice] -> m () --- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. -send ns = throwLeft . liftIO . try . flip P.send ns =<< context +mapErrorIO :: (Throw e m, MonadIO m) => (e' -> e) -> ErrorT e' IO a -> m a +mapErrorIO f = throwLeft' f . liftIO . runErrorT -call :: (Context Connection m, Throw IOError m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw IOError n, MonadIO n) => n Reply) --- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw IOError if connection fails on send, and promise will throw IOError if connection fails on receive. +send :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> m () +-- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails. +send ns = mapErrorIO ConnectionFailure . flip P.send ns =<< context + +call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply) +-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive. call ns r = do - conn <- context - promise <- throwLeft . liftIO $ try (P.call conn ns r) - return (throwLeft . liftIO $ try promise) + pipe <- context + promise <- mapErrorIO ConnectionFailure (P.call pipe ns r) + return (mapErrorIO ConnectionFailure promise) --- * Connected Monad +-- * Mongo Monad -newtype Connected m a = Connected (ErrorT Failure (ReaderT WriteMode (ReaderT MasterOrSlaveOk (ReaderT Connection m))) a) - deriving (Context Connection, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, Monad, Applicative, Functor) --- ^ Monad with access to a 'Connection', 'MasterOrSlaveOk', and 'WriteMode', and throws a 'Failure' on read/write failure and IOError on connection failure +access :: (Server s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> Connection s -> Action m a -> m (Either Failure a) +-- ^ Run action with access to server or replica set via one of the 'Pipe's (TCP connections) in given 'Connection' pool +access w mos conn act = do + ePipe <- liftIO . runErrorT $ getPipe mos conn + either (return . Left . ConnectionFailure) (runAction act w mos) ePipe -deriving instance (Throw IOError m) => Throw IOError (Connected m) +-- | A monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read, write, or pipe failure +class (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m +instance (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m -instance MonadTrans Connected where - lift = Connected . lift . lift . lift . lift +newtype Action m a = Action (ErrorT Failure (ReaderT WriteMode (ReaderT MasterOrSlaveOk (ReaderT Pipe m))) a) + deriving (Context Pipe, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, Monad, Applicative, Functor) +-- ^ Monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws a 'Failure' on read, write or pipe failure -runConn :: Connected m a -> Connection -> m (Either Failure a) --- ^ Run action with access to connection. It starts out assuming it is master (invoke 'slaveOk' inside it to change that) and that writes don't need to be check (invoke 'writeMode' to change that). Return Left Failure if error in execution. Throws IOError if connection fails during execution. -runConn (Connected action) = runReaderT (runReaderT (runReaderT (runErrorT action) Unsafe) Master) +instance MonadTrans Action where + lift = Action . lift . lift . lift . lift --- | A monad with access to a 'Connection', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read/write failure and 'IOError' on connection failure -class (Context Connection m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, Throw IOError m, MonadIO' m) => Conn m -instance (Context Connection m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, Throw IOError m, MonadIO' m) => Conn m +runAction :: Action m a -> WriteMode -> MasterOrSlaveOk -> Pipe -> m (Either Failure a) +-- ^ Run action with access to pipe. It starts out assuming it is master (invoke 'slaveOk' inside it to change that) and that writes don't need to be check (invoke 'writeMode' to change that). Return Left Failure if error in execution. Throws IOError if pipe fails during execution. +runAction (Action action) w mos = runReaderT (runReaderT (runReaderT (runErrorT action) w) mos) -- | Read or write exception like cursor expired or inserting a duplicate key. -- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change. data Failure = - CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set) + ConnectionFailure IOError -- ^ TCP connection ('Pipe') failed. Make work if you try again on the same Mongo 'Connection' which will create a new Pipe. + | CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set) | QueryFailure String -- ^ Query failed for some reason as described in the string | WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string deriving (Show, Eq) @@ -102,29 +111,31 @@ instance Error Failure where strMsg = error -- * Database -type Database = UString +newtype Database = Database {databaseName :: UString} deriving (Eq, Ord) -- ^ Database name --- | A 'Conn' monad with access to a 'Database' -class (Context Database m, Conn m) => DbConn m -instance (Context Database m, Conn m) => DbConn m +instance Show Database where show (Database x) = unpack x -allDatabases :: (Conn m) => m [Database] +-- | As 'Access' monad with access to a particular 'Database' +class (Context Database m, Access m) => DbAccess m +instance (Context Database m, Access m) => DbAccess m + +allDatabases :: (Access m) => m [Database] -- ^ List all databases residing on server -allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases") +allDatabases = map (Database . at "name") . at "databases" <$> use (Database "admin") (runCommand1 "listDatabases") -useDb :: Database -> ReaderT Database m a -> m a +use :: Database -> ReaderT Database m a -> m a -- ^ Run Db action against given database -useDb = flip runReaderT +use = flip runReaderT -thisDatabase :: (DbConn m) => m Database +thisDatabase :: (DbAccess m) => m Database -- ^ Current database in use thisDatabase = context -- * Authentication -auth :: (DbConn m) => Username -> Password -> m Bool --- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new connection. +auth :: (DbAccess m) => Username -> Password -> m Bool +-- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe. auth u p = do n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)] true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: u, "nonce" =: n, "key" =: pwKey n u p] @@ -134,7 +145,7 @@ auth u p = do type Collection = UString -- ^ Collection name (not prefixed with database) -allCollections :: (DbConn m) => m [Collection] +allCollections :: (DbAccess m) => m [Collection] -- ^ List all collections in this database allCollections = do db <- thisDatabase @@ -142,17 +153,13 @@ allCollections = do return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs where dropDbPrefix = U.tail . U.dropWhile (/= '.') - isSpecial db col = U.any (== '$') col && db <.> col /= "local.oplog.$main" + isSpecial (Database db) col = U.any (== '$') col && db <.> col /= "local.oplog.$main" -- * Selection data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq) -- ^ Selects documents in collection that match selector -{-select :: Selector -> Collection -> Selection --- ^ Synonym for 'Select' -select = Select-} - type Selector = Document -- ^ Filter for a query, analogous to the where clause in SQL. @[]@ matches all documents in collection. @[x =: a, y =: b]@ is analogous to @where x = a and y = b@ in SQL. See for full selector syntax. @@ -177,30 +184,37 @@ instance Select Query where -- | Default write-mode is 'Unsafe' data WriteMode = Unsafe -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not. - | Safe -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. + | Safe GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write. deriving (Show, Eq) -writeMode :: (Conn m) => WriteMode -> m a -> m a +type GetLastError = Document +-- ^ Parameters for getLastError command. For example ["w" =: 2] tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See "http://www.mongodb.org/display/DOCS/Last+Error+Commands" for more options. + +safe :: WriteMode +-- ^ Safe [] +safe = Safe [] + +writeMode :: (Access m) => WriteMode -> m a -> m a -- ^ Run action with given 'WriteMode' writeMode = push . const -write :: (DbConn m) => Notice -> m () +write :: (DbAccess m) => Notice -> m () -- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error. write notice = do mode <- context case mode of Unsafe -> send [notice] - Safe -> do - me <- getLastError [notice] + Safe params -> do + me <- getLastError [notice] params maybe (return ()) (throw . uncurry WriteFailure) me type ErrorCode = Int -- ^ Error code from getLastError -getLastError :: (DbConn m) => [Notice] -> m (Maybe (ErrorCode, String)) +getLastError :: (DbAccess m) => [Notice] -> GetLastError -> m (Maybe (ErrorCode, String)) -- ^ Send notices (writes) then fetch what the last error was, Nothing means no error -getLastError writes = do - r <- runCommand' writes ["getlasterror" =: (1 :: Int)] +getLastError writes params = do + r <- runCommand' writes $ ("getlasterror" =: (1 :: Int)) : params return $ (at "code" r,) <$> lookup "err" r {-resetLastError :: (DbConn m) => m () @@ -209,23 +223,23 @@ resetLastError = runCommand1 "reseterror" >> return ()-} -- ** Insert -insert :: (DbConn m) => Collection -> Document -> m Value +insert :: (DbAccess m) => Collection -> Document -> m Value -- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied insert col doc = head <$> insertMany col [doc] -insert_ :: (DbConn m) => Collection -> Document -> m () +insert_ :: (DbAccess m) => Collection -> Document -> m () -- ^ Same as 'insert' except don't return _id insert_ col doc = insert col doc >> return () -insertMany :: (DbConn m) => Collection -> [Document] -> m [Value] +insertMany :: (DbAccess m) => Collection -> [Document] -> m [Value] -- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied insertMany col docs = do - db <- thisDatabase + Database db <- thisDatabase docs' <- liftIO $ mapM assignId docs write (Insert (db <.> col) docs') mapM (look "_id") docs' -insertMany_ :: (DbConn m) => Collection -> [Document] -> m () +insertMany_ :: (DbAccess m) => Collection -> [Document] -> m () -- ^ Same as 'insertMany' except don't return _ids insertMany_ col docs = insertMany col docs >> return () @@ -237,54 +251,54 @@ assignId doc = if X.any (("_id" ==) . label) doc -- ** Update -save :: (DbConn m) => Collection -> Document -> m () +save :: (DbAccess m) => Collection -> Document -> m () -- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or update it if its not new (has \"_id\" field) save col doc = case look "_id" doc of Nothing -> insert_ col doc Just i -> repsert (Select ["_id" := i] col) doc -replace :: (DbConn m) => Selection -> Document -> m () +replace :: (DbAccess m) => Selection -> Document -> m () -- ^ Replace first document in selection with given document replace = update [] -repsert :: (DbConn m) => Selection -> Document -> m () +repsert :: (DbAccess m) => Selection -> Document -> m () -- ^ Replace first document in selection with given document, or insert document if selection is empty repsert = update [Upsert] type Modifier = Document -- ^ Update operations on fields in a document. See -modify :: (DbConn m) => Selection -> Modifier -> m () +modify :: (DbAccess m) => Selection -> Modifier -> m () -- ^ Update all documents in selection using given modifier modify = update [MultiUpdate] -update :: (DbConn m) => [UpdateOption] -> Selection -> Document -> m () +update :: (DbAccess m) => [UpdateOption] -> Selection -> Document -> m () -- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty. update opts (Select sel col) up = do - db <- thisDatabase + Database db <- thisDatabase write (Update (db <.> col) opts sel up) -- ** Delete -delete :: (DbConn m) => Selection -> m () +delete :: (DbAccess m) => Selection -> m () -- ^ Delete all documents in selection delete = delete' [] -deleteOne :: (DbConn m) => Selection -> m () +deleteOne :: (DbAccess m) => Selection -> m () -- ^ Delete first document in selection deleteOne = delete' [SingleRemove] -delete' :: (DbConn m) => [DeleteOption] -> Selection -> m () +delete' :: (DbAccess m) => [DeleteOption] -> Selection -> m () -- ^ Delete all documents in selection unless 'SingleRemove' option is given then only delete first document in selection delete' opts (Select sel col) = do - db <- thisDatabase + Database db <- thisDatabase write (Delete (db <.> col) opts sel) -- * Read -- ** MasterOrSlaveOk -slaveOk :: (Conn m) => m a -> m a +slaveOk :: (Access m) => m a -> m a -- ^ Ok to execute given action against slave, ie. eventually consistent reads slaveOk = push (const SlaveOk) @@ -347,7 +361,7 @@ batchSizeRemainingLimit batchSize limit = if limit == 0 queryRequest :: Bool -> MasterOrSlaveOk -> Query -> Database -> (Request, Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. -queryRequest isExplain mos Query{..} db = (P.Query{..}, remainingLimit) where +queryRequest isExplain mos Query{..} (Database db) = (P.Query{..}, remainingLimit) where qOptions = msOption mos ++ map pOption options qFullCollection = db <.> coll selection qSkip = fromIntegral skip @@ -360,79 +374,80 @@ queryRequest isExplain mos Query{..} db = (P.Query{..}, remainingLimit) where special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection -runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState' +runQuery :: (DbAccess m) => Bool -> [Notice] -> Query -> m CursorState' -- ^ Send query request and return cursor state runQuery isExplain ns q = do db <- thisDatabase slaveOK <- context call' ns (queryRequest isExplain slaveOK q db) -find :: (DbConn m) => Query -> m Cursor +find :: (DbAccess m) => Query -> m Cursor -- ^ Fetch documents satisfying query find q@Query{selection, batchSize} = do db <- thisDatabase cs' <- runQuery False [] q newCursor db (coll selection) batchSize cs' -findOne' :: (DbConn m) => [Notice] -> Query -> m (Maybe Document) +findOne' :: (DbAccess m) => [Notice] -> Query -> m (Maybe Document) -- ^ Send notices and fetch first document satisfying query or Nothing if none satisfy it findOne' ns q = do CS _ _ docs <- cursorState =<< runQuery False ns q {limit = 1} return (listToMaybe docs) -findOne :: (DbConn m) => Query -> m (Maybe Document) +findOne :: (DbAccess m) => Query -> m (Maybe Document) -- ^ Fetch first document satisfying query or Nothing if none satisfy it findOne = findOne' [] -explain :: (DbConn m) => Query -> m Document +explain :: (DbAccess m) => Query -> m Document -- ^ Return performance stats of query execution explain q = do -- same as findOne but with explain set to true CS _ _ docs <- cursorState =<< runQuery True [] q {limit = 1} return $ if null docs then error ("no explain: " ++ show q) else head docs -count :: (DbConn m) => Query -> m Int +count :: (DbAccess m) => Query -> m Int -- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present) count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand (["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)] ++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32))) -distinct :: (DbConn m) => Label -> Selection -> m [Value] +distinct :: (DbAccess m) => Label -> Selection -> m [Value] -- ^ Fetch distinct values of field in selected documents distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel] -- *** Cursor data Cursor = Cursor FullCollection BatchSize (MVar CursorState') --- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the connection is closed, so you can open another connection to the same server and continue using the cursor. +-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the pipe is closed, so you can open another pipe to the same server and continue using the cursor. -modifyCursorState' :: (Conn m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Connected (ErrorT IOError IO) (CursorState', a)) -> m a +modifyCursorState' :: (Access m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', a)) -> m a -- ^ Analogous to 'modifyMVar' but with Conn monad modifyCursorState' (Cursor fcol batch var) act = do - conn <- context + wr <- context + mos <- context + pipe <- context e <- liftIO . modifyMVar var $ \cs' -> do - ee <- runErrorT $ runConn (act fcol batch cs') conn - return $ case ee of - Right (Right (cs'', a)) -> (cs'', Right a) - Right (Left failure) -> (cs', Left $ throw failure) - Left ioerror -> (cs', Left $ throw ioerror) + e' <- runAction (act fcol batch cs') wr mos pipe + return $ case e' of + Right (cs'', a) -> (cs'', Right a) + Left failure -> (cs', Left $ throw failure) either id return e -getCursorState :: (Conn m) => Cursor -> m CursorState +getCursorState :: (Access m) => Cursor -> m CursorState -- ^ Extract current cursor status getCursorState (Cursor _ _ var) = cursorState =<< liftIO (readMVar var) data CursorState' = - Delayed (forall n. (Throw Failure n, Throw IOError n, MonadIO n) => n CursorState) + Delayed (forall n. (Throw Failure n, MonadIO n) => n CursorState) | CursorState CursorState -- ^ A cursor state or a promised cursor state which may fail -call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState' +call' :: (Access m) => [Notice] -> (Request, Limit) -> m CursorState' -- ^ Send notices and request and return promised cursor state call' ns (req, remainingLimit) = do promise <- call ns req return $ Delayed (fromReply remainingLimit =<< promise) -cursorState :: (Conn m) => CursorState' -> m CursorState +cursorState :: (Access m) => CursorState' -> m CursorState -- ^ Convert promised cursor state to cursor state or failure cursorState (Delayed promise) = promise cursorState (CursorState cs) = return cs @@ -452,20 +467,22 @@ fromReply limit Reply{..} = do CursorNotFound -> throw (CursorNotFoundFailure rCursorId) QueryError -> throw (QueryFailure $ at "$err" $ head rDocuments) -newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor +newCursor :: (Access m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor -- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected. -newCursor db col batch cs = do - conn <- context +newCursor (Database db) col batch cs = do + wr <- context + mos <- context + pipe <- context var <- liftIO (newMVar cs) let cursor = Cursor (db <.> col) batch var - liftIO . addMVarFinalizer var $ runErrorT (runConn (close cursor) conn :: ErrorT IOError IO (Either Failure ())) >> return () + liftIO . addMVarFinalizer var $ runAction (close cursor) wr mos pipe >> return () return cursor -next :: (Conn m) => Cursor -> m (Maybe Document) +next :: (Access m) => Cursor -> m (Maybe Document) -- ^ Return next document in query result, or Nothing if finished. next cursor = modifyCursorState' cursor nextState where -- Pre-fetch next batch promise from server when last one in current batch is returned. - nextState :: FullCollection -> BatchSize -> CursorState' -> Connected (ErrorT IOError IO) (CursorState', Maybe Document) + nextState :: FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', Maybe Document) nextState fcol batch cs' = do CS limit cid docs <- cursorState cs' case docs of @@ -480,15 +497,15 @@ next cursor = modifyCursorState' cursor nextState where nextBatch fcol batch limit cid = call' [] (GetMore fcol batchSize cid, remLimit) where (batchSize, remLimit) = batchSizeRemainingLimit batch limit -nextN :: (Conn m) => Int -> Cursor -> m [Document] +nextN :: (Access m) => Int -> Cursor -> m [Document] -- ^ Return next N documents or less if end is reached nextN n c = catMaybes <$> replicateM n (next c) -rest :: (Conn m) => Cursor -> m [Document] +rest :: (Access m) => Cursor -> m [Document] -- ^ Return remaining documents in query result rest c = loop (next c) -instance (Conn m) => Resource m Cursor where +instance (Access m) => Resource m Cursor where close cursor = modifyCursorState' cursor kill' where kill' _ _ cs' = first CursorState <$> (kill =<< cursorState cs') kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]] @@ -521,13 +538,13 @@ groupDocument Group{..} = "initial" =: gInitial, "cond" =: gCond ] -group :: (DbConn m) => Group -> m [Document] +group :: (DbAccess m) => Group -> m [Document] -- ^ Execute group query and return resulting aggregate value for each distinct key group g = at "retval" <$> runCommand ["group" =: groupDocument g] -- ** MapReduce --- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values from all lists to a single result. There are additional parameters that may be set to tweak this basic operation. +-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values to a single result. There are additional parameters that may be set to tweak this basic operation. data MapReduce = MapReduce { rColl :: Collection, rMap :: MapFun, @@ -536,7 +553,7 @@ data MapReduce = MapReduce { rSort :: Order, -- ^ Default is [] meaning no sort rLimit :: Limit, -- ^ Default is 0 meaning no limit rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned. - rKeepTemp :: Bool, -- ^ If True, the temporary output collection is made permanent. If False, the temporary output collection persists for the life of the current connection only, however, other connections may read from it while the original one is still alive. Note, reading from a temporary collection after its original connection dies returns an empty result (not an error). The default for this attribute is False, unless 'rOut' is specified, then the collection permanent. + rKeepTemp :: Bool, -- ^ If True, the temporary output collection is made permanent. If False, the temporary output collection persists for the life of the current pipe only, however, other pipes may read from it while the original one is still alive. Note, reading from a temporary collection after its original pipe dies returns an empty result (not an error). The default for this attribute is False, unless 'rOut' is specified, then the collection permanent. rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing. rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is []. rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False. @@ -546,7 +563,7 @@ type MapFun = Javascript -- ^ @() -> void@. The map function references the variable @this@ to inspect the current object under consideration. The function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate. type ReduceFun = Javascript --- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible. +-- ^ @(key, [value]) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @reduce(k, [reduce(k,vs)]) == reduce(k,vs)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible. type FinalizeFun = Javascript -- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value. @@ -570,36 +587,36 @@ mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce -- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments. mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False -runMR :: (DbConn m) => MapReduce -> m Cursor +runMR :: (DbAccess m) => MapReduce -> m Cursor -- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript) --- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when connection closes. +-- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when pipe closes. runMR mr = find . query [] =<< (at "result" <$> runMR' mr) -runMR' :: (DbConn m) => MapReduce -> m Document +runMR' :: (DbAccess m) => MapReduce -> m Document -- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript). runMR' mr = do doc <- runCommand (mrDocument mr) - return $ if true1 "ok" doc then doc else error $ at "errmsg" doc ++ " in:\n" ++ show mr + return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr -- * Command type Command = Document -- ^ A command is a special query or action against the database. See for details. -runCommand' :: (DbConn m) => [Notice] -> Command -> m Document +runCommand' :: (DbAccess m) => [Notice] -> Command -> m Document -- ^ Send notices then run command and return its result runCommand' ns c = maybe err id <$> findOne' ns (query c "$cmd") where err = error $ "Nothing returned for command: " ++ show c -runCommand :: (DbConn m) => Command -> m Document +runCommand :: (DbAccess m) => Command -> m Document -- ^ Run command against the database and return its result runCommand = runCommand' [] -runCommand1 :: (DbConn m) => UString -> m Document +runCommand1 :: (DbAccess m) => UString -> m Document -- ^ @runCommand1 foo = runCommand [foo =: 1]@ runCommand1 c = runCommand [c =: (1 :: Int)] -eval :: (DbConn m) => Javascript -> m Document +eval :: (DbAccess m) => Javascript -> m Document -- ^ Run code on server eval code = at "retval" <$> runCommand ["$eval" =: code] diff --git a/README.md b/README.md index 6929363..9b88acc 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,12 @@ mongoDB About ----- -A mongoDB driver for Haskell, which lets you connect to MongoDB and do inserts, queries, updates, etc. +MongoDB driver for Haskell, which lets you connect to a MongoDB database management system and do inserts, queries, updates, etc. Links ----- +* [MongoDB](http://www.mongodb.org) * [mongoDB API reference](http://hackage.haskell.org/package/mongoDB) * [tutorial](http://github.com/TonyGen/mongoDB-haskell/blob/master/tutorial.md) * [map/reduce example](http://github.com/TonyGen/mongoDB-haskell/blob/master/map-reduce-example.md) diff --git a/TODO b/TODO index 77845f7..78b1e82 100644 --- a/TODO +++ b/TODO @@ -1,89 +1,53 @@ TODO ==== -BSON +Bson ---- + implement deprecated types (were left out) -+ on insert/update: reject keys that start with "$" or "." -+ data support for common mongo "$symbols" + convert from/to json -+ tie in with native regex like python does? - - on outgoing uncompile? - - on incoming automatically compile -+ more time convertibles -+ map operations for BsonDoc (or should it be applicative?) ++ tie regex type to type in regex library ++ Read instance for Documents that can read its Show representation MongoDB ------- -+ support full level 0 - - operations on database objects - * add_son_manipulators? - * dereference (dbref) - - database admin - * getProfilingInfo - - misc operations - * getCollectionOptions - - cursor object - * hasMore - -- all commands listed on http://127.0.0.1:28017/_commands. (mongod --rest) -- reIndex (http://www.mongodb.org/display/DOCS/Indexes#Indexes-ReIndex) -- safe write to two or more replicas -- Query attribute: timeout -- CreateIndex attributes: background, min, max -- CreateIndex Order [Asc, Dec, Geo2d] -- FindAndModify -- getIndexInfo -- logout -- collectionsInfo -- databasesInfo -- getLastError options -- Update If Current (http://www.mongodb.org/display/DOCS/Atomic+Operations) -- block write until written on N replicas -- lazyRest on cursor, although lazy I/O) is problematic and we may not want to support it. -- Upon client exit, send killCursors for all open cursors, otherwise server will keep them open for 10 minutes and keep NoCursorTimeout cursors open for hours. --- Upon cursor finalize (garbage collect) send killCursor even if you have to create a new connection, because server keeps cursors open for 10 minutes (or more). --- Query option Exhaust - - optional: - - automatic reconnection - - buffer pooling - - connection pooling. Unsafe to shrink pool and close connections because map/reduce temp tables that were created on the connection will get deleted. Note, other connections can access a map/reduce temp table as long as the original connection is still alive. Also, other connections can access cursors created on other connections, even if those die. Cursors will be deleted on server only if idle for more than 10 minutes. Accessing a deleted cursor returns an error. -+ auto-destoy connection (how?/when?). Although, GHC will automatically close connection (Handle) when garbage collected. -+ don't read into cursor until needed, but have cursor send getMore before - it is actually out of docs (so network is finished by the time we're ready - to consume more) - -Misc ----- -+ learn more about haskelldb, anything we can learn from there -+ go through pymongo api and figure out what parts to adopt (also look - at other languages?) -+ kill prefix on data types "eg QO_*"? -+ javascript ++ When one connection in a pool fails, close all other since they will likely fail too ++ on insert/update: reject keys that start with "$" or "." ++ dereference dbref ++ functions for every command, eg. + - findAndModify + - reIndex (http://www.mongodb.org/display/DOCS/Indexes#Indexes-ReIndex) + - createIndex attributes: background, min, max + - createIndex Order [Asc, Dec, Geo2d] + - getIndexInfo + - logout + - collectionsInfo + - databasesInfo + - getProfileInfo ++ Query attribute: timeout ++ Update If Current (http://www.mongodb.org/display/DOCS/Atomic+Operations) ++ Upon client exit, send killCursors for all open cursors, otherwise server will keep them open for 10 minutes and keep NoCursorTimeout cursors open for hours. ++ Upon cursor finalize (garbage collect) send killCursor even if you have to create a new connection, because server keeps cursors open for 10 minutes (or more). ++ Query option Exhaust ++ Reconnect on replica set primary stepdown, so no exception raised to user ++ Reconnect on query ioerror re-query, so no exception raised to user. Can't be done for writes because it is not safe to re-execute a write. + tailable cursor support - only close cursor when cursorID is 0 - have to create loop that sleeps and retries - lazy list support ++ GridFS -Tests? -Documentation - - ref - -GridFS - -deep "lookup" function (other deep Map functions?) -Read instance for Documents that can read its Show representation -make sure NULLs aren't in created table names - -update tutorial to match new python one +Tests - none currently +Misc +---- ++ javascript DSL ++ update tutorial to match new python one + custom types (see python examples) -+ make BSON an instance of Binary (eg get/put) Questions: - In Mongo shell, db.foo.totalSize fetches storageSize of each index but does not use it Notes: - Remember that in the new version of MongoDB (>= 1.6), "ok" field can be a number (0 or 1) or boolean (False or True). Use 'true1' function defined in Database.MongoDB.Util -- A cursor will die on the server if not accessed (by any connection) within past 10 minutes (unless NoCursorTimeout option set). Accessing a dead (or non-existent) cursor raises a ServerFailure exception. +- A cursor will die on the server if not accessed (by any connection) within past 10 minutes (unless NoCursorTimeout option set). Accessing a dead (or non-existent) cursor raises a CursorNotFoundFailure. +- Unsafe to shrink pool and close connections because map/reduce temp tables that were created on the connection will get deleted. Note, other connections can access a map/reduce temp table as long as the original connection is still alive. Also, other connections can access cursors created on other connections, even if those die. Cursors will be deleted on server only if idle for more than 10 minutes. Accessing a deleted cursor returns an error. diff --git a/V0.6-Redesign.md b/V0.6-Redesign.md index 61d4e8c..57f201b 100644 --- a/V0.6-Redesign.md +++ b/V0.6-Redesign.md @@ -18,7 +18,7 @@ MongoDB.Internal.Protocol defines the MongoDB Wire Protocol (http://www.mongodb. MongoDB.Connection allows you to create a pipelined connection to a specific server or to a master/slave in a replica set. -MongoDB-Query defines the "connected" monad that has the current connection and database in context, and all the normal query and update operations you execute within this monad like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc. +MongoDB-Query defines the "access" monad that has the current connection and database in context, and all the normal query and update operations you execute within this monad like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc. MongoDB-Admin defines all the administration operations like validateCollection, ensureIndex, dropIndex, addUser, copyDatabase, dbStats, setProfilingLevel, etc. diff --git a/Var/Pool.hs b/Var/Pool.hs new file mode 100644 index 0000000..ec5fe8e --- /dev/null +++ b/Var/Pool.hs @@ -0,0 +1,62 @@ +{- | Cycle through a set of resources (randomly), recreating them when they expire -} + +{-# LANGUAGE RecordWildCards, NamedFieldPuns, FlexibleContexts #-} + +module Var.Pool where + +import Control.Applicative ((<$>), (<*>)) +import Control.Monad.MVar +import Data.Array.IO +import Data.Maybe (catMaybes) +import Control.Monad.Error +import System.Random (randomRIO) + +-- | Creator, destroyer, and checker of resources of type r. Creator may throw error or type e. +data Factory e r = Factory { + newResource :: ErrorT e IO r, + killResource :: r -> IO (), + isExpired :: r -> IO Bool } + +newPool :: Factory e r -> Int -> IO (Pool e r) +-- ^ Create new pool of initial max size +newPool f n = do + arr <- newArray (0, n-1) Nothing + var <- newMVar arr + return (Pool f var) + +data Pool e r = Pool {factory :: Factory e r, resources :: MVar (IOArray Int (Maybe r))} +-- ^ Pool of maximum N resources. Resources may expire on their own or be killed. Resources will initially be created on demand up N resources then recycled in random fashion. N may be changed by resizing the pool. Random is preferred to round-robin to distribute effect of pathological use cases that use every Xth resource the most and N is a multiple of X. +-- Resources *must* close/kill themselves when garbage collected ('resize' relies on this). + +aResource :: (Error e) => Pool e r -> ErrorT e IO r +-- ^ Return a random live resource in pool or create new one if expired or not yet created +aResource Pool{..} = withMVar resources $ \array -> do + i <- liftIO $ randomRIO =<< getBounds array + mr <- liftIO $ readArray array i + r <- maybe (new array i) (check array i) mr + return r + where + new array i = do + r <- newResource factory + liftIO $ writeArray array i (Just r) + return r + check array i r = do + bad <- liftIO $ isExpired factory r + if bad then new array i else return r + +poolSize :: Pool e r -> IO Int +-- ^ current max size of pool +poolSize Pool{resources} = withMVar resources (fmap rangeSize . getBounds) + +resize :: Pool e r -> Int -> IO () +-- ^ resize max size of pool. When shrinking some resource will be dropped without closing since they may still be in use. They are expected to close themselves when garbage collected. +resize Pool{resources} n = modifyMVar_ resources $ \array -> do + rs <- take n <$> getElems array + array <- newListArray (0, n-1) (rs ++ repeat Nothing) + return array + +killAll :: Pool e r -> IO () +-- ^ Kill all resources in pool so subsequent access creates new ones +killAll (Pool Factory{killResource} resources) = withMVar resources $ \array -> do + mapM_ killResource . catMaybes =<< getElems array + mapM_ (\i -> writeArray array i Nothing) . range =<< getBounds array diff --git a/map-reduce-example.md b/map-reduce-example.md index 39a8b90..30e29cd 100644 --- a/map-reduce-example.md +++ b/map-reduce-example.md @@ -19,8 +19,8 @@ map/reduce queries on: Prelude> :set prompt "> " > :set -XOverloadedStrings > import Database.MongoDB - > Right conn <- runNet $ connect $ host "localhost" - > let run act = runNet $ runConn (useDb "test" act) con + > conn <- connect 1 $ host "localhost" + > let run act = runConn safe Master conn $ use (Database "test") act > :{ run $ insertMany "mr1" [ ["x" =: 1, "tags" =: ["dog", "cat"]], diff --git a/mongoDB.cabal b/mongoDB.cabal index f18fb11..4bf25a2 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,35 +1,51 @@ -Name: mongoDB -Version: 0.7.1 -License: OtherLicense -License-file: LICENSE -Maintainer: Tony Hannan -Author: Scott Parish & Tony Hannan -Copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc. -homepage: http://github.com/TonyGen/mongoDB-haskell -Category: Database -Synopsis: A driver for MongoDB -Description: This module lets you connect to MongoDB, do inserts, queries, updates, etc. -Stability: alpha -Build-Depends: - base < 5, - containers, - mtl, - binary, - bytestring, - network, - nano-md5, - parsec, - bson -Build-Type: Simple -Exposed-modules: - Control.Monad.Context, - Control.Monad.Throw, - Control.Pipeline, - Database.MongoDB.Internal.Util, - Database.MongoDB.Internal.Protocol, - Database.MongoDB.Connection, - Database.MongoDB.Query, - Database.MongoDB.Admin, - Database.MongoDB -ghc-options: -Wall -O2 -cabal-version: >= 1.4 +name: mongoDB +version: 0.8 +cabal-version: >=1.4 +build-type: Simple +license: OtherLicense +license-file: LICENSE +copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc. +maintainer: Tony Hannan +build-depends: array -any, base <5, binary -any, bson -any, + bytestring -any, containers -any, mtl -any, nano-md5 -any, + network -any, parsec -any +stability: alpha +homepage: http://github.com/TonyGen/mongoDB-haskell +package-url: +bug-reports: +synopsis: A driver for MongoDB +description: This module lets you connect to MongoDB, do inserts, queries, updates, etc. +category: Database +author: Scott Parish & Tony Hannan +tested-with: +data-files: +data-dir: "" +extra-source-files: +extra-tmp-files: +exposed-modules: Control.Pipeline Control.Monad.Context + Control.Monad.Throw Database.MongoDB Database.MongoDB.Admin + Database.MongoDB.Connection Database.MongoDB.Query + Database.MongoDB.Internal.Protocol Database.MongoDB.Internal.Util +exposed: True +buildable: True +build-tools: +cpp-options: +cc-options: +ld-options: +pkgconfig-depends: +frameworks: +c-sources: +extensions: +extra-libraries: +extra-lib-dirs: +includes: +install-includes: +include-dirs: +hs-source-dirs: . +other-modules: +ghc-prof-options: +ghc-shared-options: +ghc-options: -Wall -O2 +hugs-options: +nhc98-options: +jhc-options: \ No newline at end of file diff --git a/tutorial.md b/tutorial.md index 19f1e23..bf925a7 100644 --- a/tutorial.md +++ b/tutorial.md @@ -9,7 +9,7 @@ This is a mini tutorial to get you up and going with the basics of the Haskell mongoDB drivers. It is modeled after the [pymongo tutorial](http://api.mongodb.org/python/1.4%2B/tutorial.html). -You will need the mongoDB bindings installed as well as mongo itself installed. +You will need the mongoDB driver installed as well as mongo itself installed. $ = command line prompt > = ghci repl prompt @@ -35,13 +35,13 @@ Getting Ready Start a MongoDB instance for us to play with: - $ mongod + $ mongod --dbpath Start up a haskell repl: $ ghci -Now we'll need to bring in the MongoDB/BSON bindings and set +Now we'll need to bring in the MongoDB/Bson bindings and set OverloadedStrings so literal strings are converted to UTF-8 automatically. > import Database.MongoDB @@ -49,48 +49,58 @@ OverloadedStrings so literal strings are converted to UTF-8 automatically. Making A Connection ------------------- -Open up a connection to your DB instance, using the standard port: +Open up a connection to your mongo server, using the standard port (27017): - > Right conn <- runNet $ connect $ host "127.0.0.1" + > conn <- connect 1 $ host "127.0.0.1" or for a non-standard port - > Right conn <- runNet $ connect $ Host "127.0.0.1" (PortNumber 30000) + > conn <- connect 1 $ Host "127.0.0.1" (PortNumber 30000) -*connect* throws IOError if connection fails and *runNet* catches IOError and -returns it as Left. We are assuming above it won't fail. If it does you will get a -pattern match error. +*connect* takes the connection pool size and the host to connect to. It returns +a *Connection*, which is really a pool of TCP connections, initially created on demand. +So it is not possible to get a connection error until you try to use it. -Connected monad +Plain IO code in this driver never raises an exception unless it invokes third party IO +code that does. Driver code that may throw an exception says so in its Monad type, +for example, *ErrorT IOError IO a*. + +Access monad ------------------- -The current connection is held in a Connected monad, and the current database -is held in a Reader monad on top of that. To run a connected monad, supply -it and a connection to *runConn*. To access a database within a connected -monad, call *useDb*. +A mongo query/update executes in an *Access* monad, which has access to a +*Pipe*, *WriteMode*, and *MasterSlaveOk* mode, and may throw a *Failure*. A Pipe +is a single TCP connection, while a Connection is a pool of Pipes. + +To run an Access action (monad), supply WriteMode, MasterOrSlaveOk, Connection, +and action to *access*. For example, to get a list of all the database on the server: + + > access safe Master conn allDatabases Since we are working in ghci, which requires us to start from the -IO monad every time, we'll define a convenient *run* function that takes a -db-action and executes it against our "test" database on the server we +IO monad every time, we'll define a convenient *run* function that takes an +action and executes it against our "test" database on the server we just connected to: - > let run action = runNet $ runConn (useDb "test" action) conn + > let run action = access safe Master conn $ use (Database "test") action -*runConn* return either Left Failure or Right result. Failure -means there was a read or write exception like cursor expired or duplicate key insert. -This combined with *runNet* means our *run* returns *(Either IOError (Either Failure a))*. +*access* return either Left Failure or Right result. Failure means there was a connection failure, +or a read or write exception like cursor expired or duplicate key insert. + +*use* adds a *Database* to the action context, so query/update operations know which +database to operate on. Databases and Collections ----------------------------- -A MongoDB can store multiple databases -- separate namespaces +MongoDB can store multiple databases -- separate namespaces under which collections reside. You can obtain the list of databases available on a connection: > run allDatabases -The "test" database is ignored in this case because *allDatabases* +The "test" database in context is ignored in this case because *allDatabases* is not a query on a specific database but on the server as a whole. Databases and collections do not need to be created, just start using