Slight API refactoring. Fix spinning pipeline when other end disconnects. Handle response flags correctly

This commit is contained in:
Tony Hannan 2010-07-27 17:18:53 -04:00
parent 6435bc3cd9
commit 3a7f235246
13 changed files with 293 additions and 207 deletions

View file

@ -1,4 +1,4 @@
{- | This is just like Control.Monad.Reader.Class except you can access the context of any Reader in the monad stack instead of just the top one as long as the context types are different. If two or more readers in the stack have the same context type you get the context of the top one. -}
{- | This is just like "Control.Monad.Reader.Class" except you can access the context of any Reader in the monad stack instead of just the top one as long as the context types are different. If two or more readers in the stack have the same context type you get the context of the top one. -}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances #-}
@ -8,7 +8,7 @@ import Control.Monad.Reader
import Control.Monad.Error
-- | Same as 'MonadReader' but without functional dependency so the same monad can have multiple contexts with different types
class Context x m where
class (Monad m) => Context x m where
context :: m x
-- ^ Get the context in the Reader in the monad stack that has @x@ context type. Analogous to 'ask'.
push :: (x -> x) -> m a -> m a
@ -18,10 +18,10 @@ instance (Monad m) => Context x (ReaderT x m) where
context = ask
push = local
instance (Monad m, Context x m) => Context x (ReaderT r m) where
instance (Context x m) => Context x (ReaderT r m) where
context = lift context
push f m = ReaderT (push f . runReaderT m)
instance (Monad m, Context x m, Error e) => Context x (ErrorT e m) where
instance (Context x m, Error e) => Context x (ErrorT e m) where
context = lift context
push f = ErrorT . push f . runErrorT

36
Control/Monad/Throw.hs Normal file
View file

@ -0,0 +1,36 @@
{- | This is just like "Control.Monad.Error.Class" except you can throw/catch the error of any ErrorT in the monad stack instead of just the top one as long as the error types are different. If two or more ErrorTs in the stack have the same error type you get the error of the top one. -}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, OverlappingInstances #-}
module Control.Monad.Throw where
import Prelude hiding (catch)
import Control.Monad.Reader
import Control.Monad.Error
-- | Same as 'MonadError' but without functional dependency so the same monad can have multiple errors with different types
class (Monad m) => Throw e m where
throw :: e -> m a
-- ^ Abort action and throw give exception. Analogous to 'throwError'.
catch :: m a -> (e -> m a) -> m a
-- ^ If first action aborts with exception then execute second action. Analogous to 'catchError'
throwLeft :: (Throw e m) => m (Either e a) -> m a
-- ^ Execute action and throw exception if result is Left, otherwise return the Right result
throwLeft = (either throw return =<<)
instance (Error e) => Throw e (Either e) where
throw = throwError
catch = catchError
instance (Error e, Monad m) => Throw e (ErrorT e m) where
throw = throwError
catch = catchError
instance (Error e, Throw e m, Error x) => Throw e (ErrorT x m) where
throw = lift . throw
catch a h = ErrorT $ catch (runErrorT a) (runErrorT . h)
instance (Throw e m) => Throw e (ReaderT x m) where
throw = lift . throw
catch a h = ReaderT $ \x -> catch (runReaderT a x) (flip runReaderT x . h)

View file

@ -17,7 +17,7 @@ import Prelude hiding (length)
import Control.Applicative ((<$>))
import Control.Monad (forever)
import Control.Exception (assert)
import System.IO.Error (try)
import System.IO.Error (try, mkIOError, eofErrorType)
import System.IO (Handle, hFlush, hClose, hIsClosed)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
@ -43,7 +43,9 @@ instance Length L.ByteString where
class Resource m r where
close :: r -> m ()
-- ^ Close resource
isClosed :: r -> m Bool
-- ^ Is resource closed
instance Resource IO Handle where
close = hClose
@ -64,16 +66,16 @@ class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where
put :: handle -> bytes -> IO ()
-- ^ Write bytes to handle
get :: handle -> Int -> IO bytes
-- ^ Read up to N bytes from handle, block until at least 1 byte is available
-- ^ Read up to N bytes from handle; if EOF return empty bytes, otherwise block until at least 1 byte is available
getN :: (Stream h b) => h -> Int -> IO b
-- ^ Read N bytes from hande, blocking until all N bytes are read. Unlike 'get' which only blocks if no bytes are available.
-- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then throw EOF exception.
getN h n = assert (n >= 0) $ do
bytes <- get h n
let x = length bytes
if x >= n then return bytes else do
remainingBytes <- getN h (n - x)
return (mappend bytes remainingBytes)
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "Control.Pipeline" Nothing Nothing)
else mappend bytes <$> getN h (n - x)
instance Stream Handle S.ByteString where
put = S.hPut

View file

