Merge branch 'pr-20'
Conflicts: Database/MongoDB/Query.hs
This commit is contained in:
commit
adb57dce72
9 changed files with 684 additions and 805 deletions
|
@ -30,7 +30,7 @@ module Database.MongoDB.Admin (
|
|||
import Prelude hiding (lookup)
|
||||
import Control.Applicative ((<$>))
|
||||
import Control.Concurrent (forkIO, threadDelay)
|
||||
import Control.Monad (forever, unless)
|
||||
import Control.Monad (forever, unless, liftM)
|
||||
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
|
||||
import Data.Set (Set)
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
@ -47,7 +47,7 @@ import qualified Data.Text as T
|
|||
|
||||
import Database.MongoDB.Connection (Host, showHostPort)
|
||||
import Database.MongoDB.Internal.Protocol (pwHash, pwKey)
|
||||
import Database.MongoDB.Internal.Util (MonadIO', (<.>), true1)
|
||||
import Database.MongoDB.Internal.Util ((<.>), true1)
|
||||
import Database.MongoDB.Query (Action, Database, Collection, Username, Password,
|
||||
Order, Query(..), accessMode, master, runCommand,
|
||||
useDb, thisDatabase, rest, select, find, findOne,
|
||||
|
@ -64,17 +64,17 @@ coptElem Capped = "capped" =: True
|
|||
coptElem (MaxByteSize n) = "size" =: n
|
||||
coptElem (MaxItems n) = "max" =: n
|
||||
|
||||
createCollection :: (MonadIO' m) => [CollectionOption] -> Collection -> Action m Document
|
||||
createCollection :: (MonadIO m) => [CollectionOption] -> Collection -> Action m Document
|
||||
-- ^ Create collection with given options. You only need to call this to set options, otherwise a collection is created automatically on first use with no options.
|
||||
createCollection opts col = runCommand $ ["create" =: col] ++ map coptElem opts
|
||||
|
||||
renameCollection :: (MonadIO' m) => Collection -> Collection -> Action m Document
|
||||
renameCollection :: (MonadIO m) => Collection -> Collection -> Action m Document
|
||||
-- ^ Rename first collection to second collection
|
||||
renameCollection from to = do
|
||||
db <- thisDatabase
|
||||
useDb admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
|
||||
|
||||
dropCollection :: (MonadIO' m) => Collection -> Action m Bool
|
||||
dropCollection :: (MonadIO m) => Collection -> Action m Bool
|
||||
-- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action).
|
||||
dropCollection coll = do
|
||||
resetIndexCache
|
||||
|
@ -83,7 +83,7 @@ dropCollection coll = do
|
|||
if at "errmsg" r == ("ns not found" :: Text) then return False else
|
||||
fail $ "dropCollection failed: " ++ show r
|
||||
|
||||
validateCollection :: (MonadIO' m) => Collection -> Action m Document
|
||||
validateCollection :: (MonadIO m) => Collection -> Action m Document
|
||||
-- ^ This operation takes a while
|
||||
validateCollection coll = runCommand ["validate" =: coll]
|
||||
|
||||
|
@ -115,7 +115,7 @@ genName :: Order -> IndexName
|
|||
genName keys = T.intercalate "_" (map f keys) where
|
||||
f (k := v) = k `T.append` "_" `T.append` T.pack (show v)
|
||||
|
||||
ensureIndex :: (MonadIO' m) => Index -> Action m ()
|
||||
ensureIndex :: (MonadIO m) => Index -> Action m ()
|
||||
-- ^ Create index if we did not already create one. May be called repeatedly with practically no performance hit, because we remember if we already called this for the same index (although this memory gets wiped out every 15 minutes, in case another client drops the index and we want to create it again).
|
||||
ensureIndex idx = let k = (iColl idx, iName idx) in do
|
||||
icache <- fetchIndexCache
|
||||
|
@ -124,11 +124,11 @@ ensureIndex idx = let k = (iColl idx, iName idx) in do
|
|||
accessMode master (createIndex idx)
|
||||
liftIO $ writeIORef icache (Set.insert k set)
|
||||
|
||||
createIndex :: (MonadIO' m) => Index -> Action m ()
|
||||
createIndex :: (MonadIO m) => Index -> Action m ()
|
||||
-- ^ Create index on the server. This call goes to the server every time.
|
||||
createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase
|
||||
|
||||
dropIndex :: (MonadIO' m) => Collection -> IndexName -> Action m Document
|
||||
dropIndex :: (MonadIO m) => Collection -> IndexName -> Action m Document
|
||||
-- ^ Remove the index
|
||||
dropIndex coll idxName = do
|
||||
resetIndexCache
|
||||
|
@ -140,7 +140,7 @@ getIndexes coll = do
|
|||
db <- thisDatabase
|
||||
rest =<< find (select ["ns" =: db <.> coll] "system.indexes")
|
||||
|
||||
dropIndexes :: (MonadIO' m) => Collection -> Action m Document
|
||||
dropIndexes :: (MonadIO m) => Collection -> Action m Document
|
||||
-- ^ Drop all indexes on this collection
|
||||
dropIndexes coll = do
|
||||
resetIndexCache
|
||||
|
@ -192,7 +192,7 @@ allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document]
|
|||
allUsers = map (exclude ["_id"]) <$> (rest =<< find
|
||||
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
|
||||
|
||||
addUser :: (MonadIO' m) => Bool -> Username -> Password -> Action m ()
|
||||
addUser :: (MonadIO m) => Bool -> Username -> Password -> Action m ()
|
||||
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
|
||||
addUser readOnly user pass = do
|
||||
mu <- findOne (select ["user" =: user] "system.users")
|
||||
|
@ -208,76 +208,76 @@ admin :: Database
|
|||
-- ^ \"admin\" database
|
||||
admin = "admin"
|
||||
|
||||
cloneDatabase :: (MonadIO' m) => Database -> Host -> Action m Document
|
||||
cloneDatabase :: (MonadIO m) => Database -> Host -> Action m Document
|
||||
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
|
||||
cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost]
|
||||
|
||||
copyDatabase :: (MonadIO' m) => Database -> Host -> Maybe (Username, Password) -> Database -> Action m Document
|
||||
copyDatabase :: (MonadIO m) => Database -> Host -> Maybe (Username, Password) -> Database -> Action m Document
|
||||
-- ^ Copy database from given host to the server I am connected to. If username & password is supplied use them to read from given host.
|
||||
copyDatabase fromDb fromHost mup toDb = do
|
||||
let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb]
|
||||
useDb admin $ case mup of
|
||||
Nothing -> runCommand c
|
||||
Just (usr, pss) -> do
|
||||
n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost]
|
||||
n <- at "nonce" `liftM` runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost]
|
||||
runCommand $ c ++ ["username" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
|
||||
|
||||
dropDatabase :: (MonadIO' m) => Database -> Action m Document
|
||||
dropDatabase :: (MonadIO m) => Database -> Action m Document
|
||||
-- ^ Delete the given database!
|
||||
dropDatabase db = useDb db $ runCommand ["dropDatabase" =: (1 :: Int)]
|
||||
|
||||
repairDatabase :: (MonadIO' m) => Database -> Action m Document
|
||||
repairDatabase :: (MonadIO m) => Database -> Action m Document
|
||||
-- ^ Attempt to fix any corrupt records. This operation takes a while.
|
||||
repairDatabase db = useDb db $ runCommand ["repairDatabase" =: (1 :: Int)]
|
||||
|
||||
-- ** Server
|
||||
|
||||
serverBuildInfo :: (MonadIO' m) => Action m Document
|
||||
serverBuildInfo :: (MonadIO m) => Action m Document
|
||||
serverBuildInfo = useDb admin $ runCommand ["buildinfo" =: (1 :: Int)]
|
||||
|
||||
serverVersion :: (MonadIO' m) => Action m Text
|
||||
serverVersion = at "version" <$> serverBuildInfo
|
||||
serverVersion :: (MonadIO m) => Action m Text
|
||||
serverVersion = at "version" `liftM` serverBuildInfo
|
||||
|
||||
-- * Diagnostics
|
||||
|
||||
-- ** Collection
|
||||
|
||||
collectionStats :: (MonadIO' m) => Collection -> Action m Document
|
||||
collectionStats :: (MonadIO m) => Collection -> Action m Document
|
||||
collectionStats coll = runCommand ["collstats" =: coll]
|
||||
|
||||
dataSize :: (MonadIO' m) => Collection -> Action m Int
|
||||
dataSize c = at "size" <$> collectionStats c
|
||||
dataSize :: (MonadIO m) => Collection -> Action m Int
|
||||
dataSize c = at "size" `liftM` collectionStats c
|
||||
|
||||
storageSize :: (MonadIO' m) => Collection -> Action m Int
|
||||
storageSize c = at "storageSize" <$> collectionStats c
|
||||
storageSize :: (MonadIO m) => Collection -> Action m Int
|
||||
storageSize c = at "storageSize" `liftM` collectionStats c
|
||||
|
||||
totalIndexSize :: (MonadIO' m) => Collection -> Action m Int
|
||||
totalIndexSize c = at "totalIndexSize" <$> collectionStats c
|
||||
totalIndexSize :: (MonadIO m) => Collection -> Action m Int
|
||||
totalIndexSize c = at "totalIndexSize" `liftM` collectionStats c
|
||||
|
||||
totalSize :: (MonadIO m, MonadBaseControl IO m, MonadIO' m) => Collection -> Action m Int
|
||||
totalSize :: (MonadIO m, MonadBaseControl IO m) => Collection -> Action m Int
|
||||
totalSize coll = do
|
||||
x <- storageSize coll
|
||||
xs <- mapM isize =<< getIndexes coll
|
||||
return (foldl (+) x xs)
|
||||
where
|
||||
isize idx = at "storageSize" <$> collectionStats (coll `T.append` ".$" `T.append` at "name" idx)
|
||||
isize idx = at "storageSize" `liftM` collectionStats (coll `T.append` ".$" `T.append` at "name" idx)
|
||||
|
||||
-- ** Profiling
|
||||
|
||||
data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq)
|
||||
|
||||
getProfilingLevel :: (MonadIO' m) => Action m ProfilingLevel
|
||||
getProfilingLevel = toEnum . at "was" <$> runCommand ["profile" =: (-1 :: Int)]
|
||||
getProfilingLevel :: (MonadIO m) => Action m ProfilingLevel
|
||||
getProfilingLevel = (toEnum . at "was") `liftM` runCommand ["profile" =: (-1 :: Int)]
|
||||
|
||||
type MilliSec = Int
|
||||
|
||||
setProfilingLevel :: (MonadIO' m) => ProfilingLevel -> Maybe MilliSec -> Action m ()
|
||||
setProfilingLevel :: (MonadIO m) => ProfilingLevel -> Maybe MilliSec -> Action m ()
|
||||
setProfilingLevel p mSlowMs =
|
||||
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
|
||||
|
||||
-- ** Database
|
||||
|
||||
dbStats :: (MonadIO' m) => Action m Document
|
||||
dbStats :: (MonadIO m) => Action m Document
|
||||
dbStats = runCommand ["dbstats" =: (1 :: Int)]
|
||||
|
||||
currentOp :: (MonadIO m) => Action m (Maybe Document)
|
||||
|
@ -291,7 +291,7 @@ killOp op = findOne (select ["op" =: op] "$cmd.sys.killop")
|
|||
|
||||
-- ** Server
|
||||
|
||||
serverStatus :: (MonadIO' m) => Action m Document
|
||||
serverStatus :: (MonadIO m) => Action m Document
|
||||
serverStatus = useDb admin $ runCommand ["serverStatus" =: (1 :: Int)]
|
||||
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
module Database.MongoDB.Connection (
|
||||
-- * Util
|
||||
Secs, IOE, runIOE,
|
||||
Secs,
|
||||
-- * Connection
|
||||
Pipe, close, isClosed,
|
||||
-- * Server
|
||||
|
@ -25,12 +25,11 @@ import System.IO.Unsafe (unsafePerformIO)
|
|||
import System.Timeout (timeout)
|
||||
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof,
|
||||
spaces, try, (<|>))
|
||||
import qualified Control.Exception as E
|
||||
import qualified Data.List as List
|
||||
|
||||
|
||||
import Control.Monad.Identity (runIdentity)
|
||||
import Control.Monad.Error (ErrorT(..), lift, throwError)
|
||||
import Control.Monad.Error (throwError)
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
|
||||
readMVar)
|
||||
import Data.Bson (Document, at, (=:))
|
||||
|
@ -40,16 +39,16 @@ import qualified Data.Bson as B
|
|||
import qualified Data.Text as T
|
||||
|
||||
import Database.MongoDB.Internal.Protocol (Pipe, newPipe)
|
||||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE,
|
||||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
|
||||
updateAssocs, shuffle, mergesortM)
|
||||
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
|
||||
slaveOk, runCommand)
|
||||
import System.IO.Pipeline (IOE, close, isClosed)
|
||||
import System.IO.Pipeline (close, isClosed)
|
||||
|
||||
adminCommand :: Command -> Pipe -> IOE Document
|
||||
adminCommand :: Command -> Pipe -> IO Document
|
||||
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
|
||||
adminCommand cmd pipe =
|
||||
liftIOE failureToIOError . ErrorT $ access pipe slaveOk "admin" $ runCommand cmd
|
||||
liftIOE failureToIOError $ access pipe slaveOk "admin" $ runCommand cmd
|
||||
where
|
||||
failureToIOError (ConnectionFailure e) = e
|
||||
failureToIOError e = userError $ show e
|
||||
|
@ -102,17 +101,16 @@ globalConnectTimeout :: IORef Secs
|
|||
globalConnectTimeout = unsafePerformIO (newIORef 6)
|
||||
{-# NOINLINE globalConnectTimeout #-}
|
||||
|
||||
connect :: Host -> IOE Pipe
|
||||
connect :: Host -> IO Pipe
|
||||
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within 'globalConnectTimeout'.
|
||||
connect h = lift (readIORef globalConnectTimeout) >>= flip connect' h
|
||||
connect h = readIORef globalConnectTimeout >>= flip connect' h
|
||||
|
||||
connect' :: Secs -> Host -> IOE Pipe
|
||||
connect' :: Secs -> Host -> IO Pipe
|
||||
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds.
|
||||
connect' timeoutSecs (Host hostname port) = do
|
||||
handle <- ErrorT . E.try $ do
|
||||
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
|
||||
maybe (ioError $ userError "connect timed out") return mh
|
||||
lift $ newPipe handle
|
||||
handle <- maybe (ioError $ userError "connect timed out") return mh
|
||||
newPipe handle
|
||||
|
||||
-- * Replica Set
|
||||
|
||||
|
@ -125,11 +123,11 @@ replSetName :: ReplicaSet -> Text
|
|||
-- ^ name of connected replica set
|
||||
replSetName (ReplicaSet rsName _ _) = rsName
|
||||
|
||||
openReplicaSet :: (ReplicaSetName, [Host]) -> IOE ReplicaSet
|
||||
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
|
||||
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet\'' instead.
|
||||
openReplicaSet rsSeed = lift (readIORef globalConnectTimeout) >>= flip openReplicaSet' rsSeed
|
||||
openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed
|
||||
|
||||
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IOE ReplicaSet
|
||||
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
|
||||
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
|
||||
openReplicaSet' timeoutSecs (rsName, seedList) = do
|
||||
vMembers <- newMVar (map (, Nothing) seedList)
|
||||
|
@ -141,7 +139,7 @@ closeReplicaSet :: ReplicaSet -> IO ()
|
|||
-- ^ Close all connections to replica set
|
||||
closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
|
||||
|
||||
primary :: ReplicaSet -> IOE Pipe
|
||||
primary :: ReplicaSet -> IO Pipe
|
||||
-- ^ Return connection to current primary of replica set. Fail if no primary available.
|
||||
primary rs@(ReplicaSet rsName _ _) = do
|
||||
mHost <- statedPrimary <$> updateMembers rs
|
||||
|
@ -149,19 +147,19 @@ primary rs@(ReplicaSet rsName _ _) = do
|
|||
Just host' -> connection rs Nothing host'
|
||||
Nothing -> throwError $ userError $ "replica set " ++ T.unpack rsName ++ " has no primary"
|
||||
|
||||
secondaryOk :: ReplicaSet -> IOE Pipe
|
||||
secondaryOk :: ReplicaSet -> IO Pipe
|
||||
-- ^ Return connection to a random secondary, or primary if no secondaries available.
|
||||
secondaryOk rs = do
|
||||
info <- updateMembers rs
|
||||
hosts <- lift $ shuffle (possibleHosts info)
|
||||
hosts <- shuffle (possibleHosts info)
|
||||
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
|
||||
untilSuccess (connection rs Nothing) hosts'
|
||||
|
||||
routedHost :: ((Host, Bool) -> (Host, Bool) -> IOE Ordering) -> ReplicaSet -> IOE Pipe
|
||||
routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO Pipe
|
||||
-- ^ Return a connection to a host using a user-supplied sorting function, which sorts based on a tuple containing the host and a boolean indicating whether the host is primary.
|
||||
routedHost f rs = do
|
||||
info <- updateMembers rs
|
||||
hosts <- lift $ shuffle (possibleHosts info)
|
||||
hosts <- shuffle (possibleHosts info)
|
||||
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
|
||||
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
|
||||
untilSuccess (connection rs Nothing) hosts'
|
||||
|
@ -177,13 +175,13 @@ possibleHosts :: ReplicaInfo -> [Host]
|
|||
-- ^ Non-arbiter, non-hidden members of replica set
|
||||
possibleHosts (_, info) = map readHostPort $ at "hosts" info
|
||||
|
||||
updateMembers :: ReplicaSet -> IOE ReplicaInfo
|
||||
updateMembers :: ReplicaSet -> IO ReplicaInfo
|
||||
-- ^ Fetch replica info from any server and update members accordingly
|
||||
updateMembers rs@(ReplicaSet _ vMembers _) = do
|
||||
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
|
||||
modifyMVar vMembers $ \members -> do
|
||||
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
|
||||
lift $ forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe
|
||||
forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe
|
||||
return (members' ++ map (, Nothing) new, (host', info))
|
||||
where
|
||||
intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k])
|
||||
|
@ -191,7 +189,7 @@ updateMembers rs@(ReplicaSet _ vMembers _) = do
|
|||
assocKeys = map fst assocs
|
||||
inKeys = intersect keys assocKeys
|
||||
|
||||
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IOE ReplicaInfo
|
||||
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
|
||||
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
|
||||
fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
|
||||
pipe <- connection rs mPipe host'
|
||||
|
@ -201,15 +199,15 @@ fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
|
|||
Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info
|
||||
Just _ -> return (host', info)
|
||||
|
||||
connection :: ReplicaSet -> Maybe Pipe -> Host -> IOE Pipe
|
||||
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
|
||||
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
|
||||
connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
|
||||
maybe conn (\p -> lift (isClosed p) >>= \bad -> if bad then conn else return p) mPipe
|
||||
maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe
|
||||
where
|
||||
conn = modifyMVar vMembers $ \members -> do
|
||||
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
|
||||
case List.lookup host' members of
|
||||
Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe)
|
||||
Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe)
|
||||
_ -> new
|
||||
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ module Database.MongoDB.Internal.Protocol (
|
|||
|
||||
import Control.Applicative ((<$>))
|
||||
import Control.Arrow ((***))
|
||||
import Control.Exception (try)
|
||||
import Control.Monad (forM_, replicateM, unless)
|
||||
import Data.Binary.Get (Get, runGet)
|
||||
import Data.Binary.Put (Put, runPut)
|
||||
|
@ -36,7 +35,6 @@ import System.IO.Unsafe (unsafePerformIO)
|
|||
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
|
||||
import Control.Monad.Error (ErrorT(..))
|
||||
import Control.Monad.Trans (MonadIO, liftIO)
|
||||
import Data.Bson (Document)
|
||||
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
|
||||
|
@ -48,7 +46,7 @@ import qualified Data.Text as T
|
|||
import qualified Data.Text.Encoding as TE
|
||||
|
||||
import Database.MongoDB.Internal.Util (whenJust, hGetN, bitOr, byteStringHex)
|
||||
import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..))
|
||||
import System.IO.Pipeline (Pipeline, newPipeline, IOStream(..))
|
||||
|
||||
import qualified System.IO.Pipeline as P
|
||||
|
||||
|
@ -61,11 +59,11 @@ newPipe :: Handle -> IO Pipe
|
|||
-- ^ Create pipe over handle
|
||||
newPipe handle = newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle)
|
||||
|
||||
send :: Pipe -> [Notice] -> IOE ()
|
||||
send :: Pipe -> [Notice] -> IO ()
|
||||
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
|
||||
send pipe notices = P.send pipe (notices, Nothing)
|
||||
|
||||
call :: Pipe -> [Notice] -> Request -> IOE (IOE Reply)
|
||||
call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
|
||||
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
|
||||
call pipe notices request = do
|
||||
requestId <- genRequestId
|
||||
|
@ -81,9 +79,9 @@ type Message = ([Notice], Maybe (Request, RequestId))
|
|||
-- ^ A write notice(s) with getLastError request, or just query request.
|
||||
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
|
||||
|
||||
writeMessage :: Handle -> Message -> IOE ()
|
||||
writeMessage :: Handle -> Message -> IO ()
|
||||
-- ^ Write message to socket
|
||||
writeMessage handle (notices, mRequest) = ErrorT . try $ do
|
||||
writeMessage handle (notices, mRequest) = do
|
||||
forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId
|
||||
whenJust mRequest $ writeReq . (Right *** id)
|
||||
hFlush handle
|
||||
|
@ -99,9 +97,9 @@ writeMessage handle (notices, mRequest) = ErrorT . try $ do
|
|||
type Response = (ResponseTo, Reply)
|
||||
-- ^ Message received from a Mongo server in response to a Request
|
||||
|
||||
readMessage :: Handle -> IOE Response
|
||||
readMessage :: Handle -> IO Response
|
||||
-- ^ read response from socket
|
||||
readMessage handle = ErrorT $ try readResp where
|
||||
readMessage handle = readResp where
|
||||
readResp = do
|
||||
len <- fromEnum . decodeSize <$> hGetN handle 4
|
||||
runGet getReply <$> hGetN handle len
|
||||
|
|
|
@ -7,9 +7,8 @@
|
|||
|
||||
module Database.MongoDB.Internal.Util where
|
||||
|
||||
import Control.Applicative (Applicative(..), (<$>))
|
||||
import Control.Arrow (left)
|
||||
import Control.Exception (assert)
|
||||
import Control.Applicative ((<$>))
|
||||
import Control.Exception (assert, handle, throwIO, Exception)
|
||||
import Control.Monad (liftM, liftM2)
|
||||
import Data.Bits (Bits, (.|.))
|
||||
import Data.Word (Word8)
|
||||
|
@ -23,7 +22,7 @@ import System.Random.Shuffle (shuffle')
|
|||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified Data.ByteString as S
|
||||
|
||||
import Control.Monad.Error (MonadError(..), ErrorT(..), Error(..))
|
||||
import Control.Monad.Error (MonadError(..), Error(..))
|
||||
import Control.Monad.Trans (MonadIO, liftIO)
|
||||
import Data.Bson
|
||||
import Data.Text (Text)
|
||||
|
@ -36,10 +35,6 @@ deriving instance Eq PortID
|
|||
#endif
|
||||
deriving instance Ord PortID
|
||||
|
||||
-- | MonadIO with extra Applicative and Functor superclasses
|
||||
class (MonadIO m, Applicative m, Functor m) => MonadIO' m
|
||||
instance (MonadIO m, Applicative m, Functor m) => MonadIO' m
|
||||
|
||||
-- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude
|
||||
mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a]
|
||||
mergesortM cmp = mergesortM' cmp . map wrap
|
||||
|
@ -87,13 +82,9 @@ untilSuccess' _ f (x : xs) = catchError (f x) (\e -> untilSuccess' e f xs)
|
|||
whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m ()
|
||||
whenJust mVal act = maybe (return ()) act mVal
|
||||
|
||||
liftIOE :: (MonadIO m) => (e -> e') -> ErrorT e IO a -> ErrorT e' m a
|
||||
liftIOE :: (MonadIO m, Exception e, Exception e') => (e -> e') -> IO a -> m a
|
||||
-- ^ lift IOE monad to ErrorT monad over some MonadIO m
|
||||
liftIOE f = ErrorT . liftIO . fmap (left f) . runErrorT
|
||||
|
||||
runIOE :: ErrorT IOError IO a -> IO a
|
||||
-- ^ Run action while catching explicit error and rethrowing in IO monad
|
||||
runIOE (ErrorT action) = action >>= either ioError return
|
||||
liftIOE f = liftIO . handle (throwIO . f)
|
||||
|
||||
updateAssocs :: (Eq k) => k -> v -> [(k, v)] -> [(k, v)]
|
||||
-- ^ Change or insert value of key in association list
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
-- | Query and update documents
|
||||
|
||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP #-}
|
||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable #-}
|
||||
|
||||
module Database.MongoDB.Query (
|
||||
-- * Monad
|
||||
Action, access, Failure(..), ErrorCode,
|
||||
AccessMode(..), GetLastError, master, slaveOk, accessMode,
|
||||
MonadDB(..),
|
||||
liftDB,
|
||||
MongoContext, HasMongoContext(..),
|
||||
-- * Database
|
||||
Database, allDatabases, useDb, thisDatabase,
|
||||
-- ** Authentication
|
||||
|
@ -43,12 +44,13 @@ module Database.MongoDB.Query (
|
|||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
import Control.Applicative (Applicative, (<$>))
|
||||
import Control.Exception (Exception, throwIO)
|
||||
import Control.Monad (unless, replicateM, liftM)
|
||||
import Data.Int (Int32)
|
||||
import Data.Maybe (listToMaybe, catMaybes)
|
||||
import Data.Word (Word32)
|
||||
import Data.Monoid (mappend)
|
||||
import Data.Typeable (Typeable)
|
||||
|
||||
#if MIN_VERSION_base(4,6,0)
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
|
||||
|
@ -57,17 +59,11 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
|
|||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
|
||||
readMVar, modifyMVar)
|
||||
#endif
|
||||
import Control.Monad.Base (MonadBase(liftBase))
|
||||
import Control.Monad.Error (ErrorT, Error(..), MonadError, runErrorT,
|
||||
throwError)
|
||||
import Control.Monad.Reader (ReaderT, runReaderT, ask, asks, local)
|
||||
import Control.Monad.RWS (RWST)
|
||||
import Control.Monad.State (StateT)
|
||||
import Control.Monad.Trans (MonadIO, MonadTrans, lift, liftIO)
|
||||
import Control.Monad.Trans.Control (ComposeSt, MonadBaseControl(..),
|
||||
MonadTransControl(..), StM, StT,
|
||||
defaultLiftBaseWith, defaultRestoreM)
|
||||
import Control.Monad.Writer (WriterT, Monoid)
|
||||
import Control.Monad.Base (MonadBase)
|
||||
import Control.Monad.Error (Error(..))
|
||||
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
||||
import Control.Monad.Trans (MonadIO, liftIO)
|
||||
import Control.Monad.Trans.Control (MonadBaseControl(..))
|
||||
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
||||
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
||||
(=?))
|
||||
|
@ -83,7 +79,7 @@ import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..),
|
|||
qFullCollection, qBatchSize,
|
||||
qSelector, qProjector),
|
||||
pwKey)
|
||||
import Database.MongoDB.Internal.Util (MonadIO', loop, liftIOE, true1, (<.>))
|
||||
import Database.MongoDB.Internal.Util (loop, liftIOE, true1, (<.>))
|
||||
import qualified Database.MongoDB.Internal.Protocol as P
|
||||
|
||||
#if !MIN_VERSION_base(4,6,0)
|
||||
|
@ -92,31 +88,12 @@ import qualified Database.MongoDB.Internal.Protocol as P
|
|||
|
||||
-- * Monad
|
||||
|
||||
newtype Action m a = Action {unAction :: ErrorT Failure (ReaderT Context m) a}
|
||||
deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure)
|
||||
type Action = ReaderT MongoContext
|
||||
-- ^ A monad on top of m (which must be a MonadIO) that may access the database and may fail with a DB 'Failure'
|
||||
|
||||
instance MonadBase b m => MonadBase b (Action m) where
|
||||
liftBase = Action . liftBase
|
||||
|
||||
instance (MonadIO m, MonadBaseControl b m) => MonadBaseControl b (Action m) where
|
||||
newtype StM (Action m) a = StMT {unStMT :: ComposeSt Action m a}
|
||||
liftBaseWith = defaultLiftBaseWith StMT
|
||||
restoreM = defaultRestoreM unStMT
|
||||
|
||||
instance MonadTrans Action where
|
||||
lift = Action . lift . lift
|
||||
|
||||
instance MonadTransControl Action where
|
||||
newtype StT Action a = StActionT {unStAction :: StT (ReaderT Context) (StT (ErrorT Failure) a)}
|
||||
liftWith f = Action $ liftWith $ \runError ->
|
||||
liftWith $ \runReader' ->
|
||||
f (liftM StActionT . runReader' . runError . unAction)
|
||||
restoreT = Action . restoreT . restoreT . liftM unStAction
|
||||
|
||||
access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m (Either Failure a)
|
||||
access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m a
|
||||
-- ^ Run action against database on server at other end of pipe. Use access mode for any reads and writes. Return Left on connection failure or read/write failure.
|
||||
access myPipe myAccessMode myDatabase (Action action) = runReaderT (runErrorT action) Context{..}
|
||||
access myPipe myAccessMode myDatabase action = runReaderT action MongoContext{..}
|
||||
|
||||
-- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key.
|
||||
-- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
|
||||
|
@ -127,7 +104,8 @@ data Failure =
|
|||
| WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string
|
||||
| DocNotFound Selection -- ^ 'fetch' found no document matching selection
|
||||
| AggregateFailure String -- ^ 'aggregate' returned an error
|
||||
deriving (Show, Eq)
|
||||
deriving (Show, Eq, Typeable)
|
||||
instance Exception Failure
|
||||
|
||||
type ErrorCode = Int
|
||||
-- ^ Error code from getLastError or query failure
|
||||
|
@ -155,7 +133,7 @@ slaveOk = ReadStaleOk
|
|||
|
||||
accessMode :: (Monad m) => AccessMode -> Action m a -> Action m a
|
||||
-- ^ Run action with given 'AccessMode'
|
||||
accessMode mode (Action act) = Action $ local (\ctx -> ctx {myAccessMode = mode}) act
|
||||
accessMode mode act = local (\ctx -> ctx {myAccessMode = mode}) act
|
||||
|
||||
readMode :: AccessMode -> ReadMode
|
||||
readMode ReadStaleOk = StaleOk
|
||||
|
@ -167,85 +145,72 @@ writeMode UnconfirmedWrites = NoConfirm
|
|||
writeMode (ConfirmWrites z) = Confirm z
|
||||
|
||||
-- | Values needed when executing a db operation
|
||||
data Context = Context {
|
||||
data MongoContext = MongoContext {
|
||||
myPipe :: Pipe, -- ^ operations read/write to this pipelined TCP connection to a MongoDB server
|
||||
myAccessMode :: AccessMode, -- ^ read/write operation will use this access mode
|
||||
myDatabase :: Database } -- ^ operations query/update this database
|
||||
|
||||
myReadMode :: Context -> ReadMode
|
||||
myReadMode :: MongoContext -> ReadMode
|
||||
myReadMode = readMode . myAccessMode
|
||||
|
||||
myWriteMode :: Context -> WriteMode
|
||||
myWriteMode :: MongoContext -> WriteMode
|
||||
myWriteMode = writeMode . myAccessMode
|
||||
|
||||
send :: (MonadIO m) => [Notice] -> Action m ()
|
||||
-- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails.
|
||||
send ns = Action $ do
|
||||
send ns = do
|
||||
pipe <- asks myPipe
|
||||
liftIOE ConnectionFailure $ P.send pipe ns
|
||||
|
||||
call :: (MonadIO m) => [Notice] -> Request -> Action m (ErrorT Failure IO Reply)
|
||||
call :: (MonadIO m) => [Notice] -> Request -> Action m (IO Reply)
|
||||
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive.
|
||||
call ns r = Action $ do
|
||||
call ns r = do
|
||||
pipe <- asks myPipe
|
||||
promise <- liftIOE ConnectionFailure $ P.call pipe ns r
|
||||
return (liftIOE ConnectionFailure promise)
|
||||
|
||||
-- | If you stack a monad on top of 'Action' then make it an instance of this class and use 'liftDB' to execute a DB Action within it. Instances already exist for the basic mtl transformers.
|
||||
class (Monad m, MonadBaseControl IO (BaseMonad m), Applicative (BaseMonad m), Functor (BaseMonad m)) => MonadDB m where
|
||||
type BaseMonad m :: * -> *
|
||||
liftDB :: Action (BaseMonad m) a -> m a
|
||||
class HasMongoContext env where
|
||||
mongoContext :: env -> MongoContext
|
||||
instance HasMongoContext MongoContext where
|
||||
mongoContext = id
|
||||
|
||||
instance (MonadBaseControl IO m, Applicative m, Functor m) => MonadDB (Action m) where
|
||||
type BaseMonad (Action m) = m
|
||||
liftDB = id
|
||||
|
||||
instance (MonadDB m, Error e) => MonadDB (ErrorT e m) where
|
||||
type BaseMonad (ErrorT e m) = BaseMonad m
|
||||
liftDB = lift . liftDB
|
||||
instance (MonadDB m) => MonadDB (ReaderT r m) where
|
||||
type BaseMonad (ReaderT r m) = BaseMonad m
|
||||
liftDB = lift . liftDB
|
||||
instance (MonadDB m) => MonadDB (StateT s m) where
|
||||
type BaseMonad (StateT s m) = BaseMonad m
|
||||
liftDB = lift . liftDB
|
||||
instance (MonadDB m, Monoid w) => MonadDB (WriterT w m) where
|
||||
type BaseMonad (WriterT w m) = BaseMonad m
|
||||
liftDB = lift . liftDB
|
||||
instance (MonadDB m, Monoid w) => MonadDB (RWST r w s m) where
|
||||
type BaseMonad (RWST r w s m) = BaseMonad m
|
||||
liftDB = lift . liftDB
|
||||
liftDB :: (MonadReader env m, HasMongoContext env, MonadIO m)
|
||||
=> Action IO a
|
||||
-> m a
|
||||
liftDB m = do
|
||||
env <- ask
|
||||
liftIO $ runReaderT m (mongoContext env)
|
||||
|
||||
-- * Database
|
||||
|
||||
type Database = Text
|
||||
|
||||
allDatabases :: (MonadIO' m) => Action m [Database]
|
||||
allDatabases :: (MonadIO m) => Action m [Database]
|
||||
-- ^ List all databases residing on server
|
||||
allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases")
|
||||
allDatabases = (map (at "name") . at "databases") `liftM` useDb "admin" (runCommand1 "listDatabases")
|
||||
|
||||
thisDatabase :: (Monad m) => Action m Database
|
||||
-- ^ Current database in use
|
||||
thisDatabase = Action $ asks myDatabase
|
||||
thisDatabase = asks myDatabase
|
||||
|
||||
useDb :: (Monad m) => Database -> Action m a -> Action m a
|
||||
-- ^ Run action against given database
|
||||
useDb db (Action act) = Action $ local (\ctx -> ctx {myDatabase = db}) act
|
||||
useDb db act = local (\ctx -> ctx {myDatabase = db}) act
|
||||
|
||||
-- * Authentication
|
||||
|
||||
auth :: (MonadIO' m) => Username -> Password -> Action m Bool
|
||||
auth :: (MonadIO m) => Username -> Password -> Action m Bool
|
||||
-- ^ Authenticate with the current database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe.
|
||||
auth usr pss = do
|
||||
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
|
||||
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
|
||||
n <- at "nonce" `liftM` runCommand ["getnonce" =: (1 :: Int)]
|
||||
true1 "ok" `liftM` runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
|
||||
|
||||
-- * Collection
|
||||
|
||||
type Collection = Text
|
||||
-- ^ Collection name (not prefixed with database)
|
||||
|
||||
allCollections :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Collection]
|
||||
allCollections :: (MonadIO m, MonadBaseControl IO m) => Action m [Collection]
|
||||
-- ^ List all collections in this database
|
||||
allCollections = do
|
||||
db <- thisDatabase
|
||||
|
@ -268,8 +233,13 @@ whereJS :: Selector -> Javascript -> Selector
|
|||
whereJS sel js = ("$where" =: js) : sel
|
||||
|
||||
class Select aQueryOrSelection where
|
||||
<<<<<<< HEAD
|
||||
select :: Selector -> Collection -> aQueryOrSelection
|
||||
-- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @'find' (select sel col)@ it is a Query, and in @'delete' (select sel col)@ it is a Selection.
|
||||
=======
|
||||
select :: Selector -> Collection -> aQueryOrSelection
|
||||
-- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @find (select sel col)@ it is a Query, and in @delete (select sel col)@ it is a Selection.
|
||||
>>>>>>> refs/heads/pr-20
|
||||
|
||||
instance Select Selection where
|
||||
select = Select
|
||||
|
@ -286,22 +256,22 @@ data WriteMode =
|
|||
|
||||
write :: (MonadIO m) => Notice -> Action m ()
|
||||
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
|
||||
write notice = Action (asks myWriteMode) >>= \mode -> case mode of
|
||||
write notice = asks myWriteMode >>= \mode -> case mode of
|
||||
NoConfirm -> send [notice]
|
||||
Confirm params -> do
|
||||
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
|
||||
Batch _ _ [doc] <- fulfill =<< request [notice] =<< queryRequest False q {limit = 1}
|
||||
case lookup "err" doc of
|
||||
Nothing -> return ()
|
||||
Just err -> throwError $ WriteFailure (maybe 0 id $ lookup "code" doc) err
|
||||
Just err -> liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "code" doc) err
|
||||
|
||||
-- ** Insert
|
||||
|
||||
insert :: (MonadIO' m) => Collection -> Document -> Action m Value
|
||||
insert :: (MonadIO m) => Collection -> Document -> Action m Value
|
||||
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
|
||||
insert col doc = head <$> insertMany col [doc]
|
||||
insert col doc = head `liftM` insertMany col [doc]
|
||||
|
||||
insert_ :: (MonadIO' m) => Collection -> Document -> Action m ()
|
||||
insert_ :: (MonadIO m) => Collection -> Document -> Action m ()
|
||||
-- ^ Same as 'insert' except don't return _id
|
||||
insert_ col doc = insert col doc >> return ()
|
||||
|
||||
|
@ -333,11 +303,11 @@ assignId :: Document -> IO Document
|
|||
-- ^ Assign a unique value to _id field if missing
|
||||
assignId doc = if any (("_id" ==) . label) doc
|
||||
then return doc
|
||||
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId
|
||||
else (\oid -> ("_id" =: oid) : doc) `liftM` genObjectId
|
||||
|
||||
-- ** Update
|
||||
|
||||
save :: (MonadIO' m) => Collection -> Document -> Action m ()
|
||||
save :: (MonadIO m) => Collection -> Document -> Action m ()
|
||||
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or update it if its not new (has \"_id\" field)
|
||||
save col doc = case look "_id" doc of
|
||||
Nothing -> insert_ col doc
|
||||
|
@ -437,12 +407,12 @@ findOne q = do
|
|||
|
||||
fetch :: (MonadIO m) => Query -> Action m Document
|
||||
-- ^ Same as 'findOne' except throw 'DocNotFound' if none match
|
||||
fetch q = findOne q >>= maybe (throwError $ DocNotFound $ selection q) return
|
||||
fetch q = findOne q >>= maybe (liftIO $ throwIO $ DocNotFound $ selection q) return
|
||||
|
||||
-- | runs the findAndModify command.
|
||||
-- Returns a single updated document (new option is set to true).
|
||||
-- Currently this API does not allow setting the remove option
|
||||
findAndModify :: (Applicative m, MonadIO m)
|
||||
findAndModify :: MonadIO m
|
||||
=> Query
|
||||
-> Document -- ^ updates
|
||||
-> Action m (Either String Document)
|
||||
|
@ -484,20 +454,20 @@ explain q = do -- same as findOne but with explain set to true
|
|||
Batch _ _ docs <- fulfill =<< request [] =<< queryRequest True q {limit = 1}
|
||||
return $ if null docs then error ("no explain: " ++ show q) else head docs
|
||||
|
||||
count :: (MonadIO' m) => Query -> Action m Int
|
||||
count :: (MonadIO m) => Query -> Action m Int
|
||||
-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present)
|
||||
count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand
|
||||
count Query{selection = Select sel col, skip, limit} = at "n" `liftM` runCommand
|
||||
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
|
||||
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
|
||||
|
||||
distinct :: (MonadIO' m) => Label -> Selection -> Action m [Value]
|
||||
distinct :: (MonadIO m) => Label -> Selection -> Action m [Value]
|
||||
-- ^ Fetch distinct values of field in selected documents
|
||||
distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
|
||||
distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
|
||||
|
||||
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit)
|
||||
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
|
||||
queryRequest isExplain Query{..} = do
|
||||
ctx <- Action ask
|
||||
ctx <- ask
|
||||
return $ queryRequest' (myReadMode ctx) (myDatabase ctx)
|
||||
where
|
||||
queryRequest' rm db = (P.Query{..}, remainingLimit) where
|
||||
|
@ -523,7 +493,7 @@ batchSizeRemainingLimit batchSize limit = if limit == 0
|
|||
where batchSize' = if batchSize == 1 then 2 else batchSize
|
||||
-- batchSize 1 is broken because server converts 1 to -1 meaning limit 1
|
||||
|
||||
type DelayedBatch = ErrorT Failure IO Batch
|
||||
type DelayedBatch = IO Batch
|
||||
-- ^ A promised batch which may fail
|
||||
|
||||
data Batch = Batch Limit CursorId [Document]
|
||||
|
@ -544,12 +514,12 @@ fromReply limit Reply{..} = do
|
|||
-- If response flag indicates failure then throw it, otherwise do nothing
|
||||
checkResponseFlag flag = case flag of
|
||||
AwaitCapable -> return ()
|
||||
CursorNotFound -> throwError $ CursorNotFoundFailure rCursorId
|
||||
QueryError -> throwError $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
|
||||
CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId
|
||||
QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
|
||||
|
||||
fulfill :: (MonadIO m) => DelayedBatch -> Action m Batch
|
||||
-- ^ Demand and wait for result, raise failure if exception
|
||||
fulfill = Action . liftIOE id
|
||||
fulfill = liftIO
|
||||
|
||||
-- *** Cursor
|
||||
|
||||
|
@ -604,11 +574,11 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where
|
|||
then return (return $ Batch 0 0 [], Nothing) -- finished
|
||||
else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid
|
||||
|
||||
nextN :: (MonadIO m, MonadBaseControl IO m, Functor m) => Int -> Cursor -> Action m [Document]
|
||||
nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document]
|
||||
-- ^ Return next N documents or less if end is reached
|
||||
nextN n c = catMaybes <$> replicateM n (next c)
|
||||
nextN n c = catMaybes `liftM` replicateM n (next c)
|
||||
|
||||
rest :: (MonadIO m, MonadBaseControl IO m, Functor m) => Cursor -> Action m [Document]
|
||||
rest :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
|
||||
-- ^ Return remaining documents in query result
|
||||
rest c = loop (next c)
|
||||
|
||||
|
@ -628,13 +598,13 @@ isCursorClosed (Cursor _ _ var) = do
|
|||
type Pipeline = [Document]
|
||||
-- ^ The Aggregate Pipeline
|
||||
|
||||
aggregate :: MonadIO' m => Collection -> Pipeline -> Action m [Document]
|
||||
aggregate :: MonadIO m => Collection -> Pipeline -> Action m [Document]
|
||||
-- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details.
|
||||
aggregate aColl agg = do
|
||||
response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg]
|
||||
case true1 "ok" response of
|
||||
True -> lookup "result" response
|
||||
False -> throwError $ AggregateFailure $ at "errmsg" response
|
||||
False -> liftIO $ throwIO $ AggregateFailure $ at "errmsg" response
|
||||
|
||||
-- ** Group
|
||||
|
||||
|
@ -661,9 +631,9 @@ groupDocument Group{..} =
|
|||
"initial" =: gInitial,
|
||||
"cond" =: gCond ]
|
||||
|
||||
group :: (MonadIO' m) => Group -> Action m [Document]
|
||||
group :: (MonadIO m) => Group -> Action m [Document]
|
||||
-- ^ Execute group query and return resulting aggregate value for each distinct key
|
||||
group g = at "retval" <$> runCommand ["group" =: groupDocument g]
|
||||
group g = at "retval" `liftM` runCommand ["group" =: groupDocument g]
|
||||
|
||||
-- ** MapReduce
|
||||
|
||||
|
@ -733,7 +703,7 @@ mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
|
|||
-- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments.
|
||||
mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
|
||||
|
||||
runMR :: (MonadIO m, MonadBaseControl IO m, Applicative m) => MapReduce -> Action m Cursor
|
||||
runMR :: (MonadIO m, MonadBaseControl IO m) => MapReduce -> Action m Cursor
|
||||
-- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript)
|
||||
runMR mr = do
|
||||
res <- runMR' mr
|
||||
|
@ -743,7 +713,7 @@ runMR mr = do
|
|||
Just x -> error $ "unexpected map-reduce result field: " ++ show x
|
||||
Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res)
|
||||
|
||||
runMR' :: (MonadIO' m) => MapReduce -> Action m MRResult
|
||||
runMR' :: (MonadIO m) => MapReduce -> Action m MRResult
|
||||
-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).
|
||||
runMR' mr = do
|
||||
doc <- runCommand (mrDocument mr)
|
||||
|
@ -754,18 +724,18 @@ runMR' mr = do
|
|||
type Command = Document
|
||||
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
|
||||
|
||||
runCommand :: (MonadIO' m) => Command -> Action m Document
|
||||
runCommand :: (MonadIO m) => Command -> Action m Document
|
||||
-- ^ Run command against the database and return its result
|
||||
runCommand c = maybe err id <$> findOne (query c "$cmd") where
|
||||
runCommand c = maybe err id `liftM` findOne (query c "$cmd") where
|
||||
err = error $ "Nothing returned for command: " ++ show c
|
||||
|
||||
runCommand1 :: (MonadIO' m) => Text -> Action m Document
|
||||
runCommand1 :: (MonadIO m) => Text -> Action m Document
|
||||
-- ^ @runCommand1 foo = runCommand [foo =: 1]@
|
||||
runCommand1 c = runCommand [c =: (1 :: Int)]
|
||||
|
||||
eval :: (MonadIO' m, Val v) => Javascript -> Action m v
|
||||
eval :: (MonadIO m, Val v) => Javascript -> Action m v
|
||||
-- ^ Run code on server
|
||||
eval code = at "retval" <$> runCommand ["$eval" =: code]
|
||||
eval code = at "retval" `liftM` runCommand ["$eval" =: code]
|
||||
|
||||
|
||||
{- Authors: Tony Hannan <tony@10gen.com>
|
||||
|
|
|
@ -12,7 +12,6 @@ A pipeline closes itself when a read or write causes an error, so you can detect
|
|||
#endif
|
||||
|
||||
module System.IO.Pipeline (
|
||||
IOE,
|
||||
-- * IOStream
|
||||
IOStream(..),
|
||||
-- * Pipeline
|
||||
|
@ -33,29 +32,19 @@ import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
|||
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
||||
putMVar, readMVar, addMVarFinalizer)
|
||||
#endif
|
||||
import Control.Monad.Error (ErrorT(ErrorT), runErrorT)
|
||||
import Control.Exception.Lifted (onException, throwIO, try)
|
||||
|
||||
#if !MIN_VERSION_base(4,6,0)
|
||||
mkWeakMVar :: MVar a -> IO () -> IO ()
|
||||
mkWeakMVar = addMVarFinalizer
|
||||
#endif
|
||||
|
||||
onException :: (Monad m) => ErrorT e m a -> m () -> ErrorT e m a
|
||||
-- ^ If first action throws an exception then run second action then re-throw
|
||||
onException (ErrorT action) releaser = ErrorT $ do
|
||||
e <- action
|
||||
either (const releaser) (const $ return ()) e
|
||||
return e
|
||||
|
||||
type IOE = ErrorT IOError IO
|
||||
-- ^ IO monad with explicit error
|
||||
|
||||
-- * IOStream
|
||||
|
||||
-- | An IO sink and source where value of type @o@ are sent and values of type @i@ are received.
|
||||
data IOStream i o = IOStream {
|
||||
writeStream :: o -> IOE (),
|
||||
readStream :: IOE i,
|
||||
writeStream :: o -> IO (),
|
||||
readStream :: IO i,
|
||||
closeStream :: IO () }
|
||||
|
||||
-- * Pipeline
|
||||
|
@ -101,19 +90,19 @@ listen :: Pipeline i o -> IO ()
|
|||
listen Pipeline{..} = do
|
||||
stream <- readMVar vStream
|
||||
forever $ do
|
||||
e <- runErrorT $ readStream stream
|
||||
e <- try $ readStream stream
|
||||
var <- readChan responseQueue
|
||||
putMVar var e
|
||||
case e of
|
||||
Left err -> closeStream stream >> ioError err -- close and stop looping
|
||||
Right _ -> return ()
|
||||
|
||||
send :: Pipeline i o -> o -> IOE ()
|
||||
send :: Pipeline i o -> o -> IO ()
|
||||
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
|
||||
-- Throw IOError and close pipeline if send fails
|
||||
send p@Pipeline{..} message = withMVar vStream (flip writeStream message) `onException` close p
|
||||
|
||||
call :: Pipeline i o -> o -> IOE (IOE i)
|
||||
call :: Pipeline i o -> o -> IO (IO i)
|
||||
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
|
||||
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
|
||||
call p@Pipeline{..} message = withMVar vStream doCall `onException` close p where
|
||||
|
@ -121,7 +110,7 @@ call p@Pipeline{..} message = withMVar vStream doCall `onException` close p whe
|
|||
writeStream stream message
|
||||
var <- newEmptyMVar
|
||||
liftIO $ writeChan responseQueue var
|
||||
return $ ErrorT (readMVar var) -- return promise
|
||||
return $ readMVar var >>= either throwIO return -- return promise
|
||||
|
||||
|
||||
{- Authors: Tony Hannan <tony@10gen.com>
|
||||
|
|
|
@ -1,66 +0,0 @@
|
|||
{- | Cycle through a set of resources (randomly), recreating them when they expire -}
|
||||
|
||||
{-# LANGUAGE RecordWildCards, NamedFieldPuns, FlexibleContexts #-}
|
||||
|
||||
module System.IO.Pool where
|
||||
|
||||
import Control.Applicative ((<$>))
|
||||
import Control.Exception (assert)
|
||||
import Data.Array.IO (IOArray, readArray, writeArray, newArray, newListArray,
|
||||
getElems, getBounds, rangeSize, range)
|
||||
import Data.Maybe (catMaybes)
|
||||
import System.Random (randomRIO)
|
||||
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar_)
|
||||
import Control.Monad.Error (ErrorT, Error)
|
||||
import Control.Monad.Trans (liftIO)
|
||||
|
||||
-- | Creator, destroyer, and checker of resources of type r. Creator may throw error or type e.
|
||||
data Factory e r = Factory {
|
||||
newResource :: ErrorT e IO r,
|
||||
killResource :: r -> IO (),
|
||||
isExpired :: r -> IO Bool }
|
||||
|
||||
newPool :: Factory e r -> Int -> IO (Pool e r)
|
||||
-- ^ Create new pool of initial max size, which must be >= 1
|
||||
newPool f n = assert (n > 0) $ do
|
||||
arr <- newArray (0, n-1) Nothing
|
||||
var <- newMVar arr
|
||||
return (Pool f var)
|
||||
|
||||
data Pool e r = Pool {factory :: Factory e r, resources :: MVar (IOArray Int (Maybe r))}
|
||||
-- ^ Pool of maximum N resources. Resources may expire on their own or be killed. Resources will initially be created on demand up N resources then recycled in random fashion. N may be changed by resizing the pool. Random is preferred to round-robin to distribute effect of pathological use cases that use every Xth resource the most and N is a multiple of X.
|
||||
-- Resources *must* close/kill themselves when garbage collected ('resize' relies on this).
|
||||
|
||||
aResource :: (Error e) => Pool e r -> ErrorT e IO r
|
||||
-- ^ Return a random live resource in pool or create new one if expired or not yet created
|
||||
aResource Pool{..} = withMVar resources $ \array -> do
|
||||
i <- liftIO $ randomRIO =<< getBounds array
|
||||
mr <- liftIO $ readArray array i
|
||||
r <- maybe (new array i) (check array i) mr
|
||||
return r
|
||||
where
|
||||
new array i = do
|
||||
r <- newResource factory
|
||||
liftIO $ writeArray array i (Just r)
|
||||
return r
|
||||
check array i r = do
|
||||
bad <- liftIO $ isExpired factory r
|
||||
if bad then new array i else return r
|
||||
|
||||
poolSize :: Pool e r -> IO Int
|
||||
-- ^ current max size of pool
|
||||
poolSize Pool{resources} = withMVar resources (fmap rangeSize . getBounds)
|
||||
|
||||
resize :: Pool e r -> Int -> IO ()
|
||||
-- ^ resize max size of pool. When shrinking some resource will be dropped without closing since they may still be in use. They are expected to close themselves when garbage collected.
|
||||
resize Pool{resources} n = modifyMVar_ resources $ \array -> do
|
||||
rs <- take n <$> getElems array
|
||||
array' <- newListArray (0, n-1) (rs ++ repeat Nothing)
|
||||
return array'
|
||||
|
||||
killAll :: Pool e r -> IO ()
|
||||
-- ^ Kill all resources in pool so subsequent access creates new ones
|
||||
killAll (Pool Factory{killResource} resources) = withMVar resources $ \array -> do
|
||||
mapM_ killResource . catMaybes =<< getElems array
|
||||
mapM_ (\i -> writeArray array i Nothing) . range =<< getBounds array
|
|
@ -46,4 +46,3 @@ Library
|
|||
Database.MongoDB.Internal.Util
|
||||
Database.MongoDB.Query
|
||||
System.IO.Pipeline
|
||||
System.IO.Pool
|
||||
|
|
Loading…
Reference in a new issue