From 8025ac4ec4137ecb257b5fe8696af940dde05ada Mon Sep 17 00:00:00 2001 From: Tony Hannan Date: Wed, 13 Jul 2011 15:34:52 -0400 Subject: [PATCH] Small edits to some comments. fix secondaryOk to return master only when no secondaries available --- Database/MongoDB/Connection.hs | 27 +++++++++++++++++---------- Database/MongoDB/Internal/Protocol.hs | 2 +- Database/MongoDB/Query.hs | 12 ++++++------ System/IO/Pipeline.hs | 5 +++-- mongoDB.cabal | 2 +- 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index f52c68d..096a7b3 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -3,13 +3,14 @@ {-# LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections #-} module Database.MongoDB.Connection ( + -- * Util IOE, runIOE, -- * Connection Pipe, close, isClosed, - -- * Host - Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM, connect, + -- * Server + Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, readHostPortM, connect, -- * Replica Set - ReplicaSetName, openReplicaSet, ReplicaSet, primary, secondaryOk + ReplicaSetName, openReplicaSet, ReplicaSet, primary, secondaryOk, closeReplicaSet ) where import Prelude hiding (lookup) @@ -28,7 +29,7 @@ import Data.UString (UString, unpack) import Data.Bson as D (Document, lookup, at, (=:)) import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand) import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle) -import Data.List as L (lookup, intersect, partition, (\\)) +import Data.List as L (lookup, intersect, partition, (\\), delete) adminCommand :: Command -> Pipe -> IOE Document -- ^ Run command against admin database on server connected to pipe. Fail if connection fails. @@ -43,10 +44,11 @@ adminCommand cmd pipe = data Host = Host HostName PortID deriving (Show, Eq, Ord) defaultPort :: PortID +-- ^ Default MongoDB port = 27017 defaultPort = PortNumber 27017 host :: HostName -> Host --- ^ Host on default MongoDB port +-- ^ Host on 'defaultPort' host hostname = Host hostname defaultPort showHostPort :: Host -> String @@ -61,7 +63,7 @@ showHostPort (Host hostname port) = hostname ++ ":" ++ portname where #endif readHostPortM :: (Monad m) => String -> m Host --- ^ Read string \"hostname:port\" as @Host hosthame port@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax. +-- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax. -- TODO: handle Service and UnixSocket port readHostPortM = either (fail . show) return . parse parser "readHostPort" where hostname = many1 (letter <|> digit <|> char '-' <|> char '.') @@ -75,7 +77,7 @@ readHostPortM = either (fail . show) return . parse parser "readHostPort" where return $ Host h (PortNumber $ fromIntegral port) readHostPort :: String -> Host --- ^ Read string \"hostname:port\" as @Host hostname port@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax. +-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax. readHostPort = runIdentity . readHostPortM connect :: Host -> IOE Pipe @@ -98,8 +100,12 @@ openReplicaSet (rsName, seedList) = do _ <- updateMembers rs return rs +closeReplicaSet :: ReplicaSet -> IO () +-- ^ Close all connections to replica set +closeReplicaSet (ReplicaSet _ vMembers) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd) + primary :: ReplicaSet -> IOE Pipe --- ^ Return connection to current primary of replica set +-- ^ Return connection to current primary of replica set. Fail if no primary available. primary rs@(ReplicaSet rsName _) = do mHost <- statedPrimary <$> updateMembers rs case mHost of @@ -107,11 +113,12 @@ primary rs@(ReplicaSet rsName _) = do Nothing -> throwError $ userError $ "replica set " ++ unpack rsName ++ " has no primary" secondaryOk :: ReplicaSet -> IOE Pipe --- ^ Return connection to a random member (secondary or primary) +-- ^ Return connection to a random secondary, or primary if no secondaries available. secondaryOk rs = do info <- updateMembers rs hosts <- lift $ shuffle (possibleHosts info) - untilSuccess (connection rs Nothing) hosts + let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info) + untilSuccess (connection rs Nothing) hosts' type ReplicaInfo = (Host, Document) -- ^ Result of isMaster command on host in replica set. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index c9101cb..02eda37 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -221,7 +221,7 @@ data Request = data QueryOption = TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception. | SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local". - | NoCursorTimeout -- The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to prevent that. + | NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed. | AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal. -- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection. deriving (Show, Eq) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index f258b84..c74903b 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -25,7 +25,7 @@ module Database.MongoDB.Query ( delete, deleteOne, -- * Read -- ** Query - Query(..), QueryOption(..), Projector, Limit, Order, BatchSize, + Query(..), QueryOption(NoCursorTimeout), Projector, Limit, Order, BatchSize, explain, find, findOne, fetch, count, distinct, -- *** Cursor Cursor, next, nextN, rest, closeCursor, isCursorClosed, @@ -64,7 +64,7 @@ newtype Action m a = Action (ErrorT Failure (ReaderT Context m) a) instance MonadTrans Action where lift = Action . lift . lift access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m (Either Failure a) --- ^ Run action against database on server at other end of pipe. Use write mode for any writes and read mode for any reads. Return Left on connection or read/write failure. +-- ^ Run action against database on server at other end of pipe. Use access mode for any reads and writes. Return Left on connection failure or read/write failure. access myPipe myAccessMode myDatabase (Action action) = runReaderT (runErrorT action) Context{..} -- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key. @@ -115,9 +115,9 @@ writeMode (ConfirmWrites z) = Confirm z -- | Values needed when executing a db operation data Context = Context { - myPipe :: Pipe, -- | operations read/write to this pipelined TCP connection to a MongoDB server - myAccessMode :: AccessMode, -- | read/write operation will use this access mode - myDatabase :: Database } -- | operations query/update this database + myPipe :: Pipe, -- ^ operations read/write to this pipelined TCP connection to a MongoDB server + myAccessMode :: AccessMode, -- ^ read/write operation will use this access mode + myDatabase :: Database } -- ^ operations query/update this database myReadMode :: Context -> ReadMode myReadMode = readMode . myAccessMode @@ -138,7 +138,7 @@ call ns r = Action $ do promise <- liftIOE ConnectionFailure $ P.call pipe ns r return (liftIOE ConnectionFailure promise) --- | If you stack a monad on top of 'Action' then make it an instance of this class and use 'liftDB' to execute an DB Actions within it. Instances already exist for simple mtl transformers. +-- | If you stack a monad on top of 'Action' then make it an instance of this class and use 'liftDB' to execute a DB Action within it. Instances already exist for the basic mtl transformers. class (Monad m, MonadMVar (BaseMonad m), Applicative (BaseMonad m), Functor (BaseMonad m)) => MonadDB m where type BaseMonad m :: * -> * liftDB :: Action (BaseMonad m) a -> m a diff --git a/System/IO/Pipeline.hs b/System/IO/Pipeline.hs index 82aeab4..f2f6795 100644 --- a/System/IO/Pipeline.hs +++ b/System/IO/Pipeline.hs @@ -1,4 +1,4 @@ -{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will send each thread's request right away without waiting. +{- | Pipelining is sending multiple requests over a socket and receiving the responses later in the same order (a' la HTTP pipelining). This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will send each thread's request right away without waiting. A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. It also closes itself when garbage collected, or you can close it explicitly. -} @@ -27,6 +27,7 @@ onException (ErrorT action) releaser = ErrorT $ do return e type IOE = ErrorT IOError IO +-- ^ IO monad with explicit error -- * IOStream @@ -59,7 +60,7 @@ newPipeline stream = do return pipe close :: Pipeline i o -> IO () --- | Close pipe and underlying connection +-- ^ Close pipe and underlying connection close Pipeline{..} = do killThread listenThread closeStream =<< readMVar vStream diff --git a/mongoDB.cabal b/mongoDB.cabal index 94c2b16..faa7ba8 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -22,7 +22,7 @@ stability: alpha homepage: http://github.com/TonyGen/mongoDB-haskell package-url: bug-reports: -synopsis: Driver (client) for MongoDB, a free, scalable, fast, document database management system +synopsis: Driver (client) for MongoDB, a free, scalable, fast, document DBMS description: This package lets you connect to MongoDB servers and update/query their data. Please see the example in Database.MongoDB and the tutorial from the homepage. For information about MongoDB itself, see www.mongodb.org. category: Database author: Tony Hannan & Scott Parish