tailable cursor

This commit is contained in:
Tony Hannan 2011-07-22 10:31:24 -04:00
parent 58f83838de
commit 17ceb53234
3 changed files with 12 additions and 11 deletions

View file

@ -4,7 +4,7 @@
module Database.MongoDB.Connection ( module Database.MongoDB.Connection (
-- * Util -- * Util
IOE, runIOE, Secs, Secs, IOE, runIOE,
-- * Connection -- * Connection
Pipe, close, isClosed, Pipe, close, isClosed,
-- * Server -- * Server
@ -16,10 +16,9 @@ module Database.MongoDB.Connection (
) where ) where
import Prelude hiding (lookup) import Prelude hiding (lookup)
import Database.MongoDB.Internal.Protocol (Pipe, writeMessage, readMessage) import Database.MongoDB.Internal.Protocol (Pipe, newPipe)
import System.IO.Pipeline (IOE, IOStream(..), newPipeline, close, isClosed) import System.IO.Pipeline (IOE, close, isClosed)
import System.IO.Error as E (try) import System.IO.Error as E (try)
import System.IO (hClose)
import Network (HostName, PortID(..), connectTo) import Network (HostName, PortID(..), connectTo)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import Control.Monad.Identity (runIdentity) import Control.Monad.Identity (runIdentity)
@ -102,7 +101,7 @@ connect' timeoutSecs (Host hostname port) = do
handle <- ErrorT . E.try $ do handle <- ErrorT . E.try $ do
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port) mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
maybe (ioError $ userError "connect timed out") return mh maybe (ioError $ userError "connect timed out") return mh
lift $ newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle) lift $ newPipe handle
-- * Replica Set -- * Replica Set

View file

@ -7,9 +7,7 @@ This module is not intended for direct use. Use the high-level interface at "Dat
module Database.MongoDB.Internal.Protocol ( module Database.MongoDB.Internal.Protocol (
FullCollection, FullCollection,
-- * Pipe -- * Pipe
Pipe, send, call, Pipe, newPipe, send, call,
-- * Message
writeMessage, readMessage,
-- ** Notice -- ** Notice
Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
-- ** Request -- ** Request
@ -24,9 +22,9 @@ import Prelude as X
import Control.Applicative ((<$>)) import Control.Applicative ((<$>))
import Control.Arrow ((***)) import Control.Arrow ((***))
import Data.ByteString.Lazy as B (length, hPut) import Data.ByteString.Lazy as B (length, hPut)
import System.IO.Pipeline (IOE, Pipeline) import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..))
import qualified System.IO.Pipeline as P (send, call) import qualified System.IO.Pipeline as P (send, call)
import System.IO (Handle) import System.IO (Handle, hClose)
import Data.Bson (Document, UString) import Data.Bson (Document, UString)
import Data.Bson.Binary import Data.Bson.Binary
import Data.Binary.Put import Data.Binary.Put
@ -47,6 +45,10 @@ import Database.MongoDB.Internal.Util (whenJust, hGetN, bitOr, byteStringHex)
type Pipe = Pipeline Response Message type Pipe = Pipeline Response Message
-- ^ Thread-safe TCP connection with pipelined requests -- ^ Thread-safe TCP connection with pipelined requests
newPipe :: Handle -> IO Pipe
-- ^ Create pipe over handle
newPipe handle = newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle)
send :: Pipe -> [Notice] -> IOE () send :: Pipe -> [Notice] -> IOE ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send pipe notices = P.send pipe (notices, Nothing) send pipe notices = P.send pipe (notices, Nothing)

View file

@ -25,7 +25,7 @@ module Database.MongoDB.Query (
delete, deleteOne, delete, deleteOne,
-- * Read -- * Read
-- ** Query -- ** Query
Query(..), QueryOption(NoCursorTimeout), Projector, Limit, Order, BatchSize, Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData), Projector, Limit, Order, BatchSize,
explain, find, findOne, fetch, count, distinct, explain, find, findOne, fetch, count, distinct,
-- *** Cursor -- *** Cursor
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed, Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,