Slight redesign: pipelining with writeMode instead of exclusive access with getLastError

This commit is contained in:
Tony Hannan 2010-06-21 11:06:20 -04:00
parent 586783b081
commit c9dc87ad33
14 changed files with 798 additions and 681 deletions

27
Control/Monad/Context.hs Normal file
View file

@ -0,0 +1,27 @@
{- | This is just like Control.Monad.Reader.Class except you can access the context of any Reader in the monad stack instead of just the top one as long as the context types are different. If two or more readers in the stack have the same context type you get the context of the top one. -}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances #-}
module Control.Monad.Context where
import Control.Monad.Reader
import Control.Monad.Error
-- | Same as 'MonadReader' but without functional dependency so the same monad can have multiple contexts with different types
class Context x m where
context :: m x
-- ^ Get the context in the Reader in the monad stack that has @x@ context type. Analogous to 'ask'.
push :: (x -> x) -> m a -> m a
-- ^ Push new context in the Reader in the monad stack that has @x@ context type. Analogous to 'local'
instance (Monad m) => Context x (ReaderT x m) where
context = ask
push = local
instance (Monad m, Context x m) => Context x (ReaderT r m) where
context = lift context
push f m = ReaderT (push f . runReaderT m)
instance (Monad m, Context x m, Error e) => Context x (ErrorT e m) where
context = lift context
push f = ErrorT . push f . runErrorT

151
Control/Pipeline.hs Normal file
View file

@ -0,0 +1,151 @@
{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipe (and get promises back); the pipe will pipeline each thread's request right away without waiting. -}
{-# LANGUAGE DoRec, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-}
module Control.Pipeline (
-- * Pipe
Pipe, newPipe, send, call,
-- * Util
Size,
Length(..),
Resource(..),
Flush(..),
Stream(..), getN
) where
import Prelude hiding (length)
import Control.Applicative ((<$>))
import Control.Monad (forever)
import Control.Exception (assert)
import System.IO.Error (try)
import System.IO (Handle, hFlush, hClose, hIsClosed)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.Monoid (Monoid(..))
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Concurrent.MVar
import Control.Concurrent.Chan
-- * Length
type Size = Int
class Length list where
length :: list -> Size
instance Length S.ByteString where
length = S.length
instance Length L.ByteString where
length = fromEnum . L.length
-- * Resource
class Resource m r where
close :: r -> m ()
isClosed :: r -> m Bool
instance Resource IO Handle where
close = hClose
isClosed = hIsClosed
-- * Flush
class Flush handle where
flush :: handle -> IO ()
-- ^ Flush written bytes to destination
instance Flush Handle where
flush = hFlush
-- * Stream
class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where
put :: handle -> bytes -> IO ()
-- ^ Write bytes to handle
get :: handle -> Int -> IO bytes
-- ^ Read up to N bytes from handle, block until at least 1 byte is available
getN :: (Stream h b) => h -> Int -> IO b
-- ^ Read N bytes from hande, blocking until all N bytes are read. Unlike 'get' which only blocks if no bytes are available.
getN h n = assert (n >= 0) $ do
bytes <- get h n
let x = length bytes
if x >= n then return bytes else do
remainingBytes <- getN h (n - x)
return (mappend bytes remainingBytes)
instance Stream Handle S.ByteString where
put = S.hPut
get = S.hGet
instance Stream Handle L.ByteString where
put = L.hPut
get = L.hGet
-- * Pipe
-- | Thread-safe and pipelined socket
data Pipe handle bytes = Pipe {
encodeSize :: Size -> bytes,
decodeSize :: bytes -> Size,
vHandle :: MVar handle, -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan (MVar (Either IOError bytes)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
listenThread :: ThreadId
}
-- | Create new Pipe with given encodeInt, decodeInt, and handle. You should 'close' pipe when finished, which will also close handle. If pipe is not closed but eventually garbage collected, it will be closed along with handle.
newPipe :: (Stream h b, Resource IO h) =>
(Size -> b) -- ^ Convert Size to bytes of fixed length. Every Int must translate to same number of bytes.
-> (b -> Size) -- ^ Convert bytes of fixed length to Size. Must be exact inverse of encodeSize.
-> h -- ^ Underlying socket (handle) this pipe will read/write from
-> IO (Pipe h b)
newPipe encodeSize decodeSize handle = do
vHandle <- newMVar handle
responseQueue <- newChan
rec
let pipe = Pipe{..}
listenThread <- forkIO (listen pipe)
addMVarFinalizer vHandle $ do
killThread listenThread
close handle
return pipe
instance (Resource IO h) => Resource IO (Pipe h b) where
-- | Close pipe and underlying socket (handle)
close Pipe{..} = do
killThread listenThread
close =<< readMVar vHandle
isClosed Pipe{..} = isClosed =<< readMVar vHandle
listen :: (Stream h b) => Pipe h b -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order
listen Pipe{..} = do
let n = length (encodeSize 0)
h <- readMVar vHandle
forever $ do
e <- try $ do
len <- decodeSize <$> getN h n
getN h len
var <- readChan responseQueue
putMVar var e
send :: (Stream h b) => Pipe h b -> [b] -> IO ()
-- ^ Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future 'call's will get these responses instead of their own).
-- Each message is preceeded by its length when written to socket.
send Pipe{..} messages = withMVar vHandle $ \h -> do
mapM_ (write encodeSize h) messages
flush h
call :: (Stream h b) => Pipe h b -> [b] -> IO (IO b)
-- ^ Send messages all together to destination (no messages will be interleaved between them), and return /promise/ of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them).
-- Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length.
call Pipe{..} messages = withMVar vHandle $ \h -> do
mapM_ (write encodeSize h) messages
flush h
var <- newEmptyMVar
writeChan responseQueue var
return (either ioError return =<< readMVar var) -- return promise
write :: (Stream h b, Monoid b, Length b) => (Size -> b) -> h -> b -> IO ()
write encodeSize h bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes)

View file

