Abstract network communication so we can capture and replay messages if desired. Also, remove dependence on deprecated ImpredicativeTypes.

This commit is contained in:
Tony Hannan 2010-12-19 21:08:53 -05:00
parent 8da53a3fa3
commit 111d9a2f72
11 changed files with 248 additions and 217 deletions

View file

@ -34,3 +34,6 @@ untilSuccess' _ f (x : xs) = catchError (f x) (\e -> untilSuccess' e f xs)
mapError :: (Functor m) => (e' -> e) -> ErrorT e' m a -> ErrorT e m a
-- ^ Convert error type thrown
mapError f (ErrorT m) = ErrorT $ (f +++ id) <$> m
whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m ()
whenJust mVal act = maybe (return ()) act mVal

View file

@ -2,169 +2,88 @@
A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. It also closes itself when garbage collected, or you can close it explicitly. -}
{-# LANGUAGE DoRec, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts #-}
{-# LANGUAGE DoRec, RecordWildCards, NamedFieldPuns, ScopedTypeVariables #-}
module Control.Pipeline (
-- * Pipeline
Pipeline, newPipeline, send, call,
-- * Util
Size,
Length(..),
Resource(..),
Flush(..),
Stream(..), getN
Pipeline, newPipeline, send, call, close, isClosed
) where
import Prelude hiding (length)
import Control.Applicative ((<$>))
import Control.Monad (forever)
import Control.Exception (assert, onException)
import System.IO.Error (try, mkIOError, eofErrorType)
import System.IO (Handle, hFlush, hClose, hIsClosed)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.Monoid (Monoid(..))
import Control.Monad.Throw (onException)
import Control.Monad.Error
import Control.Concurrent (ThreadId, forkIO, killThread)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Concurrent.MVar
import Control.Monad.MVar
import Control.Concurrent.Chan
-- * Length
type Size = Int
class Length list where
length :: list -> Size
instance Length S.ByteString where
length = S.length
instance Length L.ByteString where
length = fromEnum . L.length
-- * Resource
class Resource m r where
close :: r -> m ()
-- ^ Close resource
isClosed :: r -> m Bool
-- ^ Is resource closed
instance Resource IO Handle where
close = hClose
isClosed = hIsClosed
-- * Flush
class Flush handle where
flush :: handle -> IO ()
-- ^ Flush written bytes to destination
instance Flush Handle where
flush = hFlush
-- * Stream
class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where
put :: handle -> bytes -> IO ()
-- ^ Write bytes to handle
get :: handle -> Int -> IO bytes
-- ^ Read up to N bytes from handle; if EOF return empty bytes, otherwise block until at least 1 byte is available
getN :: (Stream h b) => h -> Int -> IO b
-- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then throw EOF exception.
getN h n = assert (n >= 0) $ do
bytes <- get h n
let x = length bytes
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "Control.Pipeline" Nothing Nothing)
else mappend bytes <$> getN h (n - x)
instance Stream Handle S.ByteString where
put = S.hPut
get = S.hGet
instance Stream Handle L.ByteString where
put = L.hPut
get = L.hGet
import Network.Abstract (IOE)
import qualified Network.Abstract as C
-- * Pipeline
-- | Thread-safe and pipelined socket
data Pipeline handle bytes = Pipeline {
encodeSize :: Size -> bytes,
decodeSize :: bytes -> Size,
vHandle :: MVar handle, -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan (MVar (Either IOError bytes)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
-- | Thread-safe and pipelined connection
data Pipeline i o = Pipeline {
vConn :: MVar (C.Connection i o), -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan (MVar (Either IOError o)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
listenThread :: ThreadId
}
-- | Create new Pipeline with given encodeInt, decodeInt, and handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: (Stream h b, Resource IO h) =>
(Size -> b) -- ^ Convert Size to bytes of fixed length. Every Int must translate to same number of bytes.
-> (b -> Size) -- ^ Convert bytes of fixed length to Size. Must be exact inverse of encodeSize.
-> h -- ^ Underlying socket (handle) this pipeline will read/write from
-> IO (Pipeline h b)
newPipeline encodeSize decodeSize handle = do
vHandle <- newMVar handle
-- | Create new Pipeline on given connection. You should 'close' pipeline when finished, which will also close connection. If pipeline is not closed but eventually garbage collected, it will be closed along with connection.
newPipeline :: (MonadIO m) => C.Connection i o -> m (Pipeline i o)
newPipeline conn = liftIO $ do
vConn <- newMVar conn
responseQueue <- newChan
rec
let pipe = Pipeline{..}
listenThread <- forkIO (listen pipe)
addMVarFinalizer vHandle $ do
addMVarFinalizer vConn $ do
killThread listenThread
close handle
C.close conn
return pipe
instance (Resource IO h) => Resource IO (Pipeline h b) where
-- | Close pipe and underlying socket (handle)
close Pipeline{..} = do
killThread listenThread
close =<< readMVar vHandle
isClosed Pipeline{listenThread} = do
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
--isClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
close :: (MonadIO m) => Pipeline i o -> m ()
-- | Close pipe and underlying connection
close Pipeline{..} = liftIO $ do
killThread listenThread
C.close =<< readMVar vConn
listen :: (Stream h b, Resource IO h) => Pipeline h b -> IO ()
isClosed :: (MonadIO m) => Pipeline i o -> m Bool
isClosed Pipeline{listenThread} = liftIO $ do
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: Pipeline i o -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order
listen Pipeline{..} = do
let n = length (encodeSize 0)
h <- readMVar vHandle
conn <- readMVar vConn
forever $ do
e <- try $ do
len <- decodeSize <$> getN h n
getN h len
e <- runErrorT $ C.receive conn
var <- readChan responseQueue
putMVar var e
case e of
Left err -> close h >> fail (show err) -- close and stop looping
Left err -> C.close conn >> ioError err -- close and stop looping
Right _ -> return ()
send :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO ()
-- ^ Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future 'call's will get these responses instead of their own).
-- Each message is preceeded by its length when written to socket.
-- Raises IOError and closes pipeline if send fails
send Pipeline{..} messages = withMVar vHandle (writeAll listenThread encodeSize messages)
send :: Pipeline i o -> i -> IOE ()
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
-- Throw IOError and close pipeline if send fails
send p@Pipeline{..} message = withMVar vConn (flip C.send message) `onException` \(_ :: IOError) -> close p
call :: (Stream h b, Resource IO h) => Pipeline h b -> [b] -> IO (IO b)
-- ^ Send messages all together to destination (no messages will be interleaved between them), and return /promise/ of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them).
-- Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length.
-- Raises IOError and closes pipeline if send fails, likewise for reply.
call Pipeline{..} messages = withMVar vHandle $ \h -> do
writeAll listenThread encodeSize messages h
var <- newEmptyMVar
writeChan responseQueue var
return (either ioError return =<< readMVar var) -- return promise
call :: Pipeline i o -> i -> IOE (IOE o)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
call p@Pipeline{..} message = withMVar vConn doCall `onException` \(_ :: IOError) -> close p where
doCall conn = do
C.send conn message
var <- newEmptyMVar
liftIO $ writeChan responseQueue var
return $ ErrorT (readMVar var) -- return promise
writeAll :: (Stream h b, Monoid b, Length b, Resource IO h) => ThreadId -> (Size -> b) -> [b] -> h -> IO ()
-- ^ Write messages to stream. On error, close pipeline and raise IOError.
writeAll listenThread encodeSize messages h = onException
(mapM_ write messages >> flush h)
(killThread listenThread >> close h)
where
write bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes)
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}

