From 17ceb532344a14a112c01a88a8379e8baae70a01 Mon Sep 17 00:00:00 2001 From: Tony Hannan Date: Fri, 22 Jul 2011 10:31:24 -0400 Subject: [PATCH] tailable cursor --- Database/MongoDB/Connection.hs | 9 ++++----- Database/MongoDB/Internal/Protocol.hs | 12 +++++++----- Database/MongoDB/Query.hs | 2 +- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index 6c7d2f1..5e45264 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -4,7 +4,7 @@ module Database.MongoDB.Connection ( -- * Util - IOE, runIOE, Secs, + Secs, IOE, runIOE, -- * Connection Pipe, close, isClosed, -- * Server @@ -16,10 +16,9 @@ module Database.MongoDB.Connection ( ) where import Prelude hiding (lookup) -import Database.MongoDB.Internal.Protocol (Pipe, writeMessage, readMessage) -import System.IO.Pipeline (IOE, IOStream(..), newPipeline, close, isClosed) +import Database.MongoDB.Internal.Protocol (Pipe, newPipe) +import System.IO.Pipeline (IOE, close, isClosed) import System.IO.Error as E (try) -import System.IO (hClose) import Network (HostName, PortID(..), connectTo) import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) import Control.Monad.Identity (runIdentity) @@ -102,7 +101,7 @@ connect' timeoutSecs (Host hostname port) = do handle <- ErrorT . E.try $ do mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port) maybe (ioError $ userError "connect timed out") return mh - lift $ newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle) + lift $ newPipe handle -- * Replica Set diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 63855d5..3f0dbb4 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -7,9 +7,7 @@ This module is not intended for direct use. Use the high-level interface at "Dat module Database.MongoDB.Internal.Protocol ( FullCollection, -- * Pipe - Pipe, send, call, - -- * Message - writeMessage, readMessage, + Pipe, newPipe, send, call, -- ** Notice Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, -- ** Request @@ -24,9 +22,9 @@ import Prelude as X import Control.Applicative ((<$>)) import Control.Arrow ((***)) import Data.ByteString.Lazy as B (length, hPut) -import System.IO.Pipeline (IOE, Pipeline) +import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..)) import qualified System.IO.Pipeline as P (send, call) -import System.IO (Handle) +import System.IO (Handle, hClose) import Data.Bson (Document, UString) import Data.Bson.Binary import Data.Binary.Put @@ -47,6 +45,10 @@ import Database.MongoDB.Internal.Util (whenJust, hGetN, bitOr, byteStringHex) type Pipe = Pipeline Response Message -- ^ Thread-safe TCP connection with pipelined requests +newPipe :: Handle -> IO Pipe +-- ^ Create pipe over handle +newPipe handle = newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle) + send :: Pipe -> [Notice] -> IOE () -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. send pipe notices = P.send pipe (notices, Nothing) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index f8c80d7..187b426 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -25,7 +25,7 @@ module Database.MongoDB.Query ( delete, deleteOne, -- * Read -- ** Query - Query(..), QueryOption(NoCursorTimeout), Projector, Limit, Order, BatchSize, + Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData), Projector, Limit, Order, BatchSize, explain, find, findOne, fetch, count, distinct, -- *** Cursor Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,