@ -28,7 +28,7 @@ module Database.MongoDB.Admin (
import Prelude hiding (lookup)
import Control.Applicative ((<$>))
import Database.MongoDB.Internal.Protocol (pwHash, pwKey)
import Database.MongoDB.Connection (Server, showHostPort)
import Database.MongoDB.Connection (Host, showHostPort)
import Database.MongoDB.Query
import Data.Bson
import Data.UString (pack, unpack, append, intercalate)
@ -191,12 +191,12 @@ removeUser user = delete (select ["user" =: user] "system.users")
-- ** Database
cloneDatabase :: (Conn m) => Database -> Server -> m Document
-- ^ Copy database from given server to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
cloneDatabase :: (Conn m) => Database -> Host -> m Document
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost]
copyDatabase :: (Conn m) => Database -> Server -> Maybe (Username, Password) -> Database -> m Document
-- ^ Copy database from given server to the server I am connected to. If username & password is supplied use them to read from given server.
copyDatabase :: (Conn m) => Database -> Host -> Maybe (Username, Password) -> Database -> m Document
-- ^ Copy database from given host to the server I am connected to. If username & password is supplied use them to read from given host.
copyDatabase fromDb fromHost mup toDb = do
let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb]
useDb "admin" $ case mup of

View file

@ -1,168 +1,186 @@
{- | A replica set is a set of servers that mirror each other (a non-replicated server can act like a replica set of one). One server in a replica set is the master and the rest are slaves. When the master goes down, one of the slaves becomes master. The ReplicaSet object in this client maintains a list of servers that it currently knows are in the set. It refreshes this list every time it establishes a new connection with one of the servers in the set. Each server in the set knows who the other member in the set are, and who is master. The user asks the ReplicaSet object for a new master or slave connection. When a connection fails, the user must ask the ReplicaSet for a new connection (which most likely will connect to another server since the previous one failed). When connecting to a new server you loose all session state that was stored with the old server, which includes open cursors and temporary map-reduce output collections. Attempting to read from a lost cursor on a new server will raise a ServerFailure exception. Attempting to read a lost map-reduce temp output on a new server will return an empty set (not an error, like it maybe should). -}
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-}
module Database.MongoDB.Connection (
-- * Server
Server(..), PortID(..), server, showHostPort, readHostPort, readHostPortF,
runNet,
-- * Host
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
-- * ReplicaSet
ReplicaSet, replicaSet, replicaServers,
MasterOrSlave(..), FailedToConnect, newConnection,
ReplicaSet, replicaSet, replicas,
newConnection,
-- * MasterOrSlaveOk
MasterOrSlaveOk(..),
-- * Connection
Connection, connect,
-- * Resource
Resource(..)
) where
import Database.MongoDB.Internal.Protocol (Connection, mkConnection)
import Database.MongoDB.Query (Failure(..), Conn, runConn, useDb, runCommand1)
import Database.MongoDB.Internal.Protocol
import Data.Bson ((=:), at)
import Control.Pipeline (Resource(..))
import Control.Applicative ((<$>))
import Control.Arrow ((+++), left)
import Control.Exception (assert)
import System.IO.Error as E (try)
import Control.Monad.Error
import Control.Monad.Throw
import Data.IORef
import Network (HostName, PortID(..), connectTo)
import Data.Bson (Document, look, typed)
import Text.ParserCombinators.Parsec as P (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity
import Database.MongoDB.Internal.Util (true1) -- PortID instances
import Database.MongoDB.Internal.Util (true1, MonadIO') -- PortID instances
-- * Server
runNet :: ErrorT IOError m a -> m (Either IOError a)
-- ^ Execute action that raises IOError only on network problem. Other IOErrors like file access errors are not caught by this.
runNet = runErrorT
data Server = Server HostName PortID deriving (Show, Eq, Ord)
adminCommand :: Document -> Request
-- ^ Convert command to request
adminCommand cmd = Query{..} where
qOptions = [SlaveOK]
qFullCollection = "admin.$cmd"
qSkip = 0
qBatchSize = 0
qSelector = cmd
qProjector = []
commandReply :: String -> Reply -> Document
-- ^ Extract first document from reply. Error if query error, using given string as prefix error message.
commandReply title Reply{..} = if elem QueryError rResponseFlags
then error $ title ++ ": " ++ at "$err" (head rDocuments)
else head rDocuments
-- * Host
data Host = Host HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID
defaultPort = PortNumber 27017
server :: HostName -> Server
-- ^ Server on default MongoDB port
server host = Server host defaultPort
host :: HostName -> Host
-- ^ Host on default MongoDB port
host hostname = Host hostname defaultPort
showHostPort :: Server -> String
-- ^ Display server as \"host:port\"
showHostPort (Server host port) = host ++ ":" ++ (case port of
showHostPort :: Host -> String
-- ^ Display host as \"host:port\"
showHostPort (Host hostname port) = hostname ++ ":" ++ (case port of
Service s -> s
PortNumber p -> show p
UnixSocket s -> s)
readHostPortF :: (Monad m) => String -> m Server
-- ^ Read string \"host:port\" as 'Server host port' or \"host\" as 'server host' (default port). Fail if string does not match either syntax.
readHostPortF = either (fail . show) return . parse parser "readHostPort" where
readHostPortM :: (Monad m) => String -> m Host
-- ^ Read string \"hostname:port\" as @Host hosthame port@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
host <- hostname
P.try (spaces >> eof >> return (server host)) <|> do
h <- hostname
T.try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Server host (PortNumber $ fromIntegral port)
return $ Host h (PortNumber $ fromIntegral port)
readHostPort :: String -> Server
-- ^ Read string \"host:port\" as 'Server host port' or \"host\" as 'server host' (default port). Error if string does not match either syntax.
readHostPort = runIdentity . readHostPortF
readHostPort :: String -> Host
-- ^ Read string \"hostname:port\" as @Host hostname port@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
readHostPort = runIdentity . readHostPortM
-- * Replica Set
-- ** Replica Set
newtype ReplicaSet = ReplicaSet (IORef [Server])
-- ^ Reference to a replica set of servers. Ok if really not a replica set and just a stand-alone server, in which case it acts like a replica set of one.
newtype ReplicaSet = ReplicaSet (IORef [Host])
-- ^ Reference to a replica set of hosts. Ok if really not a replica set and just a stand-alone server, in which case it acts like a replica set of one.
replicaSet :: [Server] -> IO ReplicaSet
-- ^ Create a reference to a replica set with servers as the initial seed list (a subset of the servers in the replica set)
replicaSet :: [Host] -> IO ReplicaSet
-- ^ Create a reference to a replica set with given hosts as the initial seed list (a subset of the hosts in the replica set)
replicaSet s = assert (not $ null s) (ReplicaSet <$> newIORef s)
replicaServers :: ReplicaSet -> IO [Server]
-- ^ Return current list of known servers in replica set. This list is updated on every 'newConnection'.
replicaServers (ReplicaSet ref) = readIORef ref
replicas :: ReplicaSet -> IO [Host]
-- ^ Return current list of known hosts in replica set. This list is updated on every 'newConnection'.
replicas (ReplicaSet ref) = readIORef ref
-- * Replica Info
data ReplicaInfo = ReplicaInfo Server Document deriving (Show, Eq)
-- ^ Configuration info of a server in a replica set. Contains all the servers in the replica set plus its role in that set (master, slave, or arbiter)
data ReplicaInfo = ReplicaInfo Host Document deriving (Show, Eq)
-- ^ Configuration info of a host in a replica set. Contains all the hosts in the replica set plus its role in that set (master, slave, or arbiter)
isMaster :: ReplicaInfo -> Bool
-- ^ Is the replica server described by this info a master/primary (not slave or arbiter)?
-- ^ Is the replica described by this info a master/primary (not slave or arbiter)?
isMaster (ReplicaInfo _ i) = true1 "ismaster" i
isSlave :: ReplicaInfo -> Bool
-- ^ Is the replica server described by this info a slave/secondary (not master or arbiter)
-- ^ Is the replica described by this info a slave/secondary (not master or arbiter)
isSlave = not . isMaster -- TODO: distinguish between slave and arbiter
allReplicas :: ReplicaInfo -> [Server]
allReplicas :: ReplicaInfo -> [Host]
-- ^ All replicas in set according to this replica configuration info.
-- If server is stand-alone then it won't have \"hosts\" in it configuration, in which case we return the server by itself.
allReplicas (ReplicaInfo s i) = maybe [s] (map readHostPort . typed) (look "hosts" i)
-- If host is stand-alone then it won't have \"hosts\" in its configuration, in which case we return the host by itself.
allReplicas (ReplicaInfo h i) = maybe [h] (map readHostPort . typed) (look "hosts" i)
sortedReplicas :: ReplicaInfo -> IO [Server]
sortedReplicas :: ReplicaInfo -> IO [Host]
-- ^ All replicas in set sorted by distance from this client. TODO
sortedReplicas = return . allReplicas
getReplicaInfo :: (Server, Connection) -> IO (Either IOError ReplicaInfo)
-- ^ Get replica info of the connected server. Return Left IOError if connection fails
getReplicaInfo (serv, conn) = left err <$> runConn (ReplicaInfo serv <$> getReplicaInfoDoc) conn where
err (ConnectionFailure e) = e
err (ServerFailure e) = userError e
getReplicaInfo :: (Throw IOError m, MonadIO' m) => Host -> Connection -> m ReplicaInfo
-- ^ Get replica info of the connected host. Throw IOError if connection fails.
getReplicaInfo host' conn = do
promise <- throwLeft . liftIO . E.try $ call conn [] (adminCommand ["ismaster" =: (1 :: Int)])
fmap (ReplicaInfo host' . commandReply "ismaster") . throwLeft . liftIO $ E.try promise
getReplicaInfoDoc :: (Conn m) => m Document
-- ^ Get replica info of connected server
getReplicaInfoDoc = useDb "admin" (runCommand1 "ismaster")
-- * MasterOrSlaveOk
-- * MasterOrSlave
data MasterOrSlave =
data MasterOrSlaveOk =
Master -- ^ connect to master only
| SlaveOk -- ^ connect to a slave, or master if no slave available
deriving (Show, Eq)
isMS :: MasterOrSlave -> ReplicaInfo -> Bool
-- ^ Does the server (as described by its info) match the master/slave type
isMS :: MasterOrSlaveOk -> ReplicaInfo -> Bool
-- ^ Does the host (as described by its replica-info) match the master/slave type
isMS Master i = isMaster i
isMS SlaveOk i = isSlave i || isMaster i
-- * Connection
type FailedToConnect = [(Server, IOError)]
-- ^ All servers tried in replica set along with reason why each failed to connect
newConnection :: MasterOrSlave -> ReplicaSet -> IO (Either FailedToConnect Connection)
-- ^ Create a connection to a master or slave in the replica set. Don't forget to close connection when you are done using it even if Failure exception is raised when using it. newConnection returns Left if failed to connect to any server in replica set.
newConnection :: (Throw IOError m, MonadIO' m) => MasterOrSlaveOk -> ReplicaSet -> m Connection
-- ^ Create a connection to a master or slave in the replica set. Throw IOError if failed to connect to any host in replica set that is the right master/slave type. 'close' connection when you are done using it even if a failure is raised. Garbage collected connections will be closed automatically (but don't rely on this when creating many connections).
-- TODO: prefer slave over master when SlaveOk and both are available.
newConnection mos (ReplicaSet vServers) = do
servers <- readIORef vServers
e <- connectFirst mos servers
newConnection mos (ReplicaSet vHosts) = throwLeft . liftIO $ left (userError . show) <$> do
hosts <- readIORef vHosts
e <- connectFirst mos hosts
case e of
Right (conn, info) -> do
writeIORef vServers =<< sortedReplicas info
writeIORef vHosts =<< sortedReplicas info
return (Right conn)
Left (fs, is) -> if null is
then return (Left fs)
else do
replicas <- sortedReplicas (head is)
writeIORef vServers replicas
writeIORef vHosts replicas
-- try again in case new replicas in info
(fst +++ fst) <$> connectFirst mos replicas
connectFirst :: MasterOrSlave -> [Server] -> IO (Either ([(Server, IOError)], [ReplicaInfo]) (Connection, ReplicaInfo))
-- ^ Connect to first server that succeeds and is master/slave, otherwise return list of failed connections plus info of connections that succeeded but were not master/slave.
connectFirst :: MasterOrSlaveOk -> [Host] -> IO (Either ([(Host, IOError)], [ReplicaInfo]) (Connection, ReplicaInfo))
-- ^ Connect to first host that succeeds and is master/slave, otherwise return list of failed connections plus info of connections that succeeded but were not master/slave.
connectFirst mos = connectFirst' ([], []) where
connectFirst' (fs, is) [] = return $ Left (fs, is)
connectFirst' (fs, is) (s : ss) = do
connectFirst' (fs, is) (h : hs) = do
e <- runErrorT $ do
c <- ErrorT (connect s)
i <- ErrorT (getReplicaInfo (s, c))
c <- connect h
i <- getReplicaInfo h c
return (c, i)
case e of
Left f -> connectFirst' ((s, f) : fs, is) ss
Left f -> connectFirst' ((h, f) : fs, is) hs
Right (c, i) -> if isMS mos i
then return $ Right (c, i)
else do
close c
connectFirst' ((s, userError $ "not a " ++ show mos) : fs, i : is) ss
connectFirst' ((h, userError $ "not a " ++ show mos) : fs, i : is) hs
connect :: Server -> IO (Either IOError Connection)
-- ^ Create a connection to the given server (as opposed to connecting to some server in a replica set via 'newConnection'). Return Left IOError if failed to connect.
connect (Server host port) = E.try (mkConnection =<< connectTo host port)
connect :: (Throw IOError m, MonadIO' m) => Host -> m Connection
-- ^ Create a connection to the given host (as opposed to connecting to some host in a replica set via 'newConnection'). Throw IOError if can't connect.
connect (Host hostname port) = throwLeft . liftIO $ E.try (mkConnection =<< connectTo hostname port)
{- Authors: Tony Hannan <tony@10gen.com>

View file

@ -1,6 +1,6 @@
{-| Low-level messaging between this client and the MongoDB server. See Mongo Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" instead. -}
This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
@ -198,9 +198,11 @@ data Request =
deriving (Show, Eq)
data QueryOption =
TailableCursor |
SlaveOK |
NoCursorTimeout -- Never timeout the cursor. When not set, the cursor will die if idle for more than 10 minutes.
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
| NoCursorTimeout -- The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to prevent that.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
deriving (Show, Eq)
-- *** Binary format
@ -230,6 +232,8 @@ qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1
qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4
qBit AwaitData = bit 5
--qBit Exhaust = bit 6
qBits :: [QueryOption] -> Int32
qBits = bitOr . map qBit
@ -246,7 +250,7 @@ data Reply = Reply {
data ResponseFlag =
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
| QueryError -- ^ Server error. Results contains one document containing an "$err" field holding the error message.
| QueryError -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
| AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
deriving (Show, Eq, Enum)

View file

@ -1,6 +1,6 @@
-- | Miscellaneous general functions
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE StandaloneDeriving, FlexibleInstances, UndecidableInstances #-}
module Database.MongoDB.Internal.Util where
@ -25,6 +25,9 @@ instance (Monad m, Error e) => Applicative (ErrorT e m) where
pure = return
(<*>) = ap
class (MonadIO m, Applicative m, Functor m) => MonadIO' m
instance (MonadIO m, Applicative m, Functor m) => MonadIO' m
ignore :: (Monad m) => a -> m ()
ignore _ = return ()

View file

@ -1,9 +1,9 @@
-- | Query and update documents residing on a MongoDB server(s)
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes, ImpredicativeTypes #-}
module Database.MongoDB.Query (
-- * Connection
-- * Connected
Connected, runConn, Conn, Failure(..),
-- * Database
Database, allDatabases, DbConn, useDb, thisDatabase,
@ -24,8 +24,9 @@ module Database.MongoDB.Query (
-- ** Delete
delete, deleteOne,
-- * Read
slaveOk,
-- ** Query
Query(..), P.QueryOption(..), Projector, Limit, Order, BatchSize,
Query(..), QueryOption(..), Projector, Limit, Order, BatchSize,
explain, find, findOne, count, distinct,
-- *** Cursor
Cursor, next, nextN, rest,
@ -40,64 +41,64 @@ module Database.MongoDB.Query (
import Prelude as X hiding (lookup)
import Control.Applicative ((<$>), Applicative(..))
import Control.Arrow (left, first, second)
import Control.Arrow (first)
import Control.Monad.Context
import Control.Monad.Reader
import Control.Monad.Error
import Control.Monad.Throw
import System.IO.Error (try)
import Control.Concurrent.MVar
import Control.Pipeline (Resource(..))
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Protocol hiding (Query, send, call)
import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call)
import Database.MongoDB.Connection (MasterOrSlaveOk(..))
import Data.Bson
import Data.Word
import Data.Int
import Data.Maybe (listToMaybe, catMaybes, mapMaybe)
import Data.Maybe (listToMaybe, catMaybes)
import Data.UString as U (dropWhile, any, tail)
import Database.MongoDB.Internal.Util (loop, (<.>), true1) -- plus Applicative instances of ErrorT & ReaderT
import Database.MongoDB.Internal.Util (loop, (<.>), true1, MonadIO') -- plus Applicative instances of ErrorT & ReaderT
-- * Connected
send :: (Context Connection m, Throw IOError m, MonadIO m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send ns = throwLeft . liftIO . try . flip P.send ns =<< context
newtype Connected m a = Connected (ErrorT Failure (ReaderT WriteMode (ReaderT Connection m)) a)
deriving (Context Connection, Context WriteMode, MonadError Failure, MonadIO, Monad, Applicative, Functor)
-- ^ Monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure
instance MonadTrans Connected where
lift = Connected . lift . lift . lift
runConn :: Connected m a -> Connection -> m (Either Failure a)
-- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution.
runConn (Connected action) = runReaderT (runReaderT (runErrorT action) Unsafe)
-- | A monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure
class (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m
instance (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m
-- | Connection or Server failure like network problem or disk full
data Failure =
ConnectionFailure IOError
-- ^ Error during sending or receiving bytes over a 'Connection'. The connection is not automatically closed when this error happens; the user must close it. Any other IOErrors raised during a Task or Op are not caught. The user is responsible for these other types of errors not related to sending/receiving bytes over the connection.
| ServerFailure String
-- ^ Failure on server, like disk full, which is usually observed using getLastError. Calling 'fail' inside a connected monad raises this failure. Do not call 'fail' unless it is a temporary server failure, like disk full. For example, receiving unexpected data from the server is not a server failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
deriving (Show, Eq)
instance Error Failure where strMsg = ServerFailure
send :: (Conn m) => [Notice] -> m ()
-- ^ Send notices as a contiguous batch to server with no reply. Raise Failure if connection fails.
send ns = do
conn <- context
e <- liftIO $ try (P.send conn ns)
either (throwError . ConnectionFailure) return e
call :: (Conn m) => [Notice] -> Request -> m (IO (Either Failure Reply))
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will raise Failure if connection fails send, and promise will return Failure if connection fails receive.
call :: (Context Connection m, Throw IOError m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw IOError n, MonadIO n) => n Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw IOError if connection fails on send, and promise will throw IOError if connection fails on receive.
call ns r = do
conn <- context
e <- liftIO $ try (P.call conn ns r)
case e of
Left err -> throwError (ConnectionFailure err)
Right promise -> return (left ConnectionFailure <$> try promise)
promise <- throwLeft . liftIO $ try (P.call conn ns r)
return (throwLeft . liftIO $ try promise)
-- * Connected Monad
newtype Connected m a = Connected (ErrorT Failure (ReaderT WriteMode (ReaderT MasterOrSlaveOk (ReaderT Connection m))) a)
deriving (Context Connection, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, Monad, Applicative, Functor)
-- ^ Monad with access to a 'Connection', 'MasterOrSlaveOk', and 'WriteMode', and throws a 'Failure' on read/write failure and IOError on connection failure
deriving instance (Throw IOError m) => Throw IOError (Connected m)
instance MonadTrans Connected where
lift = Connected . lift . lift . lift . lift
runConn :: Connected m a -> Connection -> m (Either Failure a)
-- ^ Run action with access to connection. It starts out assuming it is master (invoke 'slaveOk' inside it to change that) and that writes don't need to be check (invoke 'writeMode' to change that). Return Left Failure if error in execution. Throws IOError if connection fails during execution.
runConn (Connected action) = runReaderT (runReaderT (runReaderT (runErrorT action) Unsafe) Master)
-- | A monad with access to a 'Connection', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read/write failure and 'IOError' on connection failure
class (Context Connection m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, Throw IOError m, MonadIO' m) => Conn m
instance (Context Connection m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, Throw IOError m, MonadIO' m) => Conn m
-- | Read or write exception like cursor expired or inserting a duplicate key.
-- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
data Failure =
CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
| QueryFailure String -- ^ Query failed for some reason as described in the string
| WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string
deriving (Show, Eq)
instance Error Failure where strMsg = error
-- ^ 'fail' is treated the same as 'error'. In other words, don't use it.
-- * Database
@ -184,14 +185,14 @@ writeMode :: (Conn m) => WriteMode -> m a -> m a
writeMode = push . const
write :: (DbConn m) => Notice -> m ()
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'ServerFailure' if it reports an error.
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
write notice = do
mode <- context
case mode of
Unsafe -> send [notice]
Safe -> do
me <- getLastError [notice]
maybe (return ()) (throwError . ServerFailure . show) me
maybe (return ()) (throw . uncurry WriteFailure) me
type ErrorCode = Int
-- ^ Error code from getLastError
@ -281,6 +282,16 @@ delete' opts (Select sel col) = do
-- * Read
-- ** MasterOrSlaveOk
slaveOk :: (Conn m) => m a -> m a
-- ^ Ok to execute given action against slave, ie. eventually consistent reads
slaveOk = push (const SlaveOk)
msOption :: MasterOrSlaveOk -> [P.QueryOption]
msOption Master = []
msOption SlaveOk = [P.SlaveOK]
-- ** Query
-- | Use 'select' to create a basic query with defaults, then modify if desired. For example, @(select sel col) {limit = 10}@
@ -296,6 +307,18 @@ data Query = Query {
hint :: Order -- ^ Force MongoDB to use this index, [] = no hint. Default = []
} deriving (Show, Eq)
data QueryOption =
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
| NoCursorTimeout -- The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to prevent that.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
deriving (Show, Eq)
pOption :: QueryOption -> P.QueryOption
-- ^ Convert to protocol query option
pOption TailableCursor = P.TailableCursor
pOption NoCursorTimeout = P.NoCursorTimeout
pOption AwaitData = P.AwaitData
type Projector = Document
-- ^ Fields to return, analogous to the select clause in SQL. @[]@ means return whole document (analogous to * in SQL). @[x =: 1, y =: 1]@ means return only @x@ and @y@ fields of each document. @[x =: 0]@ means return all fields except @x@.
@ -322,10 +345,10 @@ batchSizeRemainingLimit batchSize limit = if limit == 0
where batchSize' = if batchSize == 1 then 2 else batchSize
-- batchSize 1 is broken because server converts 1 to -1 meaning limit 1
queryRequest :: Bool -> Query -> Database -> (Request, Limit)
queryRequest :: Bool -> MasterOrSlaveOk -> Query -> Database -> (Request, Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequest isExplain Query{..} db = (P.Query{..}, remainingLimit) where
qOptions = options
queryRequest isExplain mos Query{..} db = (P.Query{..}, remainingLimit) where
qOptions = msOption mos ++ map pOption options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit
@ -339,13 +362,10 @@ queryRequest isExplain Query{..} db = (P.Query{..}, remainingLimit) where
runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState'
-- ^ Send query request and return cursor state
runQuery isExplain ns q = call' ns . queryRequest isExplain q =<< thisDatabase
call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
-- ^ Send notices and request and return promised cursor state
call' ns (req, remainingLimit) = do
promise <- call ns req
return $ Delayed (fmap (fromReply remainingLimit =<<) promise)
runQuery isExplain ns q = do
db <- thisDatabase
slaveOk <- context
call' ns (queryRequest isExplain slaveOk q db)
find :: (DbConn m) => Query -> m Cursor
-- ^ Fetch documents satisfying query
@ -383,43 +403,54 @@ distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "ke
-- *** Cursor
data Cursor = Cursor FullCollection BatchSize (MVar CursorState')
-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a ServerFailure exception. Note, a cursor is not closed when the connection is closed, so you can open another connection to the same server and continue using the cursor.
-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the connection is closed, so you can open another connection to the same server and continue using the cursor.
modifyCursorState' :: (Conn m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Connected IO (CursorState', a)) -> m a
modifyCursorState' :: (Conn m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Connected (ErrorT IOError IO) (CursorState', a)) -> m a
-- ^ Analogous to 'modifyMVar' but with Conn monad
modifyCursorState' (Cursor fcol batch var) act = do
conn <- context
e <- liftIO . modifyMVar var $ \cs' ->
either ((cs',) . Left) (second Right) <$> runConn (act fcol batch cs') conn
either throwError return e
e <- liftIO . modifyMVar var $ \cs' -> do
ee <- runErrorT $ runConn (act fcol batch cs') conn
return $ case ee of
Right (Right (cs'', a)) -> (cs'', Right a)
Right (Left failure) -> (cs', Left $ throw failure)
Left ioerror -> (cs', Left $ throw ioerror)
either id return e
getCursorState :: (Conn m) => Cursor -> m CursorState
-- ^ Extract current cursor status
getCursorState (Cursor _ _ var) = cursorState =<< liftIO (readMVar var)
data CursorState' = Delayed (IO (Either Failure CursorState)) | CursorState CursorState
data CursorState' =
Delayed (forall n. (Throw Failure n, Throw IOError n, MonadIO n) => n CursorState)
| CursorState CursorState
-- ^ A cursor state or a promised cursor state which may fail
call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState'
-- ^ Send notices and request and return promised cursor state
call' ns (req, remainingLimit) = do
promise <- call ns req
return $ Delayed (fromReply remainingLimit =<< promise)
cursorState :: (Conn m) => CursorState' -> m CursorState
-- ^ Convert promised cursor state to cursor state or raise Failure
cursorState (Delayed promise) = either throwError return =<< liftIO promise
-- ^ Convert promised cursor state to cursor state or failure
cursorState (Delayed promise) = promise
cursorState (CursorState cs) = return cs
data CursorState = CS Limit CursorId [Document]
-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch.
fromReply :: Limit -> Reply -> Either Failure CursorState
fromReply :: (Throw Failure m) => Limit -> Reply -> m CursorState
-- ^ Convert Reply to CursorState or Failure
fromReply limit Reply{..} = case mapMaybe fromResponseFlag rResponseFlags of
[] -> Right (CS limit rCursorId rDocuments)
err : _ -> Left err
fromReply limit Reply{..} = do
mapM_ checkResponseFlag rResponseFlags
return (CS limit rCursorId rDocuments)
where
fromResponseFlag :: ResponseFlag -> Maybe Failure
-- ^ If response flag indicate failure then Just Failure, otherwise Nothing
fromResponseFlag x = case x of
AwaitCapable -> Nothing
CursorNotFound -> Just . ServerFailure $ "Cursor " ++ show rCursorId ++ " not found"
QueryError -> Just . ServerFailure $ "Query failure " ++ show rDocuments
-- If response flag indicates failure then throw it, otherwise do nothing
checkResponseFlag flag = case flag of
AwaitCapable -> return ()
CursorNotFound -> throw (CursorNotFoundFailure rCursorId)
QueryError -> throw (QueryFailure $ at "$err" $ head rDocuments)
newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
@ -427,14 +458,14 @@ newCursor db col batch cs = do
conn <- context
var <- liftIO (newMVar cs)
let cursor = Cursor (db <.> col) batch var
liftIO . addMVarFinalizer var $ runConn (close cursor) conn >> return ()
liftIO . addMVarFinalizer var $ runErrorT (runConn (close cursor) conn :: ErrorT IOError IO (Either Failure ())) >> return ()
return cursor
next :: (Conn m) => Cursor -> m (Maybe Document)
-- ^ Return next document in query result, or Nothing if finished.
next cursor = modifyCursorState' cursor nextState where
-- Pre-fetch next batch promise from server when last one in current batch is returned.
nextState :: FullCollection -> BatchSize -> CursorState' -> Connected IO (CursorState', Maybe Document)
nextState :: FullCollection -> BatchSize -> CursorState' -> Connected (ErrorT IOError IO) (CursorState', Maybe Document)
nextState fcol batch cs' = do
CS limit cid docs <- cursorState cs'
case docs of

View file

@ -4,16 +4,11 @@ mongoDB
About
-----
A mongoDB driver for Haskell.
This driver lets you connect to MongoDB, do inserts, queries, updates,
etc. Also has many convience functions inspired by HDBC such as more
easily converting between the Bson.Value types and native Haskell
types.
A mongoDB driver for Haskell, which lets you connect to MongoDB and do inserts, queries, updates, etc.
Links
-----
* [mongoDB API reference](http://hackage.haskell.org/package/mongoDB)
* [tutorial](http://github.com/srp/mongoDB/blob/master/tutorial.md)
* [map/reduce example](http://github.com/srp/mongoDB/blob/master/map-reduce-example.md)
* [tutorial](http://github.com/TonyGen/mongoDB-haskell/blob/master/tutorial.md)
* [map/reduce example](http://github.com/TonyGen/mongoDB-haskell/blob/master/map-reduce-example.md)

4
TODO
View file

@ -26,6 +26,9 @@ MongoDB
- cursor object
* hasMore
- all commands listed on http://127.0.0.1:28017/_commands. (mongod --rest)
- reIndex (http://www.mongodb.org/display/DOCS/Indexes#Indexes-ReIndex)
- safe write to two or more replicas
- Query attribute: timeout
- CreateIndex attributes: background, min, max
- CreateIndex Order [Asc, Dec, Geo2d]
@ -40,6 +43,7 @@ MongoDB
- lazyRest on cursor, although lazy I/O) is problematic and we may not want to support it.
- Upon client exit, send killCursors for all open cursors, otherwise server will keep them open for 10 minutes and keep NoCursorTimeout cursors open for hours.
-- Upon cursor finalize (garbage collect) send killCursor even if you have to create a new connection, because server keeps cursors open for 10 minutes (or more).
-- Query option Exhaust
optional:
- automatic reconnection

View file

@ -19,8 +19,8 @@ map/reduce queries on:
Prelude> :set prompt "> "
> :set -XOverloadedStrings
> import Database.MongoDB
> Right conn <- connect (server "localhost")
> let run act = runConn (useDb "test" act) con
> Right conn <- runNet $ connect $ host "localhost"
> let run act = runNet $ runConn (useDb "test" act) con
> :{
run $ insertMany "mr1" [
["x" =: 1, "tags" =: ["dog", "cat"]],
@ -69,8 +69,8 @@ be called iteratively on the results of other reduce steps.
Finally, we run mapReduce and iterate over the result collection:
> runDb "test" $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest
Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]
> run $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest
Right (Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]])
Advanced Map/Reduce
-------------------
@ -78,7 +78,7 @@ Advanced Map/Reduce
MongoDB returns additional statistics in the map/reduce results. To
obtain them, use *runMR'* instead:
> runDb "test" $ runMR' (mapReduce "mr1" mapFn reduceFn)
Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0]
> run $ runMR' (mapReduce "mr1" mapFn reduceFn)
Right (Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0])
You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics.

View file

@ -1,11 +1,11 @@
Name: mongoDB
Version: 0.6.1
Version: 0.7
License: OtherLicense
License-file: LICENSE
Maintainer: Tony Hannan <tony@10gen.com>
Author: Scott Parish <srp@srparish.net> & Tony Hannan <tony@10gen.com>
Copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc.
homepage: http://github.com/TonyGen/mongoDB
homepage: http://github.com/TonyGen/mongoDB-haskell
Category: Database
Synopsis: A driver for MongoDB
Description: This module lets you connect to MongoDB, do inserts, queries, updates, etc.
@ -23,6 +23,7 @@ Build-Depends:
Build-Type: Simple
Exposed-modules:
Control.Monad.Context,
Control.Monad.Throw,
Control.Pipeline,
Database.MongoDB.Internal.Util,
Database.MongoDB.Internal.Protocol,

View file

@ -51,14 +51,15 @@ Making A Connection
-------------------
Open up a connection to your DB instance, using the standard port:
> Right con <- connect $ server "127.0.0.1"
> Right conn <- runNet $ connect $ host "127.0.0.1"
or for a non-standard port
> Right con <- connect $ Server "127.0.0.1" (PortNumber 666)
> Right conn <- runNet $ connect $ Host "127.0.0.1" (PortNumber 30000)
*connect* returns Left IOError if connection fails. We are assuming above
it won't fail. If it does you will get a pattern match error.
*connect* throws IOError if connection fails and *runNet* catches IOError and
returns it as Left. We are assuming above it won't fail. If it does you will get a
pattern match error.
Connected monad
-------------------
@ -73,11 +74,11 @@ IO monad every time, we'll define a convenient *run* function that takes a
db-action and executes it against our "test" database on the server we
just connected to:
> let run act = runConn (useDb "test" act) con
> let run action = runNet $ runConn (useDb "test" action) conn
*run* (*runConn*) will return either Left Failure or Right result. Failure
means the connection failed (eg. network problem) or the server failed
(eg. disk full).
*runConn* return either Left Failure or Right result. Failure
means there was a read or write exception like cursor expired or duplicate key insert.
This combined with *runNet* means our *run* returns *(Either IOError (Either Failure a))*.
Databases and Collections
-----------------------------
@ -87,10 +88,6 @@ under which collections reside.
You can obtain the list of databases available on a connection:
> runConn allDatabases con
You can also use the *run* function we just created:
> run allDatabases
The "test" database is ignored in this case because *allDatabases*
@ -132,7 +129,6 @@ Inserting a Document
To insert a document into a collection we can use the *insert* function:
> run $ insert "posts" post
Right (Oid 4c16d355 c80c560858000000)
When a document is inserted a special field, *_id*, is automatically
added if the document doesn't already contain that field. The value
@ -160,7 +156,6 @@ match. Here we use *findOne* to get the first document from the posts
collection:
> run $ findOne (select [] "posts")
Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC])
The result is a document matching the one that we inserted previously.
@ -172,12 +167,10 @@ resulting document must match. To limit our results to a document with
author "Mike" we do:
> run $ findOne (select ["author" =: "Mike"] "posts")
Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC])
If we try with a different author, like "Eliot", we'll get no result:
> run $ findOne (select ["author" =: "Eliot"] "posts")
Right Nothing
Bulk Inserts
------------
@ -202,7 +195,6 @@ command to the server:
"date" =: now]
:}
> run $ insertMany "posts" [post1, post2]
Right [Oid 4c16d67e c80c560858000001,Oid 4c16d67e c80c560858000002]
* Note that post2 has a different shape than the other posts - there
is no "tags" field and we've added a new field, "title". This is what we