View file

@ -10,7 +10,7 @@ Simple example below. Use with language extension /OvererloadedStrings/.
> import Control.Monad.Trans (liftIO)
>
> main = do
> pool <- newConnPool 1 (host "127.0.0.1")
> pool <- newConnPool Internet 1 (host "127.0.0.1")
> e <- access safe Master pool run
> print e
>
@ -49,3 +49,8 @@ import Data.Bson
import Database.MongoDB.Connection
import Database.MongoDB.Query
import Database.MongoDB.Admin
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}

View file

@ -3,6 +3,8 @@
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes, FlexibleInstances #-}
module Database.MongoDB.Connection (
-- * Network
Network', ANetwork', Internet(..),
-- * Host
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
-- * ReplicaSet
@ -10,18 +12,20 @@ module Database.MongoDB.Connection (
-- * MasterOrSlaveOk
MasterOrSlaveOk(..),
-- * Connection Pool
Server(..), connHost, replicaSet
Server(..), newConnPool',
connHost, replicaSet
) where
import Database.MongoDB.Internal.Protocol
import Database.MongoDB.Internal.Protocol as X
import Network.Abstract (IOE, connect, ANetwork(..))
import Data.Bson ((=:), at, UString)
import Control.Pipeline (Resource(..))
import Control.Pipeline as P
import Control.Applicative ((<$>))
import Control.Exception (assert)
import System.IO.Error as E (try)
import Control.Monad.Error
import Control.Monad.MVar
import Network (HostName, PortID(..), connectTo)
import Control.Monad.Context
import Network (HostName, PortID(..))
import Data.Bson (Document, look)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity
@ -94,10 +98,10 @@ instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y
-- ** Replica Info
getReplicaInfo :: Pipe -> ErrorT IOError IO ReplicaInfo
getReplicaInfo :: Pipe -> IOE ReplicaInfo
-- ^ Get replica info of the connected host. Throw IOError if connection fails or host is not part of a replica set (no /hosts/ and /primary/ field).
getReplicaInfo pipe = do
promise <- call pipe [] (adminCommand ["ismaster" =: (1 :: Int)])
promise <- X.call pipe [] (adminCommand ["ismaster" =: (1 :: Int)])
info <- commandReply "ismaster" <$> promise
_ <- look "hosts" info
_ <- look "primary" info
@ -148,19 +152,22 @@ type Pool' = Pool IOError
class Server t where
data ConnPool t
-- ^ A pool of TCP connections ('Pipe's) to a host or a replica set of hosts
newConnPool :: (MonadIO' m) => Int -> t -> m (ConnPool t)
newConnPool :: (Network' n, MonadIO' m) => n -> Int -> t -> m (ConnPool t)
-- ^ Create a ConnectionPool to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host.
getPipe :: MasterOrSlaveOk -> ConnPool t -> ErrorT IOError IO Pipe
getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe
-- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk).
killPipes :: ConnPool t -> IO ()
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they are garbage collected.
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they get garbage collected.
newConnPool' :: (Server t, MonadIO' m, Context ANetwork' m) => Int -> t -> m (ConnPool t)
newConnPool' poolSize' host' = context >>= \(ANetwork net :: ANetwork') -> newConnPool net poolSize' host'
-- ** ConnectionPool Host
instance Server Host where
data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe}
-- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style.
newConnPool poolSize' host' = liftIO (newHostConnPool poolSize' host')
newConnPool net poolSize' host' = liftIO $ newHostConnPool (ANetwork net) poolSize' host'
-- ^ Create a connection pool to server (host or replica set)
getPipe _ = getHostPipe
-- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect.
@ -169,28 +176,29 @@ instance Server Host where
instance Show (ConnPool Host) where
show HostConnPool{connHost} = "ConnPool " ++ show connHost
newHostConnPool :: Int -> Host -> IO (ConnPool Host)
newHostConnPool :: ANetwork' -> Int -> Host -> IO (ConnPool Host)
-- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style.
newHostConnPool poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect host'
killResource = close
isExpired = isClosed
newHostConnPool net poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect net host'
killResource = P.close
isExpired = P.isClosed
getHostPipe :: ConnPool Host -> ErrorT IOError IO Pipe
getHostPipe :: ConnPool Host -> IOE Pipe
-- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host.
getHostPipe (HostConnPool _ pool) = aResource pool
tcpConnect :: Host -> ErrorT IOError IO Pipe
tcpConnect :: ANetwork' -> Host -> IOE Pipe
-- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect.
tcpConnect (Host hostname port) = ErrorT . E.try $ mkPipe =<< connectTo hostname port
tcpConnect net (Host hostname port) = newPipeline =<< connect net (hostname, port)
-- ** Connection ReplicaSet
instance Server ReplicaSet where
data ConnPool ReplicaSet = ReplicaSetConnPool {
network :: ANetwork',
repsetName :: Name,
currentMembers :: MVar [ConnPool Host] } -- master at head after a refresh
newConnPool poolSize' repset = liftIO (newSetConnPool poolSize' repset)
newConnPool net poolSize' repset = liftIO $ newSetConnPool (ANetwork net) poolSize' repset
getPipe = getSetPipe
killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes)
@ -201,29 +209,31 @@ replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet
-- ^ Return replicas set name with current members as seed list
replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
newSetConnPool :: Int -> ReplicaSet -> IO (ConnPool ReplicaSet)
newSetConnPool :: ANetwork' -> Int -> ReplicaSet -> IO (ConnPool ReplicaSet)
-- ^ Create a connection pool to each member of the replica set.
newSetConnPool poolSize' repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (newConnPool poolSize') (seedHosts repset)
return $ ReplicaSetConnPool (setName repset) currentMembers
newSetConnPool net poolSize' repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (newHostConnPool net poolSize') (seedHosts repset)
return $ ReplicaSetConnPool net (setName repset) currentMembers
getMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [Host]
getMembers :: Name -> [ConnPool Host] -> IOE [Host]
-- ^ Get members of replica set, master first. Query supplied connections until config found.
-- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not.
getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections
refreshMembers :: Name -> [ConnPool Host] -> ErrorT IOError IO [ConnPool Host]
refreshMembers :: ANetwork' -> Name -> [ConnPool Host] -> IOE [ConnPool Host]
-- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected.
refreshMembers repsetName connections = do
refreshMembers net repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (connection n) =<< getMembers repsetName connections
mapM (liftIO . connection n) =<< getMembers repsetName connections
where
connection n host' = maybe (newConnPool n host') return $ find ((host' ==) . connHost) connections
connection n host' = maybe (newHostConnPool net n host') return mc where
mc = find ((host' ==) . connHost) connections
getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> ErrorT IOError IO Pipe
getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> IOE Pipe
-- ^ Return a pipe to primary or a random secondary in replica set. Use primary for SlaveOk if and only if no secondaries. Note, refreshes members each time (makes ismaster call to primary).
getSetPipe mos ReplicaSetConnPool{..} = modifyMVar currentMembers $ \conns -> do
connections <- refreshMembers repsetName conns -- master at head after refresh
connections <- refreshMembers network repsetName conns -- master at head after refresh
pipe <- case mos of
Master -> getHostPipe (head connections)
SlaveOk -> do

View file

@ -2,12 +2,13 @@
This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts #-}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
module Database.MongoDB.Internal.Protocol (
-- * Network
Network', ANetwork', Internet(..),
-- * Pipe
Pipe, mkPipe,
send, call,
Pipe, send, call,
-- * Message
FullCollection,
-- ** Notice
@ -22,8 +23,9 @@ module Database.MongoDB.Internal.Protocol (
import Prelude as X
import Control.Applicative ((<$>))
import Control.Arrow ((***))
import System.IO (Handle)
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy as B (length, hPut)
import qualified Control.Pipeline as P
import Data.Bson (Document, UString)
import Data.Bson.Binary
@ -31,45 +33,83 @@ import Data.Binary.Put
import Data.Binary.Get
import Data.Int
import Data.Bits
import Database.MongoDB.Internal.Util (bitOr)
import Data.IORef
import System.IO.Unsafe (unsafePerformIO)
import Data.Digest.OpenSSL.MD5 (md5sum)
import Data.UString as U (pack, append, toByteString)
import System.IO.Error as E (try)
import Control.Monad.Error
import Control.Monad.Util (whenJust)
import Network.Abstract (IOE, ANetwork, Network(..), Connection(Connection))
import Network (connectTo)
import System.IO (hFlush, hClose)
import Database.MongoDB.Internal.Util (hGetN, bitOr)
-- * Network
-- Network -> Server -> (Sink, Source)
-- (Sink, Source) -> Pipeline
type Message = ([Notice], Maybe (Request, RequestId))
-- ^ Write notice(s), write notice(s) with getLastError request, or just query request
-- Note, that requestId will be out of order because request ids will be generated for notices, after the request id supplied was generated. This is ok because the mongo server does not care about order they are just used as unique identifiers.
type Response = (ResponseTo, Reply)
class (Network n Message Response) => Network' n
instance (Network n Message Response) => Network' n
type ANetwork' = ANetwork Message Response
data Internet = Internet
-- ^ Normal Network instance, i.e. no logging or replay
-- | Connect to server. Write messages and receive replies; not thread-safe!
instance Network Internet Message Response where
connect _ (hostname, portid) = ErrorT . E.try $ do
handle <- connectTo hostname portid
return $ Connection (sink handle) (source handle) (hClose handle)
where
sink h (notices, mRequest) = ErrorT . E.try $ do
forM_ notices $ \n -> writeReq h . (Left n,) =<< genRequestId
whenJust mRequest $ writeReq h . (Right *** id)
hFlush h
source h = ErrorT . E.try $ readResp h
writeReq :: Handle -> (Either Notice Request, RequestId) -> IO ()
writeReq handle (e, requestId) = do
hPut handle lenBytes
hPut handle bytes
where
bytes = runPut $ (either putNotice putRequest e) requestId
lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes
encodeSize = runPut . putInt32 . (+ 4)
readResp :: Handle -> IO (ResponseTo, Reply)
readResp handle = do
len <- fromEnum . decodeSize <$> hGetN handle 4
runGet getReply <$> hGetN handle len
where
decodeSize = subtract 4 . runGet getInt32
-- * Pipe
type Pipe = P.Pipeline Handle ByteString
type Pipe = P.Pipeline Message Response
-- ^ Thread-safe TCP connection with pipelined requests
mkPipe :: Handle -> IO Pipe
-- ^ New thread-safe pipelined connection over handle
mkPipe = P.newPipeline encodeSize decodeSize where
encodeSize = runPut . putInt32 . toEnum . (+ 4)
decodeSize = subtract 4 . fromEnum . runGet getInt32
send :: Pipe -> [Notice] -> ErrorT IOError IO ()
send :: Pipe -> [Notice] -> IOE ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send conn notices = ErrorT . E.try $ P.send conn =<< mapM noticeBytes notices
send pipe notices = P.send pipe (notices, Nothing)
call :: Pipe -> [Notice] -> Request -> ErrorT IOError IO (ErrorT IOError IO Reply)
call :: Pipe -> [Notice] -> Request -> IOE (IOE Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call conn notices request = ErrorT . E.try $ do
nMessages <- mapM noticeBytes notices
call pipe notices request = do
requestId <- genRequestId
let rMessage = runPut (putRequest request requestId)
promise <- P.call conn (nMessages ++ [rMessage])
return (ErrorT . E.try $ bytesReply requestId <$> promise)
noticeBytes :: Notice -> IO ByteString
noticeBytes notice = runPut . putNotice notice <$> genRequestId
bytesReply :: RequestId -> ByteString -> Reply
bytesReply requestId bytes = if requestId == responseTo then reply else err where
(responseTo, reply) = runGet getReply bytes
err = error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
promise <- P.call pipe (notices, Just (request, requestId))
return $ check requestId <$> promise
where
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- * Messages
@ -85,9 +125,9 @@ type RequestId = Int32
type ResponseTo = RequestId
genRequestId :: IO RequestId
genRequestId :: (MonadIO m) => m RequestId
-- ^ Generate fresh request id
genRequestId = atomicModifyIORef counter $ \n -> (n + 1, n) where
genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where
counter :: IORef RequestId
counter = unsafePerformIO (newIORef 0)
{-# NOINLINE counter #-}

View file

@ -5,10 +5,15 @@
module Database.MongoDB.Internal.Util where
import Prelude hiding (length)
import Control.Applicative ((<$>))
import Network (PortID(..))
import Data.UString as U (cons, append)
import Data.Bits (Bits, (.|.))
import Data.Bson
import Data.ByteString.Lazy as S (ByteString, length, append, hGet)
import System.IO (Handle)
import System.IO.Error (mkIOError, eofErrorType)
import Control.Exception (assert)
deriving instance Show PortID
deriving instance Eq PortID
@ -30,3 +35,12 @@ true1 k doc = case valueAt k doc of
Int32 n -> n == 1
Int64 n -> n == 1
_ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc
hGetN :: Handle -> Int -> IO ByteString
-- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then raise EOF exception.
hGetN h n = assert (n >= 0) $ do
bytes <- hGet h n
let x = fromEnum $ length bytes
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "hGetN" (Just h) Nothing)
else S.append bytes <$> hGetN h (n - x)

View file

@ -1,6 +1,6 @@
-- | Query and update documents
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes, ImpredicativeTypes #-}
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, RankNTypes #-}
module Database.MongoDB.Query (
-- * Access
@ -29,7 +29,7 @@ module Database.MongoDB.Query (
Query(..), QueryOption(..), Projector, Limit, Order, BatchSize,
explain, find, findOne, count, distinct,
-- *** Cursor
Cursor, next, nextN, rest,
Cursor, next, nextN, rest, closeCursor, isCursorClosed,
-- ** Group
Group(..), GroupKey(..), group,
-- ** MapReduce
@ -47,7 +47,6 @@ import Control.Monad.Reader
import Control.Monad.Error
import Control.Monad.Throw
import Control.Monad.MVar
import Control.Pipeline (Resource(..))
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call)
import Database.MongoDB.Connection (MasterOrSlaveOk(..), Server(..))
@ -441,7 +440,7 @@ newCursor :: (Access m) => Database -> Collection -> BatchSize -> DelayedCursorS
newCursor (Database db) col batch cs = do
var <- newMVar cs
let cursor = Cursor (db <.> col) batch var
addMVarFinalizer var (close cursor)
addMVarFinalizer var (closeCursor cursor)
return cursor
next :: (Access m) => Cursor -> m (Maybe Document)
@ -470,11 +469,13 @@ rest :: (Access m) => Cursor -> m [Document]
-- ^ Return remaining documents in query result
rest c = loop (next c)
instance (Access m) => Resource m Cursor where
close (Cursor _ _ var) = modifyMVar var kill' where
kill' dcs = first return <$> (kill =<< mapErrorIO id dcs)
kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]]
isClosed cursor = do
closeCursor :: (Access m) => Cursor -> m ()
closeCursor (Cursor _ _ var) = modifyMVar var kill' where
kill' dcs = first return <$> (kill =<< mapErrorIO id dcs)
kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]]
isCursorClosed :: (Access m) => Cursor -> m Bool
isCursorClosed cursor = do
CS _ cid docs <- getCursorState cursor
return (cid == 0 && null docs)
@ -593,7 +594,8 @@ send ns = do
pipe <- context
mapErrorIO ConnectionFailure (P.send pipe ns)
call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply)
call :: (Context Pipe m, Throw Failure m, MonadIO m, Throw Failure n, MonadIO n) =>
[Notice] -> Request -> m (n Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive.
call ns r = do
pipe <- context

33
Network/Abstract.hs Normal file
View file

@ -0,0 +1,33 @@
-- | Generalize a network connection to a sink and source
{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, UndecidableInstances #-}
module Network.Abstract where
import Network (HostName, PortID)
import Control.Monad.Error
type IOE = ErrorT IOError IO
type Server = (HostName, PortID)
-- | A network controls connections to other hosts. It may want to overide to log messages or play them back.
-- A server in the network accepts messages of type i and generates messages of type o.
class Network n i o where
connect :: n -> Server -> IOE (Connection i o)
-- ^ Connect to Server returning the send sink and receive source, throw IOError if can't connect.
data Connection i o = Connection {
send :: i -> IOE (),
receive :: IOE o,
close :: IO () }
data ANetwork i o = forall n. (Network n i o) => ANetwork n
instance Network (ANetwork i o) i o where
connect (ANetwork n) = connect n
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -}

View file

@ -19,8 +19,8 @@ map/reduce queries on:
Prelude> :set prompt "> "
> :set -XOverloadedStrings
> import Database.MongoDB
> conn <- connect 1 $ host "localhost"
> let run act = runConn safe Master conn $ use (Database "test") act
> conn <- newConnPool Internet 1 (host "localhost")
> let run act = access safe Master conn $ use (Database "test") act
> :{
run $ insertMany "mr1" [
["x" =: 1, "tags" =: ["dog", "cat"]],
@ -70,7 +70,7 @@ be called iteratively on the results of other reduce steps.
Finally, we run mapReduce and iterate over the result collection:
> run $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest
Right (Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]])
Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]
Advanced Map/Reduce
-------------------
@ -79,6 +79,6 @@ MongoDB returns additional statistics in the map/reduce results. To
obtain them, use *runMR'* instead:
> run $ runMR' (mapReduce "mr1" mapFn reduceFn)
Right (Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0])
Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0]
You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics.

