Replace hard tabs with spaces

This commit is contained in:
Michael Snoyman 2013-12-26 16:57:33 +02:00
parent b128bc7a09
commit 756c9181cb
7 changed files with 575 additions and 575 deletions

View file

@ -40,10 +40,10 @@ Simple example below. Use with language extensions /OvererloadedStrings/ & /Exte
-}
module Database.MongoDB (
module Data.Bson,
module Database.MongoDB.Connection,
module Database.MongoDB.Query,
module Database.MongoDB.Admin
module Data.Bson,
module Database.MongoDB.Connection,
module Database.MongoDB.Query,
module Database.MongoDB.Admin
) where
import Data.Bson

View file

@ -3,28 +3,28 @@
{-# LANGUAGE FlexibleContexts, OverloadedStrings, RecordWildCards #-}
module Database.MongoDB.Admin (
-- * Admin
-- ** Collection
CollectionOption(..), createCollection, renameCollection, dropCollection,
-- * Admin
-- ** Collection
CollectionOption(..), createCollection, renameCollection, dropCollection,
validateCollection,
-- ** Index
Index(..), IndexName, index, ensureIndex, createIndex, dropIndex,
-- ** Index
Index(..), IndexName, index, ensureIndex, createIndex, dropIndex,
getIndexes, dropIndexes,
-- ** User
allUsers, addUser, removeUser,
-- ** Database
admin, cloneDatabase, copyDatabase, dropDatabase, repairDatabase,
-- ** Server
serverBuildInfo, serverVersion,
-- * Diagnotics
-- ** Collection
collectionStats, dataSize, storageSize, totalIndexSize, totalSize,
-- ** Profiling
ProfilingLevel(..), getProfilingLevel, MilliSec, setProfilingLevel,
-- ** Database
dbStats, OpNum, currentOp, killOp,
-- ** Server
serverStatus
-- ** User
allUsers, addUser, removeUser,
-- ** Database
admin, cloneDatabase, copyDatabase, dropDatabase, repairDatabase,
-- ** Server
serverBuildInfo, serverVersion,
-- * Diagnotics
-- ** Collection
collectionStats, dataSize, storageSize, totalIndexSize, totalSize,
-- ** Profiling
ProfilingLevel(..), getProfilingLevel, MilliSec, setProfilingLevel,
-- ** Database
dbStats, OpNum, currentOp, killOp,
-- ** Server
serverStatus
) where
import Prelude hiding (lookup)
@ -71,17 +71,17 @@ createCollection opts col = runCommand $ ["create" =: col] ++ map coptElem opts
renameCollection :: (MonadIO' m) => Collection -> Collection -> Action m Document
-- ^ Rename first collection to second collection
renameCollection from to = do
db <- thisDatabase
useDb admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
db <- thisDatabase
useDb admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
dropCollection :: (MonadIO' m) => Collection -> Action m Bool
-- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action).
dropCollection coll = do
resetIndexCache
r <- runCommand ["drop" =: coll]
if true1 "ok" r then return True else do
if at "errmsg" r == ("ns not found" :: Text) then return False else
fail $ "dropCollection failed: " ++ show r
resetIndexCache
r <- runCommand ["drop" =: coll]
if true1 "ok" r then return True else do
if at "errmsg" r == ("ns not found" :: Text) then return False else
fail $ "dropCollection failed: " ++ show r
validateCollection :: (MonadIO' m) => Collection -> Action m Document
-- ^ This operation takes a while
@ -92,20 +92,20 @@ validateCollection coll = runCommand ["validate" =: coll]
type IndexName = Text
data Index = Index {
iColl :: Collection,
iKey :: Order,
iName :: IndexName,
iUnique :: Bool,
iDropDups :: Bool
} deriving (Show, Eq)
iColl :: Collection,
iKey :: Order,
iName :: IndexName,
iUnique :: Bool,
iDropDups :: Bool
} deriving (Show, Eq)
idxDocument :: Index -> Database -> Document
idxDocument Index{..} db = [
"ns" =: db <.> iColl,
"key" =: iKey,
"name" =: iName,
"unique" =: iUnique,
"dropDups" =: iDropDups ]
"ns" =: db <.> iColl,
"key" =: iKey,
"name" =: iName,
"unique" =: iUnique,
"dropDups" =: iDropDups ]
index :: Collection -> Order -> Index
-- ^ Spec of index of ordered keys on collection. Name is generated from keys. Unique and dropDups are False.
@ -113,16 +113,16 @@ index coll keys = Index coll keys (genName keys) False False
genName :: Order -> IndexName
genName keys = T.intercalate "_" (map f keys) where
f (k := v) = k `T.append` "_" `T.append` T.pack (show v)
f (k := v) = k `T.append` "_" `T.append` T.pack (show v)
ensureIndex :: (MonadIO' m) => Index -> Action m ()
-- ^ Create index if we did not already create one. May be called repeatedly with practically no performance hit, because we remember if we already called this for the same index (although this memory gets wiped out every 15 minutes, in case another client drops the index and we want to create it again).
ensureIndex idx = let k = (iColl idx, iName idx) in do
icache <- fetchIndexCache
set <- liftIO (readIORef icache)
unless (Set.member k set) $ do
accessMode master (createIndex idx)
liftIO $ writeIORef icache (Set.insert k set)
icache <- fetchIndexCache
set <- liftIO (readIORef icache)
unless (Set.member k set) $ do
accessMode master (createIndex idx)
liftIO $ writeIORef icache (Set.insert k set)
createIndex :: (MonadIO' m) => Index -> Action m ()
-- ^ Create index on the server. This call goes to the server every time.
@ -131,20 +131,20 @@ createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase
dropIndex :: (MonadIO' m) => Collection -> IndexName -> Action m Document
-- ^ Remove the index
dropIndex coll idxName = do
resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: idxName]
resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: idxName]
getIndexes :: (MonadIO m, MonadBaseControl IO m, Functor m) => Collection -> Action m [Document]
-- ^ Get all indexes on this collection
getIndexes coll = do
db <- thisDatabase
rest =<< find (select ["ns" =: db <.> coll] "system.indexes")
db <- thisDatabase
rest =<< find (select ["ns" =: db <.> coll] "system.indexes")
dropIndexes :: (MonadIO' m) => Collection -> Action m Document
-- ^ Drop all indexes on this collection
dropIndexes coll = do
resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: ("*" :: Text)]
resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: ("*" :: Text)]
-- *** Index cache
@ -156,48 +156,48 @@ type IndexCache = IORef (Set (Collection, IndexName))
dbIndexCache :: DbIndexCache
-- ^ initialize cache and fork thread that clears it every 15 minutes
dbIndexCache = unsafePerformIO $ do
table <- H.new
_ <- forkIO . forever $ threadDelay 900000000 >> clearDbIndexCache
return table
table <- H.new
_ <- forkIO . forever $ threadDelay 900000000 >> clearDbIndexCache
return table
{-# NOINLINE dbIndexCache #-}
clearDbIndexCache :: IO ()
clearDbIndexCache = do
keys <- map fst <$> H.toList dbIndexCache
mapM_ (H.delete dbIndexCache) keys
keys <- map fst <$> H.toList dbIndexCache
mapM_ (H.delete dbIndexCache) keys
fetchIndexCache :: (MonadIO m) => Action m IndexCache
-- ^ Get index cache for current database
fetchIndexCache = do
db <- thisDatabase
liftIO $ do
mc <- H.lookup dbIndexCache db
maybe (newIdxCache db) return mc
db <- thisDatabase
liftIO $ do
mc <- H.lookup dbIndexCache db
maybe (newIdxCache db) return mc
where
newIdxCache db = do
idx <- newIORef Set.empty
H.insert dbIndexCache db idx
return idx
newIdxCache db = do
idx <- newIORef Set.empty
H.insert dbIndexCache db idx
return idx
resetIndexCache :: (MonadIO m) => Action m ()
-- ^ reset index cache for current database
resetIndexCache = do
icache <- fetchIndexCache
liftIO (writeIORef icache Set.empty)
icache <- fetchIndexCache
liftIO (writeIORef icache Set.empty)
-- ** User
allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document]
-- ^ Fetch all users of this database
allUsers = map (exclude ["_id"]) <$> (rest =<< find
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
addUser :: (MonadIO' m) => Bool -> Username -> Password -> Action m ()
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do
mu <- findOne (select ["user" =: user] "system.users")
let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" usr
mu <- findOne (select ["user" =: user] "system.users")
let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" usr
removeUser :: (MonadIO m) => Username -> Action m ()
removeUser user = delete (select ["user" =: user] "system.users")
@ -215,12 +215,12 @@ cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromH
copyDatabase :: (MonadIO' m) => Database -> Host -> Maybe (Username, Password) -> Database -> Action m Document
-- ^ Copy database from given host to the server I am connected to. If username & password is supplied use them to read from given host.
copyDatabase fromDb fromHost mup toDb = do
let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb]
useDb admin $ case mup of
Nothing -> runCommand c
Just (usr, pss) -> do
n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost]
runCommand $ c ++ ["username" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb]
useDb admin $ case mup of
Nothing -> runCommand c
Just (usr, pss) -> do
n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost]
runCommand $ c ++ ["username" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
dropDatabase :: (MonadIO' m) => Database -> Action m Document
-- ^ Delete the given database!
@ -256,11 +256,11 @@ totalIndexSize c = at "totalIndexSize" <$> collectionStats c
totalSize :: (MonadIO m, MonadBaseControl IO m, MonadIO' m) => Collection -> Action m Int
totalSize coll = do
x <- storageSize coll
xs <- mapM isize =<< getIndexes coll
return (foldl (+) x xs)
x <- storageSize coll
xs <- mapM isize =<< getIndexes coll
return (foldl (+) x xs)
where
isize idx = at "storageSize" <$> collectionStats (coll `T.append` ".$" `T.append` at "name" idx)
isize idx = at "storageSize" <$> collectionStats (coll `T.append` ".$" `T.append` at "name" idx)
-- ** Profiling
@ -273,7 +273,7 @@ type MilliSec = Int
setProfilingLevel :: (MonadIO' m) => ProfilingLevel -> Maybe MilliSec -> Action m ()
setProfilingLevel p mSlowMs =
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
-- ** Database

View file

@ -3,16 +3,16 @@
{-# LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections #-}
module Database.MongoDB.Connection (
-- * Util
Secs, IOE, runIOE,
-- * Connection
Pipe, close, isClosed,
-- * Server
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
-- * Util
Secs, IOE, runIOE,
-- * Connection
Pipe, close, isClosed,
-- * Server
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect',
-- * Replica Set
ReplicaSetName, openReplicaSet, openReplicaSet',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
-- * Replica Set
ReplicaSetName, openReplicaSet, openReplicaSet',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where
import Prelude hiding (lookup)
@ -49,10 +49,10 @@ import System.IO.Pipeline (IOE, close, isClosed)
adminCommand :: Command -> Pipe -> IOE Document
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
adminCommand cmd pipe =
liftIOE failureToIOError . ErrorT $ access pipe slaveOk "admin" $ runCommand cmd
liftIOE failureToIOError . ErrorT $ access pipe slaveOk "admin" $ runCommand cmd
where
failureToIOError (ConnectionFailure e) = e
failureToIOError e = userError $ show e
failureToIOError (ConnectionFailure e) = e
failureToIOError e = userError $ show e
-- * Host
@ -70,26 +70,26 @@ showHostPort :: Host -> String
-- ^ Display host as \"host:port\"
-- TODO: Distinguish Service and UnixSocket port
showHostPort (Host hostname port) = hostname ++ ":" ++ portname where
portname = case port of
Service s -> s
PortNumber p -> show p
portname = case port of
Service s -> s
PortNumber p -> show p
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
UnixSocket s -> s
UnixSocket s -> s
#endif
readHostPortM :: (Monad m) => String -> m Host
-- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
-- TODO: handle Service and UnixSocket port
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
h <- hostname
try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
h <- hostname
try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)
readHostPort :: String -> Host
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
@ -109,10 +109,10 @@ connect h = lift (readIORef globalConnectTimeout) >>= flip connect' h
connect' :: Secs -> Host -> IOE Pipe
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds.
connect' timeoutSecs (Host hostname port) = do
handle <- ErrorT . E.try $ do
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
maybe (ioError $ userError "connect timed out") return mh
lift $ newPipe handle
handle <- ErrorT . E.try $ do
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
maybe (ioError $ userError "connect timed out") return mh
lift $ newPipe handle
-- * Replica Set
@ -132,10 +132,10 @@ openReplicaSet rsSeed = lift (readIORef globalConnectTimeout) >>= flip openRepli
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IOE ReplicaSet
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
openReplicaSet' timeoutSecs (rsName, seedList) = do
vMembers <- newMVar (map (, Nothing) seedList)
let rs = ReplicaSet rsName vMembers timeoutSecs
_ <- updateMembers rs
return rs
vMembers <- newMVar (map (, Nothing) seedList)
let rs = ReplicaSet rsName vMembers timeoutSecs
_ <- updateMembers rs
return rs
closeReplicaSet :: ReplicaSet -> IO ()
-- ^ Close all connections to replica set
@ -144,18 +144,18 @@ closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (re
primary :: ReplicaSet -> IOE Pipe
-- ^ Return connection to current primary of replica set. Fail if no primary available.
primary rs@(ReplicaSet rsName _ _) = do
mHost <- statedPrimary <$> updateMembers rs
case mHost of
Just host' -> connection rs Nothing host'
Nothing -> throwError $ userError $ "replica set " ++ T.unpack rsName ++ " has no primary"
mHost <- statedPrimary <$> updateMembers rs
case mHost of
Just host' -> connection rs Nothing host'
Nothing -> throwError $ userError $ "replica set " ++ T.unpack rsName ++ " has no primary"
secondaryOk :: ReplicaSet -> IOE Pipe
-- ^ Return connection to a random secondary, or primary if no secondaries available.
secondaryOk rs = do
info <- updateMembers rs
hosts <- lift $ shuffle (possibleHosts info)
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
untilSuccess (connection rs Nothing) hosts'
info <- updateMembers rs
hosts <- lift $ shuffle (possibleHosts info)
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
untilSuccess (connection rs Nothing) hosts'
routedHost :: ((Host, Bool) -> (Host, Bool) -> IOE Ordering) -> ReplicaSet -> IOE Pipe
-- ^ Return a connection to a host using a user-supplied sorting function, which sorts based on a tuple containing the host and a boolean indicating whether the host is primary.
@ -180,37 +180,37 @@ possibleHosts (_, info) = map readHostPort $ at "hosts" info
updateMembers :: ReplicaSet -> IOE ReplicaInfo
-- ^ Fetch replica info from any server and update members accordingly
updateMembers rs@(ReplicaSet _ vMembers _) = do
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
modifyMVar vMembers $ \members -> do
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
lift $ forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe
return (members' ++ map (, Nothing) new, (host', info))
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
modifyMVar vMembers $ \members -> do
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
lift $ forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe
return (members' ++ map (, Nothing) new, (host', info))
where
intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection keys assocs = (partition (flip elem inKeys . fst) assocs, keys \\ inKeys) where
assocKeys = map fst assocs
inKeys = intersect keys assocKeys
intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
intersection keys assocs = (partition (flip elem inKeys . fst) assocs, keys \\ inKeys) where
assocKeys = map fst assocs
inKeys = intersect keys assocKeys
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IOE ReplicaInfo
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
pipe <- connection rs mPipe host'
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
case B.lookup "setName" info of
Nothing -> throwError $ userError $ show host' ++ " not a member of any replica set, including " ++ T.unpack rsName ++ ": " ++ show info
Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info
Just _ -> return (host', info)
pipe <- connection rs mPipe host'
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
case B.lookup "setName" info of
Nothing -> throwError $ userError $ show host' ++ " not a member of any replica set, including " ++ T.unpack rsName ++ ": " ++ show info
Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info
Just _ -> return (host', info)
connection :: ReplicaSet -> Maybe Pipe -> Host -> IOE Pipe
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
maybe conn (\p -> lift (isClosed p) >>= \bad -> if bad then conn else return p) mPipe
maybe conn (\p -> lift (isClosed p) >>= \bad -> if bad then conn else return p) mPipe
where
conn = modifyMVar vMembers $ \members -> do
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
case List.lookup host' members of
Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe)
_ -> new
conn = modifyMVar vMembers $ \members -> do
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
case List.lookup host' members of
Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe)
_ -> new
{- Authors: Tony Hannan <tony@10gen.com>

View file

@ -9,17 +9,17 @@
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
module Database.MongoDB.Internal.Protocol (
FullCollection,
-- * Pipe
Pipe, newPipe, send, call,
-- ** Notice
Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
-- ** Request
Request(..), QueryOption(..),
-- ** Reply
Reply(..), ResponseFlag(..),
-- * Authentication
Username, Password, Nonce, pwHash, pwKey
FullCollection,
-- * Pipe
Pipe, newPipe, send, call,
-- ** Notice
Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
-- ** Request
Request(..), QueryOption(..),
-- ** Reply
Reply(..), ResponseFlag(..),
-- * Authentication
Username, Password, Nonce, pwHash, pwKey
) where
import Control.Applicative ((<$>))
@ -68,12 +68,12 @@ send pipe notices = P.send pipe (notices, Nothing)
call :: Pipe -> [Notice] -> Request -> IOE (IOE Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call pipe notices request = do
requestId <- genRequestId
promise <- P.call pipe (notices, Just (request, requestId))
return $ check requestId <$> promise
requestId <- genRequestId
promise <- P.call pipe (notices, Just (request, requestId))
return $ check requestId <$> promise
where
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- * Message
@ -84,17 +84,17 @@ type Message = ([Notice], Maybe (Request, RequestId))
writeMessage :: Handle -> Message -> IOE ()
-- ^ Write message to socket
writeMessage handle (notices, mRequest) = ErrorT . try $ do
forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId
whenJust mRequest $ writeReq . (Right *** id)
hFlush handle
forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId
whenJust mRequest $ writeReq . (Right *** id)
hFlush handle
where
writeReq (e, requestId) = do
L.hPut handle lenBytes
L.hPut handle bytes
where
bytes = runPut $ (either putNotice putRequest e) requestId
lenBytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4)
writeReq (e, requestId) = do
L.hPut handle lenBytes
L.hPut handle bytes
where
bytes = runPut $ (either putNotice putRequest e) requestId
lenBytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4)
type Response = (ResponseTo, Reply)
-- ^ Message received from a Mongo server in response to a Request
@ -102,10 +102,10 @@ type Response = (ResponseTo, Reply)
readMessage :: Handle -> IOE Response
-- ^ read response from socket
readMessage handle = ErrorT $ try readResp where
readResp = do
len <- fromEnum . decodeSize <$> hGetN handle 4
runGet getReply <$> hGetN handle len
decodeSize = subtract 4 . runGet getInt32
readResp = do
len <- fromEnum . decodeSize <$> hGetN handle 4
runGet getReply <$> hGetN handle len
decodeSize = subtract 4 . runGet getInt32
type FullCollection = Text
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"
@ -122,58 +122,58 @@ type ResponseTo = RequestId
genRequestId :: (MonadIO m) => m RequestId
-- ^ Generate fresh request id
genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where
counter :: IORef RequestId
counter = unsafePerformIO (newIORef 0)
{-# NOINLINE counter #-}
counter :: IORef RequestId
counter = unsafePerformIO (newIORef 0)
{-# NOINLINE counter #-}
-- *** Binary format
putHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putHeader opcode requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
putInt32 requestId
putInt32 0
putInt32 opcode
getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader = do
_requestId <- getInt32
responseTo <- getInt32
opcode <- getInt32
return (opcode, responseTo)
_requestId <- getInt32
responseTo <- getInt32
opcode <- getInt32
return (opcode, responseTo)
-- ** Notice
-- | A notice is a message that is sent with no reply
data Notice =
Insert {
iFullCollection :: FullCollection,
iOptions :: [InsertOption],
iDocuments :: [Document]}
| Update {
uFullCollection :: FullCollection,
uOptions :: [UpdateOption],
uSelector :: Document,
uUpdater :: Document}
| Delete {
dFullCollection :: FullCollection,
dOptions :: [DeleteOption],
dSelector :: Document}
| KillCursors {
kCursorIds :: [CursorId]}
deriving (Show, Eq)
Insert {
iFullCollection :: FullCollection,
iOptions :: [InsertOption],
iDocuments :: [Document]}
| Update {
uFullCollection :: FullCollection,
uOptions :: [UpdateOption],
uSelector :: Document,
uUpdater :: Document}
| Delete {
dFullCollection :: FullCollection,
dOptions :: [DeleteOption],
dSelector :: Document}
| KillCursors {
kCursorIds :: [CursorId]}
deriving (Show, Eq)
data InsertOption = KeepGoing -- ^ If set, the database will not stop processing a bulk insert if one fails (eg due to duplicate IDs). This makes bulk insert behave similarly to a series of single inserts, except lastError will be set if any insert fails, not just the last one. (new in 1.9.1)
deriving (Show, Eq)
deriving (Show, Eq)
data UpdateOption =
Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
| MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
deriving (Show, Eq)
Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
| MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
deriving (Show, Eq)
data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
deriving (Show, Eq)
deriving (Show, Eq)
type CursorId = Int64
@ -187,27 +187,27 @@ nOpcode KillCursors{} = 2007
putNotice :: Notice -> RequestId -> Put
putNotice notice requestId = do
putHeader (nOpcode notice) requestId
case notice of
Insert{..} -> do
putInt32 (iBits iOptions)
putCString iFullCollection
mapM_ putDocument iDocuments
Update{..} -> do
putInt32 0
putCString uFullCollection
putInt32 (uBits uOptions)
putDocument uSelector
putDocument uUpdater
Delete{..} -> do
putInt32 0
putCString dFullCollection
putInt32 (dBits dOptions)
putDocument dSelector
KillCursors{..} -> do
putInt32 0
putInt32 $ toEnum (length kCursorIds)
mapM_ putInt64 kCursorIds
putHeader (nOpcode notice) requestId
case notice of
Insert{..} -> do
putInt32 (iBits iOptions)
putCString iFullCollection
mapM_ putDocument iDocuments
Update{..} -> do
putInt32 0
putCString uFullCollection
putInt32 (uBits uOptions)
putDocument uSelector
putDocument uUpdater
Delete{..} -> do
putInt32 0
putCString dFullCollection
putInt32 (dBits dOptions)
putDocument dSelector
KillCursors{..} -> do
putInt32 0
putInt32 $ toEnum (length kCursorIds)
mapM_ putInt64 kCursorIds
iBit :: InsertOption -> Int32
iBit KeepGoing = bit 0
@ -232,28 +232,28 @@ dBits = bitOr . map dBit
-- | A request is a message that is sent with a 'Reply' expected in return
data Request =
Query {
qOptions :: [QueryOption],
qFullCollection :: FullCollection,
qSkip :: Int32, -- ^ Number of initial matching documents to skip
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document, -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ \[\] = return whole document
} | GetMore {
gFullCollection :: FullCollection,
gBatchSize :: Int32,
gCursorId :: CursorId}
deriving (Show, Eq)
Query {
qOptions :: [QueryOption],
qFullCollection :: FullCollection,
qSkip :: Int32, -- ^ Number of initial matching documents to skip
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document, -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ \[\] = return whole document
} | GetMore {
gFullCollection :: FullCollection,
gBatchSize :: Int32,
gCursorId :: CursorId}
deriving (Show, Eq)
data QueryOption =
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
| NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
| NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
-- Exhaust commented out because not compatible with current `Pipeline` implementation
| Partial -- ^ Get partial results from a _mongos_ if some shards are down, instead of throwing an error.
deriving (Show, Eq)
| Partial -- ^ Get partial results from a _mongos_ if some shards are down, instead of throwing an error.
deriving (Show, Eq)
-- *** Binary format
@ -263,20 +263,20 @@ qOpcode GetMore{} = 2005
putRequest :: Request -> RequestId -> Put
putRequest request requestId = do
putHeader (qOpcode request) requestId
case request of
Query{..} -> do
putInt32 (qBits qOptions)
putCString qFullCollection
putInt32 qSkip
putInt32 qBatchSize
putDocument qSelector
unless (null qProjector) (putDocument qProjector)
GetMore{..} -> do
putInt32 0
putCString gFullCollection
putInt32 gBatchSize
putInt64 gCursorId
putHeader (qOpcode request) requestId
case request of
Query{..} -> do
putInt32 (qBits qOptions)
putCString qFullCollection
putInt32 qSkip
putInt32 qBatchSize
putDocument qSelector
unless (null qProjector) (putDocument qProjector)
GetMore{..} -> do
putInt32 0
putCString gFullCollection
putInt32 gBatchSize
putInt64 gCursorId
qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1
@ -293,17 +293,17 @@ qBits = bitOr . map qBit
-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
rResponseFlags :: [ResponseFlag],
rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32,
rDocuments :: [Document]
} deriving (Show, Eq)
rResponseFlags :: [ResponseFlag],
rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32,
rDocuments :: [Document]
} deriving (Show, Eq)
data ResponseFlag =
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
| QueryError -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
| AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
deriving (Show, Eq, Enum)
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
| QueryError -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
| AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
deriving (Show, Eq, Enum)
-- * Binary format
@ -312,14 +312,14 @@ replyOpcode = 1
getReply :: Get (ResponseTo, Reply)
getReply = do
(opcode, responseTo) <- getHeader
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
(opcode, responseTo) <- getHeader
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
rFlags :: Int32 -> [ResponseFlag]
rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..]

View file

@ -98,7 +98,7 @@ runIOE (ErrorT action) = action >>= either ioError return
updateAssocs :: (Eq k) => k -> v -> [(k, v)] -> [(k, v)]
-- ^ Change or insert value of key in association list
updateAssocs key valu assocs = case back of [] -> (key, valu) : front; _ : back' -> front ++ (key, valu) : back'
where (front, back) = break ((key ==) . fst) assocs
where (front, back) = break ((key ==) . fst) assocs
bitOr :: (Num a, Bits a) => [a] -> a
-- ^ bit-or all numbers together
@ -111,20 +111,20 @@ a <.> b = T.append a (T.cons '.' b)
true1 :: Label -> Document -> Bool
-- ^ Is field's value a 1 or True (MongoDB use both Int and Bools for truth values). Error if field not in document or field not a Num or Bool.
true1 k doc = case valueAt k doc of
Bool b -> b
Float n -> n == 1
Int32 n -> n == 1
Int64 n -> n == 1
_ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc
Bool b -> b
Float n -> n == 1
Int32 n -> n == 1
Int64 n -> n == 1
_ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc
hGetN :: Handle -> Int -> IO L.ByteString
-- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then raise EOF exception.
hGetN h n = assert (n >= 0) $ do
bytes <- L.hGet h n
let x = fromEnum $ L.length bytes
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "hGetN" (Just h) Nothing)
else L.append bytes <$> hGetN h (n - x)
bytes <- L.hGet h n
let x = fromEnum $ L.length bytes
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "hGetN" (Just h) Nothing)
else L.append bytes <$> hGetN h (n - x)
byteStringHex :: S.ByteString -> String
-- ^ Hexadecimal string representation of a byte string. Each byte yields two hexadecimal characters.

View file

@ -3,43 +3,43 @@
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP #-}
module Database.MongoDB.Query (
-- * Monad
Action, access, Failure(..), ErrorCode,
AccessMode(..), GetLastError, master, slaveOk, accessMode,
MonadDB(..),
-- * Database
Database, allDatabases, useDb, thisDatabase,
-- ** Authentication
Username, Password, auth,
-- * Collection
Collection, allCollections,
-- ** Selection
Selection(..), Selector, whereJS,
Select(select),
-- * Write
-- ** Insert
insert, insert_, insertMany, insertMany_, insertAll, insertAll_,
-- ** Update
save, replace, repsert, Modifier, modify,
-- ** Delete
delete, deleteOne,
-- * Read
-- ** Query
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
-- * Monad
Action, access, Failure(..), ErrorCode,
AccessMode(..), GetLastError, master, slaveOk, accessMode,
MonadDB(..),
-- * Database
Database, allDatabases, useDb, thisDatabase,
-- ** Authentication
Username, Password, auth,
-- * Collection
Collection, allCollections,
-- ** Selection
Selection(..), Selector, whereJS,
Select(select),
-- * Write
-- ** Insert
insert, insert_, insertMany, insertMany_, insertAll, insertAll_,
-- ** Update
save, replace, repsert, Modifier, modify,
-- ** Delete
delete, deleteOne,
-- * Read
-- ** Query
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
Projector, Limit, Order, BatchSize,
explain, find, findOne, fetch, findAndModify, count, distinct,
-- *** Cursor
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,
-- ** Aggregate
Pipeline, aggregate,
-- ** Group
Group(..), GroupKey(..), group,
-- ** MapReduce
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..),
explain, find, findOne, fetch, findAndModify, count, distinct,
-- *** Cursor
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,
-- ** Aggregate
Pipeline, aggregate,
-- ** Group
Group(..), GroupKey(..), group,
-- ** MapReduce
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..),
MRResult, mapReduce, runMR, runMR',
-- * Command
Command, runCommand, runCommand1,
eval,
-- * Command
Command, runCommand, runCommand1,
eval,
) where
import Prelude hiding (lookup)
@ -93,7 +93,7 @@ import qualified Database.MongoDB.Internal.Protocol as P
-- * Monad
newtype Action m a = Action {unAction :: ErrorT Failure (ReaderT Context m) a}
deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure)
deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure)
-- ^ A monad on top of m (which must be a MonadIO) that may access the database and may fail with a DB 'Failure'
instance MonadBase b m => MonadBase b (Action m) where
@ -121,13 +121,13 @@ access myPipe myAccessMode myDatabase (Action action) = runReaderT (runErrorT ac
-- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key.
-- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
data Failure =
ConnectionFailure IOError -- ^ TCP connection ('Pipeline') failed. May work if you try again on the same Mongo 'Connection' which will create a new Pipe.
| CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
| QueryFailure ErrorCode String -- ^ Query failed for some reason as described in the string
| WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string
| DocNotFound Selection -- ^ 'fetch' found no document matching selection
| AggregateFailure String -- ^ 'aggregate' returned an error
deriving (Show, Eq)
ConnectionFailure IOError -- ^ TCP connection ('Pipeline') failed. May work if you try again on the same Mongo 'Connection' which will create a new Pipe.
| CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
| QueryFailure ErrorCode String -- ^ Query failed for some reason as described in the string
| WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string
| DocNotFound Selection -- ^ 'fetch' found no document matching selection
| AggregateFailure String -- ^ 'aggregate' returned an error
deriving (Show, Eq)
type ErrorCode = Int
-- ^ Error code from getLastError or query failure
@ -137,9 +137,9 @@ instance Error Failure where strMsg = error
-- | Type of reads and writes to perform
data AccessMode =
ReadStaleOk -- ^ Read-only action, reading stale data from a slave is OK.
| UnconfirmedWrites -- ^ Read-write action, slave not OK, every write is fire & forget.
| ConfirmWrites GetLastError -- ^ Read-write action, slave not OK, every write is confirmed with getLastError.
ReadStaleOk -- ^ Read-only action, reading stale data from a slave is OK.
| UnconfirmedWrites -- ^ Read-write action, slave not OK, every write is fire & forget.
| ConfirmWrites GetLastError -- ^ Read-write action, slave not OK, every write is confirmed with getLastError.
deriving Show
type GetLastError = Document
@ -168,9 +168,9 @@ writeMode (ConfirmWrites z) = Confirm z
-- | Values needed when executing a db operation
data Context = Context {
myPipe :: Pipe, -- ^ operations read/write to this pipelined TCP connection to a MongoDB server
myAccessMode :: AccessMode, -- ^ read/write operation will use this access mode
myDatabase :: Database } -- ^ operations query/update this database
myPipe :: Pipe, -- ^ operations read/write to this pipelined TCP connection to a MongoDB server
myAccessMode :: AccessMode, -- ^ read/write operation will use this access mode
myDatabase :: Database } -- ^ operations query/update this database
myReadMode :: Context -> ReadMode
myReadMode = readMode . myAccessMode
@ -181,40 +181,40 @@ myWriteMode = writeMode . myAccessMode
send :: (MonadIO m) => [Notice] -> Action m ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails.
send ns = Action $ do
pipe <- asks myPipe
liftIOE ConnectionFailure $ P.send pipe ns
pipe <- asks myPipe
liftIOE ConnectionFailure $ P.send pipe ns
call :: (MonadIO m) => [Notice] -> Request -> Action m (ErrorT Failure IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive.
call ns r = Action $ do
pipe <- asks myPipe
promise <- liftIOE ConnectionFailure $ P.call pipe ns r
return (liftIOE ConnectionFailure promise)
pipe <- asks myPipe
promise <- liftIOE ConnectionFailure $ P.call pipe ns r
return (liftIOE ConnectionFailure promise)
-- | If you stack a monad on top of 'Action' then make it an instance of this class and use 'liftDB' to execute a DB Action within it. Instances already exist for the basic mtl transformers.
class (Monad m, MonadBaseControl IO (BaseMonad m), Applicative (BaseMonad m), Functor (BaseMonad m)) => MonadDB m where
type BaseMonad m :: * -> *
liftDB :: Action (BaseMonad m) a -> m a
type BaseMonad m :: * -> *
liftDB :: Action (BaseMonad m) a -> m a
instance (MonadBaseControl IO m, Applicative m, Functor m) => MonadDB (Action m) where
type BaseMonad (Action m) = m
liftDB = id
type BaseMonad (Action m) = m
liftDB = id
instance (MonadDB m, Error e) => MonadDB (ErrorT e m) where
type BaseMonad (ErrorT e m) = BaseMonad m
liftDB = lift . liftDB
type BaseMonad (ErrorT e m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m) => MonadDB (ReaderT r m) where
type BaseMonad (ReaderT r m) = BaseMonad m
liftDB = lift . liftDB
type BaseMonad (ReaderT r m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m) => MonadDB (StateT s m) where
type BaseMonad (StateT s m) = BaseMonad m
liftDB = lift . liftDB
type BaseMonad (StateT s m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m, Monoid w) => MonadDB (WriterT w m) where
type BaseMonad (WriterT w m) = BaseMonad m
liftDB = lift . liftDB
type BaseMonad (WriterT w m) = BaseMonad m
liftDB = lift . liftDB
instance (MonadDB m, Monoid w) => MonadDB (RWST r w s m) where
type BaseMonad (RWST r w s m) = BaseMonad m
liftDB = lift . liftDB
type BaseMonad (RWST r w s m) = BaseMonad m
liftDB = lift . liftDB
-- * Database
@ -237,8 +237,8 @@ useDb db (Action act) = Action $ local (\ctx -> ctx {myDatabase = db}) act
auth :: (MonadIO' m) => Username -> Password -> Action m Bool
-- ^ Authenticate with the current database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe.
auth usr pss = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
-- * Collection
@ -248,12 +248,12 @@ type Collection = Text
allCollections :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Collection]
-- ^ List all collections in this database
allCollections = do
db <- thisDatabase
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
db <- thisDatabase
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
where
dropDbPrefix = T.tail . T.dropWhile (/= '.')
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
dropDbPrefix = T.tail . T.dropWhile (/= '.')
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
-- * Selection
@ -268,32 +268,32 @@ whereJS :: Selector -> Javascript -> Selector
whereJS sel js = ("$where" =: js) : sel
class Select aQueryOrSelection where
select :: Selector -> Collection -> aQueryOrSelection
-- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @find (select sel col)@ it is a Query, and in @delete (select sel col)@ it is a Selection.
select :: Selector -> Collection -> aQueryOrSelection
-- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @find (select sel col)@ it is a Query, and in @delete (select sel col)@ it is a Selection.
instance Select Selection where
select = Select
select = Select
instance Select Query where
select = query
select = query
-- * Write
data WriteMode =
NoConfirm -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not.
| Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
deriving (Show, Eq)
NoConfirm -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not.
| Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
deriving (Show, Eq)
write :: (MonadIO m) => Notice -> Action m ()
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
write notice = Action (asks myWriteMode) >>= \mode -> case mode of
NoConfirm -> send [notice]
Confirm params -> do
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
Batch _ _ [doc] <- fulfill =<< request [notice] =<< queryRequest False q {limit = 1}
case lookup "err" doc of
Nothing -> return ()
Just err -> throwError $ WriteFailure (maybe 0 id $ lookup "code" doc) err
NoConfirm -> send [notice]
Confirm params -> do
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
Batch _ _ [doc] <- fulfill =<< request [notice] =<< queryRequest False q {limit = 1}
case lookup "err" doc of
Nothing -> return ()
Just err -> throwError $ WriteFailure (maybe 0 id $ lookup "code" doc) err
-- ** Insert
@ -324,24 +324,24 @@ insertAll_ col docs = insertAll col docs >> return ()
insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value]
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
insert' opts col docs = do
db <- thisDatabase
docs' <- liftIO $ mapM assignId docs
write (Insert (db <.> col) opts docs')
return $ map (valueAt "_id") docs'
db <- thisDatabase
docs' <- liftIO $ mapM assignId docs
write (Insert (db <.> col) opts docs')
return $ map (valueAt "_id") docs'
assignId :: Document -> IO Document
-- ^ Assign a unique value to _id field if missing
assignId doc = if any (("_id" ==) . label) doc
then return doc
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId
then return doc
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId
-- ** Update
save :: (MonadIO' m) => Collection -> Document -> Action m ()
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or update it if its not new (has \"_id\" field)
save col doc = case look "_id" doc of
Nothing -> insert_ col doc
Just i -> repsert (Select ["_id" := i] col) doc
Nothing -> insert_ col doc
Just i -> repsert (Select ["_id" := i] col) doc
replace :: (MonadIO m) => Selection -> Document -> Action m ()
-- ^ Replace first document in selection with given document
@ -361,8 +361,8 @@ modify = update [MultiUpdate]
update :: (MonadIO m) => [UpdateOption] -> Selection -> Document -> Action m ()
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
update opts (Select sel col) up = do
db <- thisDatabase
write (Update (db <.> col) opts sel up)
db <- thisDatabase
write (Update (db <.> col) opts sel up)
-- ** Delete
@ -377,15 +377,15 @@ deleteOne = delete' [SingleRemove]
delete' :: (MonadIO m) => [DeleteOption] -> Selection -> Action m ()
-- ^ Delete all documents in selection unless 'SingleRemove' option is given then only delete first document in selection
delete' opts (Select sel col) = do
db <- thisDatabase
write (Delete (db <.> col) opts sel)
db <- thisDatabase
write (Delete (db <.> col) opts sel)
-- * Read
data ReadMode =
Fresh -- ^ read from master only
| StaleOk -- ^ read from slave ok
deriving (Show, Eq)
Fresh -- ^ read from master only
| StaleOk -- ^ read from slave ok
deriving (Show, Eq)
readModeOption :: ReadMode -> [QueryOption]
readModeOption Fresh = []
@ -395,16 +395,16 @@ readModeOption StaleOk = [SlaveOK]
-- | Use 'select' to create a basic query with defaults, then modify if desired. For example, @(select sel col) {limit = 10}@
data Query = Query {
options :: [QueryOption], -- ^ Default = []
selection :: Selection,
project :: Projector, -- ^ \[\] = all fields. Default = []
skip :: Word32, -- ^ Number of initial matching documents to skip. Default = 0
limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit. Default = 0
sort :: Order, -- ^ Sort results by this order, [] = no sort. Default = []
snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = False
batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0
hint :: Order -- ^ Force MongoDB to use this index, [] = no hint. Default = []
} deriving (Show, Eq)
options :: [QueryOption], -- ^ Default = []
selection :: Selection,
project :: Projector, -- ^ \[\] = all fields. Default = []
skip :: Word32, -- ^ Number of initial matching documents to skip. Default = 0
limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit. Default = 0
sort :: Order, -- ^ Sort results by this order, [] = no sort. Default = []
snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = False
batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0
hint :: Order -- ^ Force MongoDB to use this index, [] = no hint. Default = []
} deriving (Show, Eq)
type Projector = Document
-- ^ Fields to return, analogous to the select clause in SQL. @[]@ means return whole document (analogous to * in SQL). @[\"x\" =: 1, \"y\" =: 1]@ means return only @x@ and @y@ fields of each document. @[\"x\" =: 0]@ means return all fields except @x@.
@ -425,15 +425,15 @@ query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
find :: (MonadIO m, MonadBaseControl IO m) => Query -> Action m Cursor
-- ^ Fetch documents satisfying query
find q@Query{selection, batchSize} = do
db <- thisDatabase
dBatch <- request [] =<< queryRequest False q
newCursor db (coll selection) batchSize dBatch
db <- thisDatabase
dBatch <- request [] =<< queryRequest False q
newCursor db (coll selection) batchSize dBatch
findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
-- ^ Fetch first document satisfying query or Nothing if none satisfy it
findOne q = do
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest False q {limit = 1}
return (listToMaybe docs)
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest False q {limit = 1}
return (listToMaybe docs)
fetch :: (MonadIO m) => Query -> Action m Document
-- ^ Same as 'findOne' except throw 'DocNotFound' if none match
@ -481,14 +481,14 @@ findAndModify (Query {
explain :: (MonadIO m) => Query -> Action m Document
-- ^ Return performance stats of query execution
explain q = do -- same as findOne but with explain set to true
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest True q {limit = 1}
return $ if null docs then error ("no explain: " ++ show q) else head docs
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest True q {limit = 1}
return $ if null docs then error ("no explain: " ++ show q) else head docs
count :: (MonadIO' m) => Query -> Action m Int
-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present)
count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
distinct :: (MonadIO' m) => Label -> Selection -> Action m [Value]
-- ^ Fetch distinct values of field in selected documents
@ -497,31 +497,31 @@ distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "ke
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequest isExplain Query{..} = do
ctx <- Action ask
return $ queryRequest' (myReadMode ctx) (myDatabase ctx)
ctx <- Action ask
return $ queryRequest' (myReadMode ctx) (myDatabase ctx)
where
queryRequest' rm db = (P.Query{..}, remainingLimit) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit
qProjector = project
mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
mHint = if null hint then Nothing else Just ("$hint" =: hint)
mExplain = if isExplain then Just ("$explain" =: True) else Nothing
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
queryRequest' rm db = (P.Query{..}, remainingLimit) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit
qProjector = project
mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
mHint = if null hint then Nothing else Just ("$hint" =: hint)
mExplain = if isExplain then Just ("$explain" =: True) else Nothing
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit)
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
batchSizeRemainingLimit batchSize limit = if limit == 0
then (fromIntegral batchSize', 0) -- no limit
else if 0 < batchSize' && batchSize' < limit
then (fromIntegral batchSize', limit - batchSize')
else (- fromIntegral limit, 1)
then (fromIntegral batchSize', 0) -- no limit
else if 0 < batchSize' && batchSize' < limit
then (fromIntegral batchSize', limit - batchSize')
else (- fromIntegral limit, 1)
where batchSize' = if batchSize == 1 then 2 else batchSize
-- batchSize 1 is broken because server converts 1 to -1 meaning limit 1
-- batchSize 1 is broken because server converts 1 to -1 meaning limit 1
type DelayedBatch = ErrorT Failure IO Batch
-- ^ A promised batch which may fail
@ -532,20 +532,20 @@ data Batch = Batch Limit CursorId [Document]
request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch
-- ^ Send notices and request and return promised batch
request ns (req, remainingLimit) = do
promise <- call ns req
return $ fromReply remainingLimit =<< promise
promise <- call ns req
return $ fromReply remainingLimit =<< promise
fromReply :: Limit -> Reply -> DelayedBatch
-- ^ Convert Reply to Batch or Failure
fromReply limit Reply{..} = do
mapM_ checkResponseFlag rResponseFlags
return (Batch limit rCursorId rDocuments)
mapM_ checkResponseFlag rResponseFlags
return (Batch limit rCursorId rDocuments)
where
-- If response flag indicates failure then throw it, otherwise do nothing
checkResponseFlag flag = case flag of
AwaitCapable -> return ()
CursorNotFound -> throwError $ CursorNotFoundFailure rCursorId
QueryError -> throwError $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
-- If response flag indicates failure then throw it, otherwise do nothing
checkResponseFlag flag = case flag of
AwaitCapable -> return ()
CursorNotFound -> throwError $ CursorNotFoundFailure rCursorId
QueryError -> throwError $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
fulfill :: (MonadIO m) => DelayedBatch -> Action m Batch
-- ^ Demand and wait for result, raise failure if exception
@ -559,10 +559,10 @@ data Cursor = Cursor FullCollection BatchSize (MVar DelayedBatch)
newCursor :: (MonadIO m, MonadBaseControl IO m) => Database -> Collection -> BatchSize -> DelayedBatch -> Action m Cursor
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
newCursor db col batchSize dBatch = do
var <- newMVar dBatch
let cursor = Cursor (db <.> col) batchSize var
_ <- mkWeakMVar var (closeCursor cursor)
return cursor
var <- newMVar dBatch
let cursor = Cursor (db <.> col) batchSize var
_ <- mkWeakMVar var (closeCursor cursor)
return cursor
#if !MIN_VERSION_base(4,6,0)
where mkWeakMVar = addMVarFinalizer
#endif
@ -570,39 +570,39 @@ newCursor db col batchSize dBatch = do
nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
-- ^ Return next batch of documents in query result, which will be empty if finished.
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
-- Pre-fetch next batch promise from server and return current batch.
Batch limit cid docs <- fulfill' fcol batchSize dBatch
dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 [])
return (dBatch', docs)
-- Pre-fetch next batch promise from server and return current batch.
Batch limit cid docs <- fulfill' fcol batchSize dBatch
dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 [])
return (dBatch', docs)
fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch
-- Discard pre-fetched batch if empty with nonzero cid.
fulfill' fcol batchSize dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs
then nextBatch' fcol batchSize limit cid >>= fulfill
else return b
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs
then nextBatch' fcol batchSize limit cid >>= fulfill
else return b
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Limit -> CursorId -> Action m DelayedBatch
nextBatch' fcol batchSize limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document)
-- ^ Return next document in query result, or Nothing if finished.
next (Cursor fcol batchSize var) = modifyMVar var nextState where
-- Pre-fetch next batch promise from server when last one in current batch is returned.
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
nextState dBatch = do
Batch limit cid docs <- fulfill' fcol batchSize dBatch
case docs of
doc : docs' -> do
dBatch' <- if null docs' && cid /= 0
then nextBatch' fcol batchSize limit cid
else return $ return (Batch limit cid docs')
return (dBatch', Just doc)
[] -> if cid == 0
then return (return $ Batch 0 0 [], Nothing) -- finished
else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid
-- Pre-fetch next batch promise from server when last one in current batch is returned.
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
nextState dBatch = do
Batch limit cid docs <- fulfill' fcol batchSize dBatch
case docs of
doc : docs' -> do
dBatch' <- if null docs' && cid /= 0
then nextBatch' fcol batchSize limit cid
else return $ return (Batch limit cid docs')
return (dBatch', Just doc)
[] -> if cid == 0
then return (return $ Batch 0 0 [], Nothing) -- finished
else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid
nextN :: (MonadIO m, MonadBaseControl IO m, Functor m) => Int -> Cursor -> Action m [Document]
-- ^ Return next N documents or less if end is reached
@ -614,14 +614,14 @@ rest c = loop (next c)
closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m ()
closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do
Batch _ cid _ <- fulfill dBatch
unless (cid == 0) $ send [KillCursors [cid]]
return $ (return $ Batch 0 0 [], ())
Batch _ cid _ <- fulfill dBatch
unless (cid == 0) $ send [KillCursors [cid]]
return $ (return $ Batch 0 0 [], ())
isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool
isCursorClosed (Cursor _ _ var) = do
Batch _ cid docs <- fulfill =<< readMVar var
return (cid == 0 && null docs)
Batch _ cid docs <- fulfill =<< readMVar var
return (cid == 0 && null docs)
-- ** Aggregate
@ -631,22 +631,22 @@ type Pipeline = [Document]
aggregate :: MonadIO' m => Collection -> Pipeline -> Action m [Document]
-- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details.
aggregate aColl agg = do
response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg]
case true1 "ok" response of
True -> lookup "result" response
False -> throwError $ AggregateFailure $ at "errmsg" response
response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg]
case true1 "ok" response of
True -> lookup "result" response
False -> throwError $ AggregateFailure $ at "errmsg" response
-- ** Group
-- | Groups documents in collection by key then reduces (aggregates) each group
data Group = Group {
gColl :: Collection,
gKey :: GroupKey, -- ^ Fields to group by
gReduce :: Javascript, -- ^ @(doc, agg) -> ()@. The reduce function reduces (aggregates) the objects iterated. Typical operations of a reduce function include summing and counting. It takes two arguments, the current document being iterated over and the aggregation value, and updates the aggregate value.
gInitial :: Document, -- ^ @agg@. Initial aggregation value supplied to reduce
gCond :: Selector, -- ^ Condition that must be true for a row to be considered. [] means always true.
gFinalize :: Maybe Javascript -- ^ @agg -> () | result@. An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields).
} deriving (Show, Eq)
gColl :: Collection,
gKey :: GroupKey, -- ^ Fields to group by
gReduce :: Javascript, -- ^ @(doc, agg) -> ()@. The reduce function reduces (aggregates) the objects iterated. Typical operations of a reduce function include summing and counting. It takes two arguments, the current document being iterated over and the aggregation value, and updates the aggregate value.
gInitial :: Document, -- ^ @agg@. Initial aggregation value supplied to reduce
gCond :: Selector, -- ^ Condition that must be true for a row to be considered. [] means always true.
gFinalize :: Maybe Javascript -- ^ @agg -> () | result@. An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields).
} deriving (Show, Eq)
data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq)
-- ^ Fields to group by, or function (@doc -> key@) returning a "key object" to be used as the grouping key. Use KeyF instead of Key to specify a key that is not an existing member of the object (or, to access embedded members).
@ -654,12 +654,12 @@ data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq)
groupDocument :: Group -> Document
-- ^ Translate Group data into expected document form
groupDocument Group{..} =
("finalize" =? gFinalize) ++ [
"ns" =: gColl,
case gKey of Key k -> "key" =: map (=: True) k; KeyF f -> "$keyf" =: f,
"$reduce" =: gReduce,
"initial" =: gInitial,
"cond" =: gCond ]
("finalize" =? gFinalize) ++ [
"ns" =: gColl,
case gKey of Key k -> "key" =: map (=: True) k; KeyF f -> "$keyf" =: f,
"$reduce" =: gReduce,
"initial" =: gInitial,
"cond" =: gCond ]
group :: (MonadIO' m) => Group -> Action m [Document]
-- ^ Execute group query and return resulting aggregate value for each distinct key
@ -670,17 +670,17 @@ group g = at "retval" <$> runCommand ["group" =: groupDocument g]
-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values to a single result. There are additional parameters that may be set to tweak this basic operation.
-- This implements the latest version of map-reduce that requires MongoDB 1.7.4 or greater. To map-reduce against an older server use runCommand directly as described in http://www.mongodb.org/display/DOCS/MapReduce.
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
rReduce :: ReduceFun,
rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents.
rSort :: Order, -- ^ Default is [] meaning no sort
rLimit :: Limit, -- ^ Default is 0 meaning no limit
rOut :: MROut, -- ^ Output to a collection with a certain merge policy. Default is no collection ('Inline'). Note, you don't want this default if your result set is large.
rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing.
rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is [].
rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False.
} deriving (Show, Eq)
rColl :: Collection,
rMap :: MapFun,
rReduce :: ReduceFun,
rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents.
rSort :: Order, -- ^ Default is [] meaning no sort
rLimit :: Limit, -- ^ Default is 0 meaning no limit
rOut :: MROut, -- ^ Output to a collection with a certain merge policy. Default is no collection ('Inline'). Note, you don't want this default if your result set is large.
rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing.
rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is [].
rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False.
} deriving (Show, Eq)
type MapFun = Javascript
-- ^ @() -> void@. The map function references the variable @this@ to inspect the current object under consideration. The function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate.
@ -692,15 +692,15 @@ type FinalizeFun = Javascript
-- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value.
data MROut =
Inline -- ^ Return results directly instead of writing them to an output collection. Results must fit within 16MB limit of a single document
| Output MRMerge Collection (Maybe Database) -- ^ Write results to given collection, in other database if specified. Follow merge policy when entry already exists
deriving (Show, Eq)
Inline -- ^ Return results directly instead of writing them to an output collection. Results must fit within 16MB limit of a single document
| Output MRMerge Collection (Maybe Database) -- ^ Write results to given collection, in other database if specified. Follow merge policy when entry already exists
deriving (Show, Eq)
data MRMerge =
Replace -- ^ Clear all old data and replace it with new data
| Merge -- ^ Leave old data but overwrite entries with the same key with new data
| Reduce -- ^ Leave old data but combine entries with the same key via MR's reduce function
deriving (Show, Eq)
Replace -- ^ Clear all old data and replace it with new data
| Merge -- ^ Leave old data but overwrite entries with the same key with new data
| Reduce -- ^ Leave old data but combine entries with the same key via MR's reduce function
deriving (Show, Eq)
type MRResult = Document
-- ^ Result of running a MapReduce has some stats besides the output. See http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Resultobject
@ -708,26 +708,26 @@ type MRResult = Document
mrDocument :: MapReduce -> Document
-- ^ Translate MapReduce data into expected document form
mrDocument MapReduce{..} =
("mapreduce" =: rColl) :
("out" =: mrOutDoc rOut) :
("finalize" =? rFinalize) ++ [
"map" =: rMap,
"reduce" =: rReduce,
"query" =: rSelect,
"sort" =: rSort,
"limit" =: (fromIntegral rLimit :: Int),
"scope" =: rScope,
"verbose" =: rVerbose ]
("mapreduce" =: rColl) :
("out" =: mrOutDoc rOut) :
("finalize" =? rFinalize) ++ [
"map" =: rMap,
"reduce" =: rReduce,
"query" =: rSelect,
"sort" =: rSort,
"limit" =: (fromIntegral rLimit :: Int),
"scope" =: rScope,
"verbose" =: rVerbose ]
mrOutDoc :: MROut -> Document
-- ^ Translate MROut into expected document form
mrOutDoc Inline = ["inline" =: (1 :: Int)]
mrOutDoc (Output mrMerge coll mDB) = (mergeName mrMerge =: coll) : mdb mDB where
mergeName Replace = "replace"
mergeName Merge = "merge"
mergeName Reduce = "reduce"
mdb Nothing = []
mdb (Just db) = ["db" =: db]
mergeName Replace = "replace"
mergeName Merge = "merge"
mergeName Reduce = "reduce"
mdb Nothing = []
mdb (Just db) = ["db" =: db]
mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
-- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments.
@ -736,18 +736,18 @@ mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
runMR :: (MonadIO m, MonadBaseControl IO m, Applicative m) => MapReduce -> Action m Cursor
-- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript)
runMR mr = do
res <- runMR' mr
case look "result" res of
Just (String coll) -> find $ query [] coll
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
Just x -> error $ "unexpected map-reduce result field: " ++ show x
Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res)
res <- runMR' mr
case look "result" res of
Just (String coll) -> find $ query [] coll
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
Just x -> error $ "unexpected map-reduce result field: " ++ show x
Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res)
runMR' :: (MonadIO' m) => MapReduce -> Action m MRResult
-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).
runMR' mr = do
doc <- runCommand (mrDocument mr)
return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr
doc <- runCommand (mrDocument mr)
return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr
-- * Command
@ -757,7 +757,7 @@ type Command = Document
runCommand :: (MonadIO' m) => Command -> Action m Document
-- ^ Run command against the database and return its result
runCommand c = maybe err id <$> findOne (query c "$cmd") where
err = error $ "Nothing returned for command: " ++ show c
err = error $ "Nothing returned for command: " ++ show c
runCommand1 :: (MonadIO' m) => Text -> Action m Document
-- ^ @runCommand1 foo = runCommand [foo =: 1]@

View file

@ -12,11 +12,11 @@ A pipeline closes itself when a read or write causes an error, so you can detect
#endif
module System.IO.Pipeline (
IOE,
-- * IOStream
IOStream(..),
-- * Pipeline
Pipeline, newPipeline, send, call, close, isClosed
IOE,
-- * IOStream
IOStream(..),
-- * Pipeline
Pipeline, newPipeline, send, call, close, isClosed
) where
import Prelude hiding (length)
@ -43,9 +43,9 @@ mkWeakMVar = addMVarFinalizer
onException :: (Monad m) => ErrorT e m a -> m () -> ErrorT e m a
-- ^ If first action throws an exception then run second action then re-throw
onException (ErrorT action) releaser = ErrorT $ do
e <- action
either (const releaser) (const $ return ()) e
return e
e <- action
either (const releaser) (const $ return ()) e
return e
type IOE = ErrorT IOError IO
-- ^ IO monad with explicit error
@ -54,59 +54,59 @@ type IOE = ErrorT IOError IO
-- | An IO sink and source where value of type @o@ are sent and values of type @i@ are received.
data IOStream i o = IOStream {
writeStream :: o -> IOE (),
readStream :: IOE i,
closeStream :: IO () }
writeStream :: o -> IOE (),
readStream :: IOE i,
closeStream :: IO () }
-- * Pipeline
-- | Thread-safe and pipelined connection
data Pipeline i o = Pipeline {
vStream :: MVar (IOStream i o), -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan (MVar (Either IOError i)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
listenThread :: ThreadId
}
vStream :: MVar (IOStream i o), -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan (MVar (Either IOError i)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
listenThread :: ThreadId
}
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: IOStream i o -> IO (Pipeline i o)
newPipeline stream = do
vStream <- newMVar stream
responseQueue <- newChan
rec
let pipe = Pipeline{..}
listenThread <- forkIO (listen pipe)
_ <- mkWeakMVar vStream $ do
killThread listenThread
closeStream stream
return pipe
vStream <- newMVar stream
responseQueue <- newChan
rec
let pipe = Pipeline{..}
listenThread <- forkIO (listen pipe)
_ <- mkWeakMVar vStream $ do
killThread listenThread
closeStream stream
return pipe
close :: Pipeline i o -> IO ()
-- ^ Close pipe and underlying connection
close Pipeline{..} = do
killThread listenThread
closeStream =<< readMVar vStream
killThread listenThread
closeStream =<< readMVar vStream
isClosed :: Pipeline i o -> IO Bool
isClosed Pipeline{listenThread} = do
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: Pipeline i o -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order
listen Pipeline{..} = do
stream <- readMVar vStream
forever $ do
e <- runErrorT $ readStream stream
var <- readChan responseQueue
putMVar var e
case e of
Left err -> closeStream stream >> ioError err -- close and stop looping
Right _ -> return ()
stream <- readMVar vStream
forever $ do
e <- runErrorT $ readStream stream
var <- readChan responseQueue
putMVar var e
case e of
Left err -> closeStream stream >> ioError err -- close and stop looping
Right _ -> return ()
send :: Pipeline i o -> o -> IOE ()
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
@ -117,11 +117,11 @@ call :: Pipeline i o -> o -> IOE (IOE i)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
call p@Pipeline{..} message = withMVar vStream doCall `onException` close p where
doCall stream = do
writeStream stream message
var <- newEmptyMVar
liftIO $ writeChan responseQueue var
return $ ErrorT (readMVar var) -- return promise
doCall stream = do
writeStream stream message
var <- newEmptyMVar
liftIO $ writeChan responseQueue var
return $ ErrorT (readMVar var) -- return promise
{- Authors: Tony Hannan <tony@10gen.com>