diff --git a/Control/Monad/Context.hs b/Control/Monad/Context.hs new file mode 100644 index 0000000..d9bee7f --- /dev/null +++ b/Control/Monad/Context.hs @@ -0,0 +1,27 @@ +{- | This is just like Control.Monad.Reader.Class except you can access the context of any Reader in the monad stack instead of just the top one as long as the context types are different. If two or more readers in the stack have the same context type you get the context of the top one. -} + +{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances #-} + +module Control.Monad.Context where + +import Control.Monad.Reader +import Control.Monad.Error + +-- | Same as 'MonadReader' but without functional dependency so the same monad can have multiple contexts with different types +class Context x m where + context :: m x + -- ^ Get the context in the Reader in the monad stack that has @x@ context type. Analogous to 'ask'. + push :: (x -> x) -> m a -> m a + -- ^ Push new context in the Reader in the monad stack that has @x@ context type. Analogous to 'local' + +instance (Monad m) => Context x (ReaderT x m) where + context = ask + push = local + +instance (Monad m, Context x m) => Context x (ReaderT r m) where + context = lift context + push f m = ReaderT (push f . runReaderT m) + +instance (Monad m, Context x m, Error e) => Context x (ErrorT e m) where + context = lift context + push f = ErrorT . push f . runErrorT diff --git a/Control/Pipeline.hs b/Control/Pipeline.hs new file mode 100644 index 0000000..12a8413 --- /dev/null +++ b/Control/Pipeline.hs @@ -0,0 +1,151 @@ +{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipe (and get promises back); the pipe will pipeline each thread's request right away without waiting. -} + +{-# LANGUAGE DoRec, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-} + +module Control.Pipeline ( + -- * Pipe + Pipe, newPipe, send, call, + -- * Util + Size, + Length(..), + Resource(..), + Flush(..), + Stream(..), getN +) where + +import Prelude hiding (length) +import Control.Applicative ((<$>)) +import Control.Monad (forever) +import Control.Exception (assert) +import System.IO.Error (try) +import System.IO (Handle, hFlush, hClose, hIsClosed) +import qualified Data.ByteString as S +import qualified Data.ByteString.Lazy as L +import Data.Monoid (Monoid(..)) +import Control.Concurrent (ThreadId, forkIO, killThread) +import Control.Concurrent.MVar +import Control.Concurrent.Chan + +-- * Length + +type Size = Int + +class Length list where + length :: list -> Size + +instance Length S.ByteString where + length = S.length + +instance Length L.ByteString where + length = fromEnum . L.length + +-- * Resource + +class Resource m r where + close :: r -> m () + isClosed :: r -> m Bool + +instance Resource IO Handle where + close = hClose + isClosed = hIsClosed + +-- * Flush + +class Flush handle where + flush :: handle -> IO () + -- ^ Flush written bytes to destination + +instance Flush Handle where + flush = hFlush + +-- * Stream + +class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where + put :: handle -> bytes -> IO () + -- ^ Write bytes to handle + get :: handle -> Int -> IO bytes + -- ^ Read up to N bytes from handle, block until at least 1 byte is available + +getN :: (Stream h b) => h -> Int -> IO b +-- ^ Read N bytes from hande, blocking until all N bytes are read. Unlike 'get' which only blocks if no bytes are available. +getN h n = assert (n >= 0) $ do + bytes <- get h n + let x = length bytes + if x >= n then return bytes else do + remainingBytes <- getN h (n - x) + return (mappend bytes remainingBytes) + +instance Stream Handle S.ByteString where + put = S.hPut + get = S.hGet + +instance Stream Handle L.ByteString where + put = L.hPut + get = L.hGet + +-- * Pipe + +-- | Thread-safe and pipelined socket +data Pipe handle bytes = Pipe { + encodeSize :: Size -> bytes, + decodeSize :: bytes -> Size, + vHandle :: MVar handle, -- ^ Mutex on handle, so only one thread at a time can write to it + responseQueue :: Chan (MVar (Either IOError bytes)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. + listenThread :: ThreadId + } + +-- | Create new Pipe with given encodeInt, decodeInt, and handle. You should 'close' pipe when finished, which will also close handle. If pipe is not closed but eventually garbage collected, it will be closed along with handle. +newPipe :: (Stream h b, Resource IO h) => + (Size -> b) -- ^ Convert Size to bytes of fixed length. Every Int must translate to same number of bytes. + -> (b -> Size) -- ^ Convert bytes of fixed length to Size. Must be exact inverse of encodeSize. + -> h -- ^ Underlying socket (handle) this pipe will read/write from + -> IO (Pipe h b) +newPipe encodeSize decodeSize handle = do + vHandle <- newMVar handle + responseQueue <- newChan + rec + let pipe = Pipe{..} + listenThread <- forkIO (listen pipe) + addMVarFinalizer vHandle $ do + killThread listenThread + close handle + return pipe + +instance (Resource IO h) => Resource IO (Pipe h b) where + -- | Close pipe and underlying socket (handle) + close Pipe{..} = do + killThread listenThread + close =<< readMVar vHandle + isClosed Pipe{..} = isClosed =<< readMVar vHandle + +listen :: (Stream h b) => Pipe h b -> IO () +-- ^ Listen for responses and supply them to waiting threads in order +listen Pipe{..} = do + let n = length (encodeSize 0) + h <- readMVar vHandle + forever $ do + e <- try $ do + len <- decodeSize <$> getN h n + getN h len + var <- readChan responseQueue + putMVar var e + +send :: (Stream h b) => Pipe h b -> [b] -> IO () +-- ^ Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future 'call's will get these responses instead of their own). +-- Each message is preceeded by its length when written to socket. +send Pipe{..} messages = withMVar vHandle $ \h -> do + mapM_ (write encodeSize h) messages + flush h + +call :: (Stream h b) => Pipe h b -> [b] -> IO (IO b) +-- ^ Send messages all together to destination (no messages will be interleaved between them), and return /promise/ of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them). +-- Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length. +call Pipe{..} messages = withMVar vHandle $ \h -> do + mapM_ (write encodeSize h) messages + flush h + var <- newEmptyMVar + writeChan responseQueue var + return (either ioError return =<< readMVar var) -- return promise + +write :: (Stream h b, Monoid b, Length b) => (Size -> b) -> h -> b -> IO () +write encodeSize h bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes) diff --git a/Database/MongoDB/Admin.hs b/Database/MongoDB/Admin.hs index 40beeac..9b283dd 100644 --- a/Database/MongoDB/Admin.hs +++ b/Database/MongoDB/Admin.hs @@ -28,7 +28,7 @@ module Database.MongoDB.Admin ( import Prelude hiding (lookup) import Control.Applicative ((<$>)) import Database.MongoDB.Internal.Protocol (pwHash, pwKey) -import Database.MongoDB.Connection (Server, showHostPort, Conn) +import Database.MongoDB.Connection (Server, showHostPort) import Database.MongoDB.Query import Data.Bson import Data.UString (pack, unpack, append, intercalate) @@ -38,7 +38,7 @@ import Data.IORef import qualified Data.Set as S import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent (forkIO, threadDelay) -import Database.MongoDB.Util ((<.>), true1) +import Database.MongoDB.Internal.Util ((<.>), true1) -- * Admin @@ -51,16 +51,17 @@ coptElem Capped = "capped" =: True coptElem (MaxByteSize n) = "size" =: n coptElem (MaxItems n) = "max" =: n -createCollection :: (Conn m) => [CollectionOption] -> Collection -> Db m Document +createCollection :: (DbConn m) => [CollectionOption] -> Collection -> m Document -- ^ Create collection with given options. You only need to call this to set options, otherwise a collection is created automatically on first use with no options. createCollection opts col = runCommand $ ["create" =: col] ++ map coptElem opts -renameCollection :: (Conn m) => Collection -> Collection -> Db m Document +renameCollection :: (DbConn m) => Collection -> Collection -> m Document -- ^ Rename first collection to second collection -renameCollection from to = ReaderT $ \db -> useDb "admin" $ - runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True] +renameCollection from to = do + db <- thisDatabase + useDb "admin" $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True] -dropCollection :: (Conn m) => Collection -> Db m Bool +dropCollection :: (DbConn m) => Collection -> m Bool -- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action). dropCollection coll = do resetIndexCache @@ -69,7 +70,7 @@ dropCollection coll = do if at "errmsg" r == ("ns not found" :: UString) then return False else fail $ "dropCollection failed: " ++ show r -validateCollection :: (Conn m) => Collection -> Db m Document +validateCollection :: (DbConn m) => Collection -> m Document -- ^ This operation takes a while validateCollection coll = runCommand ["validate" =: coll] @@ -101,35 +102,32 @@ genName :: Order -> IndexName genName keys = intercalate "_" (map f keys) where f (k := v) = k `append` "_" `append` pack (show v) -ensureIndex :: (Conn m) => Index -> Db m () +ensureIndex :: (DbConn m) => Index -> m () -- ^ Create index if we did not already create one. May be called repeatedly with practically no performance hit, because we remember if we already called this for the same index (although this memory gets wiped out every 15 minutes, in case another client drops the index and we want to create it again). ensureIndex idx = let k = (iColl idx, iName idx) in do icache <- fetchIndexCache set <- liftIO (readIORef icache) - unless (S.member k set) . runDbOp $ do - createIndex idx - me <- getLastError - case me of - Nothing -> liftIO $ writeIORef icache (S.insert k set) - Just (c, e) -> fail $ "createIndex failed: (" ++ show c ++ ") " ++ e + unless (S.member k set) $ do + writeMode Safe (createIndex idx) + liftIO $ writeIORef icache (S.insert k set) -createIndex :: (Conn m) => Index -> Db m () +createIndex :: (DbConn m) => Index -> m () -- ^ Create index on the server. This call goes to the server every time. createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase -dropIndex :: (Conn m) => Collection -> IndexName -> Db m Document +dropIndex :: (DbConn m) => Collection -> IndexName -> m Document -- ^ Remove the index dropIndex coll idxName = do resetIndexCache runCommand ["deleteIndexes" =: coll, "index" =: idxName] -getIndexes :: (Conn m) => Collection -> Db m [Document] +getIndexes :: (DbConn m) => Collection -> m [Document] -- ^ Get all indexes on this collection getIndexes coll = do db <- thisDatabase - rest =<< find (query ["ns" =: db <.> coll] "system.indexes") + rest =<< find (select ["ns" =: db <.> coll] "system.indexes") -dropIndexes :: (Conn m) => Collection -> Db m Document +dropIndexes :: (DbConn m) => Collection -> m Document -- ^ Drop all indexes on this collection dropIndexes coll = do resetIndexCache @@ -155,18 +153,20 @@ clearDbIndexCache = do keys <- map fst <$> T.toList dbIndexCache mapM_ (T.delete dbIndexCache) keys -fetchIndexCache :: (Conn m) => Db m IndexCache +fetchIndexCache :: (DbConn m) => m IndexCache -- ^ Get index cache for current database -fetchIndexCache = ReaderT $ \db -> liftIO $ do - mc <- T.lookup dbIndexCache db - maybe (newIdxCache db) return mc +fetchIndexCache = do + db <- thisDatabase + liftIO $ do + mc <- T.lookup dbIndexCache db + maybe (newIdxCache db) return mc where newIdxCache db = do idx <- newIORef S.empty T.insert dbIndexCache db idx return idx -resetIndexCache :: (Conn m) => Db m () +resetIndexCache :: (DbConn m) => m () -- ^ reset index cache for current database resetIndexCache = do icache <- fetchIndexCache @@ -174,20 +174,20 @@ resetIndexCache = do -- ** User -allUsers :: (Conn m) => Db m [Document] +allUsers :: (DbConn m) => m [Document] -- ^ Fetch all users of this database allUsers = map (exclude ["_id"]) <$> (rest =<< find - (query [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]}) + (select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]}) -addUser :: (Conn m) => Bool -> Username -> Password -> Db m () +addUser :: (DbConn m) => Bool -> Username -> Password -> m () -- ^ Add user with password with read-only access if bool is True or read-write access if bool is False addUser readOnly user pass = do - mu <- findOne (query ["user" =: user] "system.users") + mu <- findOne (select ["user" =: user] "system.users") let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu) save "system.users" u -removeUser :: (Conn m) => Username -> Db m () -removeUser user = delete (Select ["user" =: user] "system.users") +removeUser :: (DbConn m) => Username -> m () +removeUser user = delete (select ["user" =: user] "system.users") -- ** Database @@ -225,19 +225,19 @@ serverVersion = at "version" <$> serverBuildInfo -- ** Collection -collectionStats :: (Conn m) => Collection -> Db m Document +collectionStats :: (DbConn m) => Collection -> m Document collectionStats coll = runCommand ["collstats" =: coll] -dataSize :: (Conn m) => Collection -> Db m Int +dataSize :: (DbConn m) => Collection -> m Int dataSize c = at "size" <$> collectionStats c -storageSize :: (Conn m) => Collection -> Db m Int +storageSize :: (DbConn m) => Collection -> m Int storageSize c = at "storageSize" <$> collectionStats c -totalIndexSize :: (Conn m) => Collection -> Db m Int +totalIndexSize :: (DbConn m) => Collection -> m Int totalIndexSize c = at "totalIndexSize" <$> collectionStats c -totalSize :: (Conn m) => Collection -> Db m Int +totalSize :: (DbConn m) => Collection -> m Int totalSize coll = do x <- storageSize coll xs <- mapM isize =<< getIndexes coll @@ -249,28 +249,28 @@ totalSize coll = do data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq) -getProfilingLevel :: (Conn m) => Db m ProfilingLevel +getProfilingLevel :: (DbConn m) => m ProfilingLevel getProfilingLevel = toEnum . at "was" <$> runCommand ["profile" =: (-1 :: Int)] type MilliSec = Int -setProfilingLevel :: (Conn m) => ProfilingLevel -> Maybe MilliSec -> Db m () +setProfilingLevel :: (DbConn m) => ProfilingLevel -> Maybe MilliSec -> m () setProfilingLevel p mSlowMs = runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return () -- ** Database -dbStats :: (Conn m) => Db m Document +dbStats :: (DbConn m) => m Document dbStats = runCommand ["dbstats" =: (1 :: Int)] -currentOp :: (Conn m) => Db m (Maybe Document) +currentOp :: (DbConn m) => m (Maybe Document) -- ^ See currently running operation on the database, if any -currentOp = findOne (query [] "$cmd.sys.inprog") +currentOp = findOne (select [] "$cmd.sys.inprog") type OpNum = Int -killOp :: (Conn m) => OpNum -> Db m (Maybe Document) -killOp op = findOne (query ["op" =: op] "$cmd.sys.killop") +killOp :: (DbConn m) => OpNum -> m (Maybe Document) +killOp op = findOne (select ["op" =: op] "$cmd.sys.killop") -- ** Server diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index e779876..715e1d3 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -4,23 +4,19 @@ module Database.MongoDB.Connection ( -- * Server - I.Server(..), PortID(..), server, showHostPort, readHostPort, readHostPortF, + Server(..), PortID(..), server, showHostPort, readHostPort, readHostPortF, -- * ReplicaSet ReplicaSet, replicaSet, replicaServers, MasterOrSlave(..), FailedToConnect, newConnection, -- * Connection - I.Connection, I.connServer, I.showHandle, - connect, I.closeConnection, I.isClosed, - -- * Connected monad - I.Conn(..), I.Failure(..), - -- ** Task - I.Task, I.runTask, - -- ** Op - I.Op + Connection, connect, + -- * Resource + Resource(..) ) where -import Database.MongoDB.Internal.Connection as I -import Database.MongoDB.Query (useDb, runCommand1) +import Database.MongoDB.Internal.Protocol (Connection, mkConnection) +import Database.MongoDB.Query (Failure(..), Conn, runConn, useDb, runCommand1) +import Control.Pipeline (Resource(..)) import Control.Applicative ((<$>)) import Control.Arrow ((+++), left) import Control.Exception (assert) @@ -31,10 +27,12 @@ import Network (HostName, PortID(..), connectTo) import Data.Bson (Document, look, typed) import Text.ParserCombinators.Parsec as P (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) import Control.Monad.Identity -import Database.MongoDB.Util (true1) -- PortID instances +import Database.MongoDB.Internal.Util (true1) -- PortID instances -- * Server +data Server = Server HostName PortID deriving (Show, Eq, Ord) + defaultPort :: PortID defaultPort = PortNumber 27017 @@ -101,17 +99,15 @@ sortedReplicas :: ReplicaInfo -> IO [Server] -- ^ All replicas in set sorted by distance from this client. TODO sortedReplicas = return . allReplicas -getReplicaInfo' :: Connection -> IO (Either IOError ReplicaInfo) +getReplicaInfo :: (Server, Connection) -> IO (Either IOError ReplicaInfo) -- ^ Get replica info of the connected server. Return Left IOError if connection fails -getReplicaInfo' conn = left err <$> runTask getReplicaInfo conn where +getReplicaInfo (serv, conn) = left err <$> runConn (ReplicaInfo serv <$> getReplicaInfoDoc) conn where err (ConnectionFailure e) = e - err (ServerFailure s) = userError s + err (ServerFailure e) = userError e -getReplicaInfo :: (Conn m) => m ReplicaInfo --- ^ Get replica info of connect server -getReplicaInfo = do - c <- getConnection - ReplicaInfo (connServer c) <$> useDb "admin" (runCommand1 "ismaster") +getReplicaInfoDoc :: (Conn m) => m Document +-- ^ Get replica info of connected server +getReplicaInfoDoc = useDb "admin" (runCommand1 "ismaster") -- * MasterOrSlave @@ -154,19 +150,19 @@ connectFirst mos = connectFirst' ([], []) where connectFirst' (fs, is) (s : ss) = do e <- runErrorT $ do c <- ErrorT (connect s) - i <- ErrorT (getReplicaInfo' c) + i <- ErrorT (getReplicaInfo (s, c)) return (c, i) case e of Left f -> connectFirst' ((s, f) : fs, is) ss Right (c, i) -> if isMS mos i then return $ Right (c, i) else do - closeConnection c + close c connectFirst' ((s, userError $ "not a " ++ show mos) : fs, i : is) ss connect :: Server -> IO (Either IOError Connection) -- ^ Create a connection to the given server (as opposed to connecting to some server in a replica set via 'newConnection'). Return Left IOError if failed to connect. -connect s@(Server host port) = E.try (mkConnection s =<< connectTo host port) +connect (Server host port) = E.try (mkConnection =<< connectTo host port) {- Authors: Tony Hannan diff --git a/Database/MongoDB/Internal/Connection.hs b/Database/MongoDB/Internal/Connection.hs deleted file mode 100644 index 0d73b40..0000000 --- a/Database/MongoDB/Internal/Connection.hs +++ /dev/null @@ -1,148 +0,0 @@ -{-| Low-level connection to a server. - -This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Connection" instead. -} - -{-# LANGUAGE GeneralizedNewtypeDeriving, TupleSections, TypeSynonymInstances, OverlappingInstances #-} - -module Database.MongoDB.Internal.Connection ( - -- * Server - Server(..), - -- * Connection - Connection, connServer, showHandle, mkConnection, withConn, closeConnection, isClosed, - -- * Connected monad - Conn(..), Failure(..), - -- ** Task - Task, runTask, - -- ** Op - Op, sendBytes, flushBytes, receiveBytes, - exposeIO, hideIO -) where - -import Control.Applicative (Applicative(..), (<$>)) -import Control.Arrow (left) -import System.IO.Error (try) -import Control.Concurrent.MVar -import Control.Monad.Reader -import Control.Monad.Error -import Network (HostName, PortID(..)) -import System.IO (Handle, hFlush, hClose, hIsClosed) -import Data.ByteString.Lazy as B (ByteString, hPut) -import System.Timeout -import Database.MongoDB.Util (Secs, ignore, hGetN) -- Reader/Error Applicative instances - --- * Server - -data Server = Server HostName PortID deriving (Show, Eq, Ord) - --- * Connection - -data Connection = Connection Server (MVar Handle) deriving (Eq) --- ^ A connection to a server. This connection holds session state on the server like open cursors and temporary map-reduce tables that disappear when the connection is closed or fails. - -connServer :: Connection -> Server --- ^ Server this connection is connected to -connServer (Connection serv _) = serv - -showHandle :: Secs -> Connection -> IO String --- ^ Show handle if not locked for more than given seconds -showHandle secs (Connection _ vHand) = - maybe "handle currently locked" show <$> timeout (round (secs * 1000000)) (readMVar vHand) - -instance Show Connection where - showsPrec d c = showParen (d > 10) $ showString "a connection to " . showsPrec 11 (connServer c) - -mkConnection :: Server -> Handle -> IO Connection --- ^ Wrap handle in a MVar to control access -mkConnection s h = Connection s <$> newMVar h - -withConn :: Connection -> (Handle -> IO a) -> IO a --- Execute IO action with exclusive access to TCP connection -withConn (Connection _ vHand) = withMVar vHand - -closeConnection :: Connection -> IO () --- ^ Close connection. Attempting to read or write to a closed connection will raise 'Failure' exception. -closeConnection (Connection _ vHand) = withMVar vHand $ \h -> catch (hClose h) ignore - -isClosed :: Connection -> IO Bool --- ^ Is connection closed? -isClosed (Connection _ vHand) = withMVar vHand hIsClosed - --- * Task - --- | Connection or Server failure like network problem or disk full -data Failure = - ConnectionFailure IOError - -- ^ Error during sending or receiving bytes over a 'Connection'. The connection is not automatically closed when this error happens; the user must close it. Any other IOErrors raised during a Task or Op are not caught. The user is responsible for these other types of errors not related to sending/receiving bytes over the connection. - | ServerFailure String - -- ^ Failure on server, like disk full, which is usually observed using getLastError. Calling 'fail' inside a Task or Op raises this failure. Do not call 'fail' unless it is a temporary server failure, like disk full. For example, receiving unexpected data from the server is not a server failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change. - deriving (Show, Eq) - -instance Error Failure where strMsg = ServerFailure - -type Task m = ErrorT Failure (ReaderT Connection m) --- ^ Action with shared access to connection (the connection can be supplied to multiple concurrent tasks). m must be a MonadIO. - -runTask :: Task m a -> Connection -> m (Either Failure a) --- ^ Run task with shared access to connection. Return Left if connection fails anytime during its execution, in which case the task was partially executed. -runTask = runReaderT . runErrorT - --- * Op - -newtype Op a = Op (ErrorT Failure (ReaderT (Connection, Handle) IO) a) - deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure) --- ^ Action with exclusive access to connection (other ops must wait) - -runOp' :: (MonadIO m) => Op a -> Task m a --- ^ Run operation with exclusive access to connection. Fail if connection fails anytime during its execution, in which case the operation was partially executed. -runOp' (Op act) = ErrorT . ReaderT $ \conn -> - liftIO . withConn conn $ runReaderT (runErrorT act) . (conn,) - -sendBytes :: ByteString -> Op () --- ^ Put bytes on socket -sendBytes bytes = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hPut h bytes) - -flushBytes :: Op () --- ^ Flush socket -flushBytes = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hFlush h) - -receiveBytes :: Int -> Op ByteString --- ^ Get N bytes from socket, blocking until all N bytes are received -receiveBytes n = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hGetN h n) - -exposeIO :: ((Connection, Handle) -> IO (Either Failure a)) -> Op a --- ^ Expose connection to underlying IO -exposeIO = Op . ErrorT . ReaderT - -hideIO :: Op a -> (Connection, Handle) -> IO (Either Failure a) --- ^ Run op from IO -hideIO (Op act) = runReaderT (runErrorT act) - --- * Connected monad - --- | A monad with shared or exclusive access to a connection, ie. 'Task' or 'Op' -class (Functor m, Applicative m, MonadIO m) => Conn m where - runOp :: Op a -> m a - -- ^ Run op with exclusive access to connection. If @m@ is already exclusive then simply run op. - getConnection :: m Connection - -- ^ Return connection that this monad has access to - -instance (MonadIO m) => Conn (Task m) where - runOp = runOp' - getConnection = ask - -instance Conn Op where - runOp = id - getConnection = Op (asks fst) - -instance (Conn m) => Conn (ReaderT r m) where - runOp = lift . runOp - getConnection = lift getConnection - -instance (Conn m, Error e) => Conn (ErrorT e m) where - runOp = lift . runOp - getConnection = lift getConnection - - -{- Authors: Tony Hannan - Copyright 2010 10gen Inc. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index c6b2878..ed2ec44 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -5,39 +5,260 @@ This module is not intended for direct use. Use the high-level interface at "Dat {-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-} module Database.MongoDB.Internal.Protocol ( - -- * FullCollection + -- * Connection + Connection, mkConnection, + send, call, + -- * Message FullCollection, - -- * Write - Insert(..), insert, - Update(..), UpdateOption(..), update, - Delete(..), DeleteOption(..), delete, - -- * Read - Query(..), QueryOption(..), query, - GetMore(..), getMore, + -- ** Notice + Notice(..), UpdateOption(..), DeleteOption(..), CursorId, + -- ** Request + Request(..), QueryOption(..), -- ** Reply Reply(..), - -- ** Cursor - CursorId, killCursors, -- * Authentication Username, Password, Nonce, pwHash, pwKey ) where -import Prelude as P -import Database.MongoDB.Internal.Connection (Op, sendBytes, flushBytes, receiveBytes) -import Data.Bson +import Prelude as X +import Control.Applicative ((<$>)) +import Control.Monad (unless, replicateM) +import System.IO (Handle) +import Data.ByteString.Lazy (ByteString) +import qualified Control.Pipeline as P +import Data.Bson (Document, UString) import Data.Bson.Binary -import Data.UString as U (pack, append, toByteString) -import Data.ByteString.Lazy as B (length, append) import Data.Binary.Put import Data.Binary.Get import Data.Int import Data.Bits -import Control.Monad.Reader -import Control.Applicative ((<$>)) +import Database.MongoDB.Internal.Util (bitOr) import Data.IORef import System.IO.Unsafe (unsafePerformIO) import Data.Digest.OpenSSL.MD5 (md5sum) -import Database.MongoDB.Util (bitOr, (<.>)) +import Data.UString as U (pack, append, toByteString) + +-- * Connection + +type Connection = P.Pipe Handle ByteString +-- ^ Thread-safe TCP connection to server with pipelined requests + +mkConnection :: Handle -> IO Connection +-- ^ New thread-safe pipelined connection over handle +mkConnection = P.newPipe encodeSize decodeSize where + encodeSize = runPut . putInt32 . toEnum . (+ 4) + decodeSize = subtract 4 . fromEnum . runGet getInt32 + +send :: Connection -> [Notice] -> IO () +-- ^ Send notices as a contiguous batch to server with no reply. Raise IOError if connection fails. +send conn notices = P.send conn =<< mapM noticeBytes notices + +call :: Connection -> [Notice] -> Request -> IO (IO Reply) +-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will raise IOError if connection fails. +call conn notices request = do + nMessages <- mapM noticeBytes notices + requestId <- genRequestId + let rMessage = runPut (putRequest request requestId) + promise <- P.call conn (nMessages ++ [rMessage]) + return (bytesReply requestId <$> promise) + +noticeBytes :: Notice -> IO ByteString +noticeBytes notice = runPut . putNotice notice <$> genRequestId + +bytesReply :: RequestId -> ByteString -> Reply +bytesReply requestId bytes = if requestId == responseTo then reply else err where + (responseTo, reply) = runGet getReply bytes + err = error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" + +-- * Messages + +type FullCollection = UString +-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\" + +-- ** Header + +type Opcode = Int32 + +type RequestId = Int32 +-- ^ A fresh request id is generated for every message + +type ResponseTo = RequestId + +genRequestId :: IO RequestId +-- ^ Generate fresh request id +genRequestId = atomicModifyIORef counter $ \n -> (n + 1, n) where + counter :: IORef RequestId + counter = unsafePerformIO (newIORef 0) + {-# NOINLINE counter #-} + +-- *** Binary format + +putHeader :: Opcode -> RequestId -> Put +-- ^ Note, does not write message length (first int32), assumes caller will write it +putHeader opcode requestId = do + putInt32 requestId + putInt32 0 + putInt32 opcode + +getHeader :: Get (Opcode, ResponseTo) +-- ^ Note, does not read message length (first int32), assumes it was already read +getHeader = do + _requestId <- getInt32 + responseTo <- getInt32 + opcode <- getInt32 + return (opcode, responseTo) + +-- ** Notice + +-- | A notice is a message that is sent with no reply +data Notice = + Insert { + iFullCollection :: FullCollection, + iDocuments :: [Document]} + | Update { + uFullCollection :: FullCollection, + uOptions :: [UpdateOption], + uSelector :: Document, + uUpdater :: Document} + | Delete { + dFullCollection :: FullCollection, + dOptions :: [DeleteOption], + dSelector :: Document} + | KillCursors { + kCursorIds :: [CursorId]} + deriving (Show, Eq) + +data UpdateOption = + Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found + | MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc + deriving (Show, Eq) + +data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed + deriving (Show, Eq) + +type CursorId = Int64 + +-- *** Binary format + +nOpcode :: Notice -> Opcode +nOpcode Update{} = 2001 +nOpcode Insert{} = 2002 +nOpcode Delete{} = 2006 +nOpcode KillCursors{} = 2007 + +putNotice :: Notice -> RequestId -> Put +putNotice notice requestId = do + putHeader (nOpcode notice) requestId + putInt32 0 + case notice of + Insert{..} -> do + putCString iFullCollection + mapM_ putDocument iDocuments + Update{..} -> do + putCString uFullCollection + putInt32 (uBits uOptions) + putDocument uSelector + putDocument uUpdater + Delete{..} -> do + putCString dFullCollection + putInt32 (dBits dOptions) + putDocument dSelector + KillCursors{..} -> do + putInt32 $ toEnum (X.length kCursorIds) + mapM_ putInt64 kCursorIds + +uBit :: UpdateOption -> Int32 +uBit Upsert = bit 0 +uBit MultiUpdate = bit 1 + +uBits :: [UpdateOption] -> Int32 +uBits = bitOr . map uBit + +dBit :: DeleteOption -> Int32 +dBit SingleRemove = bit 0 + +dBits :: [DeleteOption] -> Int32 +dBits = bitOr . map dBit + +-- ** Request + +-- | A request is a message that is sent with a 'Reply' returned +data Request = + Query { + qOptions :: [QueryOption], + qFullCollection :: FullCollection, + qSkip :: Int32, -- ^ Number of initial matching documents to skip + qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size. + qSelector :: Document, -- ^ \[\] = return all documents in collection + qProjector :: Document -- ^ \[\] = return whole document + } | GetMore { + gFullCollection :: FullCollection, + gBatchSize :: Int32, + gCursorId :: CursorId} + deriving (Show, Eq) + +data QueryOption = + TailableCursor | + SlaveOK | + NoCursorTimeout -- Never timeout the cursor. When not set, the cursor will die if idle for more than 10 minutes. + deriving (Show, Eq) + +-- *** Binary format + +qOpcode :: Request -> Opcode +qOpcode Query{} = 2004 +qOpcode GetMore{} = 2005 + +putRequest :: Request -> RequestId -> Put +putRequest request requestId = do + putHeader (qOpcode request) requestId + case request of + Query{..} -> do + putInt32 (qBits qOptions) + putCString qFullCollection + putInt32 qSkip + putInt32 qBatchSize + putDocument qSelector + unless (null qProjector) (putDocument qProjector) + GetMore{..} -> do + putInt32 0 + putCString gFullCollection + putInt32 gBatchSize + putInt64 gCursorId + +qBit :: QueryOption -> Int32 +qBit TailableCursor = bit 1 +qBit SlaveOK = bit 2 +qBit NoCursorTimeout = bit 4 + +qBits :: [QueryOption] -> Int32 +qBits = bitOr . map qBit + +-- ** Reply + +-- | A reply is a message received in response to a 'Request' +data Reply = Reply { + rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure + rCursorId :: CursorId, -- ^ 0 = cursor finished + rStartingFrom :: Int32, + rDocuments :: [Document] + } deriving (Show, Eq) + +-- * Binary format + +replyOpcode :: Opcode +replyOpcode = 1 + +getReply :: Get (ResponseTo, Reply) +getReply = do + (opcode, responseTo) <- getHeader + unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode + rResponseFlag <- getInt32 + rCursorId <- getInt64 + rStartingFrom <- getInt32 + numDocs <- fromIntegral <$> getInt32 + rDocuments <- replicateM numDocs getDocument + return (responseTo, Reply{..}) -- * Authentication @@ -50,247 +271,3 @@ pwHash u p = pack . md5sum . toByteString $ u `U.append` ":mongo:" `U.append` p pwKey :: Nonce -> Username -> Password -> UString pwKey n u p = pack . md5sum . toByteString . U.append n . U.append u $ pwHash u p - --- * FullCollection - -type FullCollection = UString --- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\" - --- * Request / response - -insert :: Insert -> Op () --- ^ Insert documents into collection -insert = send_ . putInsert - -update :: Update -> Op () --- ^ Update documents in collection matching selector using updater -update = send_ . putUpdate - -delete :: Delete -> Op () --- ^ Delete documents in collection matching selector -delete = send_ . putDelete - -killCursors :: [CursorId] -> Op () --- ^ Close cursors on server because we will not be getting anymore documents from them -killCursors = send_ . putKillCursors . KillCursors - -query :: Query -> Op Reply --- ^ Return first batch of documents in collection matching selector and a cursor-id for getting remaining documents (see 'getMore') -query q = do - requestId <- send (putQuery q) - (reply, responseTo) <- receive getReply - unless (responseTo == requestId) $ fail "expected response id to match query request id" - return reply - -getMore :: GetMore -> Op Reply --- ^ Get next batch of documents from cursor -getMore g = do - requestId <- send (putGetMore g) - (reply, responseTo) <- receive getReply - unless (responseTo == requestId) $ fail "expected response id to match get-more request id" - return reply - --- ** Send / receive - -type RequestId = Int32 --- ^ A fresh request id is generated for every message - -genRequestId :: IO RequestId --- ^ Generate fresh request id -genRequestId = atomicModifyIORef counter $ \n -> (n + 1, n) where - counter :: IORef RequestId - counter = unsafePerformIO (newIORef 0) - {-# NOINLINE counter #-} - -type ResponseTo = RequestId - -send_ :: (RequestId -> Put) -> Op () -send_ x = send x >> return () - -send :: (RequestId -> Put) -> Op RequestId -send rput = do - requestId <- liftIO genRequestId - let bytes = runPut (rput requestId) - let lengthBytes = runPut . putInt32 $ (toEnum . fromEnum) (B.length bytes + 4) - sendBytes (B.append lengthBytes bytes) - flushBytes - return requestId - -receive :: Get a -> Op a -receive getMess = do - messageLength <- fromIntegral . runGet getInt32 <$> receiveBytes 4 - runGet getMess <$> receiveBytes (messageLength - 4) - --- * Messages - -data Insert = Insert { - iFullCollection :: FullCollection, - iDocuments :: [Document] - } deriving (Show, Eq) - -data Update = Update { - uFullCollection :: FullCollection, - uOptions :: [UpdateOption], - uSelector :: Document, - uUpdater :: Document - } deriving (Show, Eq) - -data UpdateOption = - Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found - | MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc - deriving (Show, Eq) - -data Delete = Delete { - dFullCollection :: FullCollection, - dOptions :: [DeleteOption], - dSelector :: Document - } deriving (Show, Eq) - -data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed - deriving (Show, Eq) - -data Query = Query { - qOptions :: [QueryOption], - qFullCollection :: FullCollection, - qSkip :: Int32, -- ^ Number of initial matching documents to skip - qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size. - qSelector :: Document, -- ^ \[\] = return all documents in collection - qProjector :: Document -- ^ \[\] = return whole document - } deriving (Show, Eq) - -data QueryOption = - TailableCursor | - SlaveOK | - NoCursorTimeout -- Never timeout the cursor. When not set, the cursor will die if idle for more than 10 minutes. - deriving (Show, Eq) - -data GetMore = GetMore { - gFullCollection :: FullCollection, - gBatchSize :: Int32, - gCursorId :: CursorId - } deriving (Show, Eq) - -newtype KillCursors = KillCursors { - kCursorIds :: [CursorId] - } deriving (Show, Eq) - -data Reply = Reply { - rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure - rCursorId :: CursorId, -- ^ 0 = cursor finished - rStartingFrom :: Int32, - rDocuments :: [Document] - } deriving (Show, Eq) - -type CursorId = Int64 - --- ** Messages binary format - -type Opcode = Int32 --- ^ Code for each message type -replyOpcode, updateOpcode, insertOpcode, queryOpcode, getMoreOpcode, deleteOpcode, killCursorsOpcode :: Opcode -replyOpcode = 1 -updateOpcode = 2001 -insertOpcode = 2002 -queryOpcode = 2004 -getMoreOpcode = 2005 -deleteOpcode = 2006 -killCursorsOpcode = 2007 - -putUpdate :: Update -> RequestId -> Put -putUpdate Update{..} = putMessage updateOpcode $ do - putInt32 0 - putCString uFullCollection - putInt32 (uBits uOptions) - putDocument uSelector - putDocument uUpdater - -uBit :: UpdateOption -> Int32 -uBit Upsert = bit 0 -uBit MultiUpdate = bit 1 - -uBits :: [UpdateOption] -> Int32 -uBits = bitOr . map uBit - -putInsert :: Insert -> RequestId -> Put -putInsert Insert{..} = putMessage insertOpcode $ do - putInt32 0 - putCString iFullCollection - mapM_ putDocument iDocuments - -putDelete :: Delete -> RequestId -> Put -putDelete Delete{..} = putMessage deleteOpcode $ do - putInt32 0 - putCString dFullCollection - putInt32 (dBits dOptions) - putDocument dSelector - -dBit :: DeleteOption -> Int32 -dBit SingleRemove = bit 0 - -dBits :: [DeleteOption] -> Int32 -dBits = bitOr . map dBit - -putQuery :: Query -> RequestId -> Put -putQuery Query{..} = putMessage queryOpcode $ do - putInt32 (qBits qOptions) - putCString qFullCollection - putInt32 qSkip - putInt32 qBatchSize - putDocument qSelector - unless (null qProjector) (putDocument qProjector) - -qBit :: QueryOption -> Int32 -qBit TailableCursor = bit 1 -qBit SlaveOK = bit 2 -qBit NoCursorTimeout = bit 4 - -qBits :: [QueryOption] -> Int32 -qBits = bitOr . map qBit - -putGetMore :: GetMore -> RequestId -> Put -putGetMore GetMore{..} = putMessage getMoreOpcode $ do - putInt32 0 - putCString gFullCollection - putInt32 gBatchSize - putInt64 gCursorId - -putKillCursors :: KillCursors -> RequestId -> Put -putKillCursors KillCursors{..} = putMessage killCursorsOpcode $ do - putInt32 0 - putInt32 $ toEnum (P.length kCursorIds) - mapM_ putInt64 kCursorIds - -getReply :: Get (Reply, ResponseTo) -getReply = getMessage replyOpcode $ do - rResponseFlag <- getInt32 - rCursorId <- getInt64 - rStartingFrom <- getInt32 - numDocs <- getInt32 - rDocuments <- replicateM (fromIntegral numDocs) getDocument - return $ Reply {..} - --- *** Message header - -putMessage :: Opcode -> Put -> RequestId -> Put --- ^ Note, does not write message length (first int32), assumes caller will write it -putMessage opcode messageBodyPut requestId = do - putInt32 requestId - putInt32 0 - putInt32 opcode - messageBodyPut - -getMessage :: Opcode -> Get a -> Get (a, ResponseTo) --- ^ Note, does not read message length (first int32), assumes it was already read -getMessage expectedOpcode getMessageBody = do - _requestId <- getInt32 - responseTo <- getInt32 - opcode <- getInt32 - unless (opcode == expectedOpcode) $ - fail $ "expected opcode " ++ show expectedOpcode ++ " but got " ++ show opcode - body <- getMessageBody - return (body, responseTo) - - -{- Authors: Tony Hannan - Copyright 2010 10gen Inc. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Util.hs b/Database/MongoDB/Internal/Util.hs similarity index 69% rename from Database/MongoDB/Util.hs rename to Database/MongoDB/Internal/Util.hs index b868dfc..9e7b4d4 100644 --- a/Database/MongoDB/Util.hs +++ b/Database/MongoDB/Internal/Util.hs @@ -2,19 +2,16 @@ {-# LANGUAGE StandaloneDeriving #-} -module Database.MongoDB.Util where +module Database.MongoDB.Internal.Util where import Prelude hiding (length) import Network (PortID(..)) import Control.Applicative (Applicative(..), (<$>)) -import Control.Exception (assert) import Control.Monad.Reader import Control.Monad.Error -import Data.UString as U (UString, cons, append) +import Data.UString as U (cons, append) import Data.Bits (Bits, (.|.)) import Data.Bson -import System.IO (Handle) -import Data.ByteString.Lazy as B (ByteString, length, append, hGet) deriving instance Show PortID deriving instance Eq PortID @@ -31,6 +28,10 @@ instance (Monad m, Error e) => Applicative (ErrorT e m) where ignore :: (Monad m) => a -> m () ignore _ = return () +snoc :: [a] -> a -> [a] +-- ^ add element to end of list (/snoc/ is reverse of /cons/, which adds to front of list) +snoc list a = list ++ [a] + type Secs = Float bitOr :: (Bits a) => [a] -> a @@ -53,13 +54,3 @@ true1 k doc = case valueAt k doc of Int32 n -> n == 1 Int64 n -> n == 1 _ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc - -hGetN :: Handle -> Int -> IO ByteString --- ^ Read N bytes from hande, blocking until all N bytes are read. --- Unlike hGet which only blocks if no bytes are available, otherwise it returns the X bytes immediately available where X <= N. -hGetN h n = assert (n >= 0) $ do - bytes <- hGet h n - let x = fromIntegral (length bytes) - if x >= n then return bytes else do - remainingBytes <- hGetN h (n - x) - return (B.append bytes remainingBytes) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index c608a8a..c210f10 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1,17 +1,22 @@ -- | Query and update documents residing on a MongoDB server(s) -{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections #-} +{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses #-} module Database.MongoDB.Query ( + -- * Connection + Failure(..), Conn, Connected, runConn, -- * Database - Database, allDatabases, Db, useDb, thisDatabase, runDbOp, + Database, allDatabases, DbConn, useDb, thisDatabase, -- ** Authentication P.Username, P.Password, auth, -- * Collection Collection, allCollections, -- ** Selection - Selection(..), select, Selector, whereJS, + Selection(..), Selector, whereJS, + Select(select), -- * Write + -- ** WriteMode + WriteMode(..), writeMode, -- ** Insert insert, insert_, insertMany, insertMany_, -- ** Update @@ -20,10 +25,10 @@ module Database.MongoDB.Query ( delete, deleteOne, -- * Read -- ** Query - Query(..), P.QueryOption(..), Projector, Limit, Order, BatchSize, query, + Query(..), P.QueryOption(..), Projector, Limit, Order, BatchSize, explain, find, findOne, count, distinct, -- *** Cursor - Cursor, next, nextN, rest, closeCursor, + Cursor, next, nextN, rest, -- ** Group Group(..), GroupKey(..), group, -- ** MapReduce @@ -31,49 +36,88 @@ module Database.MongoDB.Query ( -- * Command Command, runCommand, runCommand1, eval, - ErrorCode, getLastError, resetLastError ) where import Prelude as X hiding (lookup) -import Control.Applicative ((<$>)) -import Database.MongoDB.Internal.Connection +import Control.Applicative ((<$>), Applicative(..)) +import Control.Arrow (left, first, second) +import Control.Monad.Context +import Control.Monad.Reader +import Control.Monad.Error +import System.IO.Error (try) +import Control.Concurrent.MVar +import Control.Pipeline (Resource(..)) import qualified Database.MongoDB.Internal.Protocol as P -import Database.MongoDB.Internal.Protocol hiding (insert, update, delete, query, Query(Query)) +import Database.MongoDB.Internal.Protocol hiding (Query, send, call) import Data.Bson import Data.Word import Data.Int -import Control.Monad.Reader -import Control.Concurrent.MVar import Data.Maybe (listToMaybe, catMaybes) import Data.UString as U (dropWhile, any, tail) -import Database.MongoDB.Util (loop, (<.>), true1) +import Database.MongoDB.Internal.Util (loop, (<.>), true1) -- plus Applicative instances of ErrorT & ReaderT + +-- * Connection + +-- | A monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure +class (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m +instance (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m + +-- | Connection or Server failure like network problem or disk full +data Failure = + ConnectionFailure IOError + -- ^ Error during sending or receiving bytes over a 'Connection'. The connection is not automatically closed when this error happens; the user must close it. Any other IOErrors raised during a Task or Op are not caught. The user is responsible for these other types of errors not related to sending/receiving bytes over the connection. + | ServerFailure String + -- ^ Failure on server, like disk full, which is usually observed using getLastError. Calling 'fail' inside a connected monad raises this failure. Do not call 'fail' unless it is a temporary server failure, like disk full. For example, receiving unexpected data from the server is not a server failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change. + deriving (Show, Eq) + +instance Error Failure where strMsg = ServerFailure + +type Connected m = ErrorT Failure (ReaderT WriteMode (ReaderT Connection m)) + +runConn :: Connected m a -> Connection -> m (Either Failure a) +-- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution. +runConn action = runReaderT (runReaderT (runErrorT action) Unsafe) + +send :: (Conn m) => [Notice] -> m () +-- ^ Send notices as a contiguous batch to server with no reply. Raise Failure if connection fails. +send ns = do + conn <- context + e <- liftIO $ try (P.send conn ns) + either (throwError . ConnectionFailure) return e + +call :: (Conn m) => [Notice] -> Request -> m (IO (Either Failure Reply)) +-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will raise Failure if connection fails send, and promise will return Failure if connection fails receive. +call ns r = do + conn <- context + e <- liftIO $ try (P.call conn ns r) + case e of + Left err -> throwError (ConnectionFailure err) + Right promise -> return (left ConnectionFailure <$> try promise) -- * Database type Database = UString -- ^ Database name +-- | A 'Conn' monad with access to a 'Database' +class (Context Database m, Conn m) => DbConn m +instance (Context Database m, Conn m) => DbConn m + allDatabases :: (Conn m) => m [Database] -- ^ List all databases residing on server allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases") -type Db m = ReaderT Database m - -useDb :: Database -> Db m a -> m a +useDb :: Database -> ReaderT Database m a -> m a -- ^ Run Db action against given database useDb = flip runReaderT -thisDatabase :: (Monad m) => Db m Database +thisDatabase :: (DbConn m) => m Database -- ^ Current database in use -thisDatabase = ask - -runDbOp :: (Conn m) => Db Op a -> Db m a --- ^ Run db operation with exclusive access to the connection -runDbOp dbOp = ReaderT (runOp . flip useDb dbOp) +thisDatabase = context -- * Authentication -auth :: (Conn m) => Username -> Password -> Db m Bool +auth :: (DbConn m) => Username -> Password -> m Bool -- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new connection. auth u p = do n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)] @@ -84,7 +128,7 @@ auth u p = do type Collection = UString -- ^ Collection name (not prefixed with database) -allCollections :: (Conn m) => Db m [Collection] +allCollections :: (DbConn m) => m [Collection] -- ^ List all collections in this database allCollections = do db <- thisDatabase @@ -99,9 +143,9 @@ allCollections = do data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq) -- ^ Selects documents in collection that match selector -select :: Selector -> Collection -> Selection +{-select :: Selector -> Collection -> Selection -- ^ Synonym for 'Select' -select = Select +select = Select-} type Selector = Document -- ^ Filter for a query, analogous to the where clause in SQL. @[]@ matches all documents in collection. @[x =: a, y =: b]@ is analogous to @where x = a and y = b@ in SQL. See for full selector syntax. @@ -110,26 +154,72 @@ whereJS :: Selector -> Javascript -> Selector -- ^ Add Javascript predicate to selector, in which case a document must match both selector and predicate whereJS sel js = ("$where" =: js) : sel +class Select aQueryOrSelection where + select :: Selector -> Collection -> aQueryOrSelection + -- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @find (select sel col)@ it is a Query, and in @delete (select sel col)@ it is a Selection. + +instance Select Selection where + select = Select + +instance Select Query where + select = query + -- * Write +-- ** WriteMode + +-- | Default write-mode is 'Unsafe' +data WriteMode = + Unsafe -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not. + | Safe -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. + deriving (Show, Eq) + +writeMode :: (Conn m) => WriteMode -> m a -> m a +-- ^ Run action with given 'WriteMode' +writeMode = push . const + +write :: (DbConn m) => Notice -> m () +-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'ServerFailure' if it reports an error. +write notice = do + mode <- context + case mode of + Unsafe -> send [notice] + Safe -> do + me <- getLastError [notice] + maybe (return ()) (throwError . ServerFailure . show) me + +type ErrorCode = Int +-- ^ Error code from getLastError + +getLastError :: (DbConn m) => [Notice] -> m (Maybe (ErrorCode, String)) +-- ^ Send notices (writes) then fetch what the last error was, Nothing means no error +getLastError writes = do + r <- runCommand' writes ["getlasterror" =: (1 :: Int)] + return $ (at "code" r,) <$> lookup "err" r + +{-resetLastError :: (DbConn m) => m () +-- ^ Clear last error +resetLastError = runCommand1 "reseterror" >> return ()-} + -- ** Insert -insert :: (Conn m) => Collection -> Document -> Db m Value +insert :: (DbConn m) => Collection -> Document -> m Value -- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied insert col doc = head <$> insertMany col [doc] -insert_ :: (Conn m) => Collection -> Document -> Db m () +insert_ :: (DbConn m) => Collection -> Document -> m () -- ^ Same as 'insert' except don't return _id insert_ col doc = insert col doc >> return () -insertMany :: (Conn m) => Collection -> [Document] -> Db m [Value] +insertMany :: (DbConn m) => Collection -> [Document] -> m [Value] -- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied -insertMany col docs = ReaderT $ \db -> do +insertMany col docs = do + db <- thisDatabase docs' <- liftIO $ mapM assignId docs - runOp $ P.insert (Insert (db <.> col) docs') + write (Insert (db <.> col) docs') mapM (look "_id") docs' -insertMany_ :: (Conn m) => Collection -> [Document] -> Db m () +insertMany_ :: (DbConn m) => Collection -> [Document] -> m () -- ^ Same as 'insertMany' except don't return _ids insertMany_ col docs = insertMany col docs >> return () @@ -141,55 +231,64 @@ assignId doc = if X.any (("_id" ==) . label) doc -- ** Update -save :: (Conn m) => Collection -> Document -> Db m () +save :: (DbConn m) => Collection -> Document -> m () -- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or update it if its not new (has \"_id\" field) save col doc = case look "_id" doc of Nothing -> insert_ col doc Just i -> repsert (Select ["_id" := i] col) doc -replace :: (Conn m) => Selection -> Document -> Db m () +replace :: (DbConn m) => Selection -> Document -> m () -- ^ Replace first document in selection with given document replace = update [] -repsert :: (Conn m) => Selection -> Document -> Db m () +repsert :: (DbConn m) => Selection -> Document -> m () -- ^ Replace first document in selection with given document, or insert document if selection is empty repsert = update [Upsert] type Modifier = Document -- ^ Update operations on fields in a document. See -modify :: (Conn m) => Selection -> Modifier -> Db m () +modify :: (DbConn m) => Selection -> Modifier -> m () -- ^ Update all documents in selection using given modifier modify = update [MultiUpdate] -update :: (Conn m) => [UpdateOption] -> Selection -> Document -> Db m () +update :: (DbConn m) => [UpdateOption] -> Selection -> Document -> m () -- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty. -update opts (Select sel col) up = ReaderT $ \db -> runOp $ P.update (Update (db <.> col) opts sel up) +update opts (Select sel col) up = do + db <- thisDatabase + write (Update (db <.> col) opts sel up) -- ** Delete -delete :: (Conn m) => Selection -> Db m () +delete :: (DbConn m) => Selection -> m () -- ^ Delete all documents in selection -delete (Select sel col) = ReaderT $ \db -> runOp $ P.delete (Delete (db <.> col) [] sel) +delete = delete' [] -deleteOne :: (Conn m) => Selection -> Db m () +deleteOne :: (DbConn m) => Selection -> m () -- ^ Delete first document in selection -deleteOne (Select sel col) = ReaderT $ \db -> runOp $ P.delete (Delete (db <.> col) [SingleRemove] sel) +deleteOne = delete' [SingleRemove] + +delete' :: (DbConn m) => [DeleteOption] -> Selection -> m () +-- ^ Delete all documents in selection unless 'SingleRemove' option is given then only delete first document in selection +delete' opts (Select sel col) = do + db <- thisDatabase + write (Delete (db <.> col) opts sel) -- * Read -- ** Query +-- | Use 'select' to create a basic query with defaults, then modify if desired. For example, @(select sel col) {limit = 10}@ data Query = Query { - options :: [QueryOption], + options :: [QueryOption], -- ^ Default = [] selection :: Selection, - project :: Projector, -- ^ \[\] = all fields - skip :: Word32, -- ^ Number of initial matching documents to skip - limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit - sort :: Order, -- ^ Sort results by this order, [] = no sort - snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. - batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. - hint :: Order -- ^ Force MongoDB to use this index, [] = no hint + project :: Projector, -- ^ \[\] = all fields. Default = [] + skip :: Word32, -- ^ Number of initial matching documents to skip. Default = 0 + limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit. Default = 0 + sort :: Order, -- ^ Sort results by this order, [] = no sort. Default = [] + snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = False + batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0 + hint :: Order -- ^ Force MongoDB to use this index, [] = no hint. Default = [] } deriving (Show, Eq) type Projector = Document @@ -216,12 +315,9 @@ batchSizeRemainingLimit batchSize limit = if limit == 0 then (fromIntegral batchSize, limit - batchSize) else (- fromIntegral limit, 1) -protoQuery :: Database -> Query -> (P.Query, Limit) -protoQuery = protoQuery' False - -protoQuery' :: Bool -> Database -> Query -> (P.Query, Limit) +queryRequest :: Bool -> Query -> Database -> (Request, Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. -protoQuery' isExplain db Query{..} = (P.Query{..}, remainingLimit) where +queryRequest isExplain Query{..} db = (P.Query{..}, remainingLimit) where qOptions = options qFullCollection = db <.> coll selection qSkip = fromIntegral skip @@ -234,80 +330,111 @@ protoQuery' isExplain db Query{..} = (P.Query{..}, remainingLimit) where special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection -find :: (Conn m) => Query -> Db m Cursor --- ^ Fetch documents satisfying query -find q@Query{selection, batchSize} = ReaderT $ \db -> do - let (q', remainingLimit) = protoQuery db q - cs <- fromReply remainingLimit =<< runOp (P.query q') - newCursor db (coll selection) batchSize cs +runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState' +-- ^ Send query request and return cursor state +runQuery isExplain ns q = call' ns . queryRequest isExplain q =<< thisDatabase -findOne :: (Conn m) => Query -> Db m (Maybe Document) --- ^ Fetch first document satisfying query or Nothing if none satisfy it -findOne q = ReaderT $ \db -> do - let (q', x) = protoQuery db q {limit = 1} - CS _ _ docs <- fromReply x =<< runOp (P.query q') +find :: (DbConn m) => Query -> m Cursor +-- ^ Fetch documents satisfying query +find q@Query{selection, batchSize} = do + db <- thisDatabase + cs' <- runQuery False [] q + newCursor db (coll selection) batchSize cs' + +findOne' :: (DbConn m) => [Notice] -> Query -> m (Maybe Document) +-- ^ Send notices and fetch first document satisfying query or Nothing if none satisfy it +findOne' ns q = do + CS _ _ docs <- cursorState =<< runQuery False ns q {limit = 1} return (listToMaybe docs) -explain :: (Conn m) => Query -> Db m Document --- ^ Return performance stats of query execution -explain q = ReaderT $ \db -> do -- same as findOne but with explain set to true - let (q', x) = protoQuery' True db q {limit = 1} - CS _ _ docs <- fromReply x =<< runOp (P.query q') - when (null docs) . fail $ "no explain: " ++ show q' - return (head docs) +findOne :: (DbConn m) => Query -> m (Maybe Document) +-- ^ Fetch first document satisfying query or Nothing if none satisfy it +findOne = findOne' [] -count :: (Conn m) => Query -> Db m Int +explain :: (DbConn m) => Query -> m Document +-- ^ Return performance stats of query execution +explain q = do -- same as findOne but with explain set to true + CS _ _ docs <- cursorState =<< runQuery True [] q {limit = 1} + return $ if null docs then error ("no explain: " ++ show q) else head docs + +count :: (DbConn m) => Query -> m Int -- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present) count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand (["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)] ++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32))) -distinct :: (Conn m) => Label -> Selection -> Db m [Value] +distinct :: (DbConn m) => Label -> Selection -> m [Value] -- ^ Fetch distinct values of field in selected documents distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel] -- *** Cursor -data Cursor = Cursor FullCollection BatchSize (MVar CursorState) +data Cursor = Cursor FullCollection BatchSize (MVar CursorState') -- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a ServerFailure exception. Note, a cursor is not closed when the connection is closed, so you can open another connection to the same server and continue using the cursor. +modifyCursorState' :: (Conn m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Connected IO (CursorState', a)) -> m a +-- ^ Analogous to 'modifyMVar' but with Conn monad +modifyCursorState' (Cursor fcol batch var) act = do + conn <- context + e <- liftIO . modifyMVar var $ \cs' -> + either ((cs',) . Left) (second Right) <$> runConn (act fcol batch cs') conn + either throwError return e + +getCursorState :: (Conn m) => Cursor -> m CursorState +-- ^ Extract current cursor status +getCursorState (Cursor _ _ var) = cursorState =<< liftIO (readMVar var) + +data CursorState' = Delayed (IO (Either Failure CursorState)) | CursorState CursorState +-- ^ A cursor state or a promised cursor state which may fail + +cursorState :: (Conn m) => CursorState' -> m CursorState +-- ^ Convert promised cursor state to cursor state or raise Failure +cursorState (Delayed promise) = either throwError return =<< liftIO promise +cursorState (CursorState cs) = return cs + data CursorState = CS Limit CursorId [Document] -- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch. -fromReply :: (Monad m) => Limit -> Reply -> m CursorState +fromReply :: Limit -> Reply -> Either Failure CursorState +-- ^ Convert Reply to CursorState or Failure fromReply limit Reply{..} = if rResponseFlag == 0 - then return (CS limit rCursorId rDocuments) - else fail $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments + then Right (CS limit rCursorId rDocuments) + else Left . ServerFailure $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments -newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState -> m Cursor --- ^ Cursor is closed when garbage collected, explicitly closed, or CIO action ends (connection closed) +call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState' +-- ^ Send notices and request and return promised cursor state +call' ns (req, remainingLimit) = do + promise <- call ns req + return $ Delayed (fmap (fromReply remainingLimit =<<) promise) + +newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor +-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected. newCursor db col batch cs = do - conn <- getConnection + conn <- context var <- liftIO (newMVar cs) - liftIO . addMVarFinalizer var $ do - -- kill cursor on server when garbage collected on client, if connection not already closed - CS _ cid _ <- readMVar var - unless (cid == 0) $ do - done <- isClosed conn - unless done $ runTask (runOp $ P.killCursors [cid]) conn >> return () - return (Cursor (db <.> col) batch var) + let cursor = Cursor (db <.> col) batch var + liftIO . addMVarFinalizer var $ runConn (close cursor) conn >> return () + return cursor next :: (Conn m) => Cursor -> m (Maybe Document) -- ^ Return next document in query result, or Nothing if finished. --- This can run inside or outside a 'Db' monad (a 'useDb' block), since @Conn m => ReaderT r m@ is an instance of the 'Conn' type class, along with @Task@ and @Op@ -next (Cursor fcol batch var) = runOp . exposeIO $ \h -> modifyMVar var $ \cs -> - -- Get lock on connection (runOp) first then get lock on cursor, otherwise you could get in deadlock if already inside an Op (connection locked), but another Task gets lock on cursor first and then tries runOp (deadlock). - either ((cs,) . Left) (fmap Right) <$> hideIO (nextState cs) h - where - nextState :: CursorState -> Op (CursorState, Maybe Document) - nextState (CS limit cid docs) = case docs of - doc : docs' -> return (CS limit cid docs', Just doc) - [] -> if cid == 0 - then return (CS 0 0 [], Nothing) -- finished - else let -- fetch next batch from server - (batchSize, remLimit) = batchSizeRemainingLimit batch limit - getNextBatch = fromReply remLimit =<< P.getMore (GetMore fcol batchSize cid) - in nextState =<< getNextBatch +next cursor = modifyCursorState' cursor nextState where + -- Pre-fetch next batch promise from server when last one in current batch is returned. + nextState :: FullCollection -> BatchSize -> CursorState' -> Connected IO (CursorState', Maybe Document) + nextState fcol batch cs' = do + CS limit cid docs <- cursorState cs' + case docs of + doc : docs' -> do + cs'' <- if null docs' && cid /= 0 + then nextBatch fcol batch limit cid + else return $ CursorState (CS limit cid docs') + return (cs'', Just doc) + [] -> if cid == 0 + then return (CursorState $ CS 0 0 [], Nothing) -- finished + else error $ "server returned empty batch but says more results on server" + nextBatch fcol batch limit cid = let + (batchSize, remLimit) = batchSizeRemainingLimit batch limit + in call' [] (GetMore fcol batchSize cid, remLimit) nextN :: (Conn m) => Int -> Cursor -> m [Document] -- ^ Return next N documents or less if end is reached @@ -317,12 +444,13 @@ rest :: (Conn m) => Cursor -> m [Document] -- ^ Return remaining documents in query result rest c = loop (next c) -closeCursor :: (Conn m) => Cursor -> m () --- ^ Close cursor without reading rest of results. Cursor closes automatically when you read all results. -closeCursor (Cursor _ _ var) = runOp . exposeIO $ \h -> - modifyMVar var $ \cs@(CS _ cid _) -> if cid == 0 - then return (CS 0 0 [], Right ()) - else either ((cs,) . Left) ((CS 0 0 [],) . Right) <$> hideIO (P.killCursors [cid]) h +instance (Conn m) => Resource m Cursor where + close cursor = modifyCursorState' cursor kill' where + kill' _ _ cs' = first CursorState <$> (kill =<< cursorState cs') + kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]] + isClosed cursor = do + CS _ cid docs <- getCursorState cursor + return (cid == 0 && null docs) -- ** Group @@ -348,7 +476,7 @@ groupDocument Group{..} = "initial" =: gInitial, "cond" =: gCond ] -group :: (Conn m) => Group -> Db m [Document] +group :: (DbConn m) => Group -> m [Document] -- ^ Execute group query and return resulting aggregate value for each distinct key group g = at "retval" <$> runCommand ["group" =: groupDocument g] @@ -397,12 +525,12 @@ mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce -- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments. mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False -runMR :: (Conn m) => MapReduce -> Db m Cursor +runMR :: (DbConn m) => MapReduce -> m Cursor -- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript) -- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when connection closes. runMR mr = find . query [] =<< (at "result" <$> runMR' mr) -runMR' :: (Conn m) => MapReduce -> Db m Document +runMR' :: (DbConn m) => MapReduce -> m Document -- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript). runMR' mr = do doc <- runCommand (mrDocument mr) @@ -413,32 +541,23 @@ runMR' mr = do type Command = Document -- ^ A command is a special query or action against the database. See for details. -runCommand :: (Conn m) => Command -> Db m Document --- ^ Run command against the database and return its result -runCommand c = maybe err return =<< findOne (query c "$cmd") where - err = fail $ "Nothing returned for command: " ++ show c +runCommand' :: (DbConn m) => [Notice] -> Command -> m Document +-- ^ Send notices then run command and return its result +runCommand' ns c = maybe err id <$> findOne' ns (query c "$cmd") where + err = error $ "Nothing returned for command: " ++ show c -runCommand1 :: (Conn m) => UString -> Db m Document --- ^ @runCommand1 "foo" = runCommand ["foo" =: 1]@ +runCommand :: (DbConn m) => Command -> m Document +-- ^ Run command against the database and return its result +runCommand = runCommand' [] + +runCommand1 :: (DbConn m) => UString -> m Document +-- ^ @runCommand1 foo = runCommand [foo =: 1]@ runCommand1 c = runCommand [c =: (1 :: Int)] -eval :: (Conn m) => Javascript -> Db m Document +eval :: (DbConn m) => Javascript -> m Document -- ^ Run code on server eval code = at "retval" <$> runCommand ["$eval" =: code] -type ErrorCode = Int --- ^ Error code from getLastError - -getLastError :: Db Op (Maybe (ErrorCode, String)) --- ^ Fetch what the last error was, Nothing means no error. Especially useful after a write since it is asynchronous (ie. nothing is returned after a write, so we don't know if it succeeded or not). To ensure no interleaving db operation executes between the write we want to check and getLastError, this can only be executed inside a 'runDbOp' which gets exclusive access to the connection. -getLastError = do - r <- runCommand1 "getlasterror" - return $ (at "code" r,) <$> lookup "err" r - -resetLastError :: Db Op () --- ^ Clear last error -resetLastError = runCommand1 "reseterror" >> return () - {- Authors: Tony Hannan Copyright 2010 10gen Inc. diff --git a/TODO b/TODO index bd4a68c..66ff971 100644 --- a/TODO +++ b/TODO @@ -3,6 +3,7 @@ TODO BSON ---- ++ implement deprecated types (were left out) + on insert/update: reject keys that start with "$" or "." + data support for common mongo "$symbols" + convert from/to json @@ -15,14 +16,12 @@ BSON MongoDB ------- + support full level 0 - - hint - operations on database objects * add_son_manipulators? * dereference (dbref) - database admin * getProfilingInfo - misc operations - * explain * getCollectionOptions - cursor object * hasMore @@ -38,7 +37,7 @@ MongoDB - getLastError options - Update If Current (http://www.mongodb.org/display/DOCS/Atomic+Operations) - block write until written on N replicas -- lazyRest on cursor, although lazy I/) is problematic and we may not want to support it. +- lazyRest on cursor, although lazy I/O) is problematic and we may not want to support it. - Upon client exit, send killCursors for all open cursors, otherwise server will keep them open for 10 minutes and keep NoCursorTimeout cursors open for hours. -- Upon cursor finalize (garbage collect) send killCursor even if you have to create a new connection, because server keeps cursors open for 10 minutes (or more). @@ -46,7 +45,6 @@ MongoDB - automatic reconnection - buffer pooling - connection pooling. Unsafe to shrink pool and close connections because map/reduce temp tables that were created on the connection will get deleted. Note, other connections can access a map/reduce temp table as long as the original connection is still alive. Also, other connections can access cursors created on other connections, even if those die. Cursors will be deleted on server only if idle for more than 10 minutes. Accessing a deleted cursor returns an error. -+ support safe operations, although operation with exclusive connection access is available which can be used to getLastError and check for that previous write was safe (successful). + auto-destoy connection (how?/when?). Although, GHC will automatically close connection (Handle) when garbage collected. + don't read into cursor until needed, but have cursor send getMore before it is actually out of docs (so network is finished by the time we're ready diff --git a/V0.5.0-Redesign.md b/V0.5.0-Redesign.md deleted file mode 100644 index 77d540c..0000000 --- a/V0.5.0-Redesign.md +++ /dev/null @@ -1,30 +0,0 @@ -Hi Scott, - -Thanks for writing the Haskell driver for MongoDB! It functions well but I basically rewrote it in an attempt to factor it nicely and support additional functionality like multiple threads using the same connection. I hope you like it! You can find it on my fork of your repository at http://github.com/TonyGen/mongoDB. - -First, I separated out BSON into its own package, since it can be used as an interchange format independent of MongoDB. You can find this new package on Github at http://github.com/TonyGen/bson-haskell and on Hackage at http://hackage.haskell.org/package/bson. I also made the BSON easier to write and view. For example, you can write: ["a" =: 1, "b" =: "hello", "c" =: [1,2,3]], and it shows as: [a: 1, b: "hello", c: [1,2,3]]. - -Second, for modularity, I separated MongoDB into 5 modules: MongoDB-Internal-Connection, MongoDB-Internal-Protocol, MongoDB-Connection, MongoDB-Query, and MongoDB-Admin. - -MongoDB-Internal-Connection defines a connection with multi-threaded support via two monads, one with shared access to a connection (Task), and one with exclusive access to a connection (Op). This module also exposes low-level writing and reading bytes inside the Op monad for MongoDB-Internal-Protocol to use. This module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not. - -MongoDB-Internal-Protocol defines the MongoDB Wire Protocol (http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol). It defines the messages the the client and server send back and forth to each other. Again, this module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not. - -MongoDB-Connection re-exports Connection, and Task and Op monads from MongoDB-Internal-Connection but without the low-level read and write bytes functions. It also adds support for replica-sets, which will replace replica-pairs in the next release of MongoDB coming out soon. I had to make two connection modules (MongoDB-Internal-Connection and MongoDB-Connection) because connecting to a replica set requires quering its config info, which requires us to import MongoDB-Query, which recursively imports MongoDB-Internal-Protocol then MongoDB-Internal-Connection. I could have used mutual dependent modules (via .hs-boot) but felt that violated the layered approach I was going for. - -MongoDB-Query defines all the normal query and update operations like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc. - -MongoDB-Admin defines all the administration operations like validateCollection, ensureIndex, dropIndex, addUser, copyDatabase, dbStats, setProfilingLevel, etc. - -Finally, the top-level MongoDB module simply re-exports MongoDB-Connection, MongoDB-Query, and MongoDB-Admin, along with Data.Bson from the bson package. - -I updated your TODO list, removing items I completed, added items that were missing, and added back items I removed from the code like lazy list from a cursor (I am skeptical of lazy I/O, but we could add it back). - -I also update your two tutorials to reflect this new API. - -I hope you like these changes! Let me know your feedback, and I hope we can both maintain it in the future. - -Cheers, -Tony Hannan -10gen Inc. -Creators of MongoDB diff --git a/V0.6-Redesign.md b/V0.6-Redesign.md new file mode 100644 index 0000000..61d4e8c --- /dev/null +++ b/V0.6-Redesign.md @@ -0,0 +1,36 @@ +Hi Scott, + +I slightly refactored my previous version I sent you. Here is the description again of my version as compared to your version. You can disregard my previous version and just read this. + +When evaluating your package, I wanted to make the BSON easier to write and read, and I wanted to make use of monads so you don't have to supply the connection and database every time. Furthermore, I wanted to modularize the code a little more, for example, separating admin function from normal query/update functions. Finally, I wanted to add more functionality, like support for replica sets and multiple threads per connection. The end result was a significant rewrite of your package. I hope you like it! You can find it on my fork of your repository at http://github.com/TonyGen/mongoDB. + +First, I separated out BSON into its own package, since it can be used as an interchange format independent of MongoDB. You can find this new package on Github at http://github.com/TonyGen/bson-haskell and on Hackage at http://hackage.haskell.org/package/bson. I also made the BSON easier to write and view. For example, you can write: ["a" =: 1, "b" =: "hello", "c" =: [1,2,3]], and it shows as: [a: 1, b: "hello", c: [1,2,3]]. + +Second, I created two independent helper modules: Control.Monad.Context and Control.Pipeline. + +Control.Monad.Context is just like Control.Monad.Reader.Class except you can access the context of any Reader in the monad stack instead of just the top one as long as the context types are different. + +Control.Pipeline gives thread-safe and pipelined access to a socket. When you make a call it sends the request and immediately returns a "promise" of the reply without waiting for the reply. When you read the promise it waits for the reply if it has not already arrived then returns it. + +Third, I separated MongoDB into 4 modules: MongoDB.Internal.Protocol, MongoDB.Connection, MongoDB.Query, and MongoDB.Admin. + +MongoDB.Internal.Protocol defines the MongoDB Wire Protocol (http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol). It defines the messages the the client and server send back and forth to each other. Again, this module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not. + +MongoDB.Connection allows you to create a pipelined connection to a specific server or to a master/slave in a replica set. + +MongoDB-Query defines the "connected" monad that has the current connection and database in context, and all the normal query and update operations you execute within this monad like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc. + +MongoDB-Admin defines all the administration operations like validateCollection, ensureIndex, dropIndex, addUser, copyDatabase, dbStats, setProfilingLevel, etc. + +Finally, the top-level MongoDB module simply re-exports MongoDB-Connection, MongoDB-Query, and MongoDB-Admin, along with Data.Bson from the bson package. + +I updated your TODO list, removing items I completed, added items that were missing, and added back items I removed from the code like lazy list from a cursor (I am skeptical of lazy I/O, but we could add it back). + +I also update your two tutorials to reflect this new API. + +I hope you like these changes! Let me know your feedback, and I hope we can both maintain it in the future. + +Cheers, +Tony Hannan +10gen Inc. +Creators of MongoDB diff --git a/map-reduce-example.md b/map-reduce-example.md index 0e74c55..ca82312 100644 --- a/map-reduce-example.md +++ b/map-reduce-example.md @@ -20,10 +20,9 @@ map/reduce queries on: > :set -XOverloadedStrings > import Database.MongoDB > Right conn <- connect (server "localhost") - > let run task = runTask task conn - > let runDb db dbTask = run $ useDb db dbTask + > let run act = runConn (useDb "test" act) con > :{ - runDb "test" $ insertMany "mr1" [ + run $ insertMany "mr1" [ ["x" =: 1, "tags" =: ["dog", "cat"]], ["x" =: 2, "tags" =: ["cat"]], ["x" =: 3, "tags" =: ["mouse", "cat", "dog"]], @@ -68,7 +67,7 @@ key: Note: We can't just return values.length as the reduce function might be called iteratively on the results of other reduce steps. -Finally, we call map_reduce() and iterate over the result collection: +Finally, we run mapReduce and iterate over the result collection: > runDb "test" $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]] @@ -82,4 +81,4 @@ obtain them, use *runMR'* instead: > runDb "test" $ runMR' (mapReduce "mr1" mapFn reduceFn) Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0] -You can then obtain the results from here by quering the result collection yourself. "runMR* (above) does this for you but discards the statistics. +You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics. diff --git a/mongoDB.cabal b/mongoDB.cabal index 453f305..dc59f70 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,5 +1,5 @@ Name: mongoDB -Version: 0.5.0 +Version: 0.6 License: MIT Maintainer: Scott Parish , Tony Hannan Author: Scott Parish , Tony Hannan @@ -23,8 +23,9 @@ Build-Depends: base < 5, bson Build-Type: Simple Exposed-modules: - Database.MongoDB.Util, - Database.MongoDB.Internal.Connection, + Control.Monad.Context, + Control.Pipeline, + Database.MongoDB.Internal.Util, Database.MongoDB.Internal.Protocol, Database.MongoDB.Connection, Database.MongoDB.Query, diff --git a/tutorial.md b/tutorial.md index d7ced80..6feeaf1 100644 --- a/tutorial.md +++ b/tutorial.md @@ -63,31 +63,31 @@ it won't fail. If it does you will get a pattern match error. Task and Db monad ------------------- -The current connection is held in a Reader monad called "Task*, and the -current database is held in a Reader monad on top of that. To run a task, -supply it and a connection to *runTask*. Within a task, to access a database, -wrap you operations in a *useDb*. +The current connection is held in a Connected monad, and the current database +is held in a Reader monad on top of that. To run a connected monad, supply +it and a connection to *runConn*. To access a database within a connected +monad, call *useDb*. -But since we are working in ghci, which requires us to start from the -IO monad every time, we'll define a convenient 'run' function that takes a +Since we are working in ghci, which requires us to start from the +IO monad every time, we'll define a convenient *run* function that takes a db-action and executes it against our "test" database on the server we just connected to: - > let run act = runTask (useDb "test" act) con + > let run act = runConn (useDb "test" act) con -*run* (*runTask*) will return either Left Failure or Right result. Failure +*run* (*runConn*) will return either Left Failure or Right result. Failure means the connection failed (eg. network problem) or the server failed (eg. disk full). Databases and Collections ----------------------------- -A MongoDB can store multiple databases--separate namespaces +A MongoDB can store multiple databases -- separate namespaces under which collections reside. You can obtain the list of databases available on a connection: - > runTask allDatabases con + > runConn allDatabases con You can also use the *run* function we just created: @@ -159,7 +159,7 @@ only one matching document, or are only interested in the first match. Here we use *findOne* to get the first document from the posts collection: - > run $ findOne (query [] "posts") + > run $ findOne (select [] "posts") Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC]) The result is a document matching the one that we inserted previously. @@ -171,12 +171,12 @@ added on insert. resulting document must match. To limit our results to a document with author "Mike" we do: - > run $ findOne (query ["author" =: "Mike"] "posts") + > run $ findOne (select ["author" =: "Mike"] "posts") Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC]) If we try with a different author, like "Eliot", we'll get no result: - > run $ findOne (query ["author" =: "Eliot"] "posts") + > run $ findOne (select ["author" =: "Eliot"] "posts") Right Nothing Bulk Inserts @@ -217,12 +217,12 @@ iterate over all matching documents. There are several ways in which we can iterate: we can call *next* to get documents one at a time or we can get all the results by applying the cursor to *rest*: - > Right cursor <- run $ find (query ["author" =: "Mike"] "posts") + > Right cursor <- run $ find (select ["author" =: "Mike"] "posts") > run $ rest cursor Of course you can use bind (*>>=*) to combine these into one line: - > run $ find (query ["author" =: "Mike"] "posts") >>= rest + > run $ find (select ["author" =: "Mike"] "posts") >>= rest * Note: *next* automatically closes the cursor when the last document has been read out of it. Similarly, *rest* automatically @@ -233,11 +233,11 @@ Counting We can count how many documents are in an entire collection: - > run $ count (query [] "posts") + > run $ count (select [] "posts") Or count how many documents match a query: - > run $ count (query ["author" =: "Mike"] "posts") + > run $ count (select ["author" =: "Mike"] "posts") Range Queries -------------