2012-06-10 19:47:14 +00:00
-- | Low-level messaging between this client and the MongoDB server, see Mongo
-- Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
--
-- This module is not intended for direct use. Use the high-level interface at
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.
2010-06-15 03:14:40 +00:00
2012-06-10 19:47:14 +00:00
{- # LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings # -}
2015-05-15 13:23:40 +00:00
{- # LANGUAGE CPP, FlexibleContexts, TupleSections, TypeSynonymInstances # -}
2012-06-10 19:47:14 +00:00
{- # LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances # -}
2016-06-02 14:46:24 +00:00
{- # LANGUAGE BangPatterns # -}
2010-06-15 03:14:40 +00:00
2016-04-11 00:45:30 +00:00
{- # LANGUAGE NamedFieldPuns, ScopedTypeVariables # -}
# if ( __GLASGOW_HASKELL__ >= 706 )
{- # LANGUAGE RecursiveDo # -}
# else
{- # LANGUAGE DoRec # -}
# endif
2010-06-15 03:14:40 +00:00
module Database.MongoDB.Internal.Protocol (
2013-12-26 14:57:33 +00:00
FullCollection ,
-- * Pipe
2015-03-05 19:20:02 +00:00
Pipe , newPipe , newPipeWith , send , call ,
2013-12-26 14:57:33 +00:00
-- ** Notice
Notice ( .. ) , InsertOption ( .. ) , UpdateOption ( .. ) , DeleteOption ( .. ) , CursorId ,
-- ** Request
Request ( .. ) , QueryOption ( .. ) ,
-- ** Reply
Reply ( .. ) , ResponseFlag ( .. ) ,
-- * Authentication
2016-04-11 00:45:30 +00:00
Username , Password , Nonce , pwHash , pwKey ,
2016-05-20 04:44:42 +00:00
isClosed , close , ServerData ( .. ) , Pipeline ( .. )
2010-06-15 03:14:40 +00:00
) where
2015-05-15 13:23:40 +00:00
# if ! MIN_VERSION_base ( 4 , 8 , 0 )
2010-06-21 15:06:20 +00:00
import Control.Applicative ( ( <$> ) )
2015-05-15 13:23:40 +00:00
# endif
2015-08-29 22:18:34 +00:00
import Control.Monad ( forM , replicateM , unless )
2012-06-10 19:47:14 +00:00
import Data.Binary.Get ( Get , runGet )
import Data.Binary.Put ( Put , runPut )
import Data.Bits ( bit , testBit )
import Data.Int ( Int32 , Int64 )
import Data.IORef ( IORef , newIORef , atomicModifyIORef )
2015-03-05 19:20:02 +00:00
import System.IO ( Handle )
2016-10-30 07:07:17 +00:00
import System.IO.Error ( doesNotExistErrorType , mkIOError )
2010-06-15 03:14:40 +00:00
import System.IO.Unsafe ( unsafePerformIO )
2015-08-29 22:18:34 +00:00
import Data.Maybe ( maybeToList )
2016-04-11 00:45:30 +00:00
import GHC.Conc ( ThreadStatus ( .. ) , threadStatus )
import Control.Monad ( forever )
2018-04-28 22:14:30 +00:00
import Control.Monad.STM ( atomically )
Unmask the forked thread in newPipeline
The `newPipeline` function, used as part of `connect`, forks a
listener thread. Before this commit, the thread is forked with
`forkFinally`, where the thread action is run in the same mask as the
parent thread. The thread is then killed by a `killThread` when
closing a connection.
This is typically not a problem if the mask is “masked” (or,
obviously, “unmasked”), because the listener is generally blocked on a
channel at some time or other, and therefore will accept the
asynchronous exception thrown by `killThread`, and terminate.
However, if the mask is “masked uninterruptible”, then the listener
definitely doesn't receive asynchronous exceptions, and the
`killThread` calls hangs, and never returns.
One should probably never call `connect` in a “masked uninterruptible”
action. However, it sounds better to protect the mongoDB library
against the user accidentally doing so than to add a big warning
saying that calling `connect` in “masked uninterruptible” will cause
the program to hang down the line.
Therefore, this commit uses `forkIOWithUnmask`, in order to run the
thread action always in an “unmasked” state. In which case we can be
sure that we can always kill the listener thread regardless of the
client code.
2019-10-29 08:25:31 +00:00
import Control.Concurrent ( ThreadId , killThread , forkIOWithUnmask )
2018-04-28 22:14:30 +00:00
import Control.Concurrent.STM.TChan ( TChan , newTChan , readTChan , writeTChan , isEmptyTChan )
2016-04-11 00:45:30 +00:00
Unmask the forked thread in newPipeline
The `newPipeline` function, used as part of `connect`, forks a
listener thread. Before this commit, the thread is forked with
`forkFinally`, where the thread action is run in the same mask as the
parent thread. The thread is then killed by a `killThread` when
closing a connection.
This is typically not a problem if the mask is “masked” (or,
obviously, “unmasked”), because the listener is generally blocked on a
channel at some time or other, and therefore will accept the
asynchronous exception thrown by `killThread`, and terminate.
However, if the mask is “masked uninterruptible”, then the listener
definitely doesn't receive asynchronous exceptions, and the
`killThread` calls hangs, and never returns.
One should probably never call `connect` in a “masked uninterruptible”
action. However, it sounds better to protect the mongoDB library
against the user accidentally doing so than to add a big warning
saying that calling `connect` in “masked uninterruptible” will cause
the program to hang down the line.
Therefore, this commit uses `forkIOWithUnmask`, in order to run the
thread action always in an “unmasked” state. In which case we can be
sure that we can always kill the listener thread regardless of the
client code.
2019-10-29 08:25:31 +00:00
import Control.Exception.Lifted ( SomeException , mask_ , onException , throwIO , try )
2012-06-10 19:47:14 +00:00
import qualified Data.ByteString.Lazy as L
import Control.Monad.Trans ( MonadIO , liftIO )
import Data.Bson ( Document )
import Data.Bson.Binary ( getDocument , putDocument , getInt32 , putInt32 , getInt64 ,
putInt64 , putCString )
2012-05-08 15:13:25 +00:00
import Data.Text ( Text )
2012-06-10 19:47:14 +00:00
import qualified Crypto.Hash.MD5 as MD5
2012-05-08 15:13:25 +00:00
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
2012-06-10 19:47:14 +00:00
2016-05-02 02:11:02 +00:00
import Database.MongoDB.Internal.Util ( bitOr , byteStringHex )
2010-12-20 02:08:53 +00:00
2016-05-03 04:30:00 +00:00
import Database.MongoDB.Transport ( Transport )
2016-10-24 06:19:49 +00:00
import qualified Database.MongoDB.Transport as Tr
2015-03-05 19:20:02 +00:00
2016-04-11 00:45:30 +00:00
# if MIN_VERSION_base ( 4 , 6 , 0 )
import Control.Concurrent.MVar.Lifted ( MVar , newEmptyMVar , newMVar , withMVar ,
2016-10-24 06:19:49 +00:00
putMVar , readMVar , mkWeakMVar , isEmptyMVar )
2016-04-11 00:45:30 +00:00
# else
import Control.Concurrent.MVar.Lifted ( MVar , newEmptyMVar , newMVar , withMVar ,
putMVar , readMVar , addMVarFinalizer )
# endif
# if ! MIN_VERSION_base ( 4 , 6 , 0 )
mkWeakMVar :: MVar a -> IO () -> IO ()
mkWeakMVar = addMVarFinalizer
# endif
-- * Pipeline
-- | Thread-safe and pipelined connection
2016-05-20 04:44:42 +00:00
data Pipeline = Pipeline
{ vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
2018-04-28 22:14:30 +00:00
, responseQueue :: TChan ( MVar ( Either IOError Response ) ) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
2016-05-20 04:44:42 +00:00
, listenThread :: ThreadId
2016-10-24 06:19:49 +00:00
, finished :: MVar ()
2016-05-20 04:44:42 +00:00
, serverData :: ServerData
2016-04-11 00:45:30 +00:00
}
2016-05-20 04:44:42 +00:00
data ServerData = ServerData
2016-05-27 06:03:55 +00:00
{ isMaster :: Bool
, minWireVersion :: Int
, maxWireVersion :: Int
, maxMessageSizeBytes :: Int
, maxBsonObjectSize :: Int
, maxWriteBatchSize :: Int
2016-05-20 04:44:42 +00:00
}
Unmask the forked thread in newPipeline
The `newPipeline` function, used as part of `connect`, forks a
listener thread. Before this commit, the thread is forked with
`forkFinally`, where the thread action is run in the same mask as the
parent thread. The thread is then killed by a `killThread` when
closing a connection.
This is typically not a problem if the mask is “masked” (or,
obviously, “unmasked”), because the listener is generally blocked on a
channel at some time or other, and therefore will accept the
asynchronous exception thrown by `killThread`, and terminate.
However, if the mask is “masked uninterruptible”, then the listener
definitely doesn't receive asynchronous exceptions, and the
`killThread` calls hangs, and never returns.
One should probably never call `connect` in a “masked uninterruptible”
action. However, it sounds better to protect the mongoDB library
against the user accidentally doing so than to add a big warning
saying that calling `connect` in “masked uninterruptible” will cause
the program to hang down the line.
Therefore, this commit uses `forkIOWithUnmask`, in order to run the
thread action always in an “unmasked” state. In which case we can be
sure that we can always kill the listener thread regardless of the
client code.
2019-10-29 08:25:31 +00:00
-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> ( Either SomeException a -> IO () ) -> IO ThreadId
forkUnmaskedFinally action and_then =
mask_ $ forkIOWithUnmask $ \ unmask ->
try ( unmask action ) >>= and_then
2016-04-11 00:45:30 +00:00
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
2016-05-20 04:44:42 +00:00
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline serverData stream = do
2016-04-11 00:45:30 +00:00
vStream <- newMVar stream
2018-04-28 22:14:30 +00:00
responseQueue <- atomically newTChan
2016-10-24 06:19:49 +00:00
finished <- newEmptyMVar
2016-10-30 07:07:17 +00:00
let drainReplies = do
2018-04-28 22:14:30 +00:00
chanEmpty <- atomically $ isEmptyTChan responseQueue
2016-10-30 07:07:17 +00:00
if chanEmpty
then return ()
else do
2018-04-28 22:14:30 +00:00
var <- atomically $ readTChan responseQueue
2016-10-30 07:07:17 +00:00
putMVar var $ Left $ mkIOError
doesNotExistErrorType
" Handle has been closed "
Nothing
Nothing
drainReplies
2016-04-11 00:45:30 +00:00
rec
let pipe = Pipeline { .. }
Unmask the forked thread in newPipeline
The `newPipeline` function, used as part of `connect`, forks a
listener thread. Before this commit, the thread is forked with
`forkFinally`, where the thread action is run in the same mask as the
parent thread. The thread is then killed by a `killThread` when
closing a connection.
This is typically not a problem if the mask is “masked” (or,
obviously, “unmasked”), because the listener is generally blocked on a
channel at some time or other, and therefore will accept the
asynchronous exception thrown by `killThread`, and terminate.
However, if the mask is “masked uninterruptible”, then the listener
definitely doesn't receive asynchronous exceptions, and the
`killThread` calls hangs, and never returns.
One should probably never call `connect` in a “masked uninterruptible”
action. However, it sounds better to protect the mongoDB library
against the user accidentally doing so than to add a big warning
saying that calling `connect` in “masked uninterruptible” will cause
the program to hang down the line.
Therefore, this commit uses `forkIOWithUnmask`, in order to run the
thread action always in an “unmasked” state. In which case we can be
sure that we can always kill the listener thread regardless of the
client code.
2019-10-29 08:25:31 +00:00
listenThread <- forkUnmaskedFinally ( listen pipe ) $ \ _ -> do
putMVar finished ()
drainReplies
2016-10-30 07:07:17 +00:00
2016-04-11 00:45:30 +00:00
_ <- mkWeakMVar vStream $ do
killThread listenThread
2016-10-24 06:19:49 +00:00
Tr . close stream
2016-04-11 00:45:30 +00:00
return pipe
2016-10-24 06:19:49 +00:00
isFinished :: Pipeline -> IO Bool
isFinished Pipeline { finished } = do
empty <- isEmptyMVar finished
return $ not empty
2016-04-11 00:55:58 +00:00
close :: Pipeline -> IO ()
2016-04-11 00:45:30 +00:00
-- ^ Close pipe and underlying connection
close Pipeline { .. } = do
killThread listenThread
2016-10-24 06:19:49 +00:00
Tr . close =<< readMVar vStream
2016-04-11 00:45:30 +00:00
2016-04-11 00:55:58 +00:00
isClosed :: Pipeline -> IO Bool
2016-04-11 00:45:30 +00:00
isClosed Pipeline { listenThread } = do
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
2016-04-11 00:55:58 +00:00
listen :: Pipeline -> IO ()
2016-04-11 00:45:30 +00:00
-- ^ Listen for responses and supply them to waiting threads in order
listen Pipeline { .. } = do
stream <- readMVar vStream
forever $ do
2016-04-11 00:55:58 +00:00
e <- try $ readMessage stream
2018-04-28 22:14:30 +00:00
var <- atomically $ readTChan responseQueue
2016-04-11 00:45:30 +00:00
putMVar var e
case e of
2016-10-24 06:19:49 +00:00
Left err -> Tr . close stream >> ioError err -- close and stop looping
2016-04-11 00:45:30 +00:00
Right _ -> return ()
2016-04-11 00:55:58 +00:00
psend :: Pipeline -> Message -> IO ()
2016-04-11 00:45:30 +00:00
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
-- Throw IOError and close pipeline if send fails
2016-06-02 14:46:24 +00:00
psend p @ Pipeline { .. } ! message = withMVar vStream ( flip writeMessage message ) ` onException ` close p
2016-04-11 00:45:30 +00:00
2016-04-11 00:55:58 +00:00
pcall :: Pipeline -> Message -> IO ( IO Response )
2016-04-11 00:45:30 +00:00
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
2016-10-24 06:19:49 +00:00
pcall p @ Pipeline { .. } message = do
2016-10-30 07:07:17 +00:00
listenerStopped <- isFinished p
if listenerStopped
2016-10-24 06:19:49 +00:00
then ioError $ mkIOError doesNotExistErrorType " Handle has been closed " Nothing Nothing
else withMVar vStream doCall ` onException ` close p
where
2016-04-11 00:45:30 +00:00
doCall stream = do
2016-04-11 00:55:58 +00:00
writeMessage stream message
2016-04-11 00:45:30 +00:00
var <- newEmptyMVar
2018-04-28 22:14:30 +00:00
liftIO $ atomically $ writeTChan responseQueue var
2016-04-11 00:45:30 +00:00
return $ readMVar var >>= either throwIO return -- return promise
2010-10-27 20:13:23 +00:00
-- * Pipe
2010-06-15 03:14:40 +00:00
2016-04-11 00:55:58 +00:00
type Pipe = Pipeline
2010-11-01 00:38:38 +00:00
-- ^ Thread-safe TCP connection with pipelined requests
2010-06-15 03:14:40 +00:00
2016-05-20 04:44:42 +00:00
newPipe :: ServerData -> Handle -> IO Pipe
2011-07-22 14:31:24 +00:00
-- ^ Create pipe over handle
2016-10-24 06:19:49 +00:00
newPipe sd handle = Tr . fromHandle handle >>= ( newPipeWith sd )
2015-03-05 19:20:02 +00:00
2016-05-20 04:44:42 +00:00
newPipeWith :: ServerData -> Transport -> IO Pipe
2015-03-05 19:20:02 +00:00
-- ^ Create pipe over connection
2016-05-20 04:44:42 +00:00
newPipeWith sd conn = newPipeline sd conn
2011-07-22 14:31:24 +00:00
2013-12-26 15:23:02 +00:00
send :: Pipe -> [ Notice ] -> IO ()
2010-10-27 20:13:23 +00:00
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
2016-04-11 00:45:30 +00:00
send pipe notices = psend pipe ( notices , Nothing )
2010-06-15 03:14:40 +00:00
2013-12-26 15:23:02 +00:00
call :: Pipe -> [ Notice ] -> Request -> IO ( IO Reply )
2010-10-27 20:13:23 +00:00
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
2010-12-20 02:08:53 +00:00
call pipe notices request = do
2013-12-26 14:57:33 +00:00
requestId <- genRequestId
2016-04-11 00:45:30 +00:00
promise <- pcall pipe ( notices , Just ( request , requestId ) )
2013-12-26 14:57:33 +00:00
return $ check requestId <$> promise
2010-12-20 02:08:53 +00:00
where
2013-12-26 14:57:33 +00:00
check requestId ( responseTo , reply ) = if requestId == responseTo then reply else
error $ " expected response id ( " ++ show responseTo ++ " ) to match request id ( " ++ show requestId ++ " ) "
2010-06-15 03:14:40 +00:00
2010-12-27 05:23:02 +00:00
-- * Message
type Message = ( [ Notice ] , Maybe ( Request , RequestId ) )
2011-07-05 14:37:01 +00:00
-- ^ A write notice(s) with getLastError request, or just query request.
2010-12-27 05:23:02 +00:00
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
2016-05-03 04:30:00 +00:00
writeMessage :: Transport -> Message -> IO ()
2015-03-05 19:20:02 +00:00
-- ^ Write message to connection
writeMessage conn ( notices , mRequest ) = do
2015-08-29 22:18:34 +00:00
noticeStrings <- forM notices $ \ n -> do
requestId <- genRequestId
let s = runPut $ putNotice n requestId
return $ ( lenBytes s ) ` L . append ` s
let requestString = do
( request , requestId ) <- mRequest
let s = runPut $ putRequest request requestId
return $ ( lenBytes s ) ` L . append ` s
2016-10-24 06:19:49 +00:00
Tr . write conn $ L . toStrict $ L . concat $ noticeStrings ++ ( maybeToList requestString )
Tr . flush conn
2011-07-05 14:37:01 +00:00
where
2015-08-29 22:18:34 +00:00
lenBytes bytes = encodeSize . toEnum . fromEnum $ L . length bytes
2013-12-26 14:57:33 +00:00
encodeSize = runPut . putInt32 . ( + 4 )
2010-12-27 05:23:02 +00:00
type Response = ( ResponseTo , Reply )
-- ^ Message received from a Mongo server in response to a Request
2016-05-03 04:30:00 +00:00
readMessage :: Transport -> IO Response
2015-03-05 19:20:02 +00:00
-- ^ read response from a connection
readMessage conn = readResp where
2013-12-26 14:57:33 +00:00
readResp = do
2016-10-24 06:19:49 +00:00
len <- fromEnum . decodeSize . L . fromStrict <$> Tr . read conn 4
runGet getReply . L . fromStrict <$> Tr . read conn len
2013-12-26 14:57:33 +00:00
decodeSize = subtract 4 . runGet getInt32
2010-06-15 03:14:40 +00:00
2012-05-08 15:13:25 +00:00
type FullCollection = Text
2010-06-21 15:06:20 +00:00
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
-- ** Header
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
type Opcode = Int32
2010-06-15 03:14:40 +00:00
type RequestId = Int32
-- ^ A fresh request id is generated for every message
2010-06-21 15:06:20 +00:00
type ResponseTo = RequestId
2010-12-20 02:08:53 +00:00
genRequestId :: ( MonadIO m ) => m RequestId
2010-06-15 03:14:40 +00:00
-- ^ Generate fresh request id
2010-12-20 02:08:53 +00:00
genRequestId = liftIO $ atomicModifyIORef counter $ \ n -> ( n + 1 , n ) where
2013-12-26 14:57:33 +00:00
counter :: IORef RequestId
counter = unsafePerformIO ( newIORef 0 )
{- # NOINLINE counter # -}
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
-- *** Binary format
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
putHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putHeader opcode requestId = do
2013-12-26 14:57:33 +00:00
putInt32 requestId
putInt32 0
putInt32 opcode
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
getHeader :: Get ( Opcode , ResponseTo )
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader = do
2013-12-26 14:57:33 +00:00
_requestId <- getInt32
responseTo <- getInt32
opcode <- getInt32
return ( opcode , responseTo )
2010-06-21 15:06:20 +00:00
-- ** Notice
-- | A notice is a message that is sent with no reply
data Notice =
2013-12-26 14:57:33 +00:00
Insert {
iFullCollection :: FullCollection ,
iOptions :: [ InsertOption ] ,
iDocuments :: [ Document ] }
| Update {
uFullCollection :: FullCollection ,
uOptions :: [ UpdateOption ] ,
uSelector :: Document ,
uUpdater :: Document }
| Delete {
dFullCollection :: FullCollection ,
dOptions :: [ DeleteOption ] ,
dSelector :: Document }
| KillCursors {
kCursorIds :: [ CursorId ] }
deriving ( Show , Eq )
2010-06-15 03:14:40 +00:00
2011-07-21 20:39:19 +00:00
data InsertOption = KeepGoing -- ^ If set, the database will not stop processing a bulk insert if one fails (eg due to duplicate IDs). This makes bulk insert behave similarly to a series of single inserts, except lastError will be set if any insert fails, not just the last one. (new in 1.9.1)
2013-12-26 14:57:33 +00:00
deriving ( Show , Eq )
2011-07-21 20:39:19 +00:00
2010-06-15 03:14:40 +00:00
data UpdateOption =
2013-12-26 14:57:33 +00:00
Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
| MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
deriving ( Show , Eq )
2010-06-15 03:14:40 +00:00
data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
2013-12-26 14:57:33 +00:00
deriving ( Show , Eq )
2010-06-15 03:14:40 +00:00
type CursorId = Int64
2010-06-21 15:06:20 +00:00
-- *** Binary format
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
nOpcode :: Notice -> Opcode
nOpcode Update { } = 2001
nOpcode Insert { } = 2002
nOpcode Delete { } = 2006
nOpcode KillCursors { } = 2007
putNotice :: Notice -> RequestId -> Put
putNotice notice requestId = do
2013-12-26 14:57:33 +00:00
putHeader ( nOpcode notice ) requestId
case notice of
Insert { .. } -> do
putInt32 ( iBits iOptions )
putCString iFullCollection
mapM_ putDocument iDocuments
Update { .. } -> do
putInt32 0
putCString uFullCollection
putInt32 ( uBits uOptions )
putDocument uSelector
putDocument uUpdater
Delete { .. } -> do
putInt32 0
putCString dFullCollection
putInt32 ( dBits dOptions )
putDocument dSelector
KillCursors { .. } -> do
putInt32 0
putInt32 $ toEnum ( length kCursorIds )
mapM_ putInt64 kCursorIds
2010-06-15 03:14:40 +00:00
2011-07-21 20:39:19 +00:00
iBit :: InsertOption -> Int32
iBit KeepGoing = bit 0
iBits :: [ InsertOption ] -> Int32
iBits = bitOr . map iBit
2010-06-15 03:14:40 +00:00
uBit :: UpdateOption -> Int32
uBit Upsert = bit 0
uBit MultiUpdate = bit 1
uBits :: [ UpdateOption ] -> Int32
uBits = bitOr . map uBit
dBit :: DeleteOption -> Int32
dBit SingleRemove = bit 0
dBits :: [ DeleteOption ] -> Int32
dBits = bitOr . map dBit
2010-06-21 15:06:20 +00:00
-- ** Request
2011-07-14 22:47:14 +00:00
-- | A request is a message that is sent with a 'Reply' expected in return
2010-06-21 15:06:20 +00:00
data Request =
2013-12-26 14:57:33 +00:00
Query {
qOptions :: [ QueryOption ] ,
qFullCollection :: FullCollection ,
qSkip :: Int32 , -- ^ Number of initial matching documents to skip
qBatchSize :: Int32 , -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
2020-04-01 13:11:17 +00:00
qSelector :: Document , -- ^ @[]@ = return all documents in collection
qProjector :: Document -- ^ @[]@ = return whole document
2013-12-26 14:57:33 +00:00
} | GetMore {
gFullCollection :: FullCollection ,
gBatchSize :: Int32 ,
gCursorId :: CursorId }
deriving ( Show , Eq )
2010-06-21 15:06:20 +00:00
data QueryOption =
2020-04-01 13:11:17 +00:00
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on @CursorNotFound@ exception.
2013-12-26 14:57:33 +00:00
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
| NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
2020-04-01 13:11:17 +00:00
2013-12-26 14:57:33 +00:00
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
2011-07-22 14:38:50 +00:00
-- Exhaust commented out because not compatible with current `Pipeline` implementation
2020-04-01 13:11:17 +00:00
| Partial -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
2013-12-26 14:57:33 +00:00
deriving ( Show , Eq )
2010-06-21 15:06:20 +00:00
-- *** Binary format
qOpcode :: Request -> Opcode
qOpcode Query { } = 2004
qOpcode GetMore { } = 2005
putRequest :: Request -> RequestId -> Put
putRequest request requestId = do
2013-12-26 14:57:33 +00:00
putHeader ( qOpcode request ) requestId
case request of
Query { .. } -> do
putInt32 ( qBits qOptions )
putCString qFullCollection
putInt32 qSkip
putInt32 qBatchSize
putDocument qSelector
unless ( null qProjector ) ( putDocument qProjector )
GetMore { .. } -> do
putInt32 0
putCString gFullCollection
putInt32 gBatchSize
putInt64 gCursorId
2010-06-15 03:14:40 +00:00
qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1
qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4
2010-07-27 21:18:53 +00:00
qBit AwaitData = bit 5
--qBit Exhaust = bit 6
2011-07-22 14:38:50 +00:00
qBit Partial = bit 7
2010-06-15 03:14:40 +00:00
qBits :: [ QueryOption ] -> Int32
qBits = bitOr . map qBit
2010-06-21 15:06:20 +00:00
-- ** Reply
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
2013-12-26 14:57:33 +00:00
rResponseFlags :: [ ResponseFlag ] ,
rCursorId :: CursorId , -- ^ 0 = cursor finished
rStartingFrom :: Int32 ,
rDocuments :: [ Document ]
} deriving ( Show , Eq )
2010-06-21 15:06:20 +00:00
2010-07-03 17:15:30 +00:00
data ResponseFlag =
2013-12-26 14:57:33 +00:00
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
| QueryError -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
| AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
deriving ( Show , Eq , Enum )
2010-07-03 17:15:30 +00:00
2010-06-21 15:06:20 +00:00
-- * Binary format
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
replyOpcode :: Opcode
replyOpcode = 1
getReply :: Get ( ResponseTo , Reply )
getReply = do
2013-12-26 14:57:33 +00:00
( opcode , responseTo ) <- getHeader
unless ( opcode == replyOpcode ) $ fail $ " expected reply opcode (1) but got " ++ show opcode
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return ( responseTo , Reply { .. } )
2010-06-15 03:14:40 +00:00
2010-07-03 17:15:30 +00:00
rFlags :: Int32 -> [ ResponseFlag ]
rFlags bits = filter ( testBit bits . rBit ) [ CursorNotFound .. ]
rBit :: ResponseFlag -> Int
rBit CursorNotFound = 0
rBit QueryError = 1
rBit AwaitCapable = 3
2010-06-21 15:06:20 +00:00
-- * Authentication
2010-06-15 03:14:40 +00:00
2012-05-08 15:13:25 +00:00
type Username = Text
type Password = Text
type Nonce = Text
2010-06-15 03:14:40 +00:00
2012-05-08 15:13:25 +00:00
pwHash :: Username -> Password -> Text
pwHash u p = T . pack . byteStringHex . MD5 . hash . TE . encodeUtf8 $ u ` T . append ` " :mongo: " ` T . append ` p
2010-06-15 03:14:40 +00:00
2012-05-08 15:13:25 +00:00
pwKey :: Nonce -> Username -> Password -> Text
pwKey n u p = T . pack . byteStringHex . MD5 . hash . TE . encodeUtf8 . T . append n . T . append u $ pwHash u p
2011-03-11 16:11:14 +00:00
2011-07-05 14:37:01 +00:00
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2011 10 gen Inc .
Licensed under the Apache License , Version 2.0 ( the " License " ) ; you may not use this file except in compliance with the License . You may obtain a copy of the License at : http :// www . apache . org / licenses / LICENSE - 2.0 . Unless required by applicable law or agreed to in writing , software distributed under the License is distributed on an " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied . See the License for the specific language governing permissions and limitations under the License . - }