View file

@ -1,9 +1,9 @@
name: mongoDB
version: 0.8.2
version: 0.9
build-type: Simple
license: OtherLicense
license-file: LICENSE
copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc.
copyright: Copyright (c) 2010-2010 10gen Inc. & Scott Parish
maintainer: Tony Hannan <tony@10gen.com>
build-depends:
array -any,
@ -24,7 +24,7 @@ bug-reports:
synopsis: A driver for MongoDB
description: This module lets you connect to MongoDB (www.mongodb.org) and do inserts, queries, updates, etc.
category: Database
author: Scott Parish <srp@srparish.net> & Tony Hannan <tony@10gen.com>
author: Tony Hannan <tony@10gen.com> & Scott Parish <srp@srparish.net>
tested-with:
data-files:
data-dir: ""
@ -42,6 +42,7 @@ exposed-modules:
Database.MongoDB.Internal.Protocol
Database.MongoDB.Internal.Util
Database.MongoDB.Query
Network.Abstract
Var.Pool
exposed: True
buildable: True

View file

@ -47,16 +47,20 @@ Making A Connection
-------------------
Create a connection pool for your mongo server, using the standard port (27017):
> pool <- newConnPool 1 $ host "127.0.0.1"
> pool <- newConnPool Internet 1 $ host "127.0.0.1"
or for a non-standard port
> pool <- newConnPool 1 $ Host "127.0.0.1" (PortNumber 30000)
> pool <- newConnPool Internet 1 $ Host "127.0.0.1" (PortNumber 30000)
*newConnPool* takes the connection pool size and the host to connect to. It returns
*newConnPool* takes the *network*, the connection pool size, and the host to connect to. It returns
a *ConnPool*, which is a potential pool of TCP connections. They are not created until first
access, so it is not possible to get a connection error here.
The network parameter allows you to override normal communications to, for example, log
or replay messages sent and received from servers. *Internet* is the normal communication mode
with no logging/replay.
Note, plain IO code in this driver never raises an exception unless it invokes third party IO
code that does. Driver code that may throw an exception says so in its Monad type,
for example, *ErrorT IOError IO a*.
@ -71,7 +75,7 @@ A Pipe is a single TCP connection.
To run an Access action (monad), supply WriteMode, MasterOrSlaveOk, Connection,
and action to *access*. For example, to get a list of all the database on the server:
> access safe Master conn allDatabases
> access safe Master pool allDatabases
*access* return either Left Failure or Right result. Failure means there was a connection failure
or a read or write exception like cursor expired or duplicate key insert.