@ -28,7 +28,7 @@ module Database.MongoDB.Admin (
import Prelude hiding (lookup)
import Control.Applicative ((<$>))
import Database.MongoDB.Internal.Protocol (pwHash, pwKey)
import Database.MongoDB.Connection (Server, showHostPort, Conn)
import Database.MongoDB.Connection (Server, showHostPort)
import Database.MongoDB.Query
import Data.Bson
import Data.UString (pack, unpack, append, intercalate)
@ -38,7 +38,7 @@ import Data.IORef
import qualified Data.Set as S
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent (forkIO, threadDelay)
import Database.MongoDB.Util ((<.>), true1)
import Database.MongoDB.Internal.Util ((<.>), true1)
-- * Admin
@ -51,16 +51,17 @@ coptElem Capped = "capped" =: True
coptElem (MaxByteSize n) = "size" =: n
coptElem (MaxItems n) = "max" =: n
createCollection :: (Conn m) => [CollectionOption] -> Collection -> Db m Document
createCollection :: (DbConn m) => [CollectionOption] -> Collection -> m Document
-- ^ Create collection with given options. You only need to call this to set options, otherwise a collection is created automatically on first use with no options.
createCollection opts col = runCommand $ ["create" =: col] ++ map coptElem opts
renameCollection :: (Conn m) => Collection -> Collection -> Db m Document
renameCollection :: (DbConn m) => Collection -> Collection -> m Document
-- ^ Rename first collection to second collection
renameCollection from to = ReaderT $ \db -> useDb "admin" $
runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
renameCollection from to = do
db <- thisDatabase
useDb "admin" $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
dropCollection :: (Conn m) => Collection -> Db m Bool
dropCollection :: (DbConn m) => Collection -> m Bool
-- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action).
dropCollection coll = do
resetIndexCache
@ -69,7 +70,7 @@ dropCollection coll = do
if at "errmsg" r == ("ns not found" :: UString) then return False else
fail $ "dropCollection failed: " ++ show r
validateCollection :: (Conn m) => Collection -> Db m Document
validateCollection :: (DbConn m) => Collection -> m Document
-- ^ This operation takes a while
validateCollection coll = runCommand ["validate" =: coll]
@ -101,35 +102,32 @@ genName :: Order -> IndexName
genName keys = intercalate "_" (map f keys) where
f (k := v) = k `append` "_" `append` pack (show v)
ensureIndex :: (Conn m) => Index -> Db m ()
ensureIndex :: (DbConn m) => Index -> m ()
-- ^ Create index if we did not already create one. May be called repeatedly with practically no performance hit, because we remember if we already called this for the same index (although this memory gets wiped out every 15 minutes, in case another client drops the index and we want to create it again).
ensureIndex idx = let k = (iColl idx, iName idx) in do
icache <- fetchIndexCache
set <- liftIO (readIORef icache)
unless (S.member k set) . runDbOp $ do
createIndex idx
me <- getLastError
case me of
Nothing -> liftIO $ writeIORef icache (S.insert k set)
Just (c, e) -> fail $ "createIndex failed: (" ++ show c ++ ") " ++ e
unless (S.member k set) $ do
writeMode Safe (createIndex idx)
liftIO $ writeIORef icache (S.insert k set)
createIndex :: (Conn m) => Index -> Db m ()
createIndex :: (DbConn m) => Index -> m ()
-- ^ Create index on the server. This call goes to the server every time.
createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase
dropIndex :: (Conn m) => Collection -> IndexName -> Db m Document
dropIndex :: (DbConn m) => Collection -> IndexName -> m Document
-- ^ Remove the index
dropIndex coll idxName = do
resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: idxName]
getIndexes :: (Conn m) => Collection -> Db m [Document]
getIndexes :: (DbConn m) => Collection -> m [Document]
-- ^ Get all indexes on this collection
getIndexes coll = do
db <- thisDatabase
rest =<< find (query ["ns" =: db <.> coll] "system.indexes")
rest =<< find (select ["ns" =: db <.> coll] "system.indexes")
dropIndexes :: (Conn m) => Collection -> Db m Document
dropIndexes :: (DbConn m) => Collection -> m Document
-- ^ Drop all indexes on this collection
dropIndexes coll = do
resetIndexCache
@ -155,9 +153,11 @@ clearDbIndexCache = do
keys <- map fst <$> T.toList dbIndexCache
mapM_ (T.delete dbIndexCache) keys
fetchIndexCache :: (Conn m) => Db m IndexCache
fetchIndexCache :: (DbConn m) => m IndexCache
-- ^ Get index cache for current database
fetchIndexCache = ReaderT $ \db -> liftIO $ do
fetchIndexCache = do
db <- thisDatabase
liftIO $ do
mc <- T.lookup dbIndexCache db
maybe (newIdxCache db) return mc
where
@ -166,7 +166,7 @@ fetchIndexCache = ReaderT $ \db -> liftIO $ do
T.insert dbIndexCache db idx
return idx
resetIndexCache :: (Conn m) => Db m ()
resetIndexCache :: (DbConn m) => m ()
-- ^ reset index cache for current database
resetIndexCache = do
icache <- fetchIndexCache
@ -174,20 +174,20 @@ resetIndexCache = do
-- ** User
allUsers :: (Conn m) => Db m [Document]
allUsers :: (DbConn m) => m [Document]
-- ^ Fetch all users of this database
allUsers = map (exclude ["_id"]) <$> (rest =<< find
(query [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
addUser :: (Conn m) => Bool -> Username -> Password -> Db m ()
addUser :: (DbConn m) => Bool -> Username -> Password -> m ()
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do
mu <- findOne (query ["user" =: user] "system.users")
mu <- findOne (select ["user" =: user] "system.users")
let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" u
removeUser :: (Conn m) => Username -> Db m ()
removeUser user = delete (Select ["user" =: user] "system.users")
removeUser :: (DbConn m) => Username -> m ()
removeUser user = delete (select ["user" =: user] "system.users")
-- ** Database
@ -225,19 +225,19 @@ serverVersion = at "version" <$> serverBuildInfo
-- ** Collection
collectionStats :: (Conn m) => Collection -> Db m Document
collectionStats :: (DbConn m) => Collection -> m Document
collectionStats coll = runCommand ["collstats" =: coll]
dataSize :: (Conn m) => Collection -> Db m Int
dataSize :: (DbConn m) => Collection -> m Int
dataSize c = at "size" <$> collectionStats c
storageSize :: (Conn m) => Collection -> Db m Int
storageSize :: (DbConn m) => Collection -> m Int
storageSize c = at "storageSize" <$> collectionStats c
totalIndexSize :: (Conn m) => Collection -> Db m Int
totalIndexSize :: (DbConn m) => Collection -> m Int
totalIndexSize c = at "totalIndexSize" <$> collectionStats c
totalSize :: (Conn m) => Collection -> Db m Int
totalSize :: (DbConn m) => Collection -> m Int
totalSize coll = do
x <- storageSize coll
xs <- mapM isize =<< getIndexes coll
@ -249,28 +249,28 @@ totalSize coll = do
data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq)
getProfilingLevel :: (Conn m) => Db m ProfilingLevel
getProfilingLevel :: (DbConn m) => m ProfilingLevel
getProfilingLevel = toEnum . at "was" <$> runCommand ["profile" =: (-1 :: Int)]
type MilliSec = Int
setProfilingLevel :: (Conn m) => ProfilingLevel -> Maybe MilliSec -> Db m ()
setProfilingLevel :: (DbConn m) => ProfilingLevel -> Maybe MilliSec -> m ()
setProfilingLevel p mSlowMs =
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
-- ** Database
dbStats :: (Conn m) => Db m Document
dbStats :: (DbConn m) => m Document
dbStats = runCommand ["dbstats" =: (1 :: Int)]
currentOp :: (Conn m) => Db m (Maybe Document)
currentOp :: (DbConn m) => m (Maybe Document)
-- ^ See currently running operation on the database, if any
currentOp = findOne (query [] "$cmd.sys.inprog")
currentOp = findOne (select [] "$cmd.sys.inprog")
type OpNum = Int
killOp :: (Conn m) => OpNum -> Db m (Maybe Document)
killOp op = findOne (query ["op" =: op] "$cmd.sys.killop")
killOp :: (DbConn m) => OpNum -> m (Maybe Document)
killOp op = findOne (select ["op" =: op] "$cmd.sys.killop")
-- ** Server

View file

@ -4,23 +4,19 @@
module Database.MongoDB.Connection (
-- * Server
I.Server(..), PortID(..), server, showHostPort, readHostPort, readHostPortF,
Server(..), PortID(..), server, showHostPort, readHostPort, readHostPortF,
-- * ReplicaSet
ReplicaSet, replicaSet, replicaServers,
MasterOrSlave(..), FailedToConnect, newConnection,
-- * Connection
I.Connection, I.connServer, I.showHandle,
connect, I.closeConnection, I.isClosed,
-- * Connected monad
I.Conn(..), I.Failure(..),
-- ** Task
I.Task, I.runTask,
-- ** Op
I.Op
Connection, connect,
-- * Resource
Resource(..)
) where
import Database.MongoDB.Internal.Connection as I
import Database.MongoDB.Query (useDb, runCommand1)
import Database.MongoDB.Internal.Protocol (Connection, mkConnection)
import Database.MongoDB.Query (Failure(..), Conn, runConn, useDb, runCommand1)
import Control.Pipeline (Resource(..))
import Control.Applicative ((<$>))
import Control.Arrow ((+++), left)
import Control.Exception (assert)
@ -31,10 +27,12 @@ import Network (HostName, PortID(..), connectTo)
import Data.Bson (Document, look, typed)
import Text.ParserCombinators.Parsec as P (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity
import Database.MongoDB.Util (true1) -- PortID instances
import Database.MongoDB.Internal.Util (true1) -- PortID instances
-- * Server
data Server = Server HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID
defaultPort = PortNumber 27017
@ -101,17 +99,15 @@ sortedReplicas :: ReplicaInfo -> IO [Server]
-- ^ All replicas in set sorted by distance from this client. TODO
sortedReplicas = return . allReplicas
getReplicaInfo' :: Connection -> IO (Either IOError ReplicaInfo)
getReplicaInfo :: (Server, Connection) -> IO (Either IOError ReplicaInfo)
-- ^ Get replica info of the connected server. Return Left IOError if connection fails
getReplicaInfo' conn = left err <$> runTask getReplicaInfo conn where
getReplicaInfo (serv, conn) = left err <$> runConn (ReplicaInfo serv <$> getReplicaInfoDoc) conn where
err (ConnectionFailure e) = e
err (ServerFailure s) = userError s
err (ServerFailure e) = userError e
getReplicaInfo :: (Conn m) => m ReplicaInfo
-- ^ Get replica info of connect server
getReplicaInfo = do
c <- getConnection
ReplicaInfo (connServer c) <$> useDb "admin" (runCommand1 "ismaster")
getReplicaInfoDoc :: (Conn m) => m Document
-- ^ Get replica info of connected server
getReplicaInfoDoc = useDb "admin" (runCommand1 "ismaster")
-- * MasterOrSlave
@ -154,19 +150,19 @@ connectFirst mos = connectFirst' ([], []) where
connectFirst' (fs, is) (s : ss) = do
e <- runErrorT $ do
c <- ErrorT (connect s)
i <- ErrorT (getReplicaInfo' c)
i <- ErrorT (getReplicaInfo (s, c))
return (c, i)
case e of
Left f -> connectFirst' ((s, f) : fs, is) ss
Right (c, i) -> if isMS mos i
then return $ Right (c, i)
else do
closeConnection c
close c
connectFirst' ((s, userError $ "not a " ++ show mos) : fs, i : is) ss
connect :: Server -> IO (Either IOError Connection)
-- ^ Create a connection to the given server (as opposed to connecting to some server in a replica set via 'newConnection'). Return Left IOError if failed to connect.
connect s@(Server host port) = E.try (mkConnection s =<< connectTo host port)
connect (Server host port) = E.try (mkConnection =<< connectTo host port)
{- Authors: Tony Hannan <tony@10gen.com>

View file

@ -1,148 +0,0 @@
{-| Low-level connection to a server.
This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Connection" instead. -}
{-# LANGUAGE GeneralizedNewtypeDeriving, TupleSections, TypeSynonymInstances, OverlappingInstances #-}
module Database.MongoDB.Internal.Connection (
-- * Server
Server(..),
-- * Connection
Connection, connServer, showHandle, mkConnection, withConn, closeConnection, isClosed,
-- * Connected monad
Conn(..), Failure(..),
-- ** Task
Task, runTask,
-- ** Op
Op, sendBytes, flushBytes, receiveBytes,
exposeIO, hideIO
) where
import Control.Applicative (Applicative(..), (<$>))
import Control.Arrow (left)
import System.IO.Error (try)
import Control.Concurrent.MVar
import Control.Monad.Reader
import Control.Monad.Error
import Network (HostName, PortID(..))
import System.IO (Handle, hFlush, hClose, hIsClosed)
import Data.ByteString.Lazy as B (ByteString, hPut)
import System.Timeout
import Database.MongoDB.Util (Secs, ignore, hGetN) -- Reader/Error Applicative instances
-- * Server
data Server = Server HostName PortID deriving (Show, Eq, Ord)
-- * Connection
data Connection = Connection Server (MVar Handle) deriving (Eq)
-- ^ A connection to a server. This connection holds session state on the server like open cursors and temporary map-reduce tables that disappear when the connection is closed or fails.
connServer :: Connection -> Server
-- ^ Server this connection is connected to
connServer (Connection serv _) = serv
showHandle :: Secs -> Connection -> IO String
-- ^ Show handle if not locked for more than given seconds
showHandle secs (Connection _ vHand) =
maybe "handle currently locked" show <$> timeout (round (secs * 1000000)) (readMVar vHand)
instance Show Connection where
showsPrec d c = showParen (d > 10) $ showString "a connection to " . showsPrec 11 (connServer c)
mkConnection :: Server -> Handle -> IO Connection
-- ^ Wrap handle in a MVar to control access
mkConnection s h = Connection s <$> newMVar h
withConn :: Connection -> (Handle -> IO a) -> IO a
-- Execute IO action with exclusive access to TCP connection
withConn (Connection _ vHand) = withMVar vHand
closeConnection :: Connection -> IO ()
-- ^ Close connection. Attempting to read or write to a closed connection will raise 'Failure' exception.
closeConnection (Connection _ vHand) = withMVar vHand $ \h -> catch (hClose h) ignore
isClosed :: Connection -> IO Bool
-- ^ Is connection closed?
isClosed (Connection _ vHand) = withMVar vHand hIsClosed
-- * Task
-- | Connection or Server failure like network problem or disk full
data Failure =
ConnectionFailure IOError
-- ^ Error during sending or receiving bytes over a 'Connection'. The connection is not automatically closed when this error happens; the user must close it. Any other IOErrors raised during a Task or Op are not caught. The user is responsible for these other types of errors not related to sending/receiving bytes over the connection.
| ServerFailure String
-- ^ Failure on server, like disk full, which is usually observed using getLastError. Calling 'fail' inside a Task or Op raises this failure. Do not call 'fail' unless it is a temporary server failure, like disk full. For example, receiving unexpected data from the server is not a server failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
deriving (Show, Eq)
instance Error Failure where strMsg = ServerFailure
type Task m = ErrorT Failure (ReaderT Connection m)
-- ^ Action with shared access to connection (the connection can be supplied to multiple concurrent tasks). m must be a MonadIO.
runTask :: Task m a -> Connection -> m (Either Failure a)
-- ^ Run task with shared access to connection. Return Left if connection fails anytime during its execution, in which case the task was partially executed.
runTask = runReaderT . runErrorT
-- * Op
newtype Op a = Op (ErrorT Failure (ReaderT (Connection, Handle) IO) a)
deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure)
-- ^ Action with exclusive access to connection (other ops must wait)
runOp' :: (MonadIO m) => Op a -> Task m a
-- ^ Run operation with exclusive access to connection. Fail if connection fails anytime during its execution, in which case the operation was partially executed.
runOp' (Op act) = ErrorT . ReaderT $ \conn ->
liftIO . withConn conn $ runReaderT (runErrorT act) . (conn,)
sendBytes :: ByteString -> Op ()
-- ^ Put bytes on socket
sendBytes bytes = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hPut h bytes)
flushBytes :: Op ()
-- ^ Flush socket
flushBytes = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hFlush h)
receiveBytes :: Int -> Op ByteString
-- ^ Get N bytes from socket, blocking until all N bytes are received
receiveBytes n = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hGetN h n)
exposeIO :: ((Connection, Handle) -> IO (Either Failure a)) -> Op a
-- ^ Expose connection to underlying IO
exposeIO = Op . ErrorT . ReaderT
hideIO :: Op a -> (Connection, Handle) -> IO (Either Failure a)
-- ^ Run op from IO
hideIO (Op act) = runReaderT (runErrorT act)
-- * Connected monad
-- | A monad with shared or exclusive access to a connection, ie. 'Task' or 'Op'
class (Functor m, Applicative m, MonadIO m) => Conn m where
runOp :: Op a -> m a
-- ^ Run op with exclusive access to connection. If @m@ is already exclusive then simply run op.
getConnection :: m Connection
-- ^ Return connection that this monad has access to
instance (MonadIO m) => Conn (Task m) where
runOp = runOp'
getConnection = ask
instance Conn Op where
runOp = id
getConnection = Op (asks fst)
instance (Conn m) => Conn (ReaderT r m) where
runOp = lift . runOp
getConnection = lift getConnection
instance (Conn m, Error e) => Conn (ErrorT e m) where
runOp = lift . runOp
getConnection = lift getConnection
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}

View file

@ -5,39 +5,260 @@ This module is not intended for direct use. Use the high-level interface at "Dat
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
module Database.MongoDB.Internal.Protocol (
-- * FullCollection
-- * Connection
Connection, mkConnection,
send, call,
-- * Message
FullCollection,
-- * Write
Insert(..), insert,
Update(..), UpdateOption(..), update,
Delete(..), DeleteOption(..), delete,
-- * Read
Query(..), QueryOption(..), query,
GetMore(..), getMore,
-- ** Notice
Notice(..), UpdateOption(..), DeleteOption(..), CursorId,
-- ** Request
Request(..), QueryOption(..),
-- ** Reply
Reply(..),
-- ** Cursor
CursorId, killCursors,
-- * Authentication
Username, Password, Nonce, pwHash, pwKey
) where
import Prelude as P
import Database.MongoDB.Internal.Connection (Op, sendBytes, flushBytes, receiveBytes)
import Data.Bson
import Prelude as X
import Control.Applicative ((<$>))
import Control.Monad (unless, replicateM)
import System.IO (Handle)
import Data.ByteString.Lazy (ByteString)
import qualified Control.Pipeline as P
import Data.Bson (Document, UString)
import Data.Bson.Binary
import Data.UString as U (pack, append, toByteString)
import Data.ByteString.Lazy as B (length, append)
import Data.Binary.Put
import Data.Binary.Get
import Data.Int
import Data.Bits
import Control.Monad.Reader
import Control.Applicative ((<$>))
import Database.MongoDB.Internal.Util (bitOr)
import Data.IORef
import System.IO.Unsafe (unsafePerformIO)
import Data.Digest.OpenSSL.MD5 (md5sum)
import Database.MongoDB.Util (bitOr, (<.>))
import Data.UString as U (pack, append, toByteString)
-- * Connection
type Connection = P.Pipe Handle ByteString
-- ^ Thread-safe TCP connection to server with pipelined requests
mkConnection :: Handle -> IO Connection
-- ^ New thread-safe pipelined connection over handle
mkConnection = P.newPipe encodeSize decodeSize where
encodeSize = runPut . putInt32 . toEnum . (+ 4)
decodeSize = subtract 4 . fromEnum . runGet getInt32
send :: Connection -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Raise IOError if connection fails.
send conn notices = P.send conn =<< mapM noticeBytes notices
call :: Connection -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will raise IOError if connection fails.
call conn notices request = do
nMessages <- mapM noticeBytes notices
requestId <- genRequestId
let rMessage = runPut (putRequest request requestId)
promise <- P.call conn (nMessages ++ [rMessage])
return (bytesReply requestId <$> promise)
noticeBytes :: Notice -> IO ByteString
noticeBytes notice = runPut . putNotice notice <$> genRequestId
bytesReply :: RequestId -> ByteString -> Reply
bytesReply requestId bytes = if requestId == responseTo then reply else err where
(responseTo, reply) = runGet getReply bytes
err = error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- * Messages
type FullCollection = UString
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"
-- ** Header
type Opcode = Int32
type RequestId = Int32
-- ^ A fresh request id is generated for every message
type ResponseTo = RequestId
genRequestId :: IO RequestId
-- ^ Generate fresh request id
genRequestId = atomicModifyIORef counter $ \n -> (n + 1, n) where
counter :: IORef RequestId
counter = unsafePerformIO (newIORef 0)
{-# NOINLINE counter #-}
-- *** Binary format
putHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putHeader opcode requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader = do
_requestId <- getInt32
responseTo <- getInt32
opcode <- getInt32
return (opcode, responseTo)
-- ** Notice
-- | A notice is a message that is sent with no reply
data Notice =
Insert {
iFullCollection :: FullCollection,
iDocuments :: [Document]}
| Update {
uFullCollection :: FullCollection,
uOptions :: [UpdateOption],
uSelector :: Document,
uUpdater :: Document}
| Delete {
dFullCollection :: FullCollection,
dOptions :: [DeleteOption],
dSelector :: Document}
| KillCursors {
kCursorIds :: [CursorId]}
deriving (Show, Eq)
data UpdateOption =
Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
| MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
deriving (Show, Eq)
data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
deriving (Show, Eq)
type CursorId = Int64
-- *** Binary format
nOpcode :: Notice -> Opcode
nOpcode Update{} = 2001
nOpcode Insert{} = 2002
nOpcode Delete{} = 2006
nOpcode KillCursors{} = 2007
putNotice :: Notice -> RequestId -> Put
putNotice notice requestId = do
putHeader (nOpcode notice) requestId
putInt32 0
case notice of
Insert{..} -> do
putCString iFullCollection
mapM_ putDocument iDocuments
Update{..} -> do
putCString uFullCollection
putInt32 (uBits uOptions)
putDocument uSelector
putDocument uUpdater
Delete{..} -> do
putCString dFullCollection
putInt32 (dBits dOptions)
putDocument dSelector
KillCursors{..} -> do
putInt32 $ toEnum (X.length kCursorIds)
mapM_ putInt64 kCursorIds
uBit :: UpdateOption -> Int32
uBit Upsert = bit 0
uBit MultiUpdate = bit 1
uBits :: [UpdateOption] -> Int32
uBits = bitOr . map uBit
dBit :: DeleteOption -> Int32
dBit SingleRemove = bit 0
dBits :: [DeleteOption] -> Int32
dBits = bitOr . map dBit
-- ** Request
-- | A request is a message that is sent with a 'Reply' returned
data Request =
Query {
qOptions :: [QueryOption],
qFullCollection :: FullCollection,
qSkip :: Int32, -- ^ Number of initial matching documents to skip
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document, -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ \[\] = return whole document
} | GetMore {
gFullCollection :: FullCollection,
gBatchSize :: Int32,
gCursorId :: CursorId}
deriving (Show, Eq)
data QueryOption =
TailableCursor |
SlaveOK |
NoCursorTimeout -- Never timeout the cursor. When not set, the cursor will die if idle for more than 10 minutes.
deriving (Show, Eq)
-- *** Binary format
qOpcode :: Request -> Opcode
qOpcode Query{} = 2004
qOpcode GetMore{} = 2005
putRequest :: Request -> RequestId -> Put
putRequest request requestId = do
putHeader (qOpcode request) requestId
case request of
Query{..} -> do
putInt32 (qBits qOptions)
putCString qFullCollection
putInt32 qSkip
putInt32 qBatchSize
putDocument qSelector
unless (null qProjector) (putDocument qProjector)
GetMore{..} -> do
putInt32 0
putCString gFullCollection
putInt32 gBatchSize
putInt64 gCursorId
qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1
qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4
qBits :: [QueryOption] -> Int32
qBits = bitOr . map qBit
-- ** Reply
-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure
rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32,
rDocuments :: [Document]
} deriving (Show, Eq)
-- * Binary format
replyOpcode :: Opcode
replyOpcode = 1
getReply :: Get (ResponseTo, Reply)
getReply = do
(opcode, responseTo) <- getHeader
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlag <- getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
-- * Authentication
@ -50,247 +271,3 @@ pwHash u p = pack . md5sum . toByteString $ u `U.append` ":mongo:" `U.append` p
pwKey :: Nonce -> Username -> Password -> UString
pwKey n u p = pack . md5sum . toByteString . U.append n . U.append u $ pwHash u p
-- * FullCollection
type FullCollection = UString
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"
-- * Request / response
insert :: Insert -> Op ()
-- ^ Insert documents into collection
insert = send_ . putInsert
update :: Update -> Op ()
-- ^ Update documents in collection matching selector using updater
update = send_ . putUpdate
delete :: Delete -> Op ()
-- ^ Delete documents in collection matching selector
delete = send_ . putDelete
killCursors :: [CursorId] -> Op ()
-- ^ Close cursors on server because we will not be getting anymore documents from them
killCursors = send_ . putKillCursors . KillCursors
query :: Query -> Op Reply
-- ^ Return first batch of documents in collection matching selector and a cursor-id for getting remaining documents (see 'getMore')
query q = do
requestId <- send (putQuery q)
(reply, responseTo) <- receive getReply
unless (responseTo == requestId) $ fail "expected response id to match query request id"
return reply
getMore :: GetMore -> Op Reply
-- ^ Get next batch of documents from cursor
getMore g = do
requestId <- send (putGetMore g)
(reply, responseTo) <- receive getReply
unless (responseTo == requestId) $ fail "expected response id to match get-more request id"
return reply
-- ** Send / receive
type RequestId = Int32
-- ^ A fresh request id is generated for every message
genRequestId :: IO RequestId
-- ^ Generate fresh request id
genRequestId = atomicModifyIORef counter $ \n -> (n + 1, n) where
counter :: IORef RequestId
counter = unsafePerformIO (newIORef 0)
{-# NOINLINE counter #-}
type ResponseTo = RequestId
send_ :: (RequestId -> Put) -> Op ()
send_ x = send x >> return ()
send :: (RequestId -> Put) -> Op RequestId
send rput = do
requestId <- liftIO genRequestId
let bytes = runPut (rput requestId)
let lengthBytes = runPut . putInt32 $ (toEnum . fromEnum) (B.length bytes + 4)
sendBytes (B.append lengthBytes bytes)
flushBytes
return requestId
receive :: Get a -> Op a
receive getMess = do
messageLength <- fromIntegral . runGet getInt32 <$> receiveBytes 4
runGet getMess <$> receiveBytes (messageLength - 4)
-- * Messages
data Insert = Insert {
iFullCollection :: FullCollection,
iDocuments :: [Document]
} deriving (Show, Eq)
data Update = Update {
uFullCollection :: FullCollection,
uOptions :: [UpdateOption],
uSelector :: Document,
uUpdater :: Document
} deriving (Show, Eq)
data UpdateOption =
Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
| MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
deriving (Show, Eq)
data Delete = Delete {
dFullCollection :: FullCollection,
dOptions :: [DeleteOption],
dSelector :: Document
} deriving (Show, Eq)
data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
deriving (Show, Eq)
data Query = Query {
qOptions :: [QueryOption],
qFullCollection :: FullCollection,
qSkip :: Int32, -- ^ Number of initial matching documents to skip
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document, -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ \[\] = return whole document
} deriving (Show, Eq)
data QueryOption =
TailableCursor |
SlaveOK |
NoCursorTimeout -- Never timeout the cursor. When not set, the cursor will die if idle for more than 10 minutes.
deriving (Show, Eq)
data GetMore = GetMore {
gFullCollection :: FullCollection,
gBatchSize :: Int32,
gCursorId :: CursorId
} deriving (Show, Eq)
newtype KillCursors = KillCursors {
kCursorIds :: [CursorId]
} deriving (Show, Eq)
data Reply = Reply {
rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure
rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32,
rDocuments :: [Document]
} deriving (Show, Eq)
type CursorId = Int64
-- ** Messages binary format
type Opcode = Int32
-- ^ Code for each message type
replyOpcode, updateOpcode, insertOpcode, queryOpcode, getMoreOpcode, deleteOpcode, killCursorsOpcode :: Opcode
replyOpcode = 1
updateOpcode = 2001
insertOpcode = 2002
queryOpcode = 2004
getMoreOpcode = 2005
deleteOpcode = 2006
killCursorsOpcode = 2007
putUpdate :: Update -> RequestId -> Put
putUpdate Update{..} = putMessage updateOpcode $ do
putInt32 0
putCString uFullCollection
putInt32 (uBits uOptions)
putDocument uSelector
putDocument uUpdater
uBit :: UpdateOption -> Int32
uBit Upsert = bit 0
uBit MultiUpdate = bit 1
uBits :: [UpdateOption] -> Int32
uBits = bitOr . map uBit
putInsert :: Insert -> RequestId -> Put
putInsert Insert{..} = putMessage insertOpcode $ do
putInt32 0
putCString iFullCollection
mapM_ putDocument iDocuments
putDelete :: Delete -> RequestId -> Put
putDelete Delete{..} = putMessage deleteOpcode $ do
putInt32 0
putCString dFullCollection
putInt32 (dBits dOptions)
putDocument dSelector
dBit :: DeleteOption -> Int32
dBit SingleRemove = bit 0
dBits :: [DeleteOption] -> Int32
dBits = bitOr . map dBit
putQuery :: Query -> RequestId -> Put
putQuery Query{..} = putMessage queryOpcode $ do
putInt32 (qBits qOptions)
putCString qFullCollection
putInt32 qSkip
putInt32 qBatchSize
putDocument qSelector
unless (null qProjector) (putDocument qProjector)
qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1
qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4
qBits :: [QueryOption] -> Int32
qBits = bitOr . map qBit
putGetMore :: GetMore -> RequestId -> Put
putGetMore GetMore{..} = putMessage getMoreOpcode $ do
putInt32 0
putCString gFullCollection
putInt32 gBatchSize
putInt64 gCursorId
putKillCursors :: KillCursors -> RequestId -> Put
putKillCursors KillCursors{..} = putMessage killCursorsOpcode $ do
putInt32 0
putInt32 $ toEnum (P.length kCursorIds)
mapM_ putInt64 kCursorIds
getReply :: Get (Reply, ResponseTo)
getReply = getMessage replyOpcode $ do
rResponseFlag <- getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- getInt32
rDocuments <- replicateM (fromIntegral numDocs) getDocument
return $ Reply {..}
-- *** Message header
putMessage :: Opcode -> Put -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putMessage opcode messageBodyPut requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
messageBodyPut
getMessage :: Opcode -> Get a -> Get (a, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read
getMessage expectedOpcode getMessageBody = do
_requestId <- getInt32
responseTo <- getInt32
opcode <- getInt32
unless (opcode == expectedOpcode) $
fail $ "expected opcode " ++ show expectedOpcode ++ " but got " ++ show opcode
body <- getMessageBody
return (body, responseTo)
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}

View file

@ -2,19 +2,16 @@
{-# LANGUAGE StandaloneDeriving #-}
module Database.MongoDB.Util where
module Database.MongoDB.Internal.Util where
import Prelude hiding (length)
import Network (PortID(..))
import Control.Applicative (Applicative(..), (<$>))
import Control.Exception (assert)
import Control.Monad.Reader
import Control.Monad.Error
import Data.UString as U (UString, cons, append)
import Data.UString as U (cons, append)
import Data.Bits (Bits, (.|.))
import Data.Bson
import System.IO (Handle)
import Data.ByteString.Lazy as B (ByteString, length, append, hGet)
deriving instance Show PortID
deriving instance Eq PortID
@ -31,6 +28,10 @@ instance (Monad m, Error e) => Applicative (ErrorT e m) where
ignore :: (Monad m) => a -> m ()
ignore _ = return ()
snoc :: [a] -> a -> [a]
-- ^ add element to end of list (/snoc/ is reverse of /cons/, which adds to front of list)
snoc list a = list ++ [a]
type Secs = Float
bitOr :: (Bits a) => [a] -> a
@ -53,13 +54,3 @@ true1 k doc = case valueAt k doc of
Int32 n -> n == 1
Int64 n -> n == 1
_ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc
hGetN :: Handle -> Int -> IO ByteString
-- ^ Read N bytes from hande, blocking until all N bytes are read.
-- Unlike hGet which only blocks if no bytes are available, otherwise it returns the X bytes immediately available where X <= N.
hGetN h n = assert (n >= 0) $ do
bytes <- hGet h n
let x = fromIntegral (length bytes)
if x >= n then return bytes else do
remainingBytes <- hGetN h (n - x)
return (B.append bytes remainingBytes)

View file

@ -1,17 +1,22 @@
-- | Query and update documents residing on a MongoDB server(s)
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections #-}
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses #-}
module Database.MongoDB.Query (
-- * Connection
Failure(..), Conn, Connected, runConn,
-- * Database
Database, allDatabases, Db, useDb, thisDatabase, runDbOp,
Database, allDatabases, DbConn, useDb, thisDatabase,
-- ** Authentication
P.Username, P.Password, auth,
-- * Collection
Collection, allCollections,
-- ** Selection
Selection(..), select, Selector, whereJS,
Selection(..), Selector, whereJS,
Select(select),
-- * Write
-- ** WriteMode
WriteMode(..), writeMode,
-- ** Insert
insert, insert_, insertMany, insertMany_,
-- ** Update
@ -20,10 +25,10 @@ module Database.MongoDB.Query (
delete, deleteOne,
-- * Read
-- ** Query
Query(..), P.QueryOption(..), Projector, Limit, Order, BatchSize, query,
Query(..), P.QueryOption(..), Projector, Limit, Order, BatchSize,
explain, find, findOne, count, distinct,
-- *** Cursor
Cursor, next, nextN, rest, closeCursor,
Cursor, next, nextN, rest,
-- ** Group
Group(..), GroupKey(..), group,
-- ** MapReduce
@ -31,49 +36,88 @@ module Database.MongoDB.Query (
-- * Command
Command, runCommand, runCommand1,
eval,
ErrorCode, getLastError, resetLastError
) where
import Prelude as X hiding (lookup)
import Control.Applicative ((<$>))
import Database.MongoDB.Internal.Connection
import Control.Applicative ((<$>), Applicative(..))
import Control.Arrow (left, first, second)
import Control.Monad.Context
import Control.Monad.Reader
import Control.Monad.Error
import System.IO.Error (try)
import Control.Concurrent.MVar
import Control.Pipeline (Resource(..))
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Protocol hiding (insert, update, delete, query, Query(Query))
import Database.MongoDB.Internal.Protocol hiding (Query, send, call)
import Data.Bson
import Data.Word
import Data.Int
import Control.Monad.Reader
import Control.Concurrent.MVar
import Data.Maybe (listToMaybe, catMaybes)
import Data.UString as U (dropWhile, any, tail)
import Database.MongoDB.Util (loop, (<.>), true1)
import Database.MongoDB.Internal.Util (loop, (<.>), true1) -- plus Applicative instances of ErrorT & ReaderT
-- * Connection
-- | A monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure
class (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m
instance (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m
-- | Connection or Server failure like network problem or disk full
data Failure =
ConnectionFailure IOError
-- ^ Error during sending or receiving bytes over a 'Connection'. The connection is not automatically closed when this error happens; the user must close it. Any other IOErrors raised during a Task or Op are not caught. The user is responsible for these other types of errors not related to sending/receiving bytes over the connection.
| ServerFailure String
-- ^ Failure on server, like disk full, which is usually observed using getLastError. Calling 'fail' inside a connected monad raises this failure. Do not call 'fail' unless it is a temporary server failure, like disk full. For example, receiving unexpected data from the server is not a server failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
deriving (Show, Eq)
instance Error Failure where strMsg = ServerFailure
type Connected m = ErrorT Failure (ReaderT WriteMode (ReaderT Connection m))
runConn :: Connected m a -> Connection -> m (Either Failure a)
-- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution.
runConn action = runReaderT (runReaderT (runErrorT action) Unsafe)
send :: (Conn m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Raise Failure if connection fails.
send ns = do
conn <- context
e <- liftIO $ try (P.send conn ns)
either (throwError . ConnectionFailure) return e
call :: (Conn m) => [Notice] -> Request -> m (IO (Either Failure Reply))
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will raise Failure if connection fails send, and promise will return Failure if connection fails receive.
call ns r = do
conn <- context
e <- liftIO $ try (P.call conn ns r)
case e of
Left err -> throwError (ConnectionFailure err)
Right promise -> return (left ConnectionFailure <$> try promise)
-- * Database
type Database = UString
-- ^ Database name
-- | A 'Conn' monad with access to a 'Database'
class (Context Database m, Conn m) => DbConn m
instance (Context Database m, Conn m) => DbConn m
allDatabases :: (Conn m) => m [Database]
-- ^ List all databases residing on server
allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases")
type Db m = ReaderT Database m
useDb :: Database -> Db m a -> m a
useDb :: Database -> ReaderT Database m a -> m a
-- ^ Run Db action against given database
useDb = flip runReaderT
thisDatabase :: (Monad m) => Db m Database
thisDatabase :: (DbConn m) => m Database
-- ^ Current database in use
thisDatabase = ask
runDbOp :: (Conn m) => Db Op a -> Db m a
-- ^ Run db operation with exclusive access to the connection
runDbOp dbOp = ReaderT (runOp . flip useDb dbOp)
thisDatabase = context
-- * Authentication
auth :: (Conn m) => Username -> Password -> Db m Bool
auth :: (DbConn m) => Username -> Password -> m Bool
-- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new connection.
auth u p = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
@ -84,7 +128,7 @@ auth u p = do
type Collection = UString
-- ^ Collection name (not prefixed with database)
allCollections :: (Conn m) => Db m [Collection]
allCollections :: (DbConn m) => m [Collection]
-- ^ List all collections in this database
allCollections = do
db <- thisDatabase
@ -99,9 +143,9 @@ allCollections = do
data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq)
-- ^ Selects documents in collection that match selector
select :: Selector -> Collection -> Selection
{-select :: Selector -> Collection -> Selection
-- ^ Synonym for 'Select'
select = Select
select = Select-}
type Selector = Document
-- ^ Filter for a query, analogous to the where clause in SQL. @[]@ matches all documents in collection. @[x =: a, y =: b]@ is analogous to @where x = a and y = b@ in SQL. See <http://www.mongodb.org/display/DOCS/Querying> for full selector syntax.
@ -110,26 +154,72 @@ whereJS :: Selector -> Javascript -> Selector
-- ^ Add Javascript predicate to selector, in which case a document must match both selector and predicate
whereJS sel js = ("$where" =: js) : sel
class Select aQueryOrSelection where
select :: Selector -> Collection -> aQueryOrSelection
-- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @find (select sel col)@ it is a Query, and in @delete (select sel col)@ it is a Selection.
instance Select Selection where
select = Select
instance Select Query where
select = query
-- * Write
-- ** WriteMode
-- | Default write-mode is 'Unsafe'
data WriteMode =
Unsafe -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not.
| Safe -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed.
deriving (Show, Eq)
writeMode :: (Conn m) => WriteMode -> m a -> m a
-- ^ Run action with given 'WriteMode'
writeMode = push . const
write :: (DbConn m) => Notice -> m ()
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'ServerFailure' if it reports an error.
write notice = do
mode <- context
case mode of
Unsafe -> send [notice]
Safe -> do
me <- getLastError [notice]
maybe (return ()) (throwError . ServerFailure . show) me
type ErrorCode = Int
-- ^ Error code from getLastError
getLastError :: (DbConn m) => [Notice] -> m (Maybe (ErrorCode, String))
-- ^ Send notices (writes) then fetch what the last error was, Nothing means no error
getLastError writes = do
r <- runCommand' writes ["getlasterror" =: (1 :: Int)]
return $ (at "code" r,) <$> lookup "err" r
{-resetLastError :: (DbConn m) => m ()
-- ^ Clear last error
resetLastError = runCommand1 "reseterror" >> return ()-}
-- ** Insert
insert :: (Conn m) => Collection -> Document -> Db m Value
insert :: (DbConn m) => Collection -> Document -> m Value
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
insert col doc = head <$> insertMany col [doc]
insert_ :: (Conn m) => Collection -> Document -> Db m ()
insert_ :: (DbConn m) => Collection -> Document -> m ()
-- ^ Same as 'insert' except don't return _id
insert_ col doc = insert col doc >> return ()
insertMany :: (Conn m) => Collection -> [Document] -> Db m [Value]
insertMany :: (DbConn m) => Collection -> [Document] -> m [Value]
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
insertMany col docs = ReaderT $ \db -> do
insertMany col docs = do
db <- thisDatabase
docs' <- liftIO $ mapM assignId docs
runOp $ P.insert (Insert (db <.> col) docs')
write (Insert (db <.> col) docs')
mapM (look "_id") docs'
insertMany_ :: (Conn m) => Collection -> [Document] -> Db m ()
insertMany_ :: (DbConn m) => Collection -> [Document] -> m ()
-- ^ Same as 'insertMany' except don't return _ids
insertMany_ col docs = insertMany col docs >> return ()
@ -141,55 +231,64 @@ assignId doc = if X.any (("_id" ==) . label) doc
-- ** Update
save :: (Conn m) => Collection -> Document -> Db m ()
save :: (DbConn m) => Collection -> Document -> m ()
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or update it if its not new (has \"_id\" field)
save col doc = case look "_id" doc of
Nothing -> insert_ col doc
Just i -> repsert (Select ["_id" := i] col) doc
replace :: (Conn m) => Selection -> Document -> Db m ()
replace :: (DbConn m) => Selection -> Document -> m ()
-- ^ Replace first document in selection with given document
replace = update []
repsert :: (Conn m) => Selection -> Document -> Db m ()
repsert :: (DbConn m) => Selection -> Document -> m ()
-- ^ Replace first document in selection with given document, or insert document if selection is empty
repsert = update [Upsert]
type Modifier = Document
-- ^ Update operations on fields in a document. See <http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations>
modify :: (Conn m) => Selection -> Modifier -> Db m ()
modify :: (DbConn m) => Selection -> Modifier -> m ()
-- ^ Update all documents in selection using given modifier
modify = update [MultiUpdate]
update :: (Conn m) => [UpdateOption] -> Selection -> Document -> Db m ()
update :: (DbConn m) => [UpdateOption] -> Selection -> Document -> m ()
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
update opts (Select sel col) up = ReaderT $ \db -> runOp $ P.update (Update (db <.> col) opts sel up)
update opts (Select sel col) up = do
db <- thisDatabase
write (Update (db <.> col) opts sel up)
-- ** Delete
delete :: (Conn m) => Selection -> Db m ()
delete :: (DbConn m) => Selection -> m ()
-- ^ Delete all documents in selection
delete (Select sel col) = ReaderT $ \db -> runOp $ P.delete (Delete (db <.> col) [] sel)
delete = delete' []
deleteOne :: (Conn m) => Selection -> Db m ()
deleteOne :: (DbConn m) => Selection -> m ()
-- ^ Delete first document in selection
deleteOne (Select sel col) = ReaderT $ \db -> runOp $ P.delete (Delete (db <.> col) [SingleRemove] sel)
deleteOne = delete' [SingleRemove]
delete' :: (DbConn m) => [DeleteOption] -> Selection -> m ()
-- ^ Delete all documents in selection unless 'SingleRemove' option is given then only delete first document in selection
delete' opts (Select sel col) = do
db <- thisDatabase
write (Delete (db <.> col) opts sel)
-- * Read
-- ** Query
-- | Use 'select' to create a basic query with defaults, then modify if desired. For example, @(select sel col) {limit = 10}@
data Query = Query {
options :: [QueryOption],
options :: [QueryOption], -- ^ Default = []
selection :: Selection,
project :: Projector, -- ^ \[\] = all fields
skip :: Word32, -- ^ Number of initial matching documents to skip
limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit
sort :: Order, -- ^ Sort results by this order, [] = no sort
snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted.
batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default.
hint :: Order -- ^ Force MongoDB to use this index, [] = no hint
project :: Projector, -- ^ \[\] = all fields. Default = []
skip :: Word32, -- ^ Number of initial matching documents to skip. Default = 0
limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit. Default = 0
sort :: Order, -- ^ Sort results by this order, [] = no sort. Default = []
snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = False
batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0
hint :: Order -- ^ Force MongoDB to use this index, [] = no hint. Default = []
} deriving (Show, Eq)
type Projector = Document
@ -216,12 +315,9 @@ batchSizeRemainingLimit batchSize limit = if limit == 0
then (fromIntegral batchSize, limit - batchSize)
else (- fromIntegral limit, 1)
protoQuery :: Database -> Query -> (P.Query, Limit)
protoQuery = protoQuery' False
protoQuery' :: Bool -> Database -> Query -> (P.Query, Limit)
queryRequest :: Bool -> Query -> Database -> (Request, Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
protoQuery' isExplain db Query{..} = (P.Query{..}, remainingLimit) where
queryRequest isExplain Query{..} db = (P.Query{..}, remainingLimit) where
qOptions = options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
@ -234,80 +330,111 @@ protoQuery' isExplain db Query{..} = (P.Query{..}, remainingLimit) where
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
find :: (Conn m) => Query -> Db m Cursor
-- ^ Fetch documents satisfying query
find q@Query{selection, batchSize} = ReaderT $ \db -> do
let (q', remainingLimit) = protoQuery db q
cs <- fromReply remainingLimit =<< runOp (P.query q')
newCursor db (coll selection) batchSize cs
runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState'
-- ^ Send query request and return cursor state
runQuery isExplain ns q = call' ns . queryRequest isExplain q =<< thisDatabase
findOne :: (Conn m) => Query -> Db m (Maybe Document)
-- ^ Fetch first document satisfying query or Nothing if none satisfy it
findOne q = ReaderT $ \db -> do
let (q', x) = protoQuery db q {limit = 1}
CS _ _ docs <- fromReply x =<< runOp (P.query q')
find :: (DbConn m) => Query -> m Cursor
-- ^ Fetch documents satisfying query
find q@Query{selection, batchSize} = do
db <- thisDatabase
cs' <- runQuery False [] q
newCursor db (coll selection) batchSize cs'
findOne' :: (DbConn m) => [Notice] -> Query -> m (Maybe Document)
-- ^ Send notices and fetch first document satisfying query or Nothing if none satisfy it
findOne' ns q = do
CS _ _ docs <- cursorState =<< runQuery False ns q {limit = 1}
return (listToMaybe docs)
explain :: (Conn m) => Query -> Db m Document
-- ^ Return performance stats of query execution
explain q = ReaderT $ \db -> do -- same as findOne but with explain set to true
let (q', x) = protoQuery' True db q {limit = 1}
CS _ _ docs <- fromReply x =<< runOp (P.query q')
when (null docs) . fail $ "no explain: " ++ show q'
return (head docs)
findOne :: (DbConn m) => Query -> m (Maybe Document)
-- ^ Fetch first document satisfying query or Nothing if none satisfy it
findOne = findOne' []
count :: (Conn m) => Query -> Db m Int
explain :: (DbConn m) => Query -> m Document
-- ^ Return performance stats of query execution
explain q = do -- same as findOne but with explain set to true
CS _ _ docs <- cursorState =<< runQuery True [] q {limit = 1}
return $ if null docs then error ("no explain: " ++ show q) else head docs
count :: (DbConn m) => Query -> m Int
-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present)
count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
distinct :: (Conn m) => Label -> Selection -> Db m [Value]
distinct :: (DbConn m) => Label -> Selection -> m [Value]
-- ^ Fetch distinct values of field in selected documents
distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
-- *** Cursor
data Cursor = Cursor FullCollection BatchSize (MVar CursorState)
data Cursor = Cursor FullCollection BatchSize (MVar CursorState')
-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a ServerFailure exception. Note, a cursor is not closed when the connection is closed, so you can open another connection to the same server and continue using the cursor.
modifyCursorState' :: (Conn m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Connected IO (CursorState', a)) -> m a
-- ^ Analogous to 'modifyMVar' but with Conn monad
modifyCursorState' (Cursor fcol batch var) act = do
conn <- context
e <- liftIO . modifyMVar var $ \cs' ->
either ((cs',) . Left) (second Right) <$> runConn (act fcol batch cs') conn
either throwError return e
getCursorState :: (Conn m) => Cursor -> m CursorState
-- ^ Extract current cursor status
getCursorState (Cursor _ _ var) = cursorState =<< liftIO (readMVar var)
data CursorState' = Delayed (IO (Either Failure CursorState)) | CursorState CursorState
-- ^ A cursor state or a promised cursor state which may fail
cursorState :: (Conn m) => CursorState' -> m CursorState
-- ^ Convert promised cursor state to cursor state or raise Failure
cursorState (Delayed promise) = either throwError return =<< liftIO promise
cursorState (CursorState cs) = return cs
data CursorState = CS Limit CursorId [Document]
-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch.
fromReply :: (Monad m) => Limit -> Reply -> m CursorState
fromReply :: Limit -> Reply -> Either Failure CursorState
-- ^ Convert Reply to CursorState or Failure
fromReply limit Reply{..} = if rResponseFlag == 0
then return (CS limit rCursorId rDocuments)
else fail $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments
then Right (CS limit rCursorId rDocuments)
else Left . ServerFailure $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments
newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState -> m Cursor
-- ^ Cursor is closed when garbage collected, explicitly closed, or CIO action ends (connection closed)
call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
-- ^ Send notices and request and return promised cursor state
call' ns (req, remainingLimit) = do
promise <- call ns req
return $ Delayed (fmap (fromReply remainingLimit =<<) promise)
newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
newCursor db col batch cs = do
conn <- getConnection
conn <- context
var <- liftIO (newMVar cs)
liftIO . addMVarFinalizer var $ do
-- kill cursor on server when garbage collected on client, if connection not already closed
CS _ cid _ <- readMVar var
unless (cid == 0) $ do
done <- isClosed conn
unless done $ runTask (runOp $ P.killCursors [cid]) conn >> return ()
return (Cursor (db <.> col) batch var)
let cursor = Cursor (db <.> col) batch var
liftIO . addMVarFinalizer var $ runConn (close cursor) conn >> return ()
return cursor
next :: (Conn m) => Cursor -> m (Maybe Document)
-- ^ Return next document in query result, or Nothing if finished.
-- This can run inside or outside a 'Db' monad (a 'useDb' block), since @Conn m => ReaderT r m@ is an instance of the 'Conn' type class, along with @Task@ and @Op@
next (Cursor fcol batch var) = runOp . exposeIO $ \h -> modifyMVar var $ \cs ->
-- Get lock on connection (runOp) first then get lock on cursor, otherwise you could get in deadlock if already inside an Op (connection locked), but another Task gets lock on cursor first and then tries runOp (deadlock).
either ((cs,) . Left) (fmap Right) <$> hideIO (nextState cs) h
where
nextState :: CursorState -> Op (CursorState, Maybe Document)
nextState (CS limit cid docs) = case docs of
doc : docs' -> return (CS limit cid docs', Just doc)
next cursor = modifyCursorState' cursor nextState where
-- Pre-fetch next batch promise from server when last one in current batch is returned.
nextState :: FullCollection -> BatchSize -> CursorState' -> Connected IO (CursorState', Maybe Document)
nextState fcol batch cs' = do
CS limit cid docs <- cursorState cs'
case docs of
doc : docs' -> do
cs'' <- if null docs' && cid /= 0
then nextBatch fcol batch limit cid
else return $ CursorState (CS limit cid docs')
return (cs'', Just doc)
[] -> if cid == 0
then return (CS 0 0 [], Nothing) -- finished
else let -- fetch next batch from server
then return (CursorState $ CS 0 0 [], Nothing) -- finished
else error $ "server returned empty batch but says more results on server"
nextBatch fcol batch limit cid = let
(batchSize, remLimit) = batchSizeRemainingLimit batch limit
getNextBatch = fromReply remLimit =<< P.getMore (GetMore fcol batchSize cid)
in nextState =<< getNextBatch
in call' [] (GetMore fcol batchSize cid, remLimit)
nextN :: (Conn m) => Int -> Cursor -> m [Document]
-- ^ Return next N documents or less if end is reached
@ -317,12 +444,13 @@ rest :: (Conn m) => Cursor -> m [Document]
-- ^ Return remaining documents in query result
rest c = loop (next c)
closeCursor :: (Conn m) => Cursor -> m ()
-- ^ Close cursor without reading rest of results. Cursor closes automatically when you read all results.
closeCursor (Cursor _ _ var) = runOp . exposeIO $ \h ->
modifyMVar var $ \cs@(CS _ cid _) -> if cid == 0
then return (CS 0 0 [], Right ())
else either ((cs,) . Left) ((CS 0 0 [],) . Right) <$> hideIO (P.killCursors [cid]) h
instance (Conn m) => Resource m Cursor where
close cursor = modifyCursorState' cursor kill' where
kill' _ _ cs' = first CursorState <$> (kill =<< cursorState cs')
kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]]
isClosed cursor = do
CS _ cid docs <- getCursorState cursor
return (cid == 0 && null docs)
-- ** Group
@ -348,7 +476,7 @@ groupDocument Group{..} =
"initial" =: gInitial,
"cond" =: gCond ]
group :: (Conn m) => Group -> Db m [Document]
group :: (DbConn m) => Group -> m [Document]
-- ^ Execute group query and return resulting aggregate value for each distinct key
group g = at "retval" <$> runCommand ["group" =: groupDocument g]
@ -397,12 +525,12 @@ mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
-- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments.
mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False
runMR :: (Conn m) => MapReduce -> Db m Cursor
runMR :: (DbConn m) => MapReduce -> m Cursor
-- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript)
-- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when connection closes.
runMR mr = find . query [] =<< (at "result" <$> runMR' mr)
runMR' :: (Conn m) => MapReduce -> Db m Document
runMR' :: (DbConn m) => MapReduce -> m Document
-- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript).
runMR' mr = do
doc <- runCommand (mrDocument mr)
@ -413,32 +541,23 @@ runMR' mr = do
type Command = Document
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
runCommand :: (Conn m) => Command -> Db m Document
-- ^ Run command against the database and return its result
runCommand c = maybe err return =<< findOne (query c "$cmd") where
err = fail $ "Nothing returned for command: " ++ show c
runCommand' :: (DbConn m) => [Notice] -> Command -> m Document
-- ^ Send notices then run command and return its result
runCommand' ns c = maybe err id <$> findOne' ns (query c "$cmd") where
err = error $ "Nothing returned for command: " ++ show c
runCommand1 :: (Conn m) => UString -> Db m Document
-- ^ @runCommand1 "foo" = runCommand ["foo" =: 1]@
runCommand :: (DbConn m) => Command -> m Document
-- ^ Run command against the database and return its result
runCommand = runCommand' []
runCommand1 :: (DbConn m) => UString -> m Document
-- ^ @runCommand1 foo = runCommand [foo =: 1]@
runCommand1 c = runCommand [c =: (1 :: Int)]
eval :: (Conn m) => Javascript -> Db m Document
eval :: (DbConn m) => Javascript -> m Document
-- ^ Run code on server
eval code = at "retval" <$> runCommand ["$eval" =: code]
type ErrorCode = Int
-- ^ Error code from getLastError
getLastError :: Db Op (Maybe (ErrorCode, String))
-- ^ Fetch what the last error was, Nothing means no error. Especially useful after a write since it is asynchronous (ie. nothing is returned after a write, so we don't know if it succeeded or not). To ensure no interleaving db operation executes between the write we want to check and getLastError, this can only be executed inside a 'runDbOp' which gets exclusive access to the connection.
getLastError = do
r <- runCommand1 "getlasterror"
return $ (at "code" r,) <$> lookup "err" r
resetLastError :: Db Op ()
-- ^ Clear last error
resetLastError = runCommand1 "reseterror" >> return ()
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.

6
TODO
View file

@ -3,6 +3,7 @@ TODO
BSON
----
+ implement deprecated types (were left out)
+ on insert/update: reject keys that start with "$" or "."
+ data support for common mongo "$symbols"
+ convert from/to json
@ -15,14 +16,12 @@ BSON
MongoDB
-------
+ support full level 0
- hint
- operations on database objects
* add_son_manipulators?
* dereference (dbref)
- database admin
* getProfilingInfo
- misc operations
* explain
* getCollectionOptions
- cursor object
* hasMore
@ -38,7 +37,7 @@ MongoDB
- getLastError options
- Update If Current (http://www.mongodb.org/display/DOCS/Atomic+Operations)
- block write until written on N replicas
- lazyRest on cursor, although lazy I/) is problematic and we may not want to support it.
- lazyRest on cursor, although lazy I/O) is problematic and we may not want to support it.
- Upon client exit, send killCursors for all open cursors, otherwise server will keep them open for 10 minutes and keep NoCursorTimeout cursors open for hours.
-- Upon cursor finalize (garbage collect) send killCursor even if you have to create a new connection, because server keeps cursors open for 10 minutes (or more).
@ -46,7 +45,6 @@ MongoDB
- automatic reconnection
- buffer pooling
- connection pooling. Unsafe to shrink pool and close connections because map/reduce temp tables that were created on the connection will get deleted. Note, other connections can access a map/reduce temp table as long as the original connection is still alive. Also, other connections can access cursors created on other connections, even if those die. Cursors will be deleted on server only if idle for more than 10 minutes. Accessing a deleted cursor returns an error.
+ support safe operations, although operation with exclusive connection access is available which can be used to getLastError and check for that previous write was safe (successful).
+ auto-destoy connection (how?/when?). Although, GHC will automatically close connection (Handle) when garbage collected.
+ don't read into cursor until needed, but have cursor send getMore before
it is actually out of docs (so network is finished by the time we're ready

View file

@ -1,30 +0,0 @@
Hi Scott,
Thanks for writing the Haskell driver for MongoDB! It functions well but I basically rewrote it in an attempt to factor it nicely and support additional functionality like multiple threads using the same connection. I hope you like it! You can find it on my fork of your repository at http://github.com/TonyGen/mongoDB.
First, I separated out BSON into its own package, since it can be used as an interchange format independent of MongoDB. You can find this new package on Github at http://github.com/TonyGen/bson-haskell and on Hackage at http://hackage.haskell.org/package/bson. I also made the BSON easier to write and view. For example, you can write: ["a" =: 1, "b" =: "hello", "c" =: [1,2,3]], and it shows as: [a: 1, b: "hello", c: [1,2,3]].
Second, for modularity, I separated MongoDB into 5 modules: MongoDB-Internal-Connection, MongoDB-Internal-Protocol, MongoDB-Connection, MongoDB-Query, and MongoDB-Admin.
MongoDB-Internal-Connection defines a connection with multi-threaded support via two monads, one with shared access to a connection (Task), and one with exclusive access to a connection (Op). This module also exposes low-level writing and reading bytes inside the Op monad for MongoDB-Internal-Protocol to use. This module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not.
MongoDB-Internal-Protocol defines the MongoDB Wire Protocol (http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol). It defines the messages the the client and server send back and forth to each other. Again, this module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not.
MongoDB-Connection re-exports Connection, and Task and Op monads from MongoDB-Internal-Connection but without the low-level read and write bytes functions. It also adds support for replica-sets, which will replace replica-pairs in the next release of MongoDB coming out soon. I had to make two connection modules (MongoDB-Internal-Connection and MongoDB-Connection) because connecting to a replica set requires quering its config info, which requires us to import MongoDB-Query, which recursively imports MongoDB-Internal-Protocol then MongoDB-Internal-Connection. I could have used mutual dependent modules (via .hs-boot) but felt that violated the layered approach I was going for.
MongoDB-Query defines all the normal query and update operations like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc.
MongoDB-Admin defines all the administration operations like validateCollection, ensureIndex, dropIndex, addUser, copyDatabase, dbStats, setProfilingLevel, etc.
Finally, the top-level MongoDB module simply re-exports MongoDB-Connection, MongoDB-Query, and MongoDB-Admin, along with Data.Bson from the bson package.
I updated your TODO list, removing items I completed, added items that were missing, and added back items I removed from the code like lazy list from a cursor (I am skeptical of lazy I/O, but we could add it back).
I also update your two tutorials to reflect this new API.
I hope you like these changes! Let me know your feedback, and I hope we can both maintain it in the future.
Cheers,
Tony Hannan
10gen Inc.
Creators of MongoDB

36
V0.6-Redesign.md Normal file
View file

@ -0,0 +1,36 @@
Hi Scott,
I slightly refactored my previous version I sent you. Here is the description again of my version as compared to your version. You can disregard my previous version and just read this.
When evaluating your package, I wanted to make the BSON easier to write and read, and I wanted to make use of monads so you don't have to supply the connection and database every time. Furthermore, I wanted to modularize the code a little more, for example, separating admin function from normal query/update functions. Finally, I wanted to add more functionality, like support for replica sets and multiple threads per connection. The end result was a significant rewrite of your package. I hope you like it! You can find it on my fork of your repository at http://github.com/TonyGen/mongoDB.
First, I separated out BSON into its own package, since it can be used as an interchange format independent of MongoDB. You can find this new package on Github at http://github.com/TonyGen/bson-haskell and on Hackage at http://hackage.haskell.org/package/bson. I also made the BSON easier to write and view. For example, you can write: ["a" =: 1, "b" =: "hello", "c" =: [1,2,3]], and it shows as: [a: 1, b: "hello", c: [1,2,3]].
Second, I created two independent helper modules: Control.Monad.Context and Control.Pipeline.
Control.Monad.Context is just like Control.Monad.Reader.Class except you can access the context of any Reader in the monad stack instead of just the top one as long as the context types are different.
Control.Pipeline gives thread-safe and pipelined access to a socket. When you make a call it sends the request and immediately returns a "promise" of the reply without waiting for the reply. When you read the promise it waits for the reply if it has not already arrived then returns it.
Third, I separated MongoDB into 4 modules: MongoDB.Internal.Protocol, MongoDB.Connection, MongoDB.Query, and MongoDB.Admin.
MongoDB.Internal.Protocol defines the MongoDB Wire Protocol (http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol). It defines the messages the the client and server send back and forth to each other. Again, this module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not.
MongoDB.Connection allows you to create a pipelined connection to a specific server or to a master/slave in a replica set.
MongoDB-Query defines the "connected" monad that has the current connection and database in context, and all the normal query and update operations you execute within this monad like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc.
MongoDB-Admin defines all the administration operations like validateCollection, ensureIndex, dropIndex, addUser, copyDatabase, dbStats, setProfilingLevel, etc.
Finally, the top-level MongoDB module simply re-exports MongoDB-Connection, MongoDB-Query, and MongoDB-Admin, along with Data.Bson from the bson package.
I updated your TODO list, removing items I completed, added items that were missing, and added back items I removed from the code like lazy list from a cursor (I am skeptical of lazy I/O, but we could add it back).
I also update your two tutorials to reflect this new API.
I hope you like these changes! Let me know your feedback, and I hope we can both maintain it in the future.
Cheers,
Tony Hannan
10gen Inc.
Creators of MongoDB

View file

@ -20,10 +20,9 @@ map/reduce queries on:
> :set -XOverloadedStrings
> import Database.MongoDB
> Right conn <- connect (server "localhost")
> let run task = runTask task conn
> let runDb db dbTask = run $ useDb db dbTask
> let run act = runConn (useDb "test" act) con
> :{
runDb "test" $ insertMany "mr1" [
run $ insertMany "mr1" [
["x" =: 1, "tags" =: ["dog", "cat"]],
["x" =: 2, "tags" =: ["cat"]],
["x" =: 3, "tags" =: ["mouse", "cat", "dog"]],
@ -68,7 +67,7 @@ key:
Note: We can't just return values.length as the reduce function might
be called iteratively on the results of other reduce steps.
Finally, we call map_reduce() and iterate over the result collection:
Finally, we run mapReduce and iterate over the result collection:
> runDb "test" $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest
Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]
@ -82,4 +81,4 @@ obtain them, use *runMR'* instead:
> runDb "test" $ runMR' (mapReduce "mr1" mapFn reduceFn)
Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0]
You can then obtain the results from here by quering the result collection yourself. "runMR* (above) does this for you but discards the statistics.
You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics.

View file

@ -1,5 +1,5 @@
Name: mongoDB
Version: 0.5.0
Version: 0.6
License: MIT
Maintainer: Scott Parish <srp@srparish.net>, Tony Hannan <tony@10gen.com>
Author: Scott Parish <srp@srparish.net>, Tony Hannan <tony@10gen.com>
@ -23,8 +23,9 @@ Build-Depends: base < 5,
bson
Build-Type: Simple
Exposed-modules:
Database.MongoDB.Util,
Database.MongoDB.Internal.Connection,
Control.Monad.Context,
Control.Pipeline,
Database.MongoDB.Internal.Util,
Database.MongoDB.Internal.Protocol,
Database.MongoDB.Connection,
Database.MongoDB.Query,

View file

@ -63,31 +63,31 @@ it won't fail. If it does you will get a pattern match error.
Task and Db monad
-------------------
The current connection is held in a Reader monad called "Task*, and the
current database is held in a Reader monad on top of that. To run a task,
supply it and a connection to *runTask*. Within a task, to access a database,
wrap you operations in a *useDb*.
The current connection is held in a Connected monad, and the current database
is held in a Reader monad on top of that. To run a connected monad, supply
it and a connection to *runConn*. To access a database within a connected
monad, call *useDb*.
But since we are working in ghci, which requires us to start from the
IO monad every time, we'll define a convenient 'run' function that takes a
Since we are working in ghci, which requires us to start from the
IO monad every time, we'll define a convenient *run* function that takes a
db-action and executes it against our "test" database on the server we
just connected to:
> let run act = runTask (useDb "test" act) con
> let run act = runConn (useDb "test" act) con
*run* (*runTask*) will return either Left Failure or Right result. Failure
*run* (*runConn*) will return either Left Failure or Right result. Failure
means the connection failed (eg. network problem) or the server failed
(eg. disk full).
Databases and Collections
-----------------------------
A MongoDB can store multiple databases--separate namespaces
A MongoDB can store multiple databases -- separate namespaces
under which collections reside.
You can obtain the list of databases available on a connection:
> runTask allDatabases con
> runConn allDatabases con
You can also use the *run* function we just created:
@ -159,7 +159,7 @@ only one matching document, or are only interested in the first
match. Here we use *findOne* to get the first document from the posts
collection:
> run $ findOne (query [] "posts")
> run $ findOne (select [] "posts")
Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC])
The result is a document matching the one that we inserted previously.
@ -171,12 +171,12 @@ added on insert.
resulting document must match. To limit our results to a document with
author "Mike" we do:
> run $ findOne (query ["author" =: "Mike"] "posts")
> run $ findOne (select ["author" =: "Mike"] "posts")
Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC])
If we try with a different author, like "Eliot", we'll get no result:
> run $ findOne (query ["author" =: "Eliot"] "posts")
> run $ findOne (select ["author" =: "Eliot"] "posts")
Right Nothing
Bulk Inserts
@ -217,12 +217,12 @@ iterate over all matching documents. There are several ways in which
we can iterate: we can call *next* to get documents one at a time
or we can get all the results by applying the cursor to *rest*:
> Right cursor <- run $ find (query ["author" =: "Mike"] "posts")
> Right cursor <- run $ find (select ["author" =: "Mike"] "posts")
> run $ rest cursor
Of course you can use bind (*>>=*) to combine these into one line:
> run $ find (query ["author" =: "Mike"] "posts") >>= rest
> run $ find (select ["author" =: "Mike"] "posts") >>= rest
* Note: *next* automatically closes the cursor when the last
document has been read out of it. Similarly, *rest* automatically
@ -233,11 +233,11 @@ Counting
We can count how many documents are in an entire collection:
> run $ count (query [] "posts")
> run $ count (select [] "posts")
Or count how many documents match a query:
> run $ count (query ["author" =: "Mike"] "posts")
> run $ count (select ["author" =: "Mike"] "posts")
Range Queries
-------------