Connection pooling and some refactoring

This commit is contained in:
Tony Hannan 2010-10-27 16:13:23 -04:00
parent 2d964f9448
commit de32b22b4f
17 changed files with 680 additions and 439 deletions

79
Control/Monad/MVar.hs Normal file
View file

@ -0,0 +1,79 @@
{- | Lift MVar operations so you can do them within monads stacked on top of IO. Analogous to MonadIO -}
{-# LANGUAGE TupleSections #-}
module Control.Monad.MVar (
MVar,
module Control.Monad.MVar,
liftIO
) where
import Control.Concurrent.MVar (MVar)
import qualified Control.Concurrent.MVar as IO
import Control.Monad.Error
import Control.Monad.Reader
import Control.Monad.State
newEmptyMVar :: (MonadIO m) => m (MVar a)
newEmptyMVar = liftIO IO.newEmptyMVar
newMVar :: (MonadIO m) => a -> m (MVar a)
newMVar = liftIO . IO.newMVar
takeMVar :: (MonadIO m) => MVar a -> m a
takeMVar = liftIO . IO.takeMVar
putMVar :: (MonadIO m) => MVar a -> a -> m ()
putMVar var = liftIO . IO.putMVar var
readMVar :: (MonadIO m) => MVar a -> m a
readMVar = liftIO . IO.readMVar
swapMVar :: (MonadIO m) => MVar a -> a -> m a
swapMVar var = liftIO . IO.swapMVar var
tryTakeMVar :: (MonadIO m) => MVar a -> m (Maybe a)
tryTakeMVar = liftIO . IO.tryTakeMVar
tryPutMVar :: (MonadIO m) => MVar a -> a -> m Bool
tryPutMVar var = liftIO . IO.tryPutMVar var
isEmptyMVar :: (MonadIO m) => MVar a -> m Bool
isEmptyMVar = liftIO . IO.isEmptyMVar
class (MonadIO m) => MonadMVar m where
modifyMVar :: MVar a -> (a -> m (a, b)) -> m b
addMVarFinalizer :: MVar a -> m () -> m ()
modifyMVar_ :: (MonadMVar m) => MVar a -> (a -> m a) -> m ()
modifyMVar_ var act = modifyMVar var $ \a -> do
a <- act a
return (a, ())
withMVar :: (MonadMVar m) => MVar a -> (a -> m b) -> m b
withMVar var act = modifyMVar var $ \a -> do
b <- act a
return (a, b)
instance MonadMVar IO where
modifyMVar = IO.modifyMVar
addMVarFinalizer = IO.addMVarFinalizer
instance (MonadMVar m, Error e) => MonadMVar (ErrorT e m) where
modifyMVar var f = ErrorT $ modifyMVar var $ \a -> do
e <- runErrorT (f a)
return $ either ((a, ) . Left) (fmap Right) e
addMVarFinalizer var (ErrorT act) = ErrorT $
addMVarFinalizer var (act >> return ()) >> return (Right ())
-- NOTE, error is silently dropped
instance (MonadMVar m) => MonadMVar (ReaderT r m) where
modifyMVar var f = ReaderT $ \r -> modifyMVar var $ \a -> runReaderT (f a) r
addMVarFinalizer var (ReaderT act) = ReaderT (addMVarFinalizer var . act)
instance (MonadMVar m) => MonadMVar (StateT s m) where
modifyMVar var f = StateT $ \s -> modifyMVar var $ \a -> do
((a, b), s) <- runStateT (f a) s
return (a, (b, s))
addMVarFinalizer var (StateT act) = StateT $ \s ->
addMVarFinalizer var (act s >> return ()) >> return ((), s)

View file

@ -1,6 +1,6 @@
{- | This is just like "Control.Monad.Error.Class" except you can throw/catch the error of any ErrorT in the monad stack instead of just the top one as long as the error types are different. If two or more ErrorTs in the stack have the same error type you get the error of the top one. -}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances #-}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances, UndecidableInstances #-}
module Control.Monad.Throw where
@ -17,7 +17,15 @@ class (Monad m) => Throw e m where
throwLeft :: (Throw e m) => m (Either e a) -> m a
-- ^ Execute action and throw exception if result is Left, otherwise return the Right result
throwLeft = (either throw return =<<)
throwLeft = throwLeft' id
throwLeft' :: (Throw e m) => (x -> e) -> m (Either x a) -> m a
-- ^ Execute action and throw transformed exception if result is Left, otherwise return Right result
throwLeft' f = (either (throw . f) return =<<)
onException :: (Throw e m) => m a -> (e -> m b) -> m a
-- ^ If first action throws an exception then run second action then re-throw
onException action releaser = catch action $ \e -> releaser e >> throw e
instance (Error e) => Throw e (Either e) where
throw = throwError

46
Control/Monad/Util.hs Normal file
View file

@ -0,0 +1,46 @@
-- | Extra monad functions and instances
{-# LANGUAGE FlexibleInstances, UndecidableInstances #-}
module Control.Monad.Util where
import Control.Applicative (Applicative(..), (<$>))
import Control.Arrow ((+++))
import Control.Monad.Reader
import Control.Monad.Error
instance (Monad m) => Applicative (ReaderT r m) where
pure = return
(<*>) = ap
instance (Monad m, Error e) => Applicative (ErrorT e m) where
pure = return
(<*>) = ap
class (MonadIO m, Applicative m, Functor m) => MonadIO' m
instance (MonadIO m, Applicative m, Functor m) => MonadIO' m
ignore :: (Monad m) => a -> m ()
ignore _ = return ()
loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act)
untilJust :: (Monad m) => (a -> m (Maybe b)) -> [a] -> m (Maybe b)
-- ^ Apply action to elements one at a time until one returns Just. Return Nothing if all return Nothing.
untilJust f [] = return Nothing
untilJust f (a:as) = f a >>= maybe (untilJust f as) (return . Just)
untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty.
untilSuccess = untilSuccess' (strMsg "empty untilSuccess")
untilSuccess' :: (MonadError e m) => e -> (a -> m b) -> [a] -> m b
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw given error if list is empty
untilSuccess' e f [] = throwError e
untilSuccess' _ f (x : xs) = catchError (f x) (\e -> untilSuccess' e f xs)
mapError :: (Functor m) => (e' -> e) -> ErrorT e' m a -> ErrorT e m a
-- ^ Convert error type thrown
mapError f (ErrorT m) = ErrorT $ (f +++ id) <$> m

View file

@ -1,10 +1,12 @@
{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipe (and get promises back); the pipe will pipeline each thread's request right away without waiting. -}
{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will pipeline each thread's request right away without waiting.
A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. -}
{-# LANGUAGE DoRec, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-}
module Control.Pipeline (
-- * Pipe
Pipe, newPipe, send, call,
-- * Pipeline
Pipeline, newPipeline, send, call,
-- * Util
Size,
Length(..),
@ -16,7 +18,7 @@ module Control.Pipeline (
import Prelude hiding (length)
import Control.Applicative ((<$>))
import Control.Monad (forever)
import Control.Exception (assert)
import Control.Exception (assert, onException)
import System.IO.Error (try, mkIOError, eofErrorType)
import System.IO (Handle, hFlush, hClose, hIsClosed)
import qualified Data.ByteString as S
@ -85,10 +87,10 @@ instance Stream Handle L.ByteString where
put = L.hPut
get = L.hGet
-- * Pipe
-- * Pipeline
-- | Thread-safe and pipelined socket
data Pipe handle bytes = Pipe {
data Pipeline handle bytes = Pipeline {
encodeSize :: Size -> bytes,
decodeSize :: bytes -> Size,
vHandle :: MVar handle, -- ^ Mutex on handle, so only one thread at a time can write to it
@ -96,33 +98,33 @@ data Pipe handle bytes = Pipe {
listenThread :: ThreadId
}
-- | Create new Pipe with given encodeInt, decodeInt, and handle. You should 'close' pipe when finished, which will also close handle. If pipe is not closed but eventually garbage collected, it will be closed along with handle.
newPipe :: (Stream h b, Resource IO h) =>
-- | Create new Pipeline with given encodeInt, decodeInt, and handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: (Stream h b, Resource IO h) =>
(Size -> b) -- ^ Convert Size to bytes of fixed length. Every Int must translate to same number of bytes.
-> (b -> Size) -- ^ Convert bytes of fixed length to Size. Must be exact inverse of encodeSize.
-> h -- ^ Underlying socket (handle) this pipe will read/write from
-> IO (Pipe h b)
newPipe encodeSize decodeSize handle = do
-> h -- ^ Underlying socket (handle) this pipeline will read/write from
-> IO (Pipeline h b)
newPipeline encodeSize decodeSize handle = do
vHandle <- newMVar handle
responseQueue <- newChan
rec
let pipe = Pipe{..}
let pipe = Pipeline{..}
listenThread <- forkIO (listen pipe)
addMVarFinalizer vHandle $ do
killThread listenThread
close handle
return pipe
instance (Resource IO h) => Resource IO (Pipe h b) where
instance (Resource IO h) => Resource IO (Pipeline h b) where
-- | Close pipe and underlying socket (handle)
close Pipe{..} = do
close Pipeline{..} = do
killThread listenThread
close =<< readMVar vHandle
isClosed Pipe{..} = isClosed =<< readMVar vHandle
isClosed Pipeline{..} = isClosed =<< readMVar vHandle
listen :: (Stream h b) => Pipe h b -> IO ()
listen :: (Stream h b, Resource IO h) => Pipeline h b -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order
listen Pipe{..} = do
listen Pipeline{..} = do
let n = length (encodeSize 0)
h <- readMVar vHandle
forever $ do
@ -131,23 +133,30 @@ listen Pipe{..} = do
getN h len
var <- readChan responseQueue
putMVar var e
case e of
Left err -> close h >> fail (show err) -- close and stop looping
Right _ -> return ()
send :: (Stream h b) => Pipe h b -> [b] -> IO ()
send :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO ()
-- ^ Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future 'call's will get these responses instead of their own).
-- Each message is preceeded by its length when written to socket.
send Pipe{..} messages = withMVar vHandle $ \h -> do
mapM_ (write encodeSize h) messages
flush h
-- Raises IOError and closes pipeline if send fails
send Pipeline{..} messages = withMVar vHandle (writeAll listenThread encodeSize messages)
call :: (Stream h b) => Pipe h b -> [b] -> IO (IO b)
call :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO (IO b)
-- ^ Send messages all together to destination (no messages will be interleaved between them), and return /promise/ of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them).
-- Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length.
call Pipe{..} messages = withMVar vHandle $ \h -> do
mapM_ (write encodeSize h) messages
flush h
-- Raises IOError and closes pipeline if send fails, likewise for reply.
call Pipeline{..} messages = withMVar vHandle $ \h -> do
writeAll listenThread encodeSize messages h
var <- newEmptyMVar
writeChan responseQueue var
return (either ioError return =<< readMVar var) -- return promise
write :: (Stream h b, Monoid b, Length b) => (Size -> b) -> h -> b -> IO ()
write encodeSize h bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes)
writeAll :: (Stream h b, Monoid b, Length b, Resource IO h) => ThreadId -> (Size -> b) -> [b] -> h -> IO ()
-- ^ Write messages to stream. On error, close pipeline and raise IOError.
writeAll listenThread encodeSize messages h = onException
(mapM_ write messages >> flush h)
(killThread listenThread >> close h)
where
write bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes)

View file

@ -1,5 +1,5 @@
{- |
Client interface to MongoDB server(s).
Client interface to MongoDB database management system.
Simple example below. Use with language extension /OvererloadedStrings/.
@ -10,12 +10,11 @@ Simple example below. Use with language extension /OvererloadedStrings/.
> import Control.Monad.Trans (liftIO)
>
> main = do
> ee <- runNet $ do
> conn <- connect (host "127.0.0.1")
> runConn run conn
> print ee
> conn <- connect 1 (host "127.0.0.1")
> e <- access safe Master conn run
> print e
>
> run = useDb "baseball" $ do
> run = use (Database "baseball") $ do
> clearTeams
> insertTeams
> print' "All Teams" =<< allTeams
@ -30,13 +29,13 @@ Simple example below. Use with language extension /OvererloadedStrings/.
> ["name" =: u"Phillies", "home" =: ["city" =: u"Philadelphia", "state" =: u"PA"], "league" =: u"National"],
> ["name" =: u"Red Sox", "home" =: ["city" =: u"Boston", "state" =: u"MA"], "league" =: u"American"] ]
>
> allTeams = rest =<< find (select [] "team") {sort = ["city" =: (1 :: Int)]}
> allTeams = rest =<< find (select [] "team") {sort = ["home.city" =: (1 :: Int)]}
>
> nationalLeagueTeams = rest =<< find (select ["league" =: u"National"] "team")
>
> newYorkTeams = rest =<< find (select ["home.state" =: u"NY"] "team") {project = ["name" =: (1 :: Int), "league" =: (1 :: Int)]}
>
> print' title docs = liftIO $ putStrLn title >> mapM_ print docs
> print' title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
-}
module Database.MongoDB (

View file

@ -11,7 +11,7 @@ module Database.MongoDB.Admin (
-- ** User
allUsers, addUser, removeUser,
-- ** Database
cloneDatabase, copyDatabase, dropDatabase, repairDatabase,
admin, cloneDatabase, copyDatabase, dropDatabase, repairDatabase,
-- ** Server
serverBuildInfo, serverVersion,
-- * Diagnotics
@ -51,17 +51,17 @@ coptElem Capped = "capped" =: True
coptElem (MaxByteSize n) = "size" =: n
coptElem (MaxItems n) = "max" =: n
createCollection :: (DbConn m) => [CollectionOption] -> Collection -> m Document
createCollection :: (DbAccess m) => [CollectionOption] -> Collection -> m Document
-- ^ Create collection with given options. You only need to call this to set options, otherwise a collection is created automatically on first use with no options.
createCollection opts col = runCommand $ ["create" =: col] ++ map coptElem opts
renameCollection :: (DbConn m) => Collection -> Collection -> m Document
renameCollection :: (DbAccess m) => Collection -> Collection -> m Document
-- ^ Rename first collection to second collection
renameCollection from to = do
db <- thisDatabase
useDb "admin" $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
Database db <- thisDatabase
use admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
dropCollection :: (DbConn m) => Collection -> m Bool
dropCollection :: (DbAccess m) => Collection -> m Bool
-- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action).
dropCollection coll = do
resetIndexCache
@ -70,7 +70,7 @@ dropCollection coll = do
if at "errmsg" r == ("ns not found" :: UString) then return False else
fail $ "dropCollection failed: " ++ show r
validateCollection :: (DbConn m) => Collection -> m Document
validateCollection :: (DbAccess m) => Collection -> m Document
-- ^ This operation takes a while
validateCollection coll = runCommand ["validate" =: coll]
@ -87,7 +87,7 @@ data Index = Index {
} deriving (Show, Eq)
idxDocument :: Index -> Database -> Document
idxDocument Index{..} db = [
idxDocument Index{..} (Database db) = [
"ns" =: db <.> iColl,
"key" =: iKey,
"name" =: iName,
@ -102,32 +102,32 @@ genName :: Order -> IndexName
genName keys = intercalate "_" (map f keys) where
f (k := v) = k `append` "_" `append` pack (show v)
ensureIndex :: (DbConn m) => Index -> m ()
ensureIndex :: (DbAccess m) => Index -> m ()
-- ^ Create index if we did not already create one. May be called repeatedly with practically no performance hit, because we remember if we already called this for the same index (although this memory gets wiped out every 15 minutes, in case another client drops the index and we want to create it again).
ensureIndex idx = let k = (iColl idx, iName idx) in do
icache <- fetchIndexCache
set <- liftIO (readIORef icache)
unless (S.member k set) $ do
writeMode Safe (createIndex idx)
writeMode (Safe []) (createIndex idx)
liftIO $ writeIORef icache (S.insert k set)
createIndex :: (DbConn m) => Index -> m ()
createIndex :: (DbAccess m) => Index -> m ()
-- ^ Create index on the server. This call goes to the server every time.
createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase
dropIndex :: (DbConn m) => Collection -> IndexName -> m Document
dropIndex :: (DbAccess m) => Collection -> IndexName -> m Document
-- ^ Remove the index
dropIndex coll idxName = do
resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: idxName]
getIndexes :: (DbConn m) => Collection -> m [Document]
getIndexes :: (DbAccess m) => Collection -> m [Document]
-- ^ Get all indexes on this collection
getIndexes coll = do
db <- thisDatabase
Database db <- thisDatabase
rest =<< find (select ["ns" =: db <.> coll] "system.indexes")
dropIndexes :: (DbConn m) => Collection -> m Document
dropIndexes :: (DbAccess m) => Collection -> m Document
-- ^ Drop all indexes on this collection
dropIndexes coll = do
resetIndexCache
@ -143,7 +143,7 @@ type IndexCache = IORef (S.Set (Collection, IndexName))
dbIndexCache :: DbIndexCache
-- ^ initialize cache and fork thread that clears it every 15 minutes
dbIndexCache = unsafePerformIO $ do
table <- T.new (==) (T.hashString . unpack)
table <- T.new (==) (T.hashString . unpack . databaseName)
_ <- forkIO . forever $ threadDelay 900000000 >> clearDbIndexCache
return table
{-# NOINLINE dbIndexCache #-}
@ -153,7 +153,7 @@ clearDbIndexCache = do
keys <- map fst <$> T.toList dbIndexCache
mapM_ (T.delete dbIndexCache) keys
fetchIndexCache :: (DbConn m) => m IndexCache
fetchIndexCache :: (DbAccess m) => m IndexCache
-- ^ Get index cache for current database
fetchIndexCache = do
db <- thisDatabase
@ -166,7 +166,7 @@ fetchIndexCache = do
T.insert dbIndexCache db idx
return idx
resetIndexCache :: (DbConn m) => m ()
resetIndexCache :: (DbAccess m) => m ()
-- ^ reset index cache for current database
resetIndexCache = do
icache <- fetchIndexCache
@ -174,70 +174,73 @@ resetIndexCache = do
-- ** User
allUsers :: (DbConn m) => m [Document]
allUsers :: (DbAccess m) => m [Document]
-- ^ Fetch all users of this database
allUsers = map (exclude ["_id"]) <$> (rest =<< find
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
addUser :: (DbConn m) => Bool -> Username -> Password -> m ()
addUser :: (DbAccess m) => Bool -> Username -> Password -> m ()
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do
mu <- findOne (select ["user" =: user] "system.users")
let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" u
removeUser :: (DbConn m) => Username -> m ()
removeUser :: (DbAccess m) => Username -> m ()
removeUser user = delete (select ["user" =: user] "system.users")
-- ** Database
cloneDatabase :: (Conn m) => Database -> Host -> m Document
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost]
admin = Database "admin"
-- ^ \"admin\" database
copyDatabase :: (Conn m) => Database -> Host -> Maybe (Username, Password) -> Database -> m Document
cloneDatabase :: (Access m) => Database -> Host -> m Document
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
cloneDatabase db fromHost = use db $ runCommand ["clone" =: showHostPort fromHost]
copyDatabase :: (Access m) => Database -> Host -> Maybe (Username, Password) -> Database -> m Document
-- ^ Copy database from given host to the server I am connected to. If username & password is supplied use them to read from given host.
copyDatabase fromDb fromHost mup toDb = do
copyDatabase (Database fromDb) fromHost mup (Database toDb) = do
let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb]
useDb "admin" $ case mup of
use admin $ case mup of
Nothing -> runCommand c
Just (u, p) -> do
n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost]
runCommand $ c ++ ["username" =: u, "nonce" =: n, "key" =: pwKey n u p]
dropDatabase :: (Conn m) => Database -> m Document
dropDatabase :: (Access m) => Database -> m Document
-- ^ Delete the given database!
dropDatabase db = useDb db $ runCommand ["dropDatabase" =: (1 :: Int)]
dropDatabase db = use db $ runCommand ["dropDatabase" =: (1 :: Int)]
repairDatabase :: (Conn m) => Database -> m Document
repairDatabase :: (Access m) => Database -> m Document
-- ^ Attempt to fix any corrupt records. This operation takes a while.
repairDatabase db = useDb db $ runCommand ["repairDatabase" =: (1 :: Int)]
repairDatabase db = use db $ runCommand ["repairDatabase" =: (1 :: Int)]
-- ** Server
serverBuildInfo :: (Conn m) => m Document
serverBuildInfo = useDb "admin" $ runCommand ["buildinfo" =: (1 :: Int)]
serverBuildInfo :: (Access m) => m Document
serverBuildInfo = use admin $ runCommand ["buildinfo" =: (1 :: Int)]
serverVersion :: (Conn m) => m UString
serverVersion :: (Access m) => m UString
serverVersion = at "version" <$> serverBuildInfo
-- * Diagnostics
-- ** Collection
collectionStats :: (DbConn m) => Collection -> m Document
collectionStats :: (DbAccess m) => Collection -> m Document
collectionStats coll = runCommand ["collstats" =: coll]
dataSize :: (DbConn m) => Collection -> m Int
dataSize :: (DbAccess m) => Collection -> m Int
dataSize c = at "size" <$> collectionStats c
storageSize :: (DbConn m) => Collection -> m Int
storageSize :: (DbAccess m) => Collection -> m Int
storageSize c = at "storageSize" <$> collectionStats c
totalIndexSize :: (DbConn m) => Collection -> m Int
totalIndexSize :: (DbAccess m) => Collection -> m Int
totalIndexSize c = at "totalIndexSize" <$> collectionStats c
totalSize :: (DbConn m) => Collection -> m Int
totalSize :: (DbAccess m) => Collection -> m Int
totalSize coll = do
x <- storageSize coll
xs <- mapM isize =<< getIndexes coll
@ -249,33 +252,33 @@ totalSize coll = do
data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq)
getProfilingLevel :: (DbConn m) => m ProfilingLevel
getProfilingLevel :: (DbAccess m) => m ProfilingLevel
getProfilingLevel = toEnum . at "was" <$> runCommand ["profile" =: (-1 :: Int)]
type MilliSec = Int
setProfilingLevel :: (DbConn m) => ProfilingLevel -> Maybe MilliSec -> m ()
setProfilingLevel :: (DbAccess m) => ProfilingLevel -> Maybe MilliSec -> m ()
setProfilingLevel p mSlowMs =
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
-- ** Database
dbStats :: (DbConn m) => m Document
dbStats :: (DbAccess m) => m Document
dbStats = runCommand ["dbstats" =: (1 :: Int)]
currentOp :: (DbConn m) => m (Maybe Document)
currentOp :: (DbAccess m) => m (Maybe Document)
-- ^ See currently running operation on the database, if any
currentOp = findOne (select [] "$cmd.sys.inprog")
type OpNum = Int
killOp :: (DbConn m) => OpNum -> m (Maybe Document)
killOp :: (DbAccess m) => OpNum -> m (Maybe Document)
killOp op = findOne (select ["op" =: op] "$cmd.sys.killop")
-- ** Server
serverStatus :: (Conn m) => m Document
serverStatus = useDb "admin" $ runCommand ["serverStatus" =: (1 :: Int)]
serverStatus :: (Access m) => m Document
serverStatus = use admin $ runCommand ["serverStatus" =: (1 :: Int)]
{- Authors: Tony Hannan <tony@10gen.com>

View file

@ -1,41 +1,39 @@
{- | A replica set is a set of servers that mirror each other (a non-replicated server can act like a replica set of one). One server in a replica set is the master and the rest are slaves. When the master goes down, one of the slaves becomes master. The ReplicaSet object in this client maintains a list of servers that it currently knows are in the set. It refreshes this list every time it establishes a new connection with one of the servers in the set. Each server in the set knows who the other member in the set are, and who is master. The user asks the ReplicaSet object for a new master or slave connection. When a connection fails, the user must ask the ReplicaSet for a new connection (which most likely will connect to another server since the previous one failed). When connecting to a new server you loose all session state that was stored with the old server, which includes open cursors and temporary map-reduce output collections. Attempting to read from a lost cursor on a new server will raise a ServerFailure exception. Attempting to read a lost map-reduce temp output on a new server will return an empty set (not an error, like it maybe should). -}
{- | A Mongo connection is a pool of TCP connections to a single server or a replica set of servers. -}
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes #-}
module Database.MongoDB.Connection (
runNet,
-- * Host
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
-- * ReplicaSet
ReplicaSet, replicaSet, replicas,
newConnection,
ReplicaSet(..),
-- * MasterOrSlaveOk
MasterOrSlaveOk(..),
-- * Connection
Connection, connect,
-- * Resource
Resource(..)
Server(..),
) where
import Database.MongoDB.Internal.Protocol
import Data.Bson ((=:), at)
import Data.Bson ((=:), at, UString)
import Control.Pipeline (Resource(..))
import Control.Applicative ((<$>))
import Control.Arrow ((+++), left)
import Control.Exception (assert)
import System.IO.Error as E (try)
import System.IO.Error as E (try, mkIOError, userErrorType)
import Control.Monad.Error
import Control.Monad.Throw
import Data.IORef
import Control.Monad.Throw (throw, onException)
import Control.Monad.MVar
import Network (HostName, PortID(..), connectTo)
import Data.Bson (Document, look, typed)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity
import Database.MongoDB.Internal.Util (true1, MonadIO') -- PortID instances
import Control.Monad.Util (MonadIO', untilSuccess)
import Database.MongoDB.Internal.Util (true1) -- PortID instances
import Var.Pool
import System.Random (newStdGen, randomRs)
import Data.List (delete, find, nub)
runNet :: ErrorT IOError m a -> m (Either IOError a)
-- ^ Execute action that raises IOError only on network problem. Other IOErrors like file access errors are not caught by this.
runNet = runErrorT
type Name = UString
adminCommand :: Document -> Request
-- ^ Convert command to request
@ -88,46 +86,48 @@ readHostPort :: String -> Host
-- ^ Read string \"hostname:port\" as @Host hostname port@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
readHostPort = runIdentity . readHostPortM
-- ** Replica Set
-- * Replica Set
newtype ReplicaSet = ReplicaSet (IORef [Host])
-- ^ Reference to a replica set of hosts. Ok if really not a replica set and just a stand-alone server, in which case it acts like a replica set of one.
data ReplicaSet = ReplicaSet {setName :: Name, seedHosts :: [Host]} deriving (Show)
-- ^ Replica set of hosts identified by set name. At least one of the seed hosts must be an active member of the set. However, this list is not used to identify the set, just the set name.
replicaSet :: [Host] -> IO ReplicaSet
-- ^ Create a reference to a replica set with given hosts as the initial seed list (a subset of the hosts in the replica set)
replicaSet s = assert (not $ null s) (ReplicaSet <$> newIORef s)
instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y
replicas :: ReplicaSet -> IO [Host]
-- ^ Return current list of known hosts in replica set. This list is updated on every 'newConnection'.
replicas (ReplicaSet ref) = readIORef ref
-- ** Replica Info
-- * Replica Info
getReplicaInfo :: Pipe -> ErrorT IOError IO ReplicaInfo
-- ^ Get replica info of the connected host. Throw IOError if connection fails or host is not part of a replica set (no /hosts/ and /primary/ field).
getReplicaInfo pipe = do
promise <- call pipe [] (adminCommand ["ismaster" =: (1 :: Int)])
info <- commandReply "ismaster" <$> promise
look "hosts" info
look "primary" info
return info
data ReplicaInfo = ReplicaInfo Host Document deriving (Show, Eq)
type ReplicaInfo = Document
-- ^ Configuration info of a host in a replica set. Contains all the hosts in the replica set plus its role in that set (master, slave, or arbiter)
isMaster :: ReplicaInfo -> Bool
isPrimary :: ReplicaInfo -> Bool
-- ^ Is the replica described by this info a master/primary (not slave or arbiter)?
isMaster (ReplicaInfo _ i) = true1 "ismaster" i
isPrimary = true1 "ismaster"
isSlave :: ReplicaInfo -> Bool
isSecondary :: ReplicaInfo -> Bool
-- ^ Is the replica described by this info a slave/secondary (not master or arbiter)
isSlave = not . isMaster -- TODO: distinguish between slave and arbiter
isSecondary = true1 "secondary"
allReplicas :: ReplicaInfo -> [Host]
replicas :: ReplicaInfo -> [Host]
-- ^ All replicas in set according to this replica configuration info.
-- If host is stand-alone then it won't have \"hosts\" in its configuration, in which case we return the host by itself.
allReplicas (ReplicaInfo h i) = maybe [h] (map readHostPort . typed) (look "hosts" i)
replicas = map readHostPort . at "hosts"
sortedReplicas :: ReplicaInfo -> IO [Host]
-- ^ All replicas in set sorted by distance from this client. TODO
sortedReplicas = return . allReplicas
primary :: ReplicaInfo -> Host
-- ^ Read primary from configuration info
primary = readHostPort . at "primary"
getReplicaInfo :: (Throw IOError m, MonadIO' m) => Host -> Connection -> m ReplicaInfo
-- ^ Get replica info of the connected host. Throw IOError if connection fails.
getReplicaInfo host' conn = do
promise <- throwLeft . liftIO . E.try $ call conn [] (adminCommand ["ismaster" =: (1 :: Int)])
fmap (ReplicaInfo host' . commandReply "ismaster") . throwLeft . liftIO $ E.try promise
hosts :: ReplicaInfo -> [Host]
-- ^ replicas with primary at head
hosts info = master : delete master members where
members = replicas info
master = primary info
-- * MasterOrSlaveOk
@ -138,49 +138,94 @@ data MasterOrSlaveOk =
isMS :: MasterOrSlaveOk -> ReplicaInfo -> Bool
-- ^ Does the host (as described by its replica-info) match the master/slave type
isMS Master i = isMaster i
isMS SlaveOk i = isSlave i || isMaster i
isMS Master i = isPrimary i
isMS SlaveOk i = isSecondary i || isPrimary i
-- * Connection
newConnection :: (Throw IOError m, MonadIO' m) => MasterOrSlaveOk -> ReplicaSet -> m Connection
-- ^ Create a connection to a master or slave in the replica set. Throw IOError if failed to connect to any host in replica set that is the right master/slave type. 'close' connection when you are done using it even if a failure is raised. Garbage collected connections will be closed automatically (but don't rely on this when creating many connections).
-- TODO: prefer slave over master when SlaveOk and both are available.
newConnection mos (ReplicaSet vHosts) = throwLeft . liftIO $ left (userError . show) <$> do
hosts <- readIORef vHosts
e <- connectFirst mos hosts
case e of
Right (conn, info) -> do
writeIORef vHosts =<< sortedReplicas info
return (Right conn)
Left (fs, is) -> if null is
then return (Left fs)
else do
reps <- sortedReplicas (head is)
writeIORef vHosts reps
-- try again in case new replicas in info
(fst +++ fst) <$> connectFirst mos reps
type Pool' = Pool IOError
connectFirst :: MasterOrSlaveOk -> [Host] -> IO (Either ([(Host, IOError)], [ReplicaInfo]) (Connection, ReplicaInfo))
-- ^ Connect to first host that succeeds and is master/slave, otherwise return list of failed connections plus info of connections that succeeded but were not master/slave.
connectFirst mos = connectFirst' ([], []) where
connectFirst' (fs, is) [] = return $ Left (fs, is)
connectFirst' (fs, is) (h : hs) = do
e <- runErrorT $ do
c <- connect h
i <- getReplicaInfo h c
return (c, i)
case e of
Left f -> connectFirst' ((h, f) : fs, is) hs
Right (c, i) -> if isMS mos i
then return $ Right (c, i)
else do
close c
connectFirst' ((h, userError $ "not a " ++ show mos) : fs, i : is) hs
-- | A Server is a single server ('Host') or a replica set of servers ('ReplicaSet')
class Server t where
data Connection t
-- ^ A Mongo connection is a pool of TCP connections to a host or a replica set of hosts
connect :: (MonadIO' m) => Int -> t -> m (Connection t)
-- ^ Create a Mongo Connection to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host.
getPipe :: MasterOrSlaveOk -> Connection t -> ErrorT IOError IO Pipe
-- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk).
killPipes :: Connection t -> IO ()
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when this Connection is garbage collected.
connect :: (Throw IOError m, MonadIO' m) => Host -> m Connection
-- ^ Create a connection to the given host (as opposed to connecting to some host in a replica set via 'newConnection'). Throw IOError if can't connect.
connect (Host hostname port) = throwLeft . liftIO $ E.try (mkConnection =<< connectTo hostname port)
-- ** Connection Host
instance Server Host where
data Connection Host = HostConnection {connHost :: Host, connPool :: Pool' Pipe}
-- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style.
connect poolSize host = liftIO (connectHost poolSize host)
-- ^ Create a Connection (pool of TCP connections) to server (host or replica set)
getPipe _ = getHostPipe
-- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect.
killPipes (HostConnection _ pool) = killAll pool
connectHost :: Int -> Host -> IO (Connection Host)
-- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style.
connectHost poolSize host = HostConnection host <$> newPool Factory{..} poolSize where
newResource = tcpConnect host
killResource = close
isExpired = isClosed
getHostPipe :: Connection Host -> ErrorT IOError IO Pipe
-- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host.
getHostPipe (HostConnection _ pool) = aResource pool
tcpConnect :: Host -> ErrorT IOError IO Pipe
-- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect.
tcpConnect (Host hostname port) = ErrorT . E.try $ mkPipe =<< connectTo hostname port
-- ** Connection ReplicaSet
instance Server ReplicaSet where
data Connection ReplicaSet = ReplicaSetConnection {
repsetName :: Name,
currentMembers :: MVar [Connection Host] } -- master at head after a refresh
connect poolSize repset = liftIO (connectSet poolSize repset)
getPipe = getSetPipe
killPipes ReplicaSetConnection{..} = withMVar currentMembers (mapM_ killPipes)
replicaSet :: (MonadIO' m) => Connection ReplicaSet -> m ReplicaSet
-- ^ Set name with current members as seed list
replicaSet ReplicaSetConnection{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
connectSet :: Int -> ReplicaSet -> IO (Connection ReplicaSet)
-- ^ Create a connection to each member of the replica set.
connectSet poolSize repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (connect poolSize) (seedHosts repset)
return $ ReplicaSetConnection (setName repset) currentMembers
getMembers :: Name -> [Connection Host] -> ErrorT IOError IO [Host]
-- ^ Get members of replica set, master first. Query supplied connections until config found.
-- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not.
getMembers repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections
refreshMembers :: Name -> [Connection Host] -> ErrorT IOError IO [Connection Host]
-- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected.
refreshMembers repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (connection n) =<< getMembers repsetName connections
where
connection n host = maybe (connect n host) return $ find ((host ==) . connHost) connections
getSetPipe :: MasterOrSlaveOk -> Connection ReplicaSet -> ErrorT IOError IO Pipe
-- ^ Return a pipe to primary or a random secondary in replica set. Use primary for SlaveOk if and only if no secondaries. Note, refreshes members each time (makes ismaster call to primary).
getSetPipe mos ReplicaSetConnection{..} = modifyMVar currentMembers $ \connections -> do
connections <- refreshMembers repsetName connections -- master at head after refresh
pipe <- case mos of
Master -> getHostPipe (head connections)
SlaveOk -> do
let n = length connections - 1
is <- take (max 1 n) . nub . randomRs (min 1 n, n) <$> liftIO newStdGen
untilSuccess (getHostPipe . (connections !!)) is
return (connections, pipe)
{- Authors: Tony Hannan <tony@10gen.com>

View file

@ -1,12 +1,12 @@
{-| Low-level messaging between this client and the MongoDB server. See Mongo Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
{-| Low-level messaging between this client and the MongoDB server, see Mongo Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts #-}
module Database.MongoDB.Internal.Protocol (
-- * Connection
Connection, mkConnection,
-- * Pipe
Pipe, mkPipe,
send, call,
-- * Message
FullCollection,
@ -37,30 +37,33 @@ import Data.IORef
import System.IO.Unsafe (unsafePerformIO)
import Data.Digest.OpenSSL.MD5 (md5sum)
import Data.UString as U (pack, append, toByteString)
import System.IO.Error as E (try)
import Control.Monad.Error
import Control.Monad.Trans (MonadIO(..))
-- * Connection
-- * Pipe
type Connection = P.Pipe Handle ByteString
type Pipe = P.Pipeline Handle ByteString
-- ^ Thread-safe TCP connection to server with pipelined requests
mkConnection :: Handle -> IO Connection
mkPipe :: Handle -> IO Pipe
-- ^ New thread-safe pipelined connection over handle
mkConnection = P.newPipe encodeSize decodeSize where
mkPipe = P.newPipeline encodeSize decodeSize where
encodeSize = runPut . putInt32 . toEnum . (+ 4)
decodeSize = subtract 4 . fromEnum . runGet getInt32
send :: Connection -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Raise IOError if connection fails.
send conn notices = P.send conn =<< mapM noticeBytes notices
send :: Pipe -> [Notice] -> ErrorT IOError IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send conn notices = ErrorT . E.try $ P.send conn =<< mapM noticeBytes notices
call :: Connection -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will raise IOError if connection fails.
call conn notices request = do
call :: Pipe -> [Notice] -> Request -> ErrorT IOError IO (ErrorT IOError IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call conn notices request = ErrorT . E.try $ do
nMessages <- mapM noticeBytes notices
requestId <- genRequestId
let rMessage = runPut (putRequest request requestId)
promise <- P.call conn (nMessages ++ [rMessage])
return (bytesReply requestId <$> promise)
return (ErrorT . E.try $ bytesReply requestId <$> promise)
noticeBytes :: Notice -> IO ByteString
noticeBytes notice = runPut . putNotice notice <$> genRequestId

View file

@ -1,14 +1,11 @@
-- | Miscellaneous general functions
{-# LANGUAGE StandaloneDeriving, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE StandaloneDeriving #-}
module Database.MongoDB.Internal.Util where
import Prelude hiding (length)
import Network (PortID(..))
import Control.Applicative (Applicative(..), (<$>))
import Control.Monad.Reader
import Control.Monad.Error
import Data.UString as U (cons, append)
import Data.Bits (Bits, (.|.))
import Data.Bson
@ -17,20 +14,6 @@ deriving instance Show PortID
deriving instance Eq PortID
deriving instance Ord PortID
instance (Monad m) => Applicative (ReaderT r m) where
pure = return
(<*>) = ap
instance (Monad m, Error e) => Applicative (ErrorT e m) where
pure = return
(<*>) = ap
class (MonadIO m, Applicative m, Functor m) => MonadIO' m
instance (MonadIO m, Applicative m, Functor m) => MonadIO' m
ignore :: (Monad m) => a -> m ()
ignore _ = return ()
snoc :: [a] -> a -> [a]
-- ^ add element to end of list (/snoc/ is reverse of /cons/, which adds to front of list)
snoc list a = list ++ [a]
@ -45,10 +28,6 @@ bitOr = foldl (.|.) 0
-- ^ Concat first and second together with period in between. Eg. @\"hello\" \<.\> \"world\" = \"hello.world\"@
a <.> b = U.append a (cons '.' b)
loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act)
true1 :: Label -> Document -> Bool
-- ^ Is field's value a 1 or True (MongoDB use both Int and Bools for truth values). Error if field not in document or field not a Num or Bool.
true1 k doc = case valueAt k doc of

View file

@ -3,10 +3,10 @@
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes, ImpredicativeTypes #-}
module Database.MongoDB.Query (
-- * Connected
Connected, runConn, Conn, Failure(..),
-- * Access
access, Access, Action, runAction, Failure(..),
-- * Database
Database, allDatabases, DbConn, useDb, thisDatabase,
Database(..), allDatabases, DbAccess, use, thisDatabase,
-- ** Authentication
P.Username, P.Password, auth,
-- * Collection
@ -16,7 +16,7 @@ module Database.MongoDB.Query (
Select(select),
-- * Write
-- ** WriteMode
WriteMode(..), writeMode,
WriteMode(..), safe, GetLastError, writeMode,
-- ** Insert
insert, insert_, insertMany, insertMany_,
-- ** Update
@ -51,48 +51,57 @@ import Control.Concurrent.MVar
import Control.Pipeline (Resource(..))
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call)
import Database.MongoDB.Connection (MasterOrSlaveOk(..))
import Database.MongoDB.Connection (MasterOrSlaveOk(..), Server(..))
import Data.Bson
import Data.Word
import Data.Int
import Data.Maybe (listToMaybe, catMaybes)
import Data.UString as U (dropWhile, any, tail)
import Database.MongoDB.Internal.Util (loop, (<.>), true1, MonadIO') -- plus Applicative instances of ErrorT & ReaderT
import Data.UString as U (dropWhile, any, tail, unpack)
import Control.Monad.Util (MonadIO', loop) -- plus Applicative instances of ErrorT & ReaderT
import Database.MongoDB.Internal.Util ((<.>), true1)
send :: (Context Connection m, Throw IOError m, MonadIO m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send ns = throwLeft . liftIO . try . flip P.send ns =<< context
mapErrorIO :: (Throw e m, MonadIO m) => (e' -> e) -> ErrorT e' IO a -> m a
mapErrorIO f = throwLeft' f . liftIO . runErrorT
call :: (Context Connection m, Throw IOError m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw IOError n, MonadIO n) => n Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw IOError if connection fails on send, and promise will throw IOError if connection fails on receive.
send :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails.
send ns = mapErrorIO ConnectionFailure . flip P.send ns =<< context
call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive.
call ns r = do
conn <- context
promise <- throwLeft . liftIO $ try (P.call conn ns r)
return (throwLeft . liftIO $ try promise)
pipe <- context
promise <- mapErrorIO ConnectionFailure (P.call pipe ns r)
return (mapErrorIO ConnectionFailure promise)
-- * Connected Monad
-- * Mongo Monad
newtype Connected m a = Connected (ErrorT Failure (ReaderT WriteMode (ReaderT MasterOrSlaveOk (ReaderT Connection m))) a)
deriving (Context Connection, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, Monad, Applicative, Functor)
-- ^ Monad with access to a 'Connection', 'MasterOrSlaveOk', and 'WriteMode', and throws a 'Failure' on read/write failure and IOError on connection failure
access :: (Server s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> Connection s -> Action m a -> m (Either Failure a)
-- ^ Run action with access to server or replica set via one of the 'Pipe's (TCP connections) in given 'Connection' pool
access w mos conn act = do
ePipe <- liftIO . runErrorT $ getPipe mos conn
either (return . Left . ConnectionFailure) (runAction act w mos) ePipe
deriving instance (Throw IOError m) => Throw IOError (Connected m)
-- | A monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read, write, or pipe failure
class (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m
instance (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m
instance MonadTrans Connected where
lift = Connected . lift . lift . lift . lift
newtype Action m a = Action (ErrorT Failure (ReaderT WriteMode (ReaderT MasterOrSlaveOk (ReaderT Pipe m))) a)
deriving (Context Pipe, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, Monad, Applicative, Functor)
-- ^ Monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws a 'Failure' on read, write or pipe failure
runConn :: Connected m a -> Connection -> m (Either Failure a)
-- ^ Run action with access to connection. It starts out assuming it is master (invoke 'slaveOk' inside it to change that) and that writes don't need to be check (invoke 'writeMode' to change that). Return Left Failure if error in execution. Throws IOError if connection fails during execution.
runConn (Connected action) = runReaderT (runReaderT (runReaderT (runErrorT action) Unsafe) Master)
instance MonadTrans Action where
lift = Action . lift . lift . lift . lift
-- | A monad with access to a 'Connection', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read/write failure and 'IOError' on connection failure
class (Context Connection m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, Throw IOError m, MonadIO' m) => Conn m
instance (Context Connection m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, Throw IOError m, MonadIO' m) => Conn m
runAction :: Action m a -> WriteMode -> MasterOrSlaveOk -> Pipe -> m (Either Failure a)
-- ^ Run action with access to pipe. It starts out assuming it is master (invoke 'slaveOk' inside it to change that) and that writes don't need to be check (invoke 'writeMode' to change that). Return Left Failure if error in execution. Throws IOError if pipe fails during execution.
runAction (Action action) w mos = runReaderT (runReaderT (runReaderT (runErrorT action) w) mos)
-- | Read or write exception like cursor expired or inserting a duplicate key.
-- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
data Failure =
CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
ConnectionFailure IOError -- ^ TCP connection ('Pipe') failed. Make work if you try again on the same Mongo 'Connection' which will create a new Pipe.
| CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
| QueryFailure String -- ^ Query failed for some reason as described in the string
| WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string
deriving (Show, Eq)
@ -102,29 +111,31 @@ instance Error Failure where strMsg = error
-- * Database
type Database = UString
newtype Database = Database {databaseName :: UString} deriving (Eq, Ord)
-- ^ Database name
-- | A 'Conn' monad with access to a 'Database'
class (Context Database m, Conn m) => DbConn m
instance (Context Database m, Conn m) => DbConn m
instance Show Database where show (Database x) = unpack x
allDatabases :: (Conn m) => m [Database]
-- | As 'Access' monad with access to a particular 'Database'
class (Context Database m, Access m) => DbAccess m
instance (Context Database m, Access m) => DbAccess m
allDatabases :: (Access m) => m [Database]
-- ^ List all databases residing on server
allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases")
allDatabases = map (Database . at "name") . at "databases" <$> use (Database "admin") (runCommand1 "listDatabases")
useDb :: Database -> ReaderT Database m a -> m a
use :: Database -> ReaderT Database m a -> m a
-- ^ Run Db action against given database
useDb = flip runReaderT
use = flip runReaderT
thisDatabase :: (DbConn m) => m Database
thisDatabase :: (DbAccess m) => m Database
-- ^ Current database in use
thisDatabase = context
-- * Authentication
auth :: (DbConn m) => Username -> Password -> m Bool
-- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new connection.
auth :: (DbAccess m) => Username -> Password -> m Bool
-- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe.
auth u p = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: u, "nonce" =: n, "key" =: pwKey n u p]
@ -134,7 +145,7 @@ auth u p = do
type Collection = UString
-- ^ Collection name (not prefixed with database)
allCollections :: (DbConn m) => m [Collection]
allCollections :: (DbAccess m) => m [Collection]
-- ^ List all collections in this database
allCollections = do
db <- thisDatabase
@ -142,17 +153,13 @@ allCollections = do
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
where
dropDbPrefix = U.tail . U.dropWhile (/= '.')
isSpecial db col = U.any (== '$') col && db <.> col /= "local.oplog.$main"
isSpecial (Database db) col = U.any (== '$') col && db <.> col /= "local.oplog.$main"
-- * Selection
data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq)
-- ^ Selects documents in collection that match selector
{-select :: Selector -> Collection -> Selection
-- ^ Synonym for 'Select'
select = Select-}
type Selector = Document
-- ^ Filter for a query, analogous to the where clause in SQL. @[]@ matches all documents in collection. @[x =: a, y =: b]@ is analogous to @where x = a and y = b@ in SQL. See <http://www.mongodb.org/display/DOCS/Querying> for full selector syntax.
@ -177,30 +184,37 @@ instance Select Query where
-- | Default write-mode is 'Unsafe'
data WriteMode =
Unsafe -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not.
| Safe -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed.
| Safe GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
deriving (Show, Eq)
writeMode :: (Conn m) => WriteMode -> m a -> m a
type GetLastError = Document
-- ^ Parameters for getLastError command. For example ["w" =: 2] tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See "http://www.mongodb.org/display/DOCS/Last+Error+Commands" for more options.
safe :: WriteMode
-- ^ Safe []
safe = Safe []
writeMode :: (Access m) => WriteMode -> m a -> m a
-- ^ Run action with given 'WriteMode'
writeMode = push . const
write :: (DbConn m) => Notice -> m ()
write :: (DbAccess m) => Notice -> m ()
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
write notice = do
mode <- context
case mode of
Unsafe -> send [notice]
Safe -> do
me <- getLastError [notice]
Safe params -> do
me <- getLastError [notice] params
maybe (return ()) (throw . uncurry WriteFailure) me
type ErrorCode = Int
-- ^ Error code from getLastError
getLastError :: (DbConn m) => [Notice] -> m (Maybe (ErrorCode, String))
getLastError :: (DbAccess m) => [Notice] -> GetLastError -> m (Maybe (ErrorCode, String))
-- ^ Send notices (writes) then fetch what the last error was, Nothing means no error
getLastError writes = do
r <- runCommand' writes ["getlasterror" =: (1 :: Int)]
getLastError writes params = do
r <- runCommand' writes $ ("getlasterror" =: (1 :: Int)) : params
return $ (at "code" r,) <$> lookup "err" r
{-resetLastError :: (DbConn m) => m ()
@ -209,23 +223,23 @@ resetLastError = runCommand1 "reseterror" >> return ()-}
-- ** Insert
insert :: (DbConn m) => Collection -> Document -> m Value
insert :: (DbAccess m) => Collection -> Document -> m Value
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
insert col doc = head <$> insertMany col [doc]
insert_ :: (DbConn m) => Collection -> Document -> m ()
insert_ :: (DbAccess m) => Collection -> Document -> m ()
-- ^ Same as 'insert' except don't return _id
insert_ col doc = insert col doc >> return ()
insertMany :: (DbConn m) => Collection -> [Document] -> m [Value]
insertMany :: (DbAccess m) => Collection -> [Document] -> m [Value]
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
insertMany col docs = do
db <- thisDatabase
Database db <- thisDatabase
docs' <- liftIO $ mapM assignId docs
write (Insert (db <.> col) docs')
mapM (look "_id") docs'
insertMany_ :: (DbConn m) => Collection -> [Document] -> m ()
insertMany_ :: (DbAccess m) => Collection -> [Document] -> m ()
-- ^ Same as 'insertMany' except don't return _ids
insertMany_ col docs = insertMany col docs >> return ()
@ -237,54 +251,54 @@ assignId doc = if X.any (("_id" ==) . label) doc
-- ** Update
save :: (DbConn m) => Collection -> Document -> m ()
save :: (DbAccess m) => Collection -> Document -> m ()
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or update it if its not new (has \"_id\" field)
save col doc = case look "_id" doc of
Nothing -> insert_ col doc
Just i -> repsert (Select ["_id" := i] col) doc
replace :: (DbConn m) => Selection -> Document -> m ()
replace :: (DbAccess m) => Selection -> Document -> m ()
-- ^ Replace first document in selection with given document
replace = update []
repsert :: (DbConn m) => Selection -> Document -> m ()
repsert :: (DbAccess m) => Selection -> Document -> m ()
-- ^ Replace first document in selection with given document, or insert document if selection is empty
repsert = update [Upsert]
type Modifier = Document
-- ^ Update operations on fields in a document. See <http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations>
modify :: (DbConn m) => Selection -> Modifier -> m ()
modify :: (DbAccess m) => Selection -> Modifier -> m ()
-- ^ Update all documents in selection using given modifier
modify = update [MultiUpdate]
update :: (DbConn m) => [UpdateOption] -> Selection -> Document -> m ()
update :: (DbAccess m) => [UpdateOption] -> Selection -> Document -> m ()
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
update opts (Select sel col) up = do
db <- thisDatabase
Database db <- thisDatabase
write (Update (db <.> col) opts sel up)
-- ** Delete
delete :: (DbConn m) => Selection -> m ()
delete :: (DbAccess m) => Selection -> m ()
-- ^ Delete all documents in selection
delete = delete' []
deleteOne :: (DbConn m) => Selection -> m ()
deleteOne :: (DbAccess m) => Selection -> m ()
-- ^ Delete first document in selection
deleteOne = delete' [SingleRemove]
delete' :: (DbConn m) => [DeleteOption] -> Selection -> m ()
delete' :: (DbAccess m) => [DeleteOption] -> Selection -> m ()
-- ^ Delete all documents in selection unless 'SingleRemove' option is given then only delete first document in selection
delete' opts (Select sel col) = do
db <- thisDatabase
Database db <- thisDatabase
write (Delete (db <.> col) opts sel)
-- * Read
-- ** MasterOrSlaveOk
slaveOk :: (Conn m) => m a -> m a
slaveOk :: (Access m) => m a -> m a
-- ^ Ok to execute given action against slave, ie. eventually consistent reads
slaveOk = push (const SlaveOk)
@ -347,7 +361,7 @@ batchSizeRemainingLimit batchSize limit = if limit == 0
queryRequest :: Bool -> MasterOrSlaveOk -> Query -> Database -> (Request, Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequest isExplain mos Query{..} db = (P.Query{..}, remainingLimit) where
queryRequest isExplain mos Query{..} (Database db) = (P.Query{..}, remainingLimit) where
qOptions = msOption mos ++ map pOption options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
@ -360,79 +374,80 @@ queryRequest isExplain mos Query{..} db = (P.Query{..}, remainingLimit) where
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState'
runQuery :: (DbAccess m) => Bool -> [Notice] -> Query -> m CursorState'
-- ^ Send query request and return cursor state
runQuery isExplain ns q = do
db <- thisDatabase
slaveOK <- context
call' ns (queryRequest isExplain slaveOK q db)
find :: (DbConn m) => Query -> m Cursor
find :: (DbAccess m) => Query -> m Cursor
-- ^ Fetch documents satisfying query
find q@Query{selection, batchSize} = do
db <- thisDatabase
cs' <- runQuery False [] q
newCursor db (coll selection) batchSize cs'
findOne' :: (DbConn m) => [Notice] -> Query -> m (Maybe Document)
findOne' :: (DbAccess m) => [Notice] -> Query -> m (Maybe Document)
-- ^ Send notices and fetch first document satisfying query or Nothing if none satisfy it
findOne' ns q = do
CS _ _ docs <- cursorState =<< runQuery False ns q {limit = 1}
return (listToMaybe docs)
findOne :: (DbConn m) => Query -> m (Maybe Document)
findOne :: (DbAccess m) => Query -> m (Maybe Document)
-- ^ Fetch first document satisfying query or Nothing if none satisfy it
findOne = findOne' []
explain :: (DbConn m) => Query -> m Document
explain :: (DbAccess m) => Query -> m Document
-- ^ Return performance stats of query execution
explain q = do -- same as findOne but with explain set to true
CS _ _ docs <- cursorState =<< runQuery True [] q {limit = 1}
return $ if null docs then error ("no explain: " ++ show q) else head docs
count :: (DbConn m) => Query -> m Int
count :: (DbAccess m) => Query -> m Int
-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present)
count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
distinct :: (DbConn m) => Label -> Selection -> m [Value]
distinct :: (DbAccess m) => Label -> Selection -> m [Value]
-- ^ Fetch distinct values of field in selected documents
distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
-- *** Cursor
data Cursor = Cursor FullCollection BatchSize (MVar CursorState')
-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the connection is closed, so you can open another connection to the same server and continue using the cursor.
-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the pipe is closed, so you can open another pipe to the same server and continue using the cursor.
modifyCursorState' :: (Conn m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Connected (ErrorT IOError IO) (CursorState', a)) -> m a
modifyCursorState' :: (Access m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', a)) -> m a
-- ^ Analogous to 'modifyMVar' but with Conn monad
modifyCursorState' (Cursor fcol batch var) act = do
conn <- context
wr <- context
mos <- context
pipe <- context
e <- liftIO . modifyMVar var $ \cs' -> do
ee <- runErrorT $ runConn (act fcol batch cs') conn
return $ case ee of
Right (Right (cs'', a)) -> (cs'', Right a)
Right (Left failure) -> (cs', Left $ throw failure)
Left ioerror -> (cs', Left $ throw ioerror)
e' <- runAction (act fcol batch cs') wr mos pipe
return $ case e' of
Right (cs'', a) -> (cs'', Right a)
Left failure -> (cs', Left $ throw failure)
either id return e
getCursorState :: (Conn m) => Cursor -> m CursorState
getCursorState :: (Access m) => Cursor -> m CursorState
-- ^ Extract current cursor status
getCursorState (Cursor _ _ var) = cursorState =<< liftIO (readMVar var)
data CursorState' =
Delayed (forall n. (Throw Failure n, Throw IOError n, MonadIO n) => n CursorState)
Delayed (forall n. (Throw Failure n, MonadIO n) => n CursorState)
| CursorState CursorState
-- ^ A cursor state or a promised cursor state which may fail
call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
call' :: (Access m) => [Notice] -> (Request, Limit) -> m CursorState'
-- ^ Send notices and request and return promised cursor state
call' ns (req, remainingLimit) = do
promise <- call ns req
return $ Delayed (fromReply remainingLimit =<< promise)
cursorState :: (Conn m) => CursorState' -> m CursorState
cursorState :: (Access m) => CursorState' -> m CursorState
-- ^ Convert promised cursor state to cursor state or failure
cursorState (Delayed promise) = promise
cursorState (CursorState cs) = return cs
@ -452,20 +467,22 @@ fromReply limit Reply{..} = do
CursorNotFound -> throw (CursorNotFoundFailure rCursorId)
QueryError -> throw (QueryFailure $ at "$err" $ head rDocuments)
newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor
newCursor :: (Access m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
newCursor db col batch cs = do
conn <- context
newCursor (Database db) col batch cs = do
wr <- context
mos <- context
pipe <- context
var <- liftIO (newMVar cs)
let cursor = Cursor (db <.> col) batch var
liftIO . addMVarFinalizer var $ runErrorT (runConn (close cursor) conn :: ErrorT IOError IO (Either Failure ())) >> return ()
liftIO . addMVarFinalizer var $ runAction (close cursor) wr mos pipe >> return ()
return cursor
next :: (Conn m) => Cursor -> m (Maybe Document)
next :: (Access m) => Cursor -> m (Maybe Document)
-- ^ Return next document in query result, or Nothing if finished.
next cursor = modifyCursorState' cursor nextState where
-- Pre-fetch next batch promise from server when last one in current batch is returned.
nextState :: FullCollection -> BatchSize -> CursorState' -> Connected (ErrorT IOError IO) (CursorState', Maybe Document)
nextState :: FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', Maybe Document)
nextState fcol batch cs' = do
CS limit cid docs <- cursorState cs'
case docs of
@ -480,15 +497,15 @@ next cursor = modifyCursorState' cursor nextState where
nextBatch fcol batch limit cid = call' [] (GetMore fcol batchSize cid, remLimit)
where (batchSize, remLimit) = batchSizeRemainingLimit batch limit
nextN :: (Conn m) => Int -> Cursor -> m [Document]
nextN :: (Access m) => Int -> Cursor -> m [Document]
-- ^ Return next N documents or less if end is reached
nextN n c = catMaybes <$> replicateM n (next c)
rest :: (Conn m) => Cursor -> m [Document]
rest :: (Access m) => Cursor -> m [Document]
-- ^ Return remaining documents in query result
rest c = loop (next c)
instance (Conn m) => Resource m Cursor where
instance (Access m) => Resource m Cursor where
close cursor = modifyCursorState' cursor kill' where
kill' _ _ cs' = first CursorState <$> (kill =<< cursorState cs')
kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]]
@ -521,13 +538,13 @@ groupDocument Group{..} =
"initial" =: gInitial,
"cond" =: gCond ]
group :: (DbConn m) => Group -> m [Document]
group :: (DbAccess m) => Group -> m [Document]
-- ^ Execute group query and return resulting aggregate value for each distinct key
group g = at "retval" <$> runCommand ["group" =: groupDocument g]
-- ** MapReduce
-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values from all lists to a single result. There are additional parameters that may be set to tweak this basic operation.
-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values to a single result. There are additional parameters that may be set to tweak this basic operation.
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
@ -536,7 +553,7 @@ data MapReduce = MapReduce {
rSort :: Order, -- ^ Default is [] meaning no sort
rLimit :: Limit, -- ^ Default is 0 meaning no limit
rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned.
rKeepTemp :: Bool, -- ^ If True, the temporary output collection is made permanent. If False, the temporary output collection persists for the life of the current connection only, however, other connections may read from it while the original one is still alive. Note, reading from a temporary collection after its original connection dies returns an empty result (not an error). The default for this attribute is False, unless 'rOut' is specified, then the collection permanent.
rKeepTemp :: Bool, -- ^ If True, the temporary output collection is made permanent. If False, the temporary output collection persists for the life of the current pipe only, however, other pipes may read from it while the original one is still alive. Note, reading from a temporary collection after its original pipe dies returns an empty result (not an error). The default for this attribute is False, unless 'rOut' is specified, then the collection permanent.
rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing.
rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is [].
rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False.
@ -546,7 +563,7 @@ type MapFun = Javascript
-- ^ @() -> void@. The map function references the variable @this@ to inspect the current object under consideration. The function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate.
type ReduceFun = Javascript
-- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible.
-- ^ @(key, [value]) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @reduce(k, [reduce(k,vs)]) == reduce(k,vs)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible.
type FinalizeFun = Javascript
-- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value.
@ -570,36 +587,36 @@ mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
-- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments.
mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False
runMR :: (DbConn m) => MapReduce -> m Cursor
runMR :: (DbAccess m) => MapReduce -> m Cursor
-- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript)
-- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when connection closes.
-- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when pipe closes.
runMR mr = find . query [] =<< (at "result" <$> runMR' mr)
runMR' :: (DbConn m) => MapReduce -> m Document
runMR' :: (DbAccess m) => MapReduce -> m Document
-- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript).
runMR' mr = do
doc <- runCommand (mrDocument mr)
return $ if true1 "ok" doc then doc else error $ at "errmsg" doc ++ " in:\n" ++ show mr
return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr
-- * Command
type Command = Document
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
runCommand' :: (DbConn m) => [Notice] -> Command -> m Document
runCommand' :: (DbAccess m) => [Notice] -> Command -> m Document
-- ^ Send notices then run command and return its result
runCommand' ns c = maybe err id <$> findOne' ns (query c "$cmd") where
err = error $ "Nothing returned for command: " ++ show c
runCommand :: (DbConn m) => Command -> m Document
runCommand :: (DbAccess m) => Command -> m Document
-- ^ Run command against the database and return its result
runCommand = runCommand' []
runCommand1 :: (DbConn m) => UString -> m Document
runCommand1 :: (DbAccess m) => UString -> m Document
-- ^ @runCommand1 foo = runCommand [foo =: 1]@
runCommand1 c = runCommand [c =: (1 :: Int)]
eval :: (DbConn m) => Javascript -> m Document
eval :: (DbAccess m) => Javascript -> m Document
-- ^ Run code on server
eval code = at "retval" <$> runCommand ["$eval" =: code]

View file

@ -4,11 +4,12 @@ mongoDB
About
-----
A mongoDB driver for Haskell, which lets you connect to MongoDB and do inserts, queries, updates, etc.
MongoDB driver for Haskell, which lets you connect to a MongoDB database management system and do inserts, queries, updates, etc.
Links
-----
* [MongoDB](http://www.mongodb.org)
* [mongoDB API reference](http://hackage.haskell.org/package/mongoDB)
* [tutorial](http://github.com/TonyGen/mongoDB-haskell/blob/master/tutorial.md)
* [map/reduce example](http://github.com/TonyGen/mongoDB-haskell/blob/master/map-reduce-example.md)

88
TODO
View file

@ -1,89 +1,53 @@
TODO
====
BSON
Bson
----
+ implement deprecated types (were left out)
+ on insert/update: reject keys that start with "$" or "."
+ data support for common mongo "$symbols"
+ convert from/to json
+ tie in with native regex like python does?
- on outgoing uncompile?
- on incoming automatically compile
+ more time convertibles
+ map operations for BsonDoc (or should it be applicative?)
+ tie regex type to type in regex library
+ Read instance for Documents that can read its Show representation
MongoDB
-------
+ support full level 0
- operations on database objects
* add_son_manipulators?
* dereference (dbref)
- database admin
* getProfilingInfo
- misc operations
* getCollectionOptions
- cursor object
* hasMore
- all commands listed on http://127.0.0.1:28017/_commands. (mongod --rest)
+ When one connection in a pool fails, close all other since they will likely fail too
+ on insert/update: reject keys that start with "$" or "."
+ dereference dbref
+ functions for every command, eg.
- findAndModify
- reIndex (http://www.mongodb.org/display/DOCS/Indexes#Indexes-ReIndex)
- safe write to two or more replicas
- Query attribute: timeout
- CreateIndex attributes: background, min, max
- CreateIndex Order [Asc, Dec, Geo2d]
- FindAndModify
- createIndex attributes: background, min, max
- createIndex Order [Asc, Dec, Geo2d]
- getIndexInfo
- logout
- collectionsInfo
- databasesInfo
- getLastError options
- Update If Current (http://www.mongodb.org/display/DOCS/Atomic+Operations)
- block write until written on N replicas
- lazyRest on cursor, although lazy I/O) is problematic and we may not want to support it.
- Upon client exit, send killCursors for all open cursors, otherwise server will keep them open for 10 minutes and keep NoCursorTimeout cursors open for hours.
-- Upon cursor finalize (garbage collect) send killCursor even if you have to create a new connection, because server keeps cursors open for 10 minutes (or more).
-- Query option Exhaust
optional:
- automatic reconnection
- buffer pooling
- connection pooling. Unsafe to shrink pool and close connections because map/reduce temp tables that were created on the connection will get deleted. Note, other connections can access a map/reduce temp table as long as the original connection is still alive. Also, other connections can access cursors created on other connections, even if those die. Cursors will be deleted on server only if idle for more than 10 minutes. Accessing a deleted cursor returns an error.
+ auto-destoy connection (how?/when?). Although, GHC will automatically close connection (Handle) when garbage collected.
+ don't read into cursor until needed, but have cursor send getMore before
it is actually out of docs (so network is finished by the time we're ready
to consume more)
Misc
----
+ learn more about haskelldb, anything we can learn from there
+ go through pymongo api and figure out what parts to adopt (also look
at other languages?)
+ kill prefix on data types "eg QO_*"?
+ javascript
- getProfileInfo
+ Query attribute: timeout
+ Update If Current (http://www.mongodb.org/display/DOCS/Atomic+Operations)
+ Upon client exit, send killCursors for all open cursors, otherwise server will keep them open for 10 minutes and keep NoCursorTimeout cursors open for hours.
+ Upon cursor finalize (garbage collect) send killCursor even if you have to create a new connection, because server keeps cursors open for 10 minutes (or more).
+ Query option Exhaust
+ Reconnect on replica set primary stepdown, so no exception raised to user
+ Reconnect on query ioerror re-query, so no exception raised to user. Can't be done for writes because it is not safe to re-execute a write.
+ tailable cursor support
- only close cursor when cursorID is 0
- have to create loop that sleeps and retries
- lazy list support
+ GridFS
Tests?
Documentation
- ref
GridFS
deep "lookup" function (other deep Map functions?)
Read instance for Documents that can read its Show representation
make sure NULLs aren't in created table names
update tutorial to match new python one
Tests - none currently
Misc
----
+ javascript DSL
+ update tutorial to match new python one
+ custom types (see python examples)
+ make BSON an instance of Binary (eg get/put)
Questions:
- In Mongo shell, db.foo.totalSize fetches storageSize of each index but does not use it
Notes:
- Remember that in the new version of MongoDB (>= 1.6), "ok" field can be a number (0 or 1) or boolean (False or True). Use 'true1' function defined in Database.MongoDB.Util
- A cursor will die on the server if not accessed (by any connection) within past 10 minutes (unless NoCursorTimeout option set). Accessing a dead (or non-existent) cursor raises a ServerFailure exception.
- A cursor will die on the server if not accessed (by any connection) within past 10 minutes (unless NoCursorTimeout option set). Accessing a dead (or non-existent) cursor raises a CursorNotFoundFailure.
- Unsafe to shrink pool and close connections because map/reduce temp tables that were created on the connection will get deleted. Note, other connections can access a map/reduce temp table as long as the original connection is still alive. Also, other connections can access cursors created on other connections, even if those die. Cursors will be deleted on server only if idle for more than 10 minutes. Accessing a deleted cursor returns an error.

View file

@ -18,7 +18,7 @@ MongoDB.Internal.Protocol defines the MongoDB Wire Protocol (http://www.mongodb.
MongoDB.Connection allows you to create a pipelined connection to a specific server or to a master/slave in a replica set.
MongoDB-Query defines the "connected" monad that has the current connection and database in context, and all the normal query and update operations you execute within this monad like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc.
MongoDB-Query defines the "access" monad that has the current connection and database in context, and all the normal query and update operations you execute within this monad like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc.
MongoDB-Admin defines all the administration operations like validateCollection, ensureIndex, dropIndex, addUser, copyDatabase, dbStats, setProfilingLevel, etc.

62
Var/Pool.hs Normal file
View file

@ -0,0 +1,62 @@
{- | Cycle through a set of resources (randomly), recreating them when they expire -}
{-# LANGUAGE RecordWildCards, NamedFieldPuns, FlexibleContexts #-}
module Var.Pool where
import Control.Applicative ((<$>), (<*>))
import Control.Monad.MVar
import Data.Array.IO
import Data.Maybe (catMaybes)
import Control.Monad.Error
import System.Random (randomRIO)
-- | Creator, destroyer, and checker of resources of type r. Creator may throw error or type e.
data Factory e r = Factory {
newResource :: ErrorT e IO r,
killResource :: r -> IO (),
isExpired :: r -> IO Bool }
newPool :: Factory e r -> Int -> IO (Pool e r)
-- ^ Create new pool of initial max size
newPool f n = do
arr <- newArray (0, n-1) Nothing
var <- newMVar arr
return (Pool f var)
data Pool e r = Pool {factory :: Factory e r, resources :: MVar (IOArray Int (Maybe r))}
-- ^ Pool of maximum N resources. Resources may expire on their own or be killed. Resources will initially be created on demand up N resources then recycled in random fashion. N may be changed by resizing the pool. Random is preferred to round-robin to distribute effect of pathological use cases that use every Xth resource the most and N is a multiple of X.
-- Resources *must* close/kill themselves when garbage collected ('resize' relies on this).
aResource :: (Error e) => Pool e r -> ErrorT e IO r
-- ^ Return a random live resource in pool or create new one if expired or not yet created
aResource Pool{..} = withMVar resources $ \array -> do
i <- liftIO $ randomRIO =<< getBounds array
mr <- liftIO $ readArray array i
r <- maybe (new array i) (check array i) mr
return r
where
new array i = do
r <- newResource factory
liftIO $ writeArray array i (Just r)
return r
check array i r = do
bad <- liftIO $ isExpired factory r
if bad then new array i else return r
poolSize :: Pool e r -> IO Int
-- ^ current max size of pool
poolSize Pool{resources} = withMVar resources (fmap rangeSize . getBounds)
resize :: Pool e r -> Int -> IO ()
-- ^ resize max size of pool. When shrinking some resource will be dropped without closing since they may still be in use. They are expected to close themselves when garbage collected.
resize Pool{resources} n = modifyMVar_ resources $ \array -> do
rs <- take n <$> getElems array
array <- newListArray (0, n-1) (rs ++ repeat Nothing)
return array
killAll :: Pool e r -> IO ()
-- ^ Kill all resources in pool so subsequent access creates new ones
killAll (Pool Factory{killResource} resources) = withMVar resources $ \array -> do
mapM_ killResource . catMaybes =<< getElems array
mapM_ (\i -> writeArray array i Nothing) . range =<< getBounds array

View file

@ -19,8 +19,8 @@ map/reduce queries on:
Prelude> :set prompt "> "
> :set -XOverloadedStrings
> import Database.MongoDB
> Right conn <- runNet $ connect $ host "localhost"
> let run act = runNet $ runConn (useDb "test" act) con
> conn <- connect 1 $ host "localhost"
> let run act = runConn safe Master conn $ use (Database "test") act
> :{
run $ insertMany "mr1" [
["x" =: 1, "tags" =: ["dog", "cat"]],

View file

@ -1,35 +1,51 @@
Name: mongoDB
Version: 0.7.1
License: OtherLicense
License-file: LICENSE
Maintainer: Tony Hannan <tony@10gen.com>
Author: Scott Parish <srp@srparish.net> & Tony Hannan <tony@10gen.com>
Copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc.
homepage: http://github.com/TonyGen/mongoDB-haskell
Category: Database
Synopsis: A driver for MongoDB
Description: This module lets you connect to MongoDB, do inserts, queries, updates, etc.
Stability: alpha
Build-Depends:
base < 5,
containers,
mtl,
binary,
bytestring,
network,
nano-md5,
parsec,
bson
Build-Type: Simple
Exposed-modules:
Control.Monad.Context,
Control.Monad.Throw,
Control.Pipeline,
Database.MongoDB.Internal.Util,
Database.MongoDB.Internal.Protocol,
Database.MongoDB.Connection,
Database.MongoDB.Query,
Database.MongoDB.Admin,
Database.MongoDB
ghc-options: -Wall -O2
name: mongoDB
version: 0.8
cabal-version: >=1.4
build-type: Simple
license: OtherLicense
license-file: LICENSE
copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc.
maintainer: Tony Hannan <tony@10gen.com>
build-depends: array -any, base <5, binary -any, bson -any,
bytestring -any, containers -any, mtl -any, nano-md5 -any,
network -any, parsec -any
stability: alpha
homepage: http://github.com/TonyGen/mongoDB-haskell
package-url:
bug-reports:
synopsis: A driver for MongoDB
description: This module lets you connect to MongoDB, do inserts, queries, updates, etc.
category: Database
author: Scott Parish <srp@srparish.net> & Tony Hannan <tony@10gen.com>
tested-with:
data-files:
data-dir: ""
extra-source-files:
extra-tmp-files:
exposed-modules: Control.Pipeline Control.Monad.Context
Control.Monad.Throw Database.MongoDB Database.MongoDB.Admin
Database.MongoDB.Connection Database.MongoDB.Query
Database.MongoDB.Internal.Protocol Database.MongoDB.Internal.Util
exposed: True
buildable: True
build-tools:
cpp-options:
cc-options:
ld-options:
pkgconfig-depends:
frameworks:
c-sources:
extensions:
extra-libraries:
extra-lib-dirs:
includes:
install-includes:
include-dirs:
hs-source-dirs: .
other-modules:
ghc-prof-options:
ghc-shared-options:
ghc-options: -Wall -O2
hugs-options:
nhc98-options:
jhc-options:

View file

@ -9,7 +9,7 @@ This is a mini tutorial to get you up and going with the basics
of the Haskell mongoDB drivers. It is modeled after the
[pymongo tutorial](http://api.mongodb.org/python/1.4%2B/tutorial.html).
You will need the mongoDB bindings installed as well as mongo itself installed.
You will need the mongoDB driver installed as well as mongo itself installed.
$ = command line prompt
> = ghci repl prompt
@ -35,13 +35,13 @@ Getting Ready
Start a MongoDB instance for us to play with:
$ mongod
$ mongod --dbpath <directory where Mongo will store the data>
Start up a haskell repl:
$ ghci
Now we'll need to bring in the MongoDB/BSON bindings and set
Now we'll need to bring in the MongoDB/Bson bindings and set
OverloadedStrings so literal strings are converted to UTF-8 automatically.
> import Database.MongoDB
@ -49,48 +49,58 @@ OverloadedStrings so literal strings are converted to UTF-8 automatically.
Making A Connection
-------------------
Open up a connection to your DB instance, using the standard port:
Open up a connection to your mongo server, using the standard port (27017):
> Right conn <- runNet $ connect $ host "127.0.0.1"
> conn <- connect 1 $ host "127.0.0.1"
or for a non-standard port
> Right conn <- runNet $ connect $ Host "127.0.0.1" (PortNumber 30000)
> conn <- connect 1 $ Host "127.0.0.1" (PortNumber 30000)
*connect* throws IOError if connection fails and *runNet* catches IOError and
returns it as Left. We are assuming above it won't fail. If it does you will get a
pattern match error.
*connect* takes the connection pool size and the host to connect to. It returns
a *Connection*, which is really a pool of TCP connections, initially created on demand.
So it is not possible to get a connection error until you try to use it.
Connected monad
Plain IO code in this driver never raises an exception unless it invokes third party IO
code that does. Driver code that may throw an exception says so in its Monad type,
for example, *ErrorT IOError IO a*.
Access monad
-------------------
The current connection is held in a Connected monad, and the current database
is held in a Reader monad on top of that. To run a connected monad, supply
it and a connection to *runConn*. To access a database within a connected
monad, call *useDb*.
A mongo query/update executes in an *Access* monad, which has access to a
*Pipe*, *WriteMode*, and *MasterSlaveOk* mode, and may throw a *Failure*. A Pipe
is a single TCP connection, while a Connection is a pool of Pipes.
To run an Access action (monad), supply WriteMode, MasterOrSlaveOk, Connection,
and action to *access*. For example, to get a list of all the database on the server:
> access safe Master conn allDatabases
Since we are working in ghci, which requires us to start from the
IO monad every time, we'll define a convenient *run* function that takes a
db-action and executes it against our "test" database on the server we
IO monad every time, we'll define a convenient *run* function that takes an
action and executes it against our "test" database on the server we
just connected to:
> let run action = runNet $ runConn (useDb "test" action) conn
> let run action = access safe Master conn $ use (Database "test") action
*runConn* return either Left Failure or Right result. Failure
means there was a read or write exception like cursor expired or duplicate key insert.
This combined with *runNet* means our *run* returns *(Either IOError (Either Failure a))*.
*access* return either Left Failure or Right result. Failure means there was a connection failure,
or a read or write exception like cursor expired or duplicate key insert.
*use* adds a *Database* to the action context, so query/update operations know which
database to operate on.
Databases and Collections
-----------------------------
A MongoDB can store multiple databases -- separate namespaces
MongoDB can store multiple databases -- separate namespaces
under which collections reside.
You can obtain the list of databases available on a connection:
> run allDatabases
The "test" database is ignored in this case because *allDatabases*
The "test" database in context is ignored in this case because *allDatabases*
is not a query on a specific database but on the server as a whole.
Databases and collections do not need to be created, just start using