diff --git a/Database/MongoDB.hs b/Database/MongoDB.hs index ff5f2e4..9801d3d 100644 --- a/Database/MongoDB.hs +++ b/Database/MongoDB.hs @@ -6,20 +6,20 @@ Simple example below. Use with language extension /OvererloadedStrings/. > {-# LANGUAGE OverloadedStrings #-} > > import Database.MongoDB -> import Data.UString (u) +> import Data.CompactString () -- Show and IsString instances of UString > import Control.Monad.Trans (liftIO) > > main = do -> pool <- newConnPool Internet 1 (host "127.0.0.1") +> pool <- newConnPool 1 (host "127.0.0.1") > e <- access safe Master pool run > print e > > run = use (Database "baseball") $ do > clearTeams > insertTeams -> print' "All Teams" =<< allTeams -> print' "National League Teams" =<< nationalLeagueTeams -> print' "New York Teams" =<< newYorkTeams +> printDocs "All Teams" =<< allTeams +> printDocs "National League Teams" =<< nationalLeagueTeams +> printDocs "New York Teams" =<< newYorkTeams > > clearTeams = delete (select [] "team") > @@ -35,7 +35,8 @@ Simple example below. Use with language extension /OvererloadedStrings/. > > newYorkTeams = rest =<< find (select ["home.state" =: u"NY"] "team") {project = ["name" =: (1 :: Int), "league" =: (1 :: Int)]} > -> print' title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs +> printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs +> -} module Database.MongoDB ( diff --git a/Database/MongoDB/Admin.hs b/Database/MongoDB/Admin.hs index 7ad49b5..8caa031 100644 --- a/Database/MongoDB/Admin.hs +++ b/Database/MongoDB/Admin.hs @@ -31,7 +31,7 @@ import Database.MongoDB.Internal.Protocol (pwHash, pwKey) import Database.MongoDB.Connection (Host, showHostPort) import Database.MongoDB.Query import Data.Bson -import Data.UString (pack, unpack, append, intercalate) +import Data.UString (pack, append, intercalate) import Control.Monad.Reader import qualified Data.HashTable as T import Data.IORef @@ -183,8 +183,8 @@ addUser :: (DbAccess m) => Bool -> Username -> Password -> m () -- ^ Add user with password with read-only access if bool is True or read-write access if bool is False addUser readOnly user pass = do mu <- findOne (select ["user" =: user] "system.users") - let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu) - save "system.users" u + let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu) + save "system.users" usr removeUser :: (DbAccess m) => Username -> m () removeUser user = delete (select ["user" =: user] "system.users") @@ -205,9 +205,9 @@ copyDatabase (Database fromDb) fromHost mup (Database toDb) = do let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb] use admin $ case mup of Nothing -> runCommand c - Just (u, p) -> do + Just (usr, pss) -> do n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost] - runCommand $ c ++ ["username" =: u, "nonce" =: n, "key" =: pwKey n u p] + runCommand $ c ++ ["username" =: usr, "nonce" =: n, "key" =: pwKey n usr pss] dropDatabase :: (Access m) => Database -> m Document -- ^ Delete the given database! diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index 572f897..2f5f07c 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -3,8 +3,8 @@ {-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes, FlexibleInstances #-} module Database.MongoDB.Connection ( - -- * Network - Network', ANetwork', Internet(..), + -- * Pipe + Pipe, -- * Host Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM, -- * ReplicaSet @@ -12,19 +12,19 @@ module Database.MongoDB.Connection ( -- * MasterOrSlaveOk MasterOrSlaveOk(..), -- * Connection Pool - Server(..), newConnPool', + Service(..), connHost, replicaSet ) where import Database.MongoDB.Internal.Protocol as X -import Network.Abstract (IOE, connect, ANetwork(..)) +import qualified Network.Abstract as C +import Network.Abstract (IOE, NetworkIO, ANetwork) import Data.Bson ((=:), at, UString) import Control.Pipeline as P import Control.Applicative ((<$>)) import Control.Exception (assert) import Control.Monad.Error import Control.Monad.MVar -import Control.Monad.Context import Network (HostName, PortID(..)) import Data.Bson (Document, look) import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) @@ -148,26 +148,23 @@ isMS SlaveOk i = isSecondary i || isPrimary i -} type Pool' = Pool IOError --- | A Server is a single server ('Host') or a replica set of servers ('ReplicaSet') -class Server t where +-- | A Service is a single server ('Host') or a replica set of servers ('ReplicaSet') +class Service t where data ConnPool t -- ^ A pool of TCP connections ('Pipe's) to a host or a replica set of hosts - newConnPool :: (Network' n, MonadIO' m) => n -> Int -> t -> m (ConnPool t) + newConnPool :: (NetworkIO m) => Int -> t -> m (ConnPool t) -- ^ Create a ConnectionPool to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host. getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe -- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk). killPipes :: ConnPool t -> IO () -- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they get garbage collected. -newConnPool' :: (Server t, MonadIO' m, Context ANetwork' m) => Int -> t -> m (ConnPool t) -newConnPool' poolSize' host' = context >>= \(ANetwork net :: ANetwork') -> newConnPool net poolSize' host' - -- ** ConnectionPool Host -instance Server Host where +instance Service Host where data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe} -- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style. - newConnPool net poolSize' host' = liftIO $ newHostConnPool (ANetwork net) poolSize' host' + newConnPool poolSize' host' = liftIO . newHostConnPool poolSize' host' =<< C.network -- ^ Create a connection pool to server (host or replica set) getPipe _ = getHostPipe -- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect. @@ -176,9 +173,9 @@ instance Server Host where instance Show (ConnPool Host) where show HostConnPool{connHost} = "ConnPool " ++ show connHost -newHostConnPool :: ANetwork' -> Int -> Host -> IO (ConnPool Host) +newHostConnPool :: Int -> Host -> ANetwork -> IO (ConnPool Host) -- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style. -newHostConnPool net poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where +newHostConnPool poolSize' host' net = HostConnPool host' <$> newPool Factory{..} poolSize' where newResource = tcpConnect net host' killResource = P.close isExpired = P.isClosed @@ -187,18 +184,18 @@ getHostPipe :: ConnPool Host -> IOE Pipe -- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host. getHostPipe (HostConnPool _ pool) = aResource pool -tcpConnect :: ANetwork' -> Host -> IOE Pipe +tcpConnect :: ANetwork -> Host -> IOE Pipe -- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect. -tcpConnect net (Host hostname port) = newPipeline =<< connect net (hostname, port) +tcpConnect net (Host hostname port) = newPipeline =<< C.connect net (C.Server hostname port) -- ** Connection ReplicaSet -instance Server ReplicaSet where +instance Service ReplicaSet where data ConnPool ReplicaSet = ReplicaSetConnPool { - network :: ANetwork', + network :: ANetwork, repsetName :: Name, currentMembers :: MVar [ConnPool Host] } -- master at head after a refresh - newConnPool net poolSize' repset = liftIO $ newSetConnPool (ANetwork net) poolSize' repset + newConnPool poolSize' repset = liftIO . newSetConnPool poolSize' repset =<< C.network getPipe = getSetPipe killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes) @@ -209,10 +206,10 @@ replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet -- ^ Return replicas set name with current members as seed list replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers -newSetConnPool :: ANetwork' -> Int -> ReplicaSet -> IO (ConnPool ReplicaSet) +newSetConnPool :: Int -> ReplicaSet -> ANetwork -> IO (ConnPool ReplicaSet) -- ^ Create a connection pool to each member of the replica set. -newSetConnPool net poolSize' repset = assert (not . null $ seedHosts repset) $ do - currentMembers <- newMVar =<< mapM (newHostConnPool net poolSize') (seedHosts repset) +newSetConnPool poolSize' repset net = assert (not . null $ seedHosts repset) $ do + currentMembers <- newMVar =<< mapM (\h -> newHostConnPool poolSize' h net) (seedHosts repset) return $ ReplicaSetConnPool net (setName repset) currentMembers getMembers :: Name -> [ConnPool Host] -> IOE [Host] @@ -220,13 +217,13 @@ getMembers :: Name -> [ConnPool Host] -> IOE [Host] -- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not. getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections -refreshMembers :: ANetwork' -> Name -> [ConnPool Host] -> IOE [ConnPool Host] +refreshMembers :: ANetwork -> Name -> [ConnPool Host] -> IOE [ConnPool Host] -- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected. refreshMembers net repsetName connections = do n <- liftIO . poolSize . connPool $ head connections mapM (liftIO . connection n) =<< getMembers repsetName connections where - connection n host' = maybe (newHostConnPool net n host') return mc where + connection n host' = maybe (newHostConnPool n host' net) return mc where mc = find ((host' ==) . connHost) connections diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 6deef55..d260867 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -5,8 +5,6 @@ This module is not intended for direct use. Use the high-level interface at "Dat {-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-} module Database.MongoDB.Internal.Protocol ( - -- * Network - Network', ANetwork', Internet(..), -- * Pipe Pipe, send, call, -- * Message @@ -24,7 +22,6 @@ module Database.MongoDB.Internal.Protocol ( import Prelude as X import Control.Applicative ((<$>)) import Control.Arrow ((***)) -import System.IO (Handle) import Data.ByteString.Lazy as B (length, hPut) import qualified Control.Pipeline as P import Data.Bson (Document, UString) @@ -40,58 +37,13 @@ import Data.UString as U (pack, append, toByteString) import System.IO.Error as E (try) import Control.Monad.Error import Control.Monad.Util (whenJust) -import Network.Abstract (IOE, ANetwork, Network(..), Connection(Connection)) -import Network (connectTo) -import System.IO (hFlush, hClose) +import Network.Abstract hiding (send) +import System.IO (hFlush) import Database.MongoDB.Internal.Util (hGetN, bitOr) --- * Network - -- Network -> Server -> (Sink, Source) -- (Sink, Source) -> Pipeline -type Message = ([Notice], Maybe (Request, RequestId)) --- ^ Write notice(s), write notice(s) with getLastError request, or just query request --- Note, that requestId will be out of order because request ids will be generated for notices, after the request id supplied was generated. This is ok because the mongo server does not care about order they are just used as unique identifiers. - -type Response = (ResponseTo, Reply) - -class (Network n Message Response) => Network' n -instance (Network n Message Response) => Network' n - -type ANetwork' = ANetwork Message Response - -data Internet = Internet --- ^ Normal Network instance, i.e. no logging or replay - --- | Connect to server. Write messages and receive replies; not thread-safe! -instance Network Internet Message Response where - connect _ (hostname, portid) = ErrorT . E.try $ do - handle <- connectTo hostname portid - return $ Connection (sink handle) (source handle) (hClose handle) - where - sink h (notices, mRequest) = ErrorT . E.try $ do - forM_ notices $ \n -> writeReq h . (Left n,) =<< genRequestId - whenJust mRequest $ writeReq h . (Right *** id) - hFlush h - source h = ErrorT . E.try $ readResp h - -writeReq :: Handle -> (Either Notice Request, RequestId) -> IO () -writeReq handle (e, requestId) = do - hPut handle lenBytes - hPut handle bytes - where - bytes = runPut $ (either putNotice putRequest e) requestId - lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes - encodeSize = runPut . putInt32 . (+ 4) - -readResp :: Handle -> IO (ResponseTo, Reply) -readResp handle = do - len <- fromEnum . decodeSize <$> hGetN handle 4 - runGet getReply <$> hGetN handle len - where - decodeSize = subtract 4 . runGet getInt32 - -- * Pipe type Pipe = P.Pipeline Message Response @@ -111,7 +63,35 @@ call pipe notices request = do check requestId (responseTo, reply) = if requestId == responseTo then reply else error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" --- * Messages +-- * Message + +type Message = ([Notice], Maybe (Request, RequestId)) +-- ^ A write notice(s), write notice(s) with getLastError request, or just query request. +-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness. + +instance WriteMessage Message where + writeMessage handle (notices, mRequest) = ErrorT . E.try $ do + forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId + whenJust mRequest $ writeReq . (Right *** id) + hFlush handle + where + writeReq (e, requestId) = do + hPut handle lenBytes + hPut handle bytes + where + bytes = runPut $ (either putNotice putRequest e) requestId + lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes + encodeSize = runPut . putInt32 . (+ 4) + +type Response = (ResponseTo, Reply) +-- ^ Message received from a Mongo server in response to a Request + +instance ReadMessage Response where + readMessage handle = ErrorT . E.try $ readResp where + readResp = do + len <- fromEnum . decodeSize <$> hGetN handle 4 + runGet getReply <$> hGetN handle len + decodeSize = subtract 4 . runGet getInt32 type FullCollection = UString -- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\" diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 0f2fd49..baf531c 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -49,12 +49,12 @@ import Control.Monad.Throw import Control.Monad.MVar import qualified Database.MongoDB.Internal.Protocol as P import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call) -import Database.MongoDB.Connection (MasterOrSlaveOk(..), Server(..)) +import Database.MongoDB.Connection (MasterOrSlaveOk(..), Service(..)) import Data.Bson import Data.Word import Data.Int import Data.Maybe (listToMaybe, catMaybes) -import Data.UString as U (dropWhile, any, tail, unpack) +import Data.UString as U (dropWhile, any, tail) import Control.Monad.Util (MonadIO', loop) import Database.MongoDB.Internal.Util ((<.>), true1) @@ -63,7 +63,7 @@ mapErrorIO f = throwLeft' f . liftIO . runErrorT -- * Mongo Monad -access :: (Server s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> ConnPool s -> Action m a -> m (Either Failure a) +access :: (Service s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> ConnPool s -> Action m a -> m (Either Failure a) -- ^ Run action under given write and read mode against the server or replicaSet behind given connection pool. Return Left Failure if there is a connection failure or read/write error. access w mos pool act = do ePipe <- liftIO . runErrorT $ getPipe mos pool @@ -124,9 +124,9 @@ thisDatabase = context auth :: (DbAccess m) => Username -> Password -> m Bool -- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe. -auth u p = do +auth usr pss = do n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)] - true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: u, "nonce" =: n, "key" =: pwKey n u p] + true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss] -- * Collection diff --git a/Network/Abstract.hs b/Network/Abstract.hs index bab4366..80b79c3 100644 --- a/Network/Abstract.hs +++ b/Network/Abstract.hs @@ -1,20 +1,33 @@ -- | Generalize a network connection to a sink and source -{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, UndecidableInstances #-} +{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, OverlappingInstances, UndecidableInstances #-} module Network.Abstract where -import Network (HostName, PortID) +import System.IO (Handle, hClose) +import Network (HostName, PortID, connectTo) import Control.Monad.Error +import System.IO.Error (try) +import Control.Monad.Context +import Control.Monad.Util (MonadIO') type IOE = ErrorT IOError IO +-- ^ Be explicit about exception that may be raised. -type Server = (HostName, PortID) +data Server i o = Server HostName PortID +-- ^ A server receives messages of type i and returns messages of type o. + +-- | Serialize message over handle +class WriteMessage i where + writeMessage :: Handle -> i -> IOE () + +-- | Deserialize message from handle +class ReadMessage o where + readMessage :: Handle -> IOE o -- | A network controls connections to other hosts. It may want to overide to log messages or play them back. --- A server in the network accepts messages of type i and generates messages of type o. -class Network n i o where - connect :: n -> Server -> IOE (Connection i o) +class Network n where + connect :: (WriteMessage i, ReadMessage o) => n -> Server i o -> IOE (Connection i o) -- ^ Connect to Server returning the send sink and receive source, throw IOError if can't connect. data Connection i o = Connection { @@ -22,11 +35,28 @@ data Connection i o = Connection { receive :: IOE o, close :: IO () } -data ANetwork i o = forall n. (Network n i o) => ANetwork n +data ANetwork = forall n. (Network n) => ANetwork n -instance Network (ANetwork i o) i o where +instance Network (ANetwork) where connect (ANetwork n) = connect n +data Internet = Internet +-- ^ Normal Network instance, i.e. no logging or replay + +-- | Connect to server. Write messages and receive replies. Not thread-safe, must be wrapped in Pipeline or something. +instance Network Internet where + connect _ (Server hostname portid) = ErrorT . try $ do + handle <- connectTo hostname portid + return $ Connection (writeMessage handle) (readMessage handle) (hClose handle) + +class (MonadIO' m) => NetworkIO m where + network :: m ANetwork + +instance (Context ANetwork m, MonadIO' m) => NetworkIO m where + network = context + +instance NetworkIO IO where + network = return (ANetwork Internet) {- Authors: Tony Hannan Copyright 2010 10gen Inc. diff --git a/TODO b/TODO index 78b1e82..fdb67e1 100644 --- a/TODO +++ b/TODO @@ -10,6 +10,7 @@ Bson MongoDB ------- ++ Support MapReduce 1.8 version + When one connection in a pool fails, close all other since they will likely fail too + on insert/update: reject keys that start with "$" or "." + dereference dbref diff --git a/map-reduce-example.md b/map-reduce-example.md index 84ebbbd..5de2236 100644 --- a/map-reduce-example.md +++ b/map-reduce-example.md @@ -19,7 +19,8 @@ map/reduce queries on: Prelude> :set prompt "> " > :set -XOverloadedStrings > import Database.MongoDB - > conn <- newConnPool Internet 1 (host "localhost") + > import Data.CompactString () + > conn <- newConnPool 1 (host "localhost") > let run act = access safe Master conn $ use (Database "test") act > :{ run $ insertMany "mr1" [ diff --git a/mongoDB.cabal b/mongoDB.cabal index 7e209e8..0fcd59b 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,5 +1,5 @@ name: mongoDB -version: 0.9 +version: 0.9.1 build-type: Simple license: OtherLicense license-file: LICENSE @@ -16,7 +16,8 @@ build-depends: nano-md5 -any, network -any, parsec -any, - random -any + random -any, + compact-string-fix -any stability: alpha homepage: http://github.com/TonyGen/mongoDB-haskell package-url: diff --git a/tutorial.md b/tutorial.md index b0e046a..596c0b1 100644 --- a/tutorial.md +++ b/tutorial.md @@ -41,26 +41,23 @@ Import the MongoDB driver library, and set OverloadedStrings so literal strings are converted to UTF-8 automatically. > import Database.MongoDB + > import Data.CompactString () > :set -XOverloadedStrings Making A Connection ------------------- Create a connection pool for your mongo server, using the standard port (27017): - > pool <- newConnPool Internet 1 $ host "127.0.0.1" + > pool <- newConnPool 1 $ host "127.0.0.1" or for a non-standard port - > pool <- newConnPool Internet 1 $ Host "127.0.0.1" (PortNumber 30000) + > pool <- newConnPool 1 $ Host "127.0.0.1" (PortNumber 30000) -*newConnPool* takes the *network*, the connection pool size, and the host to connect to. It returns +*newConnPool* takes the connection pool size, and the host to connect to. It returns a *ConnPool*, which is a potential pool of TCP connections. They are not created until first access, so it is not possible to get a connection error here. -The network parameter allows you to override normal communications to, for example, log -or replay messages sent and received from servers. *Internet* is the normal communication mode -with no logging/replay. - Note, plain IO code in this driver never raises an exception unless it invokes third party IO code that does. Driver code that may throw an exception says so in its Monad type, for example, *ErrorT IOError IO a*.