Small edits to some comments. fix secondaryOk to return master only when no secondaries available
This commit is contained in:
parent
8672652395
commit
8025ac4ec4
5 changed files with 28 additions and 20 deletions
|
@ -3,13 +3,14 @@
|
|||
{-# LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections #-}
|
||||
|
||||
module Database.MongoDB.Connection (
|
||||
-- * Util
|
||||
IOE, runIOE,
|
||||
-- * Connection
|
||||
Pipe, close, isClosed,
|
||||
-- * Host
|
||||
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM, connect,
|
||||
-- * Server
|
||||
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, readHostPortM, connect,
|
||||
-- * Replica Set
|
||||
ReplicaSetName, openReplicaSet, ReplicaSet, primary, secondaryOk
|
||||
ReplicaSetName, openReplicaSet, ReplicaSet, primary, secondaryOk, closeReplicaSet
|
||||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
|
@ -28,7 +29,7 @@ import Data.UString (UString, unpack)
|
|||
import Data.Bson as D (Document, lookup, at, (=:))
|
||||
import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand)
|
||||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle)
|
||||
import Data.List as L (lookup, intersect, partition, (\\))
|
||||
import Data.List as L (lookup, intersect, partition, (\\), delete)
|
||||
|
||||
adminCommand :: Command -> Pipe -> IOE Document
|
||||
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
|
||||
|
@ -43,10 +44,11 @@ adminCommand cmd pipe =
|
|||
data Host = Host HostName PortID deriving (Show, Eq, Ord)
|
||||
|
||||
defaultPort :: PortID
|
||||
-- ^ Default MongoDB port = 27017
|
||||
defaultPort = PortNumber 27017
|
||||
|
||||
host :: HostName -> Host
|
||||
-- ^ Host on default MongoDB port
|
||||
-- ^ Host on 'defaultPort'
|
||||
host hostname = Host hostname defaultPort
|
||||
|
||||
showHostPort :: Host -> String
|
||||
|
@ -61,7 +63,7 @@ showHostPort (Host hostname port) = hostname ++ ":" ++ portname where
|
|||
#endif
|
||||
|
||||
readHostPortM :: (Monad m) => String -> m Host
|
||||
-- ^ Read string \"hostname:port\" as @Host hosthame port@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
|
||||
-- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
|
||||
-- TODO: handle Service and UnixSocket port
|
||||
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
|
||||
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
|
||||
|
@ -75,7 +77,7 @@ readHostPortM = either (fail . show) return . parse parser "readHostPort" where
|
|||
return $ Host h (PortNumber $ fromIntegral port)
|
||||
|
||||
readHostPort :: String -> Host
|
||||
-- ^ Read string \"hostname:port\" as @Host hostname port@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
|
||||
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
|
||||
readHostPort = runIdentity . readHostPortM
|
||||
|
||||
connect :: Host -> IOE Pipe
|
||||
|
@ -98,8 +100,12 @@ openReplicaSet (rsName, seedList) = do
|
|||
_ <- updateMembers rs
|
||||
return rs
|
||||
|
||||
closeReplicaSet :: ReplicaSet -> IO ()
|
||||
-- ^ Close all connections to replica set
|
||||
closeReplicaSet (ReplicaSet _ vMembers) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
|
||||
|
||||
primary :: ReplicaSet -> IOE Pipe
|
||||
-- ^ Return connection to current primary of replica set
|
||||
-- ^ Return connection to current primary of replica set. Fail if no primary available.
|
||||
primary rs@(ReplicaSet rsName _) = do
|
||||
mHost <- statedPrimary <$> updateMembers rs
|
||||
case mHost of
|
||||
|
@ -107,11 +113,12 @@ primary rs@(ReplicaSet rsName _) = do
|
|||
Nothing -> throwError $ userError $ "replica set " ++ unpack rsName ++ " has no primary"
|
||||
|
||||
secondaryOk :: ReplicaSet -> IOE Pipe
|
||||
-- ^ Return connection to a random member (secondary or primary)
|
||||
-- ^ Return connection to a random secondary, or primary if no secondaries available.
|
||||
secondaryOk rs = do
|
||||
info <- updateMembers rs
|
||||
hosts <- lift $ shuffle (possibleHosts info)
|
||||
untilSuccess (connection rs Nothing) hosts
|
||||
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
|
||||
untilSuccess (connection rs Nothing) hosts'
|
||||
|
||||
type ReplicaInfo = (Host, Document)
|
||||
-- ^ Result of isMaster command on host in replica set. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false
|
||||
|
|
|
@ -221,7 +221,7 @@ data Request =
|
|||
data QueryOption =
|
||||
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
|
||||
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
|
||||
| NoCursorTimeout -- The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to prevent that.
|
||||
| NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
|
||||
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
|
||||
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
|
||||
deriving (Show, Eq)
|
||||
|
|
|
@ -25,7 +25,7 @@ module Database.MongoDB.Query (
|
|||
delete, deleteOne,
|
||||
-- * Read
|
||||
-- ** Query
|
||||
Query(..), QueryOption(..), Projector, Limit, Order, BatchSize,
|
||||
Query(..), QueryOption(NoCursorTimeout), Projector, Limit, Order, BatchSize,
|
||||
explain, find, findOne, fetch, count, distinct,
|
||||
-- *** Cursor
|
||||
Cursor, next, nextN, rest, closeCursor, isCursorClosed,
|
||||
|
@ -64,7 +64,7 @@ newtype Action m a = Action (ErrorT Failure (ReaderT Context m) a)
|
|||
instance MonadTrans Action where lift = Action . lift . lift
|
||||
|
||||
access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m (Either Failure a)
|
||||
-- ^ Run action against database on server at other end of pipe. Use write mode for any writes and read mode for any reads. Return Left on connection or read/write failure.
|
||||
-- ^ Run action against database on server at other end of pipe. Use access mode for any reads and writes. Return Left on connection failure or read/write failure.
|
||||
access myPipe myAccessMode myDatabase (Action action) = runReaderT (runErrorT action) Context{..}
|
||||
|
||||
-- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key.
|
||||
|
@ -115,9 +115,9 @@ writeMode (ConfirmWrites z) = Confirm z
|
|||
|
||||
-- | Values needed when executing a db operation
|
||||
data Context = Context {
|
||||
myPipe :: Pipe, -- | operations read/write to this pipelined TCP connection to a MongoDB server
|
||||
myAccessMode :: AccessMode, -- | read/write operation will use this access mode
|
||||
myDatabase :: Database } -- | operations query/update this database
|
||||
myPipe :: Pipe, -- ^ operations read/write to this pipelined TCP connection to a MongoDB server
|
||||
myAccessMode :: AccessMode, -- ^ read/write operation will use this access mode
|
||||
myDatabase :: Database } -- ^ operations query/update this database
|
||||
|
||||
myReadMode :: Context -> ReadMode
|
||||
myReadMode = readMode . myAccessMode
|
||||
|
@ -138,7 +138,7 @@ call ns r = Action $ do
|
|||
promise <- liftIOE ConnectionFailure $ P.call pipe ns r
|
||||
return (liftIOE ConnectionFailure promise)
|
||||
|
||||
-- | If you stack a monad on top of 'Action' then make it an instance of this class and use 'liftDB' to execute an DB Actions within it. Instances already exist for simple mtl transformers.
|
||||
-- | If you stack a monad on top of 'Action' then make it an instance of this class and use 'liftDB' to execute a DB Action within it. Instances already exist for the basic mtl transformers.
|
||||
class (Monad m, MonadMVar (BaseMonad m), Applicative (BaseMonad m), Functor (BaseMonad m)) => MonadDB m where
|
||||
type BaseMonad m :: * -> *
|
||||
liftDB :: Action (BaseMonad m) a -> m a
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will send each thread's request right away without waiting.
|
||||
{- | Pipelining is sending multiple requests over a socket and receiving the responses later in the same order (a' la HTTP pipelining). This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will send each thread's request right away without waiting.
|
||||
|
||||
A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. It also closes itself when garbage collected, or you can close it explicitly. -}
|
||||
|
||||
|
@ -27,6 +27,7 @@ onException (ErrorT action) releaser = ErrorT $ do
|
|||
return e
|
||||
|
||||
type IOE = ErrorT IOError IO
|
||||
-- ^ IO monad with explicit error
|
||||
|
||||
-- * IOStream
|
||||
|
||||
|
@ -59,7 +60,7 @@ newPipeline stream = do
|
|||
return pipe
|
||||
|
||||
close :: Pipeline i o -> IO ()
|
||||
-- | Close pipe and underlying connection
|
||||
-- ^ Close pipe and underlying connection
|
||||
close Pipeline{..} = do
|
||||
killThread listenThread
|
||||
closeStream =<< readMVar vStream
|
||||
|
|
|
@ -22,7 +22,7 @@ stability: alpha
|
|||
homepage: http://github.com/TonyGen/mongoDB-haskell
|
||||
package-url:
|
||||
bug-reports:
|
||||
synopsis: Driver (client) for MongoDB, a free, scalable, fast, document database management system
|
||||
synopsis: Driver (client) for MongoDB, a free, scalable, fast, document DBMS
|
||||
description: This package lets you connect to MongoDB servers and update/query their data. Please see the example in Database.MongoDB and the tutorial from the homepage. For information about MongoDB itself, see www.mongodb.org.
|
||||
category: Database
|
||||
author: Tony Hannan <tony@10gen.com> & Scott Parish <srp@srparish.net>
|
||||
|
|
Loading…
Reference in a new issue