Make current driver compatible with the OP_MSG protocol (#137)

* Make current driver compatible with the OP_MSG protocol

Starting with mongodb v6 the OP_MSG protocol is the only accepted message protocol that is accepted by mongodb.
All prior protocols are deprecated. This commit implements the protocol keeping the current client facing API intact.

See:
https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst
https://medium.com/@asayechemeda/communicating-with-mongodb-using-tcp-sockets-521490f981f

Co-authored-by: Doro Rose <doroerose@gmail.com>
This commit is contained in:
kfiz 2022-10-27 06:09:24 +02:00 committed by GitHub
parent 80c313362b
commit 780df80cfc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 700 additions and 82 deletions

View file

@ -20,31 +20,32 @@
module Database.MongoDB.Internal.Protocol ( module Database.MongoDB.Internal.Protocol (
FullCollection, FullCollection,
-- * Pipe -- * Pipe
Pipe, newPipe, newPipeWith, send, call, Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg,
-- ** Notice -- ** Notice
Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
-- ** Request -- ** Request
Request(..), QueryOption(..), Request(..), QueryOption(..), Cmd (..), KillC(..),
-- ** Reply -- ** Reply
Reply(..), ResponseFlag(..), Reply(..), ResponseFlag(..), FlagBit(..),
-- * Authentication -- * Authentication
Username, Password, Nonce, pwHash, pwKey, Username, Password, Nonce, pwHash, pwKey,
isClosed, close, ServerData(..), Pipeline(..) isClosed, close, ServerData(..), Pipeline(..), putOpMsg,
bitOpMsg
) where ) where
#if !MIN_VERSION_base(4,8,0) #if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>)) import Control.Applicative ((<$>))
#endif #endif
import Control.Monad ( forM, replicateM, unless, forever ) import Control.Monad ( forM, replicateM, unless, forever )
import Data.Binary.Get (Get, runGet) import Data.Binary.Get (Get, runGet, getInt8)
import Data.Binary.Put (Put, runPut) import Data.Binary.Put (Put, runPut, putInt8)
import Data.Bits (bit, testBit) import Data.Bits (bit, testBit, zeroBits)
import Data.Int (Int32, Int64) import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef) import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle) import System.IO (Handle)
import System.IO.Error (doesNotExistErrorType, mkIOError) import System.IO.Error (doesNotExistErrorType, mkIOError)
import System.IO.Unsafe (unsafePerformIO) import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList) import Data.Maybe (maybeToList, fromJust)
import GHC.Conc (ThreadStatus(..), threadStatus) import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad.STM (atomically) import Control.Monad.STM (atomically)
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask) import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
@ -55,7 +56,7 @@ import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson (Document) import Data.Bson (Document, (=:), merge, cast, valueAt, look)
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64, import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
putInt64, putCString) putInt64, putCString)
import Data.Text (Text) import Data.Text (Text)
@ -73,6 +74,8 @@ import qualified Database.MongoDB.Transport as Tr
#if MIN_VERSION_base(4,6,0) #if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, mkWeakMVar, isEmptyMVar) putMVar, readMVar, mkWeakMVar, isEmptyMVar)
import GHC.List (foldl1')
import Conduit (repeatWhileMC, (.|), runConduit, foldlC)
#else #else
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, addMVarFinalizer) putMVar, readMVar, addMVarFinalizer)
@ -89,7 +92,7 @@ mkWeakMVar = addMVarFinalizer
-- | Thread-safe and pipelined connection -- | Thread-safe and pipelined connection
data Pipeline = Pipeline data Pipeline = Pipeline
{ vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it { vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
, responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. , responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrives we pop the next thread and give it the response.
, listenThread :: ThreadId , listenThread :: ThreadId
, finished :: MVar () , finished :: MVar ()
, serverData :: ServerData , serverData :: ServerData
@ -103,6 +106,7 @@ data ServerData = ServerData
, maxBsonObjectSize :: Int , maxBsonObjectSize :: Int
, maxWriteBatchSize :: Int , maxWriteBatchSize :: Int
} }
deriving Show
-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread. -- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
@ -159,6 +163,7 @@ isClosed Pipeline{listenThread} = do
ThreadFinished -> True ThreadFinished -> True
ThreadBlocked _ -> False ThreadBlocked _ -> False
ThreadDied -> True ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read --isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: Pipeline -> IO () listen :: Pipeline -> IO ()
@ -178,6 +183,14 @@ psend :: Pipeline -> Message -> IO ()
-- Throw IOError and close pipeline if send fails -- Throw IOError and close pipeline if send fails
psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `onException` close p psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `onException` close p
psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()-- IO (IO Response)
psendOpMsg p@Pipeline{..} commands flagBit params =
case flagBit of
Just f -> case f of
MoreToCome -> withMVar vStream (\t -> writeOpMsgMessage t (commands, Nothing) flagBit params) `onException` close p -- >> return (return (0, ReplyEmpty))
_ -> error "moreToCome has to be set if no response is expected"
_ -> error "moreToCome has to be set if no response is expected"
pcall :: Pipeline -> Message -> IO (IO Response) pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response. -- Throw IOError and closes pipeline if send fails, likewise for promised response.
@ -193,11 +206,28 @@ pcall p@Pipeline{..} message = do
liftIO $ atomically $ writeTChan responseQueue var liftIO $ atomically $ writeTChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise return $ readMVar var >>= either throwIO return -- return promise
pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcallOpMsg p@Pipeline{..} message flagbit params = do
listenerStopped <- isFinished p
if listenerStopped
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
else withMVar vStream doCall `onException` close p
where
doCall stream = do
writeOpMsgMessage stream ([], message) flagbit params
var <- newEmptyMVar
-- put var into the response-queue so that it can
-- fetch the latest response
liftIO $ atomically $ writeTChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise
-- * Pipe -- * Pipe
type Pipe = Pipeline type Pipe = Pipeline
-- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe` -- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe`
-- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary. -- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary.
newPipe :: ServerData -> Handle -> IO Pipe newPipe :: ServerData -> Handle -> IO Pipe
-- ^ Create pipe over handle -- ^ Create pipe over handle
@ -211,6 +241,12 @@ send :: Pipe -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send pipe notices = psend pipe (notices, Nothing) send pipe notices = psend pipe (notices, Nothing)
sendOpMsg :: Pipe -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
sendOpMsg pipe commands@(Nc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
sendOpMsg pipe commands@(Kc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
sendOpMsg _ _ _ _ = error "This function only supports Cmd types wrapped in Nc or Kc type constructors"
call :: Pipe -> [Notice] -> Request -> IO (IO Reply) call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call pipe notices request = do call pipe notices request = do
@ -221,11 +257,73 @@ call pipe notices request = do
check requestId (responseTo, reply) = if requestId == responseTo then reply else check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
callOpMsg pipe request flagBit params = do
requestId <- genRequestId
promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params
promise' <- promise :: IO Response
return $ snd <$> produce requestId promise'
where
-- We need to perform streaming here as within the OP_MSG protocol mongoDB expects
-- our client to keep receiving messages after the MoreToCome flagbit was
-- set by the server until our client receives an empty flagbit. After the
-- first MoreToCome flagbit was set the responseTo field in the following
-- headers will reference the cursorId that was set in the previous message.
-- see:
-- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses
checkFlagBit p =
case p of
(_, r) ->
case r of
ReplyOpMsg{..} -> flagBits == [MoreToCome]
-- This is called by functions using the OP_MSG protocol,
-- so this has to be ReplyOpMsg
_ -> error "Impossible"
produce reqId p = runConduit $
case p of
(rt, r) ->
case r of
ReplyOpMsg{..} ->
if flagBits == [MoreToCome]
then yieldResponses .| foldlC mergeResponses p
else return $ (rt, check reqId p)
_ -> error "Impossible" -- see comment above
yieldResponses = repeatWhileMC
(do
var <- newEmptyMVar
liftIO $ atomically $ writeTChan (responseQueue pipe) var
readMVar var >>= either throwIO return :: IO Response
)
checkFlagBit
mergeResponses p@(rt,rep) p' =
case (p, p') of
((_, r), (_, r')) ->
case (r, r') of
(ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do
let (section, section') = (head sec, head sec')
(cur, cur') = (maybe Nothing cast $ look "cursor" section,
maybe Nothing cast $ look "cursor" section')
case (cur, cur') of
(Just doc, Just doc') -> do
let (docs, docs') =
( fromJust $ cast $ valueAt "nextBatch" doc :: [Document]
, fromJust $ cast $ valueAt "nextBatch" doc' :: [Document])
id' = fromJust $ cast $ valueAt "id" doc' :: Int32
(rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++)
-- Since we use this to process moreToCome messages, we
-- know that there will be a nextBatch key in the document
_ -> error "Impossible"
_ -> error "Impossible" -- see comment above
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- * Message -- * Message
type Message = ([Notice], Maybe (Request, RequestId)) type Message = ([Notice], Maybe (Request, RequestId))
-- ^ A write notice(s) with getLastError request, or just query request. -- ^ A write notice(s) with getLastError request, or just query request.
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness. -- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
type OpMsgMessage = ([Cmd], Maybe (Request, RequestId))
writeMessage :: Transport -> Message -> IO () writeMessage :: Transport -> Message -> IO ()
-- ^ Write message to connection -- ^ Write message to connection
@ -246,6 +344,25 @@ writeMessage conn (notices, mRequest) = do
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4) encodeSize = runPut . putInt32 . (+ 4)
writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
-- ^ Write message to connection
writeOpMsgMessage conn (notices, mRequest) flagBit params = do
noticeStrings <- forM notices $ \n -> do
requestId <- genRequestId
let s = runPut $ putOpMsg n requestId flagBit params
return $ (lenBytes s) `L.append` s
let requestString = do
(request, requestId) <- mRequest
let s = runPut $ putOpMsg (Req request) requestId flagBit params
return $ (lenBytes s) `L.append` s
Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
Tr.flush conn
where
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4)
type Response = (ResponseTo, Reply) type Response = (ResponseTo, Reply)
-- ^ Message received from a Mongo server in response to a Request -- ^ Message received from a Mongo server in response to a Request
@ -286,6 +403,13 @@ putHeader opcode requestId = do
putInt32 0 putInt32 0
putInt32 opcode putInt32 opcode
putOpMsgHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putOpMsgHeader opcode requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
getHeader :: Get (Opcode, ResponseTo) getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read -- ^ Note, does not read message length (first int32), assumes it was already read
getHeader = do getHeader = do
@ -360,6 +484,137 @@ putNotice notice requestId = do
putInt32 $ toEnum (length kCursorIds) putInt32 $ toEnum (length kCursorIds)
mapM_ putInt64 kCursorIds mapM_ putInt64 kCursorIds
data KillC = KillC { killCursor :: Notice, kFullCollection:: FullCollection} deriving Show
data Cmd = Nc Notice | Req Request | Kc KillC deriving Show
data FlagBit =
ChecksumPresent -- ^ The message ends with 4 bytes containing a CRC-32C checksum
| MoreToCome -- ^ Another message will follow this one without further action from the receiver.
| ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
deriving (Show, Eq, Enum)
{-
OP_MSG header == 16 byte
+ 4 bytes flagBits
+ 1 byte payload type = 1
+ 1 byte payload type = 2
+ 4 byte size of payload
== 26 bytes opcode overhead
+ X Full command document {insert: "test", writeConcern: {...}}
+ Y command identifier ("documents", "deletes", "updates") ( + \0)
-}
putOpMsg :: Cmd -> RequestId -> Maybe FlagBit -> Document -> Put
putOpMsg cmd requestId flagBit params = do
let biT = maybe zeroBits (bit . bitOpMsg) flagBit:: Int32
putOpMsgHeader opMsgOpcode requestId -- header
case cmd of
Nc n -> case n of
Insert{..} -> do
let (sec0, sec1Size) =
prepSectionInfo
iFullCollection
(Just (iDocuments:: [Document]))
(Nothing:: Maybe Document)
("insert":: Text)
("documents":: Text)
params
putInt32 biT -- flagBit
putInt8 0 -- payload type 0
putDocument sec0 -- payload
putInt8 1 -- payload type 1
putInt32 sec1Size -- size of section
putCString "documents" -- identifier
mapM_ putDocument iDocuments -- payload
Update{..} -> do
let doc = ["q" =: uSelector, "u" =: uUpdater]
(sec0, sec1Size) =
prepSectionInfo
uFullCollection
(Nothing:: Maybe [Document])
(Just doc)
("update":: Text)
("updates":: Text)
params
putInt32 biT
putInt8 0
putDocument sec0
putInt8 1
putInt32 sec1Size
putCString "updates"
putDocument doc
Delete{..} -> do
-- Setting limit to 1 here is ok, since this is only used by deleteOne
let doc = ["q" =: dSelector, "limit" =: (1 :: Int32)]
(sec0, sec1Size) =
prepSectionInfo
dFullCollection
(Nothing:: Maybe [Document])
(Just doc)
("delete":: Text)
("deletes":: Text)
params
putInt32 biT
putInt8 0
putDocument sec0
putInt8 1
putInt32 sec1Size
putCString "deletes"
putDocument doc
_ -> error "The KillCursors command cannot be wrapped into a Nc type constructor. Please use the Kc type constructor"
Req r -> case r of
Query{..} -> do
let n = T.splitOn "." qFullCollection
db = head n
sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector]
putInt32 biT
putInt8 0
putDocument sec0
GetMore{..} -> do
let n = T.splitOn "." gFullCollection
(db, coll) = (head n, last n)
pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize]
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
putInt8 0
putDocument pre
Kc k -> case k of
KillC{..} -> do
let n = T.splitOn "." kFullCollection
(db, coll) = (head n, last n)
case killCursor of
KillCursors{..} -> do
let doc = ["killCursors" =: coll, "cursors" =: kCursorIds, "$db" =: db]
putInt32 biT
putInt8 0
putDocument doc
-- Notices are already captured at the beginning, so all
-- other cases are impossible
_ -> error "impossible"
where
lenBytes bytes = toEnum . fromEnum $ L.length bytes:: Int32
prepSectionInfo fullCollection documents document command identifier ps =
let n = T.splitOn "." fullCollection
(db, coll) = (head n, last n)
in
case documents of
Just ds ->
let
sec0 = merge ps [command =: coll, "$db" =: db]
s = sum $ map (lenBytes . runPut . putDocument) ds
i = runPut $ putCString identifier
-- +4 bytes for the type 1 section size that has to be
-- transported in addition to the type 1 section document
sec1Size = s + lenBytes i + 4
in (sec0, sec1Size)
Nothing ->
let
sec0 = merge ps [command =: coll, "$db" =: db]
s = runPut $ putDocument $ fromJust document
i = runPut $ putCString identifier
sec1Size = lenBytes s + lenBytes i + 4
in (sec0, sec1Size)
iBit :: InsertOption -> Int32 iBit :: InsertOption -> Int32
iBit KeepGoing = bit 0 iBit KeepGoing = bit 0
@ -379,6 +634,11 @@ dBit SingleRemove = bit 0
dBits :: [DeleteOption] -> Int32 dBits :: [DeleteOption] -> Int32
dBits = bitOr . map dBit dBits = bitOr . map dBit
bitOpMsg :: FlagBit -> Int
bitOpMsg ChecksumPresent = 0
bitOpMsg MoreToCome = 1
bitOpMsg ExhaustAllowed = 16
-- ** Request -- ** Request
-- | A request is a message that is sent with a 'Reply' expected in return -- | A request is a message that is sent with a 'Reply' expected in return
@ -414,6 +674,9 @@ qOpcode :: Request -> Opcode
qOpcode Query{} = 2004 qOpcode Query{} = 2004
qOpcode GetMore{} = 2005 qOpcode GetMore{} = 2005
opMsgOpcode :: Opcode
opMsgOpcode = 2013
putRequest :: Request -> RequestId -> Put putRequest :: Request -> RequestId -> Put
putRequest request requestId = do putRequest request requestId = do
putHeader (qOpcode request) requestId putHeader (qOpcode request) requestId
@ -437,7 +700,7 @@ qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4 qBit NoCursorTimeout = bit 4
qBit AwaitData = bit 5 qBit AwaitData = bit 5
--qBit Exhaust = bit 6 --qBit Exhaust = bit 6
qBit Partial = bit 7 qBit Database.MongoDB.Internal.Protocol.Partial = bit 7
qBits :: [QueryOption] -> Int32 qBits :: [QueryOption] -> Int32
qBits = bitOr . map qBit qBits = bitOr . map qBit
@ -450,7 +713,13 @@ data Reply = Reply {
rCursorId :: CursorId, -- ^ 0 = cursor finished rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32, rStartingFrom :: Int32,
rDocuments :: [Document] rDocuments :: [Document]
} deriving (Show, Eq) }
| ReplyOpMsg {
flagBits :: [FlagBit],
sections :: [Document],
checksum :: Maybe Int32
}
deriving (Show, Eq)
data ResponseFlag = data ResponseFlag =
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results. CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
@ -466,17 +735,38 @@ replyOpcode = 1
getReply :: Get (ResponseTo, Reply) getReply :: Get (ResponseTo, Reply)
getReply = do getReply = do
(opcode, responseTo) <- getHeader (opcode, responseTo) <- getHeader
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode if opcode == 2013
rResponseFlags <- rFlags <$> getInt32 then do
rCursorId <- getInt64 -- Notes:
rStartingFrom <- getInt32 -- Checksum bits that are set by the server don't seem to be supported by official drivers.
numDocs <- fromIntegral <$> getInt32 -- See: https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/message.py#L1423
rDocuments <- replicateM numDocs getDocument flagBits <- rFlagsOpMsg <$> getInt32
return (responseTo, Reply{..}) _ <- getInt8
sec0 <- getDocument
let sections = [sec0]
checksum = Nothing
return (responseTo, ReplyOpMsg{..})
else do
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
rFlags :: Int32 -> [ResponseFlag] rFlags :: Int32 -> [ResponseFlag]
rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..] rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..]
-- See https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#flagbits
rFlagsOpMsg :: Int32 -> [FlagBit]
rFlagsOpMsg bits = isValidFlag bits
where isValidFlag bt =
let setBits = map fst $ filter (\(_,b) -> b == True) $ zip ([0..31] :: [Int32]) $ map (testBit bt) [0 .. 31]
in if any (\n -> not $ elem n [0,1,16]) setBits
then error "Unsopported bit was set"
else filter (testBit bt . bitOpMsg) [ChecksumPresent ..]
rBit :: ResponseFlag -> Int rBit :: ResponseFlag -> Int
rBit CursorNotFound = 0 rBit CursorNotFound = 0
rBit QueryError = 1 rBit QueryError = 1

View file

@ -83,6 +83,8 @@ import Data.Bson
(!?), (!?),
(=:), (=:),
(=?), (=?),
merge,
cast
) )
import Data.Bson.Binary (putDocument) import Data.Bson.Binary (putDocument)
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
@ -97,7 +99,7 @@ import Data.Functor ((<&>))
import Data.Int (Int32, Int64) import Data.Int (Int32, Int64)
import Data.List (foldl1') import Data.List (foldl1')
import qualified Data.Map as Map import qualified Data.Map as Map
import Data.Maybe (catMaybes, fromMaybe, isNothing, listToMaybe, mapMaybe) import Data.Maybe (catMaybes, fromMaybe, isNothing, listToMaybe, mapMaybe, maybeToList, fromJust)
import Data.Text (Text) import Data.Text (Text)
import qualified Data.Text as T import qualified Data.Text as T
import Data.Typeable (Typeable) import Data.Typeable (Typeable)
@ -125,7 +127,9 @@ import Database.MongoDB.Internal.Protocol
ServerData (..), ServerData (..),
UpdateOption (..), UpdateOption (..),
Username, Username,
Cmd (..),
pwKey, pwKey,
FlagBit (..)
) )
import qualified Database.MongoDB.Internal.Protocol as P import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>)) import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>))
@ -343,11 +347,13 @@ parseSCRAM :: B.ByteString -> Map.Map B.ByteString B.ByteString
parseSCRAM = Map.fromList . fmap (cleanup . T.breakOn "=") . T.splitOn "," . T.pack . B.unpack parseSCRAM = Map.fromList . fmap (cleanup . T.breakOn "=") . T.splitOn "," . T.pack . B.unpack
where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2) where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2)
-- As long as server api is not requested OP_Query has to be used. See:
-- https://github.com/mongodb/specifications/blob/6dc6f80026f0f8d99a8c81f996389534b14f6602/source/mongodb-handshake/handshake.rst#specification
retrieveServerData :: (MonadIO m) => Action m ServerData retrieveServerData :: (MonadIO m) => Action m ServerData
retrieveServerData = do retrieveServerData = do
d <- runCommand1 "isMaster" d <- runCommand1 "isMaster"
let newSd = ServerData let newSd = ServerData
{ isMaster = fromMaybe False $ lookup "ismaster" d { isMaster = fromMaybe False $ lookup "isMaster" d
, minWireVersion = fromMaybe 0 $ lookup "minWireVersion" d , minWireVersion = fromMaybe 0 $ lookup "minWireVersion" d
, maxWireVersion = fromMaybe 0 $ lookup "maxWireVersion" d , maxWireVersion = fromMaybe 0 $ lookup "maxWireVersion" d
, maxMessageSizeBytes = fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d , maxMessageSizeBytes = fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d
@ -371,19 +377,29 @@ allCollections = do
db <- thisDatabase db <- thisDatabase
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]} docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
(return . filter (not . isSpecial db)) (map (dropDbPrefix . at "name") docs) (return . filter (not . isSpecial db)) (map (dropDbPrefix . at "name") docs)
else do else
r <- runCommand1 "listCollections" if maxWireVersion sd < 17
let curData = do then do
(Doc curDoc) <- r !? "cursor" r <- runCommand1 "listCollections"
(curId :: Int64) <- curDoc !? "id" let curData = do
(curNs :: Text) <- curDoc !? "ns" (Doc curDoc) <- r !? "cursor"
(firstBatch :: [Value]) <- curDoc !? "firstBatch" (curId :: Int64) <- curDoc !? "id"
return (curId, curNs, mapMaybe cast' firstBatch :: [Document]) (curNs :: Text) <- curDoc !? "ns"
case curData of (firstBatch :: [Value]) <- curDoc !? "firstBatch"
Nothing -> return [] return (curId, curNs, mapMaybe cast' firstBatch :: [Document])
Just (curId, curNs, firstBatch) -> do case curData of
Nothing -> return []
Just (curId, curNs, firstBatch) -> do
db <- thisDatabase
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch
docs <- rest nc
return $ mapMaybe (\d -> d !? "name") docs
else do
let q = Query [] (Select ["listCollections" =: (1 :: Int)] "$cmd") [] 0 0 [] False 0 []
qr <- queryRequestOpMsg False q
dBatch <- liftIO $ requestOpMsg p qr []
db <- thisDatabase db <- thisDatabase
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch nc <- newCursor db "$cmd" 0 dBatch
docs <- rest nc docs <- rest nc
return $ mapMaybe (\d -> d !? "name") docs return $ mapMaybe (\d -> d !? "name") docs
where where
@ -493,7 +509,7 @@ insert' opts col docs = do
docs' <- liftIO $ mapM assignId docs docs' <- liftIO $ mapM assignId docs
mode <- asks mongoWriteMode mode <- asks mongoWriteMode
let writeConcern = case mode of let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)] NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params Confirm params -> params
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
let ordered = KeepGoing `notElem` opts let ordered = KeepGoing `notElem` opts
@ -544,11 +560,40 @@ insertBlock opts col (prevCount, docs) = do
case errorMessage of case errorMessage of
Just failure -> return $ Left failure Just failure -> return $ Left failure
Nothing -> return $ Right $ map (valueAt "_id") docs Nothing -> return $ Right $ map (valueAt "_id") docs
else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params
doc <- runCommand $ insertCommandDocument opts col docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
(Just (Array errs), Nothing) -> do
let writeErrors = map (anyToWriteError prevCount) errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure errorsWithFailureIndex
(Nothing, Just err) -> do
return $ Left $ WriteFailure
prevCount
(fromMaybe 0 $ lookup "ok" doc)
(show err)
(Just (Array errs), Just writeConcernErr) -> do
let writeErrors = map (anyToWriteError prevCount) errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure $ WriteFailure
prevCount
(fromMaybe 0 $ lookup "ok" doc)
(show writeConcernErr) : errorsWithFailureIndex
(Just unknownValue, Nothing) -> do
return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
(Just unknownValue, Just writeConcernErr) -> do
return $ Left $ CompoundFailure [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
, WriteFailure prevCount (fromMaybe 0 $ lookup "ok" doc) $ show writeConcernErr]
else do else do
mode <- asks mongoWriteMode mode <- asks mongoWriteMode
let writeConcern = case mode of let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)] NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params Confirm params -> merge params ["w" =: (1 :: Int32)]
doc <- runCommand $ insertCommandDocument opts col docs writeConcern doc <- runCommand $ insertCommandDocument opts col docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs (Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
@ -643,9 +688,20 @@ update :: (MonadIO m)
=> [UpdateOption] -> Selection -> Document -> Action m () => [UpdateOption] -> Selection -> Document -> Action m ()
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty. -- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
update opts (Select sel col) up = do update opts (Select sel col) up = do
pipe <- asks mongoPipe
db <- thisDatabase db <- thisDatabase
ctx <- ask let sd = P.serverData pipe
liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx if maxWireVersion sd < 17
then do
ctx <- ask
liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx
else do
liftIOE ConnectionFailure $
P.sendOpMsg
pipe
[Nc (Update (db <.> col) opts sel up)]
(Just P.MoreToCome)
["writeConcern" =: ["w" =: (0 :: Int32)]]
updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
updateCommandDocument col ordered updates writeConcern = updateCommandDocument col ordered updates writeConcern =
@ -700,7 +756,7 @@ update' ordered col updateDocs = do
ctx <- ask ctx <- ask
liftIO $ do liftIO $ do
let writeConcern = case mode of let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)] NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params Confirm params -> params
let docSize = sizeOfDocument $ updateCommandDocument let docSize = sizeOfDocument $ updateCommandDocument
col col
@ -752,10 +808,10 @@ updateBlock ordered col (prevCount, docs) = do
let sd = P.serverData p let sd = P.serverData p
if maxWireVersion sd < 2 if maxWireVersion sd < 2
then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6" then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6"
else do else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
mode <- asks mongoWriteMode mode <- asks mongoWriteMode
let writeConcern = case mode of let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)] NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params Confirm params -> params
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
@ -805,7 +861,59 @@ updateBlock ordered col (prevCount, docs) = do
let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted") let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted")
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] [] let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults] return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> merge params ["w" =: (1 :: Int32)]
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? "n"
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Array err) -> WriteResult True 0 (Just 0) 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ show unknownErr]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Doc err) -> WriteResult
True
0
(Just 0)
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ show unknownErr]
let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted")
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
interruptibleFor :: (Monad m, Result b) => Bool -> [a] -> (a -> m b) -> m [b] interruptibleFor :: (Monad m, Result b) => Bool -> [a] -> (a -> m b) -> m [b]
interruptibleFor ordered = go [] interruptibleFor ordered = go []
@ -853,18 +961,41 @@ docToWriteError doc = WriteFailure ind code msg
delete :: (MonadIO m) delete :: (MonadIO m)
=> Selection -> Action m () => Selection -> Action m ()
-- ^ Delete all documents in selection -- ^ Delete all documents in selection
delete = deleteHelper [] delete s = do
pipe <- asks mongoPipe
let sd = P.serverData pipe
if maxWireVersion sd < 17
then deleteHelper [] s
else deleteMany (coll s) [([], [])] >> return ()
deleteOne :: (MonadIO m) deleteOne :: (MonadIO m)
=> Selection -> Action m () => Selection -> Action m ()
-- ^ Delete first document in selection -- ^ Delete first document in selection
deleteOne = deleteHelper [SingleRemove] deleteOne sel@((Select sel' col)) = do
pipe <- asks mongoPipe
let sd = P.serverData pipe
if maxWireVersion sd < 17
then deleteHelper [SingleRemove] sel
else do
-- Starting with v6 confirming writes via getLastError as it is
-- performed in the deleteHelper call via its call to write is
-- deprecated. To confirm writes now an appropriate writeConcern has to be
-- set. These confirmations were discarded in deleteHelper anyway so no
-- need to dispatch on the writeConcern as it is currently done in deleteHelper
-- via write for older versions
db <- thisDatabase
liftIOE ConnectionFailure $
P.sendOpMsg
pipe
[Nc (Delete (db <.> col) [] sel')]
(Just P.MoreToCome)
["writeConcern" =: ["w" =: (0 :: Int32)]]
deleteHelper :: (MonadIO m) deleteHelper :: (MonadIO m)
=> [DeleteOption] -> Selection -> Action m () => [DeleteOption] -> Selection -> Action m ()
deleteHelper opts (Select sel col) = do deleteHelper opts (Select sel col) = do
db <- thisDatabase
ctx <- ask ctx <- ask
db <- thisDatabase
liftIO $ runReaderT (void $ write (Delete (db <.> col) opts sel)) ctx liftIO $ runReaderT (void $ write (Delete (db <.> col) opts sel)) ctx
{-| Bulk delete operation. If one delete fails it will not delete the remaining {-| Bulk delete operation. If one delete fails it will not delete the remaining
@ -914,7 +1045,7 @@ delete' ordered col deleteDocs = do
mode <- asks mongoWriteMode mode <- asks mongoWriteMode
let writeConcern = case mode of let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)] NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params Confirm params -> params
let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern
let chunks = splitAtLimit let chunks = splitAtLimit
@ -947,11 +1078,61 @@ deleteBlock ordered col (prevCount, docs) = do
let sd = P.serverData p let sd = P.serverData p
if maxWireVersion sd < 2 if maxWireVersion sd < 2
then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6" then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6"
else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params
doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? "n"
let successResults = WriteResult False 0 Nothing n [] [] []
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just (Array err) -> WriteResult True 0 Nothing 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ show unknownErr]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just (Doc err) -> WriteResult
True
0
Nothing
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ show unknownErr]
return $ foldl1' mergeWriteResults [successResults, writeErrorsResults, writeConcernResults]
else do else do
mode <- asks mongoWriteMode mode <- asks mongoWriteMode
let writeConcern = case mode of let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)] NoConfirm -> ["w" =: (0 :: Int32)]
Confirm params -> params Confirm params -> merge params ["w" =: (1 :: Int32)]
doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? "n" let n = fromMaybe 0 $ doc !? "n"
@ -1040,6 +1221,37 @@ type Order = Document
type BatchSize = Word32 type BatchSize = Word32
-- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default.
-- noticeCommands and adminCommands are needed to identify whether
-- queryRequestOpMsg is called via runCommand or not. If not it will
-- behave like being called by a "find"-like command and add additional fields
-- specific to the find command into the selector, such as "filter", "projection" etc.
noticeCommands :: [Text]
noticeCommands = [ "aggregate"
, "count"
, "delete"
, "findAndModify"
, "insert"
, "listCollections"
, "update"
]
adminCommands :: [Text]
adminCommands = [ "buildinfo"
, "clone"
, "collstats"
, "copydb"
, "copydbgetnonce"
, "create"
, "dbstats"
, "deleteIndexes"
, "drop"
, "dropDatabase"
, "renameCollection"
, "repairDatabase"
, "serverStatus"
, "validate"
]
query :: Selector -> Collection -> Query query :: Selector -> Collection -> Query
-- ^ Selects documents in collection that match selector. It uses no query options, projects all fields, does not skip any documents, does not limit result size, uses default batch size, does not sort, does not hint, and does not snapshot. -- ^ Selects documents in collection that match selector. It uses no query options, projects all fields, does not skip any documents, does not limit result size, uses default batch size, does not sort, does not hint, and does not snapshot.
query sel col = Query [] (Select sel col) [] 0 0 [] False 0 [] query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
@ -1047,32 +1259,49 @@ query sel col = Query [] (Select sel col) [] 0 0 [] False 0 []
find :: MonadIO m => Query -> Action m Cursor find :: MonadIO m => Query -> Action m Cursor
-- ^ Fetch documents satisfying query -- ^ Fetch documents satisfying query
find q@Query{selection, batchSize} = do find q@Query{selection, batchSize} = do
db <- thisDatabase
pipe <- asks mongoPipe pipe <- asks mongoPipe
qr <- queryRequest False q db <- thisDatabase
dBatch <- liftIO $ request pipe [] qr let sd = P.serverData pipe
newCursor db (coll selection) batchSize dBatch if maxWireVersion sd < 17
then do
qr <- queryRequest False q
dBatch <- liftIO $ request pipe [] qr
newCursor db (coll selection) batchSize dBatch
else do
qr <- queryRequestOpMsg False q
let newQr =
case fst qr of
Req qry ->
let coll = last $ T.splitOn "." (qFullCollection qry)
in (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr)
-- queryRequestOpMsg only returns Cmd types constructed via Req
_ -> error "impossible"
dBatch <- liftIO $ requestOpMsg pipe newQr []
newCursor db (coll selection) batchSize dBatch
findCommand :: (MonadIO m, MonadFail m) => Query -> Action m Cursor findCommand :: (MonadIO m, MonadFail m) => Query -> Action m Cursor
-- ^ Fetch documents satisfying query using the command "find" -- ^ Fetch documents satisfying query using the command "find"
findCommand Query{..} = do findCommand q@Query{..} = do
let aColl = coll selection pipe <- asks mongoPipe
response <- runCommand $ let sd = P.serverData pipe
[ "find" =: aColl if maxWireVersion sd < 17
, "filter" =: selector selection then do
, "sort" =: sort let aColl = coll selection
, "projection" =: project response <- runCommand $
, "hint" =: hint [ "find" =: aColl
, "skip" =: toInt32 skip , "filter" =: selector selection
] , "sort" =: sort
++ mconcat -- optional fields. They should not be present if set to 0 and mongo will use defaults , "projection" =: project
[ "batchSize" =? toMaybe (/= 0) toInt32 batchSize , "hint" =: hint
, "limit" =? toMaybe (/= 0) toInt32 limit , "skip" =: toInt32 skip
] ]
++ mconcat -- optional fields. They should not be present if set to 0 and mongo will use defaults
getCursorFromResponse aColl response [ "batchSize" =? toMaybe (/= 0) toInt32 batchSize
>>= either (liftIO . throwIO . QueryFailure (at "code" response)) return , "limit" =? toMaybe (/= 0) toInt32 limit
]
getCursorFromResponse aColl response
>>= either (liftIO . throwIO . QueryFailure (at "code" response)) return
else find q
where where
toInt32 :: Integral a => a -> Int32 toInt32 :: Integral a => a -> Int32
toInt32 = fromIntegral toInt32 = fromIntegral
@ -1086,10 +1315,35 @@ findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
-- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it -- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it
findOne q = do findOne q = do
pipe <- asks mongoPipe pipe <- asks mongoPipe
qr <- queryRequest False q {limit = 1} let legacyQuery = do
rq <- liftIO $ request pipe [] qr qr <- queryRequest False q {limit = 1}
Batch _ _ docs <- liftDB $ fulfill rq rq <- liftIO $ request pipe [] qr
return (listToMaybe docs) Batch _ _ docs <- liftDB $ fulfill rq
return (listToMaybe docs)
isHandshake = (== ["isMaster" =: (1 :: Int32)]) $ selector $ selection q :: Bool
if isHandshake
then legacyQuery
else do
let sd = P.serverData pipe
if (maxWireVersion sd < 17)
then legacyQuery
else do
qr <- queryRequestOpMsg False q {limit = 1}
let newQr =
case fst qr of
Req qry ->
let coll = last $ T.splitOn "." (qFullCollection qry)
-- We have to understand whether findOne is called as
-- command directly. This is necessary since findOne is used via
-- runCommand as a vehicle to execute any type of commands and notices.
labels = catMaybes $ map (\f -> look f $ qSelector qry) (noticeCommands ++ adminCommands) :: [Value]
in if null labels
then (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr)
else qr
_ -> error "impossible"
rq <- liftIO $ requestOpMsg pipe newQr []
Batch _ _ docs <- liftDB $ fulfill rq
return (listToMaybe docs)
fetch :: (MonadIO m) => Query -> Action m Document fetch :: (MonadIO m) => Query -> Action m Document
-- ^ Same as 'findOne' except throw 'DocNotFound' if none match -- ^ Same as 'findOne' except throw 'DocNotFound' if none match
@ -1194,7 +1448,7 @@ distinct :: (MonadIO m) => Label -> Selection -> Action m [Value]
-- ^ Fetch distinct values of field in selected documents -- ^ Fetch distinct values of field in selected documents
distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel] distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit) queryRequest :: (Monad m, MonadIO m) => Bool -> Query -> Action m (Request, Maybe Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequest isExplain Query{..} = do queryRequest isExplain Query{..} = do
ctx <- ask ctx <- ask
@ -1213,6 +1467,33 @@ queryRequest isExplain Query{..} = do
special = catMaybes [mOrder, mSnapshot, mHint, mExplain] special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
queryRequestOpMsg :: (Monad m, MonadIO m) => Bool -> Query -> Action m (Cmd, Maybe Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequestOpMsg isExplain Query{..} = do
ctx <- ask
return $ queryRequest' (mongoReadMode ctx) (mongoDatabase ctx)
where
queryRequest' rm db = (Req P.Query{..}, remainingLimit) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit)
-- Check whether this query is not a command in disguise. If
-- isNotCommand is true, then we treat this as a find command and add
-- the relevant fields to the selector
isNotCommand = null $ catMaybes $ map (\l -> look l (selector selection)) (noticeCommands ++ adminCommands)
mOrder = if null sort then Nothing else Just ("sort" =: sort)
mSnapshot = if snapshot then Just ("snapshot" =: True) else Nothing
mHint = if null hint then Nothing else Just ("hint" =: hint)
mExplain = if isExplain then Just ("$explain" =: True) else Nothing
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qProjector = if isNotCommand then ["projection" =: project] else project
qSelector = if isNotCommand then c else s
where s = selector selection
bSize = if qBatchSize == 0 then Nothing else Just ("batchSize" =: qBatchSize)
mLimit = if limit == 0 then Nothing else maybe Nothing (\rL -> Just ("limit" =: (fromIntegral rL :: Int32))) remainingLimit
c = ("filter" =: s) : special ++ maybeToList bSize ++ maybeToList mLimit
batchSizeRemainingLimit :: BatchSize -> Maybe Limit -> (Int32, Maybe Limit) batchSizeRemainingLimit :: BatchSize -> Maybe Limit -> (Int32, Maybe Limit)
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit -- ^ Given batchSize and limit return P.qBatchSize and remaining limit
batchSizeRemainingLimit batchSize mLimit = batchSizeRemainingLimit batchSize mLimit =
@ -1238,6 +1519,14 @@ request pipe ns (req, remainingLimit) = do
let protectedPromise = liftIOE ConnectionFailure promise let protectedPromise = liftIOE ConnectionFailure promise
return $ fromReply remainingLimit =<< protectedPromise return $ fromReply remainingLimit =<< protectedPromise
requestOpMsg :: Pipe -> (Cmd, Maybe Limit) -> Document -> IO DelayedBatch
-- ^ Send notices and request and return promised batch
requestOpMsg pipe (Req r, remainingLimit) params = do
promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params
let protectedPromise = liftIOE ConnectionFailure promise
return $ fromReply remainingLimit =<< protectedPromise
requestOpMsg _ (Nc _, _) _ = error "requestOpMsg: Only messages of type Query are supported"
fromReply :: Maybe Limit -> Reply -> DelayedBatch fromReply :: Maybe Limit -> Reply -> DelayedBatch
-- ^ Convert Reply to Batch or Failure -- ^ Convert Reply to Batch or Failure
fromReply limit Reply{..} = do fromReply limit Reply{..} = do
@ -1249,6 +1538,23 @@ fromReply limit Reply{..} = do
AwaitCapable -> return () AwaitCapable -> return ()
CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId
QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments) QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
fromReply limit ReplyOpMsg{..} = do
let section = head sections
cur = maybe Nothing cast $ look "cursor" section
case cur of
Nothing -> return (Batch limit 0 sections)
Just doc ->
case look "firstBatch" doc of
Just ar -> do
let docs = fromJust $ cast ar
id' = fromJust $ cast $ valueAt "id" doc
return (Batch limit id' docs)
-- A cursor without a firstBatch field, should be a reply to a
-- getMore query and thus have a nextBatch key
Nothing -> do
let docs = fromJust $ cast $ valueAt "nextBatch" doc
id' = fromJust $ cast $ valueAt "id" doc
return (Batch limit id' docs)
fulfill :: DelayedBatch -> Action IO Batch fulfill :: DelayedBatch -> Action IO Batch
-- ^ Demand and wait for result, raise failure if exception -- ^ Demand and wait for result, raise failure if exception
@ -1297,7 +1603,10 @@ fulfill' fcol batchSize dBatch = do
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Maybe Limit -> CursorId -> Action m DelayedBatch nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Maybe Limit -> CursorId -> Action m DelayedBatch
nextBatch' fcol batchSize limit cid = do nextBatch' fcol batchSize limit cid = do
pipe <- asks mongoPipe pipe <- asks mongoPipe
liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit) let sd = P.serverData pipe
if maxWireVersion sd < 17
then liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit)
else liftIO $ requestOpMsg pipe (Req $ GetMore fcol batchSize' cid, remLimit) []
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
next :: MonadIO m => Cursor -> Action m (Maybe Document) next :: MonadIO m => Cursor -> Action m (Maybe Document)
@ -1320,7 +1629,10 @@ next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where
else return $ return (Batch newLimit cid docs') else return $ return (Batch newLimit cid docs')
when (newLimit == Just 0) $ unless (cid == 0) $ do when (newLimit == Just 0) $ unless (cid == 0) $ do
pipe <- asks mongoPipe pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] let sd = P.serverData pipe
if maxWireVersion sd < 17
then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) []
return (dBatch', Just doc) return (dBatch', Just doc)
[] -> if cid == 0 [] -> if cid == 0
then return (return $ Batch (Just 0) 0 [], Nothing) -- finished then return (return $ Batch (Just 0) 0 [], Nothing) -- finished
@ -1380,9 +1692,20 @@ aggregateCommand aColl agg AggregateConfig {..} =
aggregateCursor :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> AggregateConfig -> Action m Cursor aggregateCursor :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> AggregateConfig -> Action m Cursor
-- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details. -- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details.
aggregateCursor aColl agg cfg = do aggregateCursor aColl agg cfg = do
response <- runCommand (aggregateCommand aColl agg cfg) pipe <- asks mongoPipe
getCursorFromResponse aColl response let sd = P.serverData pipe
>>= either (liftIO . throwIO . AggregateFailure) return if maxWireVersion sd < 17
then do
response <- runCommand (aggregateCommand aColl agg cfg)
getCursorFromResponse aColl response
>>= either (liftIO . throwIO . AggregateFailure) return
else do
let q = select (aggregateCommand aColl agg cfg) aColl
qr <- queryRequestOpMsg False q
dBatch <- liftIO $ requestOpMsg pipe qr []
db <- thisDatabase
Right <$> newCursor db aColl 0 dBatch
>>= either (liftIO . throwIO . AggregateFailure) return
getCursorFromResponse getCursorFromResponse
:: (MonadIO m, MonadFail m) :: (MonadIO m, MonadFail m)
@ -1527,7 +1850,12 @@ runCommand1 c = runCommand [c =: (1 :: Int)]
eval :: (MonadIO m, Val v) => Javascript -> Action m v eval :: (MonadIO m, Val v) => Javascript -> Action m v
-- ^ Run code on server -- ^ Run code on server
eval code = at "retval" <$> runCommand ["$eval" =: code] eval code = do
p <- asks mongoPipe
let sd = P.serverData p
if maxWireVersion sd <= 7
then at "retval" <$> runCommand ["$eval" =: code]
else error "The command db.eval() has been removed since MongoDB 4.2"
modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b
modifyMVar v f = do modifyMVar v f = do

Binary file not shown.