diff --git a/Control/Monad/Util.hs b/Control/Monad/Util.hs index 502fd18..5a49eec 100644 --- a/Control/Monad/Util.hs +++ b/Control/Monad/Util.hs @@ -34,3 +34,6 @@ untilSuccess' _ f (x : xs) = catchError (f x) (\e -> untilSuccess' e f xs) mapError :: (Functor m) => (e' -> e) -> ErrorT e' m a -> ErrorT e m a -- ^ Convert error type thrown mapError f (ErrorT m) = ErrorT $ (f +++ id) <$> m + +whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m () +whenJust mVal act = maybe (return ()) act mVal diff --git a/Control/Pipeline.hs b/Control/Pipeline.hs index f377a52..554d121 100644 --- a/Control/Pipeline.hs +++ b/Control/Pipeline.hs @@ -2,169 +2,88 @@ A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. It also closes itself when garbage collected, or you can close it explicitly. -} -{-# LANGUAGE DoRec, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts #-} +{-# LANGUAGE DoRec, RecordWildCards, NamedFieldPuns, ScopedTypeVariables #-} module Control.Pipeline ( -- * Pipeline - Pipeline, newPipeline, send, call, - -- * Util - Size, - Length(..), - Resource(..), - Flush(..), - Stream(..), getN + Pipeline, newPipeline, send, call, close, isClosed ) where -import Prelude hiding (length) -import Control.Applicative ((<$>)) -import Control.Monad (forever) -import Control.Exception (assert, onException) -import System.IO.Error (try, mkIOError, eofErrorType) -import System.IO (Handle, hFlush, hClose, hIsClosed) -import qualified Data.ByteString as S -import qualified Data.ByteString.Lazy as L -import Data.Monoid (Monoid(..)) +import Control.Monad.Throw (onException) +import Control.Monad.Error import Control.Concurrent (ThreadId, forkIO, killThread) import GHC.Conc (ThreadStatus(..), threadStatus) -import Control.Concurrent.MVar +import Control.Monad.MVar import Control.Concurrent.Chan - --- * Length - -type Size = Int - -class Length list where - length :: list -> Size - -instance Length S.ByteString where - length = S.length - -instance Length L.ByteString where - length = fromEnum . L.length - --- * Resource - -class Resource m r where - close :: r -> m () - -- ^ Close resource - isClosed :: r -> m Bool - -- ^ Is resource closed - -instance Resource IO Handle where - close = hClose - isClosed = hIsClosed - --- * Flush - -class Flush handle where - flush :: handle -> IO () - -- ^ Flush written bytes to destination - -instance Flush Handle where - flush = hFlush - --- * Stream - -class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where - put :: handle -> bytes -> IO () - -- ^ Write bytes to handle - get :: handle -> Int -> IO bytes - -- ^ Read up to N bytes from handle; if EOF return empty bytes, otherwise block until at least 1 byte is available - -getN :: (Stream h b) => h -> Int -> IO b --- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then throw EOF exception. -getN h n = assert (n >= 0) $ do - bytes <- get h n - let x = length bytes - if x >= n then return bytes - else if x == 0 then ioError (mkIOError eofErrorType "Control.Pipeline" Nothing Nothing) - else mappend bytes <$> getN h (n - x) - -instance Stream Handle S.ByteString where - put = S.hPut - get = S.hGet - -instance Stream Handle L.ByteString where - put = L.hPut - get = L.hGet +import Network.Abstract (IOE) +import qualified Network.Abstract as C -- * Pipeline --- | Thread-safe and pipelined socket -data Pipeline handle bytes = Pipeline { - encodeSize :: Size -> bytes, - decodeSize :: bytes -> Size, - vHandle :: MVar handle, -- ^ Mutex on handle, so only one thread at a time can write to it - responseQueue :: Chan (MVar (Either IOError bytes)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. +-- | Thread-safe and pipelined connection +data Pipeline i o = Pipeline { + vConn :: MVar (C.Connection i o), -- ^ Mutex on handle, so only one thread at a time can write to it + responseQueue :: Chan (MVar (Either IOError o)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. listenThread :: ThreadId } --- | Create new Pipeline with given encodeInt, decodeInt, and handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. -newPipeline :: (Stream h b, Resource IO h) => - (Size -> b) -- ^ Convert Size to bytes of fixed length. Every Int must translate to same number of bytes. - -> (b -> Size) -- ^ Convert bytes of fixed length to Size. Must be exact inverse of encodeSize. - -> h -- ^ Underlying socket (handle) this pipeline will read/write from - -> IO (Pipeline h b) -newPipeline encodeSize decodeSize handle = do - vHandle <- newMVar handle +-- | Create new Pipeline on given connection. You should 'close' pipeline when finished, which will also close connection. If pipeline is not closed but eventually garbage collected, it will be closed along with connection. +newPipeline :: (MonadIO m) => C.Connection i o -> m (Pipeline i o) +newPipeline conn = liftIO $ do + vConn <- newMVar conn responseQueue <- newChan rec let pipe = Pipeline{..} listenThread <- forkIO (listen pipe) - addMVarFinalizer vHandle $ do + addMVarFinalizer vConn $ do killThread listenThread - close handle + C.close conn return pipe -instance (Resource IO h) => Resource IO (Pipeline h b) where - -- | Close pipe and underlying socket (handle) - close Pipeline{..} = do - killThread listenThread - close =<< readMVar vHandle - isClosed Pipeline{listenThread} = do - status <- threadStatus listenThread - return $ case status of - ThreadRunning -> False - ThreadFinished -> True - ThreadBlocked _ -> False - ThreadDied -> True - --isClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read +close :: (MonadIO m) => Pipeline i o -> m () +-- | Close pipe and underlying connection +close Pipeline{..} = liftIO $ do + killThread listenThread + C.close =<< readMVar vConn -listen :: (Stream h b, Resource IO h) => Pipeline h b -> IO () +isClosed :: (MonadIO m) => Pipeline i o -> m Bool +isClosed Pipeline{listenThread} = liftIO $ do + status <- threadStatus listenThread + return $ case status of + ThreadRunning -> False + ThreadFinished -> True + ThreadBlocked _ -> False + ThreadDied -> True +--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read + +listen :: Pipeline i o -> IO () -- ^ Listen for responses and supply them to waiting threads in order listen Pipeline{..} = do - let n = length (encodeSize 0) - h <- readMVar vHandle + conn <- readMVar vConn forever $ do - e <- try $ do - len <- decodeSize <$> getN h n - getN h len + e <- runErrorT $ C.receive conn var <- readChan responseQueue putMVar var e case e of - Left err -> close h >> fail (show err) -- close and stop looping + Left err -> C.close conn >> ioError err -- close and stop looping Right _ -> return () -send :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO () --- ^ Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future 'call's will get these responses instead of their own). --- Each message is preceeded by its length when written to socket. --- Raises IOError and closes pipeline if send fails -send Pipeline{..} messages = withMVar vHandle (writeAll listenThread encodeSize messages) +send :: Pipeline i o -> i -> IOE () +-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own). +-- Throw IOError and close pipeline if send fails +send p@Pipeline{..} message = withMVar vConn (flip C.send message) `onException` \(_ :: IOError) -> close p -call :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO (IO b) --- ^ Send messages all together to destination (no messages will be interleaved between them), and return /promise/ of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them). --- Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length. --- Raises IOError and closes pipeline if send fails, likewise for reply. -call Pipeline{..} messages = withMVar vHandle $ \h -> do - writeAll listenThread encodeSize messages h - var <- newEmptyMVar - writeChan responseQueue var - return (either ioError return =<< readMVar var) -- return promise +call :: Pipeline i o -> i -> IOE (IOE o) +-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). +-- Throw IOError and closes pipeline if send fails, likewise for promised response. +call p@Pipeline{..} message = withMVar vConn doCall `onException` \(_ :: IOError) -> close p where + doCall conn = do + C.send conn message + var <- newEmptyMVar + liftIO $ writeChan responseQueue var + return $ ErrorT (readMVar var) -- return promise -writeAll :: (Stream h b, Monoid b, Length b, Resource IO h) => ThreadId -> (Size -> b) -> [b] -> h -> IO () --- ^ Write messages to stream. On error, close pipeline and raise IOError. -writeAll listenThread encodeSize messages h = onException - (mapM_ write messages >> flush h) - (killThread listenThread >> close h) - where - write bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes) + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB.hs b/Database/MongoDB.hs index cf7b406..ff5f2e4 100644 --- a/Database/MongoDB.hs +++ b/Database/MongoDB.hs @@ -10,7 +10,7 @@ Simple example below. Use with language extension /OvererloadedStrings/. > import Control.Monad.Trans (liftIO) > > main = do -> pool <- newConnPool 1 (host "127.0.0.1") +> pool <- newConnPool Internet 1 (host "127.0.0.1") > e <- access safe Master pool run > print e > @@ -49,3 +49,8 @@ import Data.Bson import Database.MongoDB.Connection import Database.MongoDB.Query import Database.MongoDB.Admin + + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index 82229c2..572f897 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -3,6 +3,8 @@ {-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes, FlexibleInstances #-} module Database.MongoDB.Connection ( + -- * Network + Network', ANetwork', Internet(..), -- * Host Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM, -- * ReplicaSet @@ -10,18 +12,20 @@ module Database.MongoDB.Connection ( -- * MasterOrSlaveOk MasterOrSlaveOk(..), -- * Connection Pool - Server(..), connHost, replicaSet + Server(..), newConnPool', + connHost, replicaSet ) where -import Database.MongoDB.Internal.Protocol +import Database.MongoDB.Internal.Protocol as X +import Network.Abstract (IOE, connect, ANetwork(..)) import Data.Bson ((=:), at, UString) -import Control.Pipeline (Resource(..)) +import Control.Pipeline as P import Control.Applicative ((<$>)) import Control.Exception (assert) -import System.IO.Error as E (try) import Control.Monad.Error import Control.Monad.MVar -import Network (HostName, PortID(..), connectTo) +import Control.Monad.Context +import Network (HostName, PortID(..)) import Data.Bson (Document, look) import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) import Control.Monad.Identity @@ -94,10 +98,10 @@ instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y -- ** Replica Info -getReplicaInfo :: Pipe -> ErrorT IOError IO ReplicaInfo +getReplicaInfo :: Pipe -> IOE ReplicaInfo -- ^ Get replica info of the connected host. Throw IOError if connection fails or host is not part of a replica set (no /hosts/ and /primary/ field). getReplicaInfo pipe = do - promise <- call pipe [] (adminCommand ["ismaster" =: (1 :: Int)]) + promise <- X.call pipe [] (adminCommand ["ismaster" =: (1 :: Int)]) info <- commandReply "ismaster" <$> promise _ <- look "hosts" info _ <- look "primary" info @@ -148,19 +152,22 @@ type Pool' = Pool IOError class Server t where data ConnPool t -- ^ A pool of TCP connections ('Pipe's) to a host or a replica set of hosts - newConnPool :: (MonadIO' m) => Int -> t -> m (ConnPool t) + newConnPool :: (Network' n, MonadIO' m) => n -> Int -> t -> m (ConnPool t) -- ^ Create a ConnectionPool to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host. - getPipe :: MasterOrSlaveOk -> ConnPool t -> ErrorT IOError IO Pipe + getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe -- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk). killPipes :: ConnPool t -> IO () - -- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they are garbage collected. + -- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they get garbage collected. + +newConnPool' :: (Server t, MonadIO' m, Context ANetwork' m) => Int -> t -> m (ConnPool t) +newConnPool' poolSize' host' = context >>= \(ANetwork net :: ANetwork') -> newConnPool net poolSize' host' -- ** ConnectionPool Host instance Server Host where data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe} -- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style. - newConnPool poolSize' host' = liftIO (newHostConnPool poolSize' host') + newConnPool net poolSize' host' = liftIO $ newHostConnPool (ANetwork net) poolSize' host' -- ^ Create a connection pool to server (host or replica set) getPipe _ = getHostPipe -- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect. @@ -169,28 +176,29 @@ instance Server Host where instance Show (ConnPool Host) where show HostConnPool{connHost} = "ConnPool " ++ show connHost -newHostConnPool :: Int -> Host -> IO (ConnPool Host) +newHostConnPool :: ANetwork' -> Int -> Host -> IO (ConnPool Host) -- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style. -newHostConnPool poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where - newResource = tcpConnect host' - killResource = close - isExpired = isClosed +newHostConnPool net poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where + newResource = tcpConnect net host' + killResource = P.close + isExpired = P.isClosed -getHostPipe :: ConnPool Host -> ErrorT IOError IO Pipe +getHostPipe :: ConnPool Host -> IOE Pipe -- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host. getHostPipe (HostConnPool _ pool) = aResource pool -tcpConnect :: Host -> ErrorT IOError IO Pipe +tcpConnect :: ANetwork' -> Host -> IOE Pipe -- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect. -tcpConnect (Host hostname port) = ErrorT . E.try $ mkPipe =<< connectTo hostname port +tcpConnect net (Host hostname port) = newPipeline =<< connect net (hostname, port) -- ** Connection ReplicaSet instance Server ReplicaSet where data ConnPool ReplicaSet = ReplicaSetConnPool { + network :: ANetwork', repsetName :: Name, currentMembers :: MVar [ConnPool Host] } -- master at head after a refresh - newConnPool poolSize' repset = liftIO (newSetConnPool poolSize' repset) + newConnPool net poolSize' repset = liftIO $ newSetConnPool (ANetwork net) poolSize' repset getPipe = getSetPipe killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes) @@ -201,29 +209,31 @@ replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet -- ^ Return replicas set name with current members as seed list replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers -newSetConnPool :: Int -> ReplicaSet -> IO (ConnPool ReplicaSet) +newSetConnPool :: ANetwork' -> Int -> ReplicaSet -> IO (ConnPool ReplicaSet) -- ^ Create a connection pool to each member of the replica set. -newSetConnPool poolSize' repset = assert (not . null $ seedHosts repset) $ do - currentMembers <- newMVar =<< mapM (newConnPool poolSize') (seedHosts repset) - return $ ReplicaSetConnPool (setName repset) currentMembers +newSetConnPool net poolSize' repset = assert (not . null $ seedHosts repset) $ do + currentMembers <- newMVar =<< mapM (newHostConnPool net poolSize') (seedHosts repset) + return $ ReplicaSetConnPool net (setName repset) currentMembers -getMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [Host] +getMembers :: Name -> [ConnPool Host] -> IOE [Host] -- ^ Get members of replica set, master first. Query supplied connections until config found. -- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not. getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections -refreshMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [ConnPool Host] +refreshMembers :: ANetwork' -> Name -> [ConnPool Host] -> IOE [ConnPool Host] -- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected. -refreshMembers repsetName connections = do +refreshMembers net repsetName connections = do n <- liftIO . poolSize . connPool $ head connections - mapM (connection n) =<< getMembers repsetName connections + mapM (liftIO . connection n) =<< getMembers repsetName connections where - connection n host' = maybe (newConnPool n host') return $ find ((host' ==) . connHost) connections + connection n host' = maybe (newHostConnPool net n host') return mc where + mc = find ((host' ==) . connHost) connections + -getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> ErrorT IOError IO Pipe +getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> IOE Pipe -- ^ Return a pipe to primary or a random secondary in replica set. Use primary for SlaveOk if and only if no secondaries. Note, refreshes members each time (makes ismaster call to primary). getSetPipe mos ReplicaSetConnPool{..} = modifyMVar currentMembers $ \conns -> do - connections <- refreshMembers repsetName conns -- master at head after refresh + connections <- refreshMembers network repsetName conns -- master at head after refresh pipe <- case mos of Master -> getHostPipe (head connections) SlaveOk -> do diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index b0cae95..6deef55 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -2,12 +2,13 @@ This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -} -{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts #-} +{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-} module Database.MongoDB.Internal.Protocol ( + -- * Network + Network', ANetwork', Internet(..), -- * Pipe - Pipe, mkPipe, - send, call, + Pipe, send, call, -- * Message FullCollection, -- ** Notice @@ -22,8 +23,9 @@ module Database.MongoDB.Internal.Protocol ( import Prelude as X import Control.Applicative ((<$>)) +import Control.Arrow ((***)) import System.IO (Handle) -import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy as B (length, hPut) import qualified Control.Pipeline as P import Data.Bson (Document, UString) import Data.Bson.Binary @@ -31,45 +33,83 @@ import Data.Binary.Put import Data.Binary.Get import Data.Int import Data.Bits -import Database.MongoDB.Internal.Util (bitOr) import Data.IORef import System.IO.Unsafe (unsafePerformIO) import Data.Digest.OpenSSL.MD5 (md5sum) import Data.UString as U (pack, append, toByteString) import System.IO.Error as E (try) import Control.Monad.Error +import Control.Monad.Util (whenJust) +import Network.Abstract (IOE, ANetwork, Network(..), Connection(Connection)) +import Network (connectTo) +import System.IO (hFlush, hClose) +import Database.MongoDB.Internal.Util (hGetN, bitOr) + +-- * Network + +-- Network -> Server -> (Sink, Source) +-- (Sink, Source) -> Pipeline + +type Message = ([Notice], Maybe (Request, RequestId)) +-- ^ Write notice(s), write notice(s) with getLastError request, or just query request +-- Note, that requestId will be out of order because request ids will be generated for notices, after the request id supplied was generated. This is ok because the mongo server does not care about order they are just used as unique identifiers. + +type Response = (ResponseTo, Reply) + +class (Network n Message Response) => Network' n +instance (Network n Message Response) => Network' n + +type ANetwork' = ANetwork Message Response + +data Internet = Internet +-- ^ Normal Network instance, i.e. no logging or replay + +-- | Connect to server. Write messages and receive replies; not thread-safe! +instance Network Internet Message Response where + connect _ (hostname, portid) = ErrorT . E.try $ do + handle <- connectTo hostname portid + return $ Connection (sink handle) (source handle) (hClose handle) + where + sink h (notices, mRequest) = ErrorT . E.try $ do + forM_ notices $ \n -> writeReq h . (Left n,) =<< genRequestId + whenJust mRequest $ writeReq h . (Right *** id) + hFlush h + source h = ErrorT . E.try $ readResp h + +writeReq :: Handle -> (Either Notice Request, RequestId) -> IO () +writeReq handle (e, requestId) = do + hPut handle lenBytes + hPut handle bytes + where + bytes = runPut $ (either putNotice putRequest e) requestId + lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes + encodeSize = runPut . putInt32 . (+ 4) + +readResp :: Handle -> IO (ResponseTo, Reply) +readResp handle = do + len <- fromEnum . decodeSize <$> hGetN handle 4 + runGet getReply <$> hGetN handle len + where + decodeSize = subtract 4 . runGet getInt32 -- * Pipe -type Pipe = P.Pipeline Handle ByteString +type Pipe = P.Pipeline Message Response -- ^ Thread-safe TCP connection with pipelined requests -mkPipe :: Handle -> IO Pipe --- ^ New thread-safe pipelined connection over handle -mkPipe = P.newPipeline encodeSize decodeSize where - encodeSize = runPut . putInt32 . toEnum . (+ 4) - decodeSize = subtract 4 . fromEnum . runGet getInt32 - -send :: Pipe -> [Notice] -> ErrorT IOError IO () +send :: Pipe -> [Notice] -> IOE () -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. -send conn notices = ErrorT . E.try $ P.send conn =<< mapM noticeBytes notices +send pipe notices = P.send pipe (notices, Nothing) -call :: Pipe -> [Notice] -> Request -> ErrorT IOError IO (ErrorT IOError IO Reply) +call :: Pipe -> [Notice] -> Request -> IOE (IOE Reply) -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. -call conn notices request = ErrorT . E.try $ do - nMessages <- mapM noticeBytes notices +call pipe notices request = do requestId <- genRequestId - let rMessage = runPut (putRequest request requestId) - promise <- P.call conn (nMessages ++ [rMessage]) - return (ErrorT . E.try $ bytesReply requestId <$> promise) - -noticeBytes :: Notice -> IO ByteString -noticeBytes notice = runPut . putNotice notice <$> genRequestId - -bytesReply :: RequestId -> ByteString -> Reply -bytesReply requestId bytes = if requestId == responseTo then reply else err where - (responseTo, reply) = runGet getReply bytes - err = error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" + promise <- P.call pipe (notices, Just (request, requestId)) + return $ check requestId <$> promise + where + check requestId (responseTo, reply) = if requestId == responseTo then reply else + error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" -- * Messages @@ -85,9 +125,9 @@ type RequestId = Int32 type ResponseTo = RequestId -genRequestId :: IO RequestId +genRequestId :: (MonadIO m) => m RequestId -- ^ Generate fresh request id -genRequestId = atomicModifyIORef counter $ \n -> (n + 1, n) where +genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where counter :: IORef RequestId counter = unsafePerformIO (newIORef 0) {-# NOINLINE counter #-} diff --git a/Database/MongoDB/Internal/Util.hs b/Database/MongoDB/Internal/Util.hs index 355f9c6..4c076b4 100644 --- a/Database/MongoDB/Internal/Util.hs +++ b/Database/MongoDB/Internal/Util.hs @@ -5,10 +5,15 @@ module Database.MongoDB.Internal.Util where import Prelude hiding (length) +import Control.Applicative ((<$>)) import Network (PortID(..)) import Data.UString as U (cons, append) import Data.Bits (Bits, (.|.)) import Data.Bson +import Data.ByteString.Lazy as S (ByteString, length, append, hGet) +import System.IO (Handle) +import System.IO.Error (mkIOError, eofErrorType) +import Control.Exception (assert) deriving instance Show PortID deriving instance Eq PortID @@ -30,3 +35,12 @@ true1 k doc = case valueAt k doc of Int32 n -> n == 1 Int64 n -> n == 1 _ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc + +hGetN :: Handle -> Int -> IO ByteString +-- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then raise EOF exception. +hGetN h n = assert (n >= 0) $ do + bytes <- hGet h n + let x = fromEnum $ length bytes + if x >= n then return bytes + else if x == 0 then ioError (mkIOError eofErrorType "hGetN" (Just h) Nothing) + else S.append bytes <$> hGetN h (n - x) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 6fb8dde..0f2fd49 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1,6 +1,6 @@ -- | Query and update documents -{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes, ImpredicativeTypes #-} +{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes #-} module Database.MongoDB.Query ( -- * Access @@ -29,7 +29,7 @@ module Database.MongoDB.Query ( Query(..), QueryOption(..), Projector, Limit, Order, BatchSize, explain, find, findOne, count, distinct, -- *** Cursor - Cursor, next, nextN, rest, + Cursor, next, nextN, rest, closeCursor, isCursorClosed, -- ** Group Group(..), GroupKey(..), group, -- ** MapReduce @@ -47,7 +47,6 @@ import Control.Monad.Reader import Control.Monad.Error import Control.Monad.Throw import Control.Monad.MVar -import Control.Pipeline (Resource(..)) import qualified Database.MongoDB.Internal.Protocol as P import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call) import Database.MongoDB.Connection (MasterOrSlaveOk(..), Server(..)) @@ -441,7 +440,7 @@ newCursor :: (Access m) => Database -> Collection -> BatchSize -> DelayedCursorS newCursor (Database db) col batch cs = do var <- newMVar cs let cursor = Cursor (db <.> col) batch var - addMVarFinalizer var (close cursor) + addMVarFinalizer var (closeCursor cursor) return cursor next :: (Access m) => Cursor -> m (Maybe Document) @@ -470,11 +469,13 @@ rest :: (Access m) => Cursor -> m [Document] -- ^ Return remaining documents in query result rest c = loop (next c) -instance (Access m) => Resource m Cursor where - close (Cursor _ _ var) = modifyMVar var kill' where - kill' dcs = first return <$> (kill =<< mapErrorIO id dcs) - kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]] - isClosed cursor = do +closeCursor :: (Access m) => Cursor -> m () +closeCursor (Cursor _ _ var) = modifyMVar var kill' where + kill' dcs = first return <$> (kill =<< mapErrorIO id dcs) + kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]] + +isCursorClosed :: (Access m) => Cursor -> m Bool +isCursorClosed cursor = do CS _ cid docs <- getCursorState cursor return (cid == 0 && null docs) @@ -593,7 +594,8 @@ send ns = do pipe <- context mapErrorIO ConnectionFailure (P.send pipe ns) -call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply) +call :: (Context Pipe m, Throw Failure m, MonadIO m, Throw Failure n, MonadIO n) => + [Notice] -> Request -> m (n Reply) -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive. call ns r = do pipe <- context diff --git a/Network/Abstract.hs b/Network/Abstract.hs new file mode 100644 index 0000000..bab4366 --- /dev/null +++ b/Network/Abstract.hs @@ -0,0 +1,33 @@ +-- | Generalize a network connection to a sink and source + +{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, UndecidableInstances #-} + +module Network.Abstract where + +import Network (HostName, PortID) +import Control.Monad.Error + +type IOE = ErrorT IOError IO + +type Server = (HostName, PortID) + +-- | A network controls connections to other hosts. It may want to overide to log messages or play them back. +-- A server in the network accepts messages of type i and generates messages of type o. +class Network n i o where + connect :: n -> Server -> IOE (Connection i o) + -- ^ Connect to Server returning the send sink and receive source, throw IOError if can't connect. + +data Connection i o = Connection { + send :: i -> IOE (), + receive :: IOE o, + close :: IO () } + +data ANetwork i o = forall n. (Network n i o) => ANetwork n + +instance Network (ANetwork i o) i o where + connect (ANetwork n) = connect n + + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/map-reduce-example.md b/map-reduce-example.md index 30e29cd..84ebbbd 100644 --- a/map-reduce-example.md +++ b/map-reduce-example.md @@ -19,8 +19,8 @@ map/reduce queries on: Prelude> :set prompt "> " > :set -XOverloadedStrings > import Database.MongoDB - > conn <- connect 1 $ host "localhost" - > let run act = runConn safe Master conn $ use (Database "test") act + > conn <- newConnPool Internet 1 (host "localhost") + > let run act = access safe Master conn $ use (Database "test") act > :{ run $ insertMany "mr1" [ ["x" =: 1, "tags" =: ["dog", "cat"]], @@ -70,7 +70,7 @@ be called iteratively on the results of other reduce steps. Finally, we run mapReduce and iterate over the result collection: > run $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest - Right (Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]) + Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]] Advanced Map/Reduce ------------------- @@ -79,6 +79,6 @@ MongoDB returns additional statistics in the map/reduce results. To obtain them, use *runMR'* instead: > run $ runMR' (mapReduce "mr1" mapFn reduceFn) - Right (Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0]) + Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0] You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics. diff --git a/mongoDB.cabal b/mongoDB.cabal index e671ac8..7e209e8 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,9 +1,9 @@ name: mongoDB -version: 0.8.2 +version: 0.9 build-type: Simple license: OtherLicense license-file: LICENSE -copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc. +copyright: Copyright (c) 2010-2010 10gen Inc. & Scott Parish maintainer: Tony Hannan build-depends: array -any, @@ -24,7 +24,7 @@ bug-reports: synopsis: A driver for MongoDB description: This module lets you connect to MongoDB (www.mongodb.org) and do inserts, queries, updates, etc. category: Database -author: Scott Parish & Tony Hannan +author: Tony Hannan & Scott Parish tested-with: data-files: data-dir: "" @@ -42,6 +42,7 @@ exposed-modules: Database.MongoDB.Internal.Protocol Database.MongoDB.Internal.Util Database.MongoDB.Query + Network.Abstract Var.Pool exposed: True buildable: True diff --git a/tutorial.md b/tutorial.md index 66212c9..b0e046a 100644 --- a/tutorial.md +++ b/tutorial.md @@ -47,16 +47,20 @@ Making A Connection ------------------- Create a connection pool for your mongo server, using the standard port (27017): - > pool <- newConnPool 1 $ host "127.0.0.1" + > pool <- newConnPool Internet 1 $ host "127.0.0.1" or for a non-standard port - > pool <- newConnPool 1 $ Host "127.0.0.1" (PortNumber 30000) + > pool <- newConnPool Internet 1 $ Host "127.0.0.1" (PortNumber 30000) -*newConnPool* takes the connection pool size and the host to connect to. It returns +*newConnPool* takes the *network*, the connection pool size, and the host to connect to. It returns a *ConnPool*, which is a potential pool of TCP connections. They are not created until first access, so it is not possible to get a connection error here. +The network parameter allows you to override normal communications to, for example, log +or replay messages sent and received from servers. *Internet* is the normal communication mode +with no logging/replay. + Note, plain IO code in this driver never raises an exception unless it invokes third party IO code that does. Driver code that may throw an exception says so in its Monad type, for example, *ErrorT IOError IO a*. @@ -71,7 +75,7 @@ A Pipe is a single TCP connection. To run an Access action (monad), supply WriteMode, MasterOrSlaveOk, Connection, and action to *access*. For example, to get a list of all the database on the server: - > access safe Master conn allDatabases + > access safe Master pool allDatabases *access* return either Left Failure or Right result. Failure means there was a connection failure or a read or write exception like cursor expired or duplicate key insert.