From 780df80cfc0781a21a2c1e397de26c00c8878488 Mon Sep 17 00:00:00 2001 From: kfiz Date: Thu, 27 Oct 2022 06:09:24 +0200 Subject: [PATCH] Make current driver compatible with the OP_MSG protocol (#137) * Make current driver compatible with the OP_MSG protocol Starting with mongodb v6 the OP_MSG protocol is the only accepted message protocol that is accepted by mongodb. All prior protocols are deprecated. This commit implements the protocol keeping the current client facing API intact. See: https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst https://medium.com/@asayechemeda/communicating-with-mongodb-using-tcp-sockets-521490f981f Co-authored-by: Doro Rose --- Database/MongoDB/Internal/Protocol.hs | 332 +++++++++++++++++-- Database/MongoDB/Query.hs | 450 ++++++++++++++++++++++---- dist-newstyle/cache/config | Bin 11362 -> 0 bytes 3 files changed, 700 insertions(+), 82 deletions(-) delete mode 100644 dist-newstyle/cache/config diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index ed99a0c..26d190c 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -20,31 +20,32 @@ module Database.MongoDB.Internal.Protocol ( FullCollection, -- * Pipe - Pipe, newPipe, newPipeWith, send, call, + Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg, -- ** Notice Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, -- ** Request - Request(..), QueryOption(..), + Request(..), QueryOption(..), Cmd (..), KillC(..), -- ** Reply - Reply(..), ResponseFlag(..), + Reply(..), ResponseFlag(..), FlagBit(..), -- * Authentication Username, Password, Nonce, pwHash, pwKey, - isClosed, close, ServerData(..), Pipeline(..) + isClosed, close, ServerData(..), Pipeline(..), putOpMsg, + bitOpMsg ) where #if !MIN_VERSION_base(4,8,0) import Control.Applicative ((<$>)) #endif import Control.Monad ( forM, replicateM, unless, forever ) -import Data.Binary.Get (Get, runGet) -import Data.Binary.Put (Put, runPut) -import Data.Bits (bit, testBit) +import Data.Binary.Get (Get, runGet, getInt8) +import Data.Binary.Put (Put, runPut, putInt8) +import Data.Bits (bit, testBit, zeroBits) import Data.Int (Int32, Int64) import Data.IORef (IORef, newIORef, atomicModifyIORef) import System.IO (Handle) import System.IO.Error (doesNotExistErrorType, mkIOError) import System.IO.Unsafe (unsafePerformIO) -import Data.Maybe (maybeToList) +import Data.Maybe (maybeToList, fromJust) import GHC.Conc (ThreadStatus(..), threadStatus) import Control.Monad.STM (atomically) import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask) @@ -55,7 +56,7 @@ import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try import qualified Data.ByteString.Lazy as L import Control.Monad.Trans (MonadIO, liftIO) -import Data.Bson (Document) +import Data.Bson (Document, (=:), merge, cast, valueAt, look) import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64, putInt64, putCString) import Data.Text (Text) @@ -73,6 +74,8 @@ import qualified Database.MongoDB.Transport as Tr #if MIN_VERSION_base(4,6,0) import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, mkWeakMVar, isEmptyMVar) +import GHC.List (foldl1') +import Conduit (repeatWhileMC, (.|), runConduit, foldlC) #else import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, addMVarFinalizer) @@ -89,7 +92,7 @@ mkWeakMVar = addMVarFinalizer -- | Thread-safe and pipelined connection data Pipeline = Pipeline { vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it - , responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. + , responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrives we pop the next thread and give it the response. , listenThread :: ThreadId , finished :: MVar () , serverData :: ServerData @@ -103,6 +106,7 @@ data ServerData = ServerData , maxBsonObjectSize :: Int , maxWriteBatchSize :: Int } + deriving Show -- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread. forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId @@ -159,6 +163,7 @@ isClosed Pipeline{listenThread} = do ThreadFinished -> True ThreadBlocked _ -> False ThreadDied -> True + --isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read listen :: Pipeline -> IO () @@ -178,6 +183,14 @@ psend :: Pipeline -> Message -> IO () -- Throw IOError and close pipeline if send fails psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `onException` close p +psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()-- IO (IO Response) +psendOpMsg p@Pipeline{..} commands flagBit params = + case flagBit of + Just f -> case f of + MoreToCome -> withMVar vStream (\t -> writeOpMsgMessage t (commands, Nothing) flagBit params) `onException` close p -- >> return (return (0, ReplyEmpty)) + _ -> error "moreToCome has to be set if no response is expected" + _ -> error "moreToCome has to be set if no response is expected" + pcall :: Pipeline -> Message -> IO (IO Response) -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. @@ -193,11 +206,28 @@ pcall p@Pipeline{..} message = do liftIO $ atomically $ writeTChan responseQueue var return $ readMVar var >>= either throwIO return -- return promise +pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response) +-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). +-- Throw IOError and closes pipeline if send fails, likewise for promised response. +pcallOpMsg p@Pipeline{..} message flagbit params = do + listenerStopped <- isFinished p + if listenerStopped + then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing + else withMVar vStream doCall `onException` close p + where + doCall stream = do + writeOpMsgMessage stream ([], message) flagbit params + var <- newEmptyMVar + -- put var into the response-queue so that it can + -- fetch the latest response + liftIO $ atomically $ writeTChan responseQueue var + return $ readMVar var >>= either throwIO return -- return promise + -- * Pipe type Pipe = Pipeline --- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe` --- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary. +-- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe` +-- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary. newPipe :: ServerData -> Handle -> IO Pipe -- ^ Create pipe over handle @@ -211,6 +241,12 @@ send :: Pipe -> [Notice] -> IO () -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. send pipe notices = psend pipe (notices, Nothing) +sendOpMsg :: Pipe -> [Cmd] -> Maybe FlagBit -> Document -> IO () +-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. +sendOpMsg pipe commands@(Nc _ : _) flagBit params = psendOpMsg pipe commands flagBit params +sendOpMsg pipe commands@(Kc _ : _) flagBit params = psendOpMsg pipe commands flagBit params +sendOpMsg _ _ _ _ = error "This function only supports Cmd types wrapped in Nc or Kc type constructors" + call :: Pipe -> [Notice] -> Request -> IO (IO Reply) -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. call pipe notices request = do @@ -221,11 +257,73 @@ call pipe notices request = do check requestId (responseTo, reply) = if requestId == responseTo then reply else error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" +callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply) +-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. +callOpMsg pipe request flagBit params = do + requestId <- genRequestId + promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params + promise' <- promise :: IO Response + return $ snd <$> produce requestId promise' + where + -- We need to perform streaming here as within the OP_MSG protocol mongoDB expects + -- our client to keep receiving messages after the MoreToCome flagbit was + -- set by the server until our client receives an empty flagbit. After the + -- first MoreToCome flagbit was set the responseTo field in the following + -- headers will reference the cursorId that was set in the previous message. + -- see: + -- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses + checkFlagBit p = + case p of + (_, r) -> + case r of + ReplyOpMsg{..} -> flagBits == [MoreToCome] + -- This is called by functions using the OP_MSG protocol, + -- so this has to be ReplyOpMsg + _ -> error "Impossible" + produce reqId p = runConduit $ + case p of + (rt, r) -> + case r of + ReplyOpMsg{..} -> + if flagBits == [MoreToCome] + then yieldResponses .| foldlC mergeResponses p + else return $ (rt, check reqId p) + _ -> error "Impossible" -- see comment above + yieldResponses = repeatWhileMC + (do + var <- newEmptyMVar + liftIO $ atomically $ writeTChan (responseQueue pipe) var + readMVar var >>= either throwIO return :: IO Response + ) + checkFlagBit + mergeResponses p@(rt,rep) p' = + case (p, p') of + ((_, r), (_, r')) -> + case (r, r') of + (ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do + let (section, section') = (head sec, head sec') + (cur, cur') = (maybe Nothing cast $ look "cursor" section, + maybe Nothing cast $ look "cursor" section') + case (cur, cur') of + (Just doc, Just doc') -> do + let (docs, docs') = + ( fromJust $ cast $ valueAt "nextBatch" doc :: [Document] + , fromJust $ cast $ valueAt "nextBatch" doc' :: [Document]) + id' = fromJust $ cast $ valueAt "id" doc' :: Int32 + (rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++) + -- Since we use this to process moreToCome messages, we + -- know that there will be a nextBatch key in the document + _ -> error "Impossible" + _ -> error "Impossible" -- see comment above + check requestId (responseTo, reply) = if requestId == responseTo then reply else + error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" + -- * Message type Message = ([Notice], Maybe (Request, RequestId)) -- ^ A write notice(s) with getLastError request, or just query request. -- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness. +type OpMsgMessage = ([Cmd], Maybe (Request, RequestId)) writeMessage :: Transport -> Message -> IO () -- ^ Write message to connection @@ -246,6 +344,25 @@ writeMessage conn (notices, mRequest) = do lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes encodeSize = runPut . putInt32 . (+ 4) +writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO () +-- ^ Write message to connection +writeOpMsgMessage conn (notices, mRequest) flagBit params = do + noticeStrings <- forM notices $ \n -> do + requestId <- genRequestId + let s = runPut $ putOpMsg n requestId flagBit params + return $ (lenBytes s) `L.append` s + + let requestString = do + (request, requestId) <- mRequest + let s = runPut $ putOpMsg (Req request) requestId flagBit params + return $ (lenBytes s) `L.append` s + + Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString) + Tr.flush conn + where + lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes + encodeSize = runPut . putInt32 . (+ 4) + type Response = (ResponseTo, Reply) -- ^ Message received from a Mongo server in response to a Request @@ -286,6 +403,13 @@ putHeader opcode requestId = do putInt32 0 putInt32 opcode +putOpMsgHeader :: Opcode -> RequestId -> Put +-- ^ Note, does not write message length (first int32), assumes caller will write it +putOpMsgHeader opcode requestId = do + putInt32 requestId + putInt32 0 + putInt32 opcode + getHeader :: Get (Opcode, ResponseTo) -- ^ Note, does not read message length (first int32), assumes it was already read getHeader = do @@ -360,6 +484,137 @@ putNotice notice requestId = do putInt32 $ toEnum (length kCursorIds) mapM_ putInt64 kCursorIds +data KillC = KillC { killCursor :: Notice, kFullCollection:: FullCollection} deriving Show + +data Cmd = Nc Notice | Req Request | Kc KillC deriving Show + +data FlagBit = + ChecksumPresent -- ^ The message ends with 4 bytes containing a CRC-32C checksum + | MoreToCome -- ^ Another message will follow this one without further action from the receiver. + | ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit. + deriving (Show, Eq, Enum) + + +{- + OP_MSG header == 16 byte + + 4 bytes flagBits + + 1 byte payload type = 1 + + 1 byte payload type = 2 + + 4 byte size of payload + == 26 bytes opcode overhead + + X Full command document {insert: "test", writeConcern: {...}} + + Y command identifier ("documents", "deletes", "updates") ( + \0) +-} +putOpMsg :: Cmd -> RequestId -> Maybe FlagBit -> Document -> Put +putOpMsg cmd requestId flagBit params = do + let biT = maybe zeroBits (bit . bitOpMsg) flagBit:: Int32 + putOpMsgHeader opMsgOpcode requestId -- header + case cmd of + Nc n -> case n of + Insert{..} -> do + let (sec0, sec1Size) = + prepSectionInfo + iFullCollection + (Just (iDocuments:: [Document])) + (Nothing:: Maybe Document) + ("insert":: Text) + ("documents":: Text) + params + putInt32 biT -- flagBit + putInt8 0 -- payload type 0 + putDocument sec0 -- payload + putInt8 1 -- payload type 1 + putInt32 sec1Size -- size of section + putCString "documents" -- identifier + mapM_ putDocument iDocuments -- payload + Update{..} -> do + let doc = ["q" =: uSelector, "u" =: uUpdater] + (sec0, sec1Size) = + prepSectionInfo + uFullCollection + (Nothing:: Maybe [Document]) + (Just doc) + ("update":: Text) + ("updates":: Text) + params + putInt32 biT + putInt8 0 + putDocument sec0 + putInt8 1 + putInt32 sec1Size + putCString "updates" + putDocument doc + Delete{..} -> do + -- Setting limit to 1 here is ok, since this is only used by deleteOne + let doc = ["q" =: dSelector, "limit" =: (1 :: Int32)] + (sec0, sec1Size) = + prepSectionInfo + dFullCollection + (Nothing:: Maybe [Document]) + (Just doc) + ("delete":: Text) + ("deletes":: Text) + params + putInt32 biT + putInt8 0 + putDocument sec0 + putInt8 1 + putInt32 sec1Size + putCString "deletes" + putDocument doc + _ -> error "The KillCursors command cannot be wrapped into a Nc type constructor. Please use the Kc type constructor" + Req r -> case r of + Query{..} -> do + let n = T.splitOn "." qFullCollection + db = head n + sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector] + putInt32 biT + putInt8 0 + putDocument sec0 + GetMore{..} -> do + let n = T.splitOn "." gFullCollection + (db, coll) = (head n, last n) + pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize] + putInt32 (bit $ bitOpMsg $ ExhaustAllowed) + putInt8 0 + putDocument pre + Kc k -> case k of + KillC{..} -> do + let n = T.splitOn "." kFullCollection + (db, coll) = (head n, last n) + case killCursor of + KillCursors{..} -> do + let doc = ["killCursors" =: coll, "cursors" =: kCursorIds, "$db" =: db] + putInt32 biT + putInt8 0 + putDocument doc + -- Notices are already captured at the beginning, so all + -- other cases are impossible + _ -> error "impossible" + where + lenBytes bytes = toEnum . fromEnum $ L.length bytes:: Int32 + prepSectionInfo fullCollection documents document command identifier ps = + let n = T.splitOn "." fullCollection + (db, coll) = (head n, last n) + in + case documents of + Just ds -> + let + sec0 = merge ps [command =: coll, "$db" =: db] + s = sum $ map (lenBytes . runPut . putDocument) ds + i = runPut $ putCString identifier + -- +4 bytes for the type 1 section size that has to be + -- transported in addition to the type 1 section document + sec1Size = s + lenBytes i + 4 + in (sec0, sec1Size) + Nothing -> + let + sec0 = merge ps [command =: coll, "$db" =: db] + s = runPut $ putDocument $ fromJust document + i = runPut $ putCString identifier + sec1Size = lenBytes s + lenBytes i + 4 + in (sec0, sec1Size) + iBit :: InsertOption -> Int32 iBit KeepGoing = bit 0 @@ -379,6 +634,11 @@ dBit SingleRemove = bit 0 dBits :: [DeleteOption] -> Int32 dBits = bitOr . map dBit +bitOpMsg :: FlagBit -> Int +bitOpMsg ChecksumPresent = 0 +bitOpMsg MoreToCome = 1 +bitOpMsg ExhaustAllowed = 16 + -- ** Request -- | A request is a message that is sent with a 'Reply' expected in return @@ -414,6 +674,9 @@ qOpcode :: Request -> Opcode qOpcode Query{} = 2004 qOpcode GetMore{} = 2005 +opMsgOpcode :: Opcode +opMsgOpcode = 2013 + putRequest :: Request -> RequestId -> Put putRequest request requestId = do putHeader (qOpcode request) requestId @@ -437,7 +700,7 @@ qBit SlaveOK = bit 2 qBit NoCursorTimeout = bit 4 qBit AwaitData = bit 5 --qBit Exhaust = bit 6 -qBit Partial = bit 7 +qBit Database.MongoDB.Internal.Protocol.Partial = bit 7 qBits :: [QueryOption] -> Int32 qBits = bitOr . map qBit @@ -450,7 +713,13 @@ data Reply = Reply { rCursorId :: CursorId, -- ^ 0 = cursor finished rStartingFrom :: Int32, rDocuments :: [Document] - } deriving (Show, Eq) + } + | ReplyOpMsg { + flagBits :: [FlagBit], + sections :: [Document], + checksum :: Maybe Int32 + } + deriving (Show, Eq) data ResponseFlag = CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results. @@ -466,17 +735,38 @@ replyOpcode = 1 getReply :: Get (ResponseTo, Reply) getReply = do (opcode, responseTo) <- getHeader - unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode - rResponseFlags <- rFlags <$> getInt32 - rCursorId <- getInt64 - rStartingFrom <- getInt32 - numDocs <- fromIntegral <$> getInt32 - rDocuments <- replicateM numDocs getDocument - return (responseTo, Reply{..}) + if opcode == 2013 + then do + -- Notes: + -- Checksum bits that are set by the server don't seem to be supported by official drivers. + -- See: https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/message.py#L1423 + flagBits <- rFlagsOpMsg <$> getInt32 + _ <- getInt8 + sec0 <- getDocument + let sections = [sec0] + checksum = Nothing + return (responseTo, ReplyOpMsg{..}) + else do + unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode + rResponseFlags <- rFlags <$> getInt32 + rCursorId <- getInt64 + rStartingFrom <- getInt32 + numDocs <- fromIntegral <$> getInt32 + rDocuments <- replicateM numDocs getDocument + return (responseTo, Reply{..}) rFlags :: Int32 -> [ResponseFlag] rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..] +-- See https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#flagbits +rFlagsOpMsg :: Int32 -> [FlagBit] +rFlagsOpMsg bits = isValidFlag bits + where isValidFlag bt = + let setBits = map fst $ filter (\(_,b) -> b == True) $ zip ([0..31] :: [Int32]) $ map (testBit bt) [0 .. 31] + in if any (\n -> not $ elem n [0,1,16]) setBits + then error "Unsopported bit was set" + else filter (testBit bt . bitOpMsg) [ChecksumPresent ..] + rBit :: ResponseFlag -> Int rBit CursorNotFound = 0 rBit QueryError = 1 diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index a68c8c5..a8ec1dc 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -83,6 +83,8 @@ import Data.Bson (!?), (=:), (=?), + merge, + cast ) import Data.Bson.Binary (putDocument) import qualified Data.ByteString as BS @@ -97,7 +99,7 @@ import Data.Functor ((<&>)) import Data.Int (Int32, Int64) import Data.List (foldl1') import qualified Data.Map as Map -import Data.Maybe (catMaybes, fromMaybe, isNothing, listToMaybe, mapMaybe) +import Data.Maybe (catMaybes, fromMaybe, isNothing, listToMaybe, mapMaybe, maybeToList, fromJust) import Data.Text (Text) import qualified Data.Text as T import Data.Typeable (Typeable) @@ -125,7 +127,9 @@ import Database.MongoDB.Internal.Protocol ServerData (..), UpdateOption (..), Username, + Cmd (..), pwKey, + FlagBit (..) ) import qualified Database.MongoDB.Internal.Protocol as P import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>)) @@ -343,11 +347,13 @@ parseSCRAM :: B.ByteString -> Map.Map B.ByteString B.ByteString parseSCRAM = Map.fromList . fmap (cleanup . T.breakOn "=") . T.splitOn "," . T.pack . B.unpack where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2) +-- As long as server api is not requested OP_Query has to be used. See: +-- https://github.com/mongodb/specifications/blob/6dc6f80026f0f8d99a8c81f996389534b14f6602/source/mongodb-handshake/handshake.rst#specification retrieveServerData :: (MonadIO m) => Action m ServerData retrieveServerData = do d <- runCommand1 "isMaster" let newSd = ServerData - { isMaster = fromMaybe False $ lookup "ismaster" d + { isMaster = fromMaybe False $ lookup "isMaster" d , minWireVersion = fromMaybe 0 $ lookup "minWireVersion" d , maxWireVersion = fromMaybe 0 $ lookup "maxWireVersion" d , maxMessageSizeBytes = fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d @@ -371,19 +377,29 @@ allCollections = do db <- thisDatabase docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]} (return . filter (not . isSpecial db)) (map (dropDbPrefix . at "name") docs) - else do - r <- runCommand1 "listCollections" - let curData = do - (Doc curDoc) <- r !? "cursor" - (curId :: Int64) <- curDoc !? "id" - (curNs :: Text) <- curDoc !? "ns" - (firstBatch :: [Value]) <- curDoc !? "firstBatch" - return (curId, curNs, mapMaybe cast' firstBatch :: [Document]) - case curData of - Nothing -> return [] - Just (curId, curNs, firstBatch) -> do + else + if maxWireVersion sd < 17 + then do + r <- runCommand1 "listCollections" + let curData = do + (Doc curDoc) <- r !? "cursor" + (curId :: Int64) <- curDoc !? "id" + (curNs :: Text) <- curDoc !? "ns" + (firstBatch :: [Value]) <- curDoc !? "firstBatch" + return (curId, curNs, mapMaybe cast' firstBatch :: [Document]) + case curData of + Nothing -> return [] + Just (curId, curNs, firstBatch) -> do + db <- thisDatabase + nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch + docs <- rest nc + return $ mapMaybe (\d -> d !? "name") docs + else do + let q = Query [] (Select ["listCollections" =: (1 :: Int)] "$cmd") [] 0 0 [] False 0 [] + qr <- queryRequestOpMsg False q + dBatch <- liftIO $ requestOpMsg p qr [] db <- thisDatabase - nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch + nc <- newCursor db "$cmd" 0 dBatch docs <- rest nc return $ mapMaybe (\d -> d !? "name") docs where @@ -493,7 +509,7 @@ insert' opts col docs = do docs' <- liftIO $ mapM assignId docs mode <- asks mongoWriteMode let writeConcern = case mode of - NoConfirm -> ["w" =: (0 :: Int)] + NoConfirm -> ["w" =: (0 :: Int32)] Confirm params -> params let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern let ordered = KeepGoing `notElem` opts @@ -544,11 +560,40 @@ insertBlock opts col (prevCount, docs) = do case errorMessage of Just failure -> return $ Left failure Nothing -> return $ Right $ map (valueAt "_id") docs + else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + mode <- asks mongoWriteMode + let writeConcern = case mode of + NoConfirm -> ["w" =: (0 :: Int32)] + Confirm params -> params + doc <- runCommand $ insertCommandDocument opts col docs writeConcern + case (look "writeErrors" doc, look "writeConcernError" doc) of + (Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs + (Just (Array errs), Nothing) -> do + let writeErrors = map (anyToWriteError prevCount) errs + let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors + return $ Left $ CompoundFailure errorsWithFailureIndex + (Nothing, Just err) -> do + return $ Left $ WriteFailure + prevCount + (fromMaybe 0 $ lookup "ok" doc) + (show err) + (Just (Array errs), Just writeConcernErr) -> do + let writeErrors = map (anyToWriteError prevCount) errs + let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors + return $ Left $ CompoundFailure $ WriteFailure + prevCount + (fromMaybe 0 $ lookup "ok" doc) + (show writeConcernErr) : errorsWithFailureIndex + (Just unknownValue, Nothing) -> do + return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue + (Just unknownValue, Just writeConcernErr) -> do + return $ Left $ CompoundFailure [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue + , WriteFailure prevCount (fromMaybe 0 $ lookup "ok" doc) $ show writeConcernErr] else do mode <- asks mongoWriteMode let writeConcern = case mode of - NoConfirm -> ["w" =: (0 :: Int)] - Confirm params -> params + NoConfirm -> ["w" =: (0 :: Int32)] + Confirm params -> merge params ["w" =: (1 :: Int32)] doc <- runCommand $ insertCommandDocument opts col docs writeConcern case (look "writeErrors" doc, look "writeConcernError" doc) of (Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs @@ -643,9 +688,20 @@ update :: (MonadIO m) => [UpdateOption] -> Selection -> Document -> Action m () -- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty. update opts (Select sel col) up = do + pipe <- asks mongoPipe db <- thisDatabase - ctx <- ask - liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then do + ctx <- ask + liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx + else do + liftIOE ConnectionFailure $ + P.sendOpMsg + pipe + [Nc (Update (db <.> col) opts sel up)] + (Just P.MoreToCome) + ["writeConcern" =: ["w" =: (0 :: Int32)]] updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document updateCommandDocument col ordered updates writeConcern = @@ -700,7 +756,7 @@ update' ordered col updateDocs = do ctx <- ask liftIO $ do let writeConcern = case mode of - NoConfirm -> ["w" =: (0 :: Int)] + NoConfirm -> ["w" =: (0 :: Int32)] Confirm params -> params let docSize = sizeOfDocument $ updateCommandDocument col @@ -752,10 +808,10 @@ updateBlock ordered col (prevCount, docs) = do let sd = P.serverData p if maxWireVersion sd < 2 then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6" - else do + else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do mode <- asks mongoWriteMode let writeConcern = case mode of - NoConfirm -> ["w" =: (0 :: Int)] + NoConfirm -> ["w" =: (0 :: Int32)] Confirm params -> params doc <- runCommand $ updateCommandDocument col ordered docs writeConcern @@ -805,7 +861,59 @@ updateBlock ordered col (prevCount, docs) = do let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted") let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] [] return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults] + else do + mode <- asks mongoWriteMode + let writeConcern = case mode of + NoConfirm -> ["w" =: (0 :: Int32)] + Confirm params -> merge params ["w" =: (1 :: Int32)] + doc <- runCommand $ updateCommandDocument col ordered docs writeConcern + let n = fromMaybe 0 $ doc !? "n" + let writeErrorsResults = + case look "writeErrors" doc of + Nothing -> WriteResult False 0 (Just 0) 0 [] [] [] + Just (Array err) -> WriteResult True 0 (Just 0) 0 [] (map (anyToWriteError prevCount) err) [] + Just unknownErr -> WriteResult + True + 0 + (Just 0) + 0 + [] + [ ProtocolFailure + prevCount + $ "Expected array of error docs, but received: " + ++ show unknownErr] + [] + + let writeConcernResults = + case look "writeConcernError" doc of + Nothing -> WriteResult False 0 (Just 0) 0 [] [] [] + Just (Doc err) -> WriteResult + True + 0 + (Just 0) + 0 + [] + [] + [ WriteConcernFailure + (fromMaybe (-1) $ err !? "code") + (fromMaybe "" $ err !? "errmsg") + ] + Just unknownErr -> WriteResult + True + 0 + (Just 0) + 0 + [] + [] + [ ProtocolFailure + prevCount + $ "Expected doc in writeConcernError, but received: " + ++ show unknownErr] + + let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted") + let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] [] + return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults] interruptibleFor :: (Monad m, Result b) => Bool -> [a] -> (a -> m b) -> m [b] interruptibleFor ordered = go [] @@ -853,18 +961,41 @@ docToWriteError doc = WriteFailure ind code msg delete :: (MonadIO m) => Selection -> Action m () -- ^ Delete all documents in selection -delete = deleteHelper [] +delete s = do + pipe <- asks mongoPipe + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then deleteHelper [] s + else deleteMany (coll s) [([], [])] >> return () deleteOne :: (MonadIO m) => Selection -> Action m () -- ^ Delete first document in selection -deleteOne = deleteHelper [SingleRemove] +deleteOne sel@((Select sel' col)) = do + pipe <- asks mongoPipe + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then deleteHelper [SingleRemove] sel + else do + -- Starting with v6 confirming writes via getLastError as it is + -- performed in the deleteHelper call via its call to write is + -- deprecated. To confirm writes now an appropriate writeConcern has to be + -- set. These confirmations were discarded in deleteHelper anyway so no + -- need to dispatch on the writeConcern as it is currently done in deleteHelper + -- via write for older versions + db <- thisDatabase + liftIOE ConnectionFailure $ + P.sendOpMsg + pipe + [Nc (Delete (db <.> col) [] sel')] + (Just P.MoreToCome) + ["writeConcern" =: ["w" =: (0 :: Int32)]] deleteHelper :: (MonadIO m) => [DeleteOption] -> Selection -> Action m () deleteHelper opts (Select sel col) = do - db <- thisDatabase ctx <- ask + db <- thisDatabase liftIO $ runReaderT (void $ write (Delete (db <.> col) opts sel)) ctx {-| Bulk delete operation. If one delete fails it will not delete the remaining @@ -914,7 +1045,7 @@ delete' ordered col deleteDocs = do mode <- asks mongoWriteMode let writeConcern = case mode of - NoConfirm -> ["w" =: (0 :: Int)] + NoConfirm -> ["w" =: (0 :: Int32)] Confirm params -> params let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern let chunks = splitAtLimit @@ -947,11 +1078,61 @@ deleteBlock ordered col (prevCount, docs) = do let sd = P.serverData p if maxWireVersion sd < 2 then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6" + else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do + mode <- asks mongoWriteMode + let writeConcern = case mode of + NoConfirm -> ["w" =: (0 :: Int32)] + Confirm params -> params + doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern + let n = fromMaybe 0 $ doc !? "n" + + let successResults = WriteResult False 0 Nothing n [] [] [] + let writeErrorsResults = + case look "writeErrors" doc of + Nothing -> WriteResult False 0 Nothing 0 [] [] [] + Just (Array err) -> WriteResult True 0 Nothing 0 [] (map (anyToWriteError prevCount) err) [] + Just unknownErr -> WriteResult + True + 0 + Nothing + 0 + [] + [ ProtocolFailure + prevCount + $ "Expected array of error docs, but received: " + ++ show unknownErr] + [] + let writeConcernResults = + case look "writeConcernError" doc of + Nothing -> WriteResult False 0 Nothing 0 [] [] [] + Just (Doc err) -> WriteResult + True + 0 + Nothing + 0 + [] + [] + [ WriteConcernFailure + (fromMaybe (-1) $ err !? "code") + (fromMaybe "" $ err !? "errmsg") + ] + Just unknownErr -> WriteResult + True + 0 + Nothing + 0 + [] + [] + [ ProtocolFailure + prevCount + $ "Expected doc in writeConcernError, but received: " + ++ show unknownErr] + return $ foldl1' mergeWriteResults [successResults, writeErrorsResults, writeConcernResults] else do mode <- asks mongoWriteMode let writeConcern = case mode of - NoConfirm -> ["w" =: (0 :: Int)] - Confirm params -> params + NoConfirm -> ["w" =: (0 :: Int32)] + Confirm params -> merge params ["w" =: (1 :: Int32)] doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern let n = fromMaybe 0 $ doc !? "n" @@ -1040,6 +1221,37 @@ type Order = Document type BatchSize = Word32 -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. +-- noticeCommands and adminCommands are needed to identify whether +-- queryRequestOpMsg is called via runCommand or not. If not it will +-- behave like being called by a "find"-like command and add additional fields +-- specific to the find command into the selector, such as "filter", "projection" etc. +noticeCommands :: [Text] +noticeCommands = [ "aggregate" + , "count" + , "delete" + , "findAndModify" + , "insert" + , "listCollections" + , "update" + ] + +adminCommands :: [Text] +adminCommands = [ "buildinfo" + , "clone" + , "collstats" + , "copydb" + , "copydbgetnonce" + , "create" + , "dbstats" + , "deleteIndexes" + , "drop" + , "dropDatabase" + , "renameCollection" + , "repairDatabase" + , "serverStatus" + , "validate" + ] + query :: Selector -> Collection -> Query -- ^ Selects documents in collection that match selector. It uses no query options, projects all fields, does not skip any documents, does not limit result size, uses default batch size, does not sort, does not hint, and does not snapshot. query sel col = Query [] (Select sel col) [] 0 0 [] False 0 [] @@ -1047,32 +1259,49 @@ query sel col = Query [] (Select sel col) [] 0 0 [] False 0 [] find :: MonadIO m => Query -> Action m Cursor -- ^ Fetch documents satisfying query find q@Query{selection, batchSize} = do - db <- thisDatabase pipe <- asks mongoPipe - qr <- queryRequest False q - dBatch <- liftIO $ request pipe [] qr - newCursor db (coll selection) batchSize dBatch + db <- thisDatabase + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then do + qr <- queryRequest False q + dBatch <- liftIO $ request pipe [] qr + newCursor db (coll selection) batchSize dBatch + else do + qr <- queryRequestOpMsg False q + let newQr = + case fst qr of + Req qry -> + let coll = last $ T.splitOn "." (qFullCollection qry) + in (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr) + -- queryRequestOpMsg only returns Cmd types constructed via Req + _ -> error "impossible" + dBatch <- liftIO $ requestOpMsg pipe newQr [] + newCursor db (coll selection) batchSize dBatch findCommand :: (MonadIO m, MonadFail m) => Query -> Action m Cursor -- ^ Fetch documents satisfying query using the command "find" -findCommand Query{..} = do - let aColl = coll selection - response <- runCommand $ - [ "find" =: aColl - , "filter" =: selector selection - , "sort" =: sort - , "projection" =: project - , "hint" =: hint - , "skip" =: toInt32 skip - ] - ++ mconcat -- optional fields. They should not be present if set to 0 and mongo will use defaults - [ "batchSize" =? toMaybe (/= 0) toInt32 batchSize - , "limit" =? toMaybe (/= 0) toInt32 limit - ] - - getCursorFromResponse aColl response - >>= either (liftIO . throwIO . QueryFailure (at "code" response)) return - +findCommand q@Query{..} = do + pipe <- asks mongoPipe + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then do + let aColl = coll selection + response <- runCommand $ + [ "find" =: aColl + , "filter" =: selector selection + , "sort" =: sort + , "projection" =: project + , "hint" =: hint + , "skip" =: toInt32 skip + ] + ++ mconcat -- optional fields. They should not be present if set to 0 and mongo will use defaults + [ "batchSize" =? toMaybe (/= 0) toInt32 batchSize + , "limit" =? toMaybe (/= 0) toInt32 limit + ] + getCursorFromResponse aColl response + >>= either (liftIO . throwIO . QueryFailure (at "code" response)) return + else find q where toInt32 :: Integral a => a -> Int32 toInt32 = fromIntegral @@ -1086,10 +1315,35 @@ findOne :: (MonadIO m) => Query -> Action m (Maybe Document) -- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it findOne q = do pipe <- asks mongoPipe - qr <- queryRequest False q {limit = 1} - rq <- liftIO $ request pipe [] qr - Batch _ _ docs <- liftDB $ fulfill rq - return (listToMaybe docs) + let legacyQuery = do + qr <- queryRequest False q {limit = 1} + rq <- liftIO $ request pipe [] qr + Batch _ _ docs <- liftDB $ fulfill rq + return (listToMaybe docs) + isHandshake = (== ["isMaster" =: (1 :: Int32)]) $ selector $ selection q :: Bool + if isHandshake + then legacyQuery + else do + let sd = P.serverData pipe + if (maxWireVersion sd < 17) + then legacyQuery + else do + qr <- queryRequestOpMsg False q {limit = 1} + let newQr = + case fst qr of + Req qry -> + let coll = last $ T.splitOn "." (qFullCollection qry) + -- We have to understand whether findOne is called as + -- command directly. This is necessary since findOne is used via + -- runCommand as a vehicle to execute any type of commands and notices. + labels = catMaybes $ map (\f -> look f $ qSelector qry) (noticeCommands ++ adminCommands) :: [Value] + in if null labels + then (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr) + else qr + _ -> error "impossible" + rq <- liftIO $ requestOpMsg pipe newQr [] + Batch _ _ docs <- liftDB $ fulfill rq + return (listToMaybe docs) fetch :: (MonadIO m) => Query -> Action m Document -- ^ Same as 'findOne' except throw 'DocNotFound' if none match @@ -1194,7 +1448,7 @@ distinct :: (MonadIO m) => Label -> Selection -> Action m [Value] -- ^ Fetch distinct values of field in selected documents distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel] -queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit) +queryRequest :: (Monad m, MonadIO m) => Bool -> Query -> Action m (Request, Maybe Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. queryRequest isExplain Query{..} = do ctx <- ask @@ -1213,6 +1467,33 @@ queryRequest isExplain Query{..} = do special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection +queryRequestOpMsg :: (Monad m, MonadIO m) => Bool -> Query -> Action m (Cmd, Maybe Limit) +-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. +queryRequestOpMsg isExplain Query{..} = do + ctx <- ask + return $ queryRequest' (mongoReadMode ctx) (mongoDatabase ctx) + where + queryRequest' rm db = (Req P.Query{..}, remainingLimit) where + qOptions = readModeOption rm ++ options + qFullCollection = db <.> coll selection + qSkip = fromIntegral skip + (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit) + -- Check whether this query is not a command in disguise. If + -- isNotCommand is true, then we treat this as a find command and add + -- the relevant fields to the selector + isNotCommand = null $ catMaybes $ map (\l -> look l (selector selection)) (noticeCommands ++ adminCommands) + mOrder = if null sort then Nothing else Just ("sort" =: sort) + mSnapshot = if snapshot then Just ("snapshot" =: True) else Nothing + mHint = if null hint then Nothing else Just ("hint" =: hint) + mExplain = if isExplain then Just ("$explain" =: True) else Nothing + special = catMaybes [mOrder, mSnapshot, mHint, mExplain] + qProjector = if isNotCommand then ["projection" =: project] else project + qSelector = if isNotCommand then c else s + where s = selector selection + bSize = if qBatchSize == 0 then Nothing else Just ("batchSize" =: qBatchSize) + mLimit = if limit == 0 then Nothing else maybe Nothing (\rL -> Just ("limit" =: (fromIntegral rL :: Int32))) remainingLimit + c = ("filter" =: s) : special ++ maybeToList bSize ++ maybeToList mLimit + batchSizeRemainingLimit :: BatchSize -> Maybe Limit -> (Int32, Maybe Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit batchSizeRemainingLimit batchSize mLimit = @@ -1238,6 +1519,14 @@ request pipe ns (req, remainingLimit) = do let protectedPromise = liftIOE ConnectionFailure promise return $ fromReply remainingLimit =<< protectedPromise +requestOpMsg :: Pipe -> (Cmd, Maybe Limit) -> Document -> IO DelayedBatch +-- ^ Send notices and request and return promised batch +requestOpMsg pipe (Req r, remainingLimit) params = do + promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params + let protectedPromise = liftIOE ConnectionFailure promise + return $ fromReply remainingLimit =<< protectedPromise +requestOpMsg _ (Nc _, _) _ = error "requestOpMsg: Only messages of type Query are supported" + fromReply :: Maybe Limit -> Reply -> DelayedBatch -- ^ Convert Reply to Batch or Failure fromReply limit Reply{..} = do @@ -1249,6 +1538,23 @@ fromReply limit Reply{..} = do AwaitCapable -> return () CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments) +fromReply limit ReplyOpMsg{..} = do + let section = head sections + cur = maybe Nothing cast $ look "cursor" section + case cur of + Nothing -> return (Batch limit 0 sections) + Just doc -> + case look "firstBatch" doc of + Just ar -> do + let docs = fromJust $ cast ar + id' = fromJust $ cast $ valueAt "id" doc + return (Batch limit id' docs) + -- A cursor without a firstBatch field, should be a reply to a + -- getMore query and thus have a nextBatch key + Nothing -> do + let docs = fromJust $ cast $ valueAt "nextBatch" doc + id' = fromJust $ cast $ valueAt "id" doc + return (Batch limit id' docs) fulfill :: DelayedBatch -> Action IO Batch -- ^ Demand and wait for result, raise failure if exception @@ -1297,7 +1603,10 @@ fulfill' fcol batchSize dBatch = do nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Maybe Limit -> CursorId -> Action m DelayedBatch nextBatch' fcol batchSize limit cid = do pipe <- asks mongoPipe - liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit) + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit) + else liftIO $ requestOpMsg pipe (Req $ GetMore fcol batchSize' cid, remLimit) [] where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit next :: MonadIO m => Cursor -> Action m (Maybe Document) @@ -1320,7 +1629,10 @@ next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where else return $ return (Batch newLimit cid docs') when (newLimit == Just 0) $ unless (cid == 0) $ do pipe <- asks mongoPipe - liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] + else liftIOE ConnectionFailure $ P.sendOpMsg pipe [Kc (P.KillC (KillCursors [cid]) fcol)] (Just MoreToCome) [] return (dBatch', Just doc) [] -> if cid == 0 then return (return $ Batch (Just 0) 0 [], Nothing) -- finished @@ -1380,9 +1692,20 @@ aggregateCommand aColl agg AggregateConfig {..} = aggregateCursor :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> AggregateConfig -> Action m Cursor -- ^ Runs an aggregate and unpacks the result. See for details. aggregateCursor aColl agg cfg = do - response <- runCommand (aggregateCommand aColl agg cfg) - getCursorFromResponse aColl response - >>= either (liftIO . throwIO . AggregateFailure) return + pipe <- asks mongoPipe + let sd = P.serverData pipe + if maxWireVersion sd < 17 + then do + response <- runCommand (aggregateCommand aColl agg cfg) + getCursorFromResponse aColl response + >>= either (liftIO . throwIO . AggregateFailure) return + else do + let q = select (aggregateCommand aColl agg cfg) aColl + qr <- queryRequestOpMsg False q + dBatch <- liftIO $ requestOpMsg pipe qr [] + db <- thisDatabase + Right <$> newCursor db aColl 0 dBatch + >>= either (liftIO . throwIO . AggregateFailure) return getCursorFromResponse :: (MonadIO m, MonadFail m) @@ -1527,7 +1850,12 @@ runCommand1 c = runCommand [c =: (1 :: Int)] eval :: (MonadIO m, Val v) => Javascript -> Action m v -- ^ Run code on server -eval code = at "retval" <$> runCommand ["$eval" =: code] +eval code = do + p <- asks mongoPipe + let sd = P.serverData p + if maxWireVersion sd <= 7 + then at "retval" <$> runCommand ["$eval" =: code] + else error "The command db.eval() has been removed since MongoDB 4.2" modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b modifyMVar v f = do diff --git a/dist-newstyle/cache/config b/dist-newstyle/cache/config deleted file mode 100644 index feaef74ba0a122eb212c47d91eea6c588bfe4f45..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 11362 zcmeHN&97TW75C?Rd45SlDQZP3&}|o}De?WXeQj4&l_oDuBbuaDUSWym&djxMbA7M5 zcifi;NJZ=s-LOUMD!~p3vEdJ3fy4^IiY*drSXABg%=Mh{wf$_rBrhR=z3`lw`JFT8 zoH_Gx#{T5z#NKEq}p)U7lu<^O|=TlBpoP7eJI5E4FsaBH4F$TTS#H4 zwMW&j2E5gBI=1WCUe9&|PN>u29jDFuexH+GE8x86dqKd1uG4CD+TA{Jxz}U8HeuBB zZ6*1t!%5#KEths&k2#d~9pZx^pS8R;rybtsJ?eX12l(4vr`L4@kGo!A6W;B%SldN8 zF7;^3bGttCXus$7`?lNaw|m_H%y;Y#>Di9U+U+*$b#2;0IX3Cg zE{Cxwb9|>0*iO(1Iu2>I2xFwny-vrcc3}Iq=X88Rdfe?gL7Re&4tE`faykyMdXC?> zJ=<~o4(R8BBf6gN5Z;HfYct~YTdwOkE#kI4V)uid-K8E_*moTzr>du~5NWC}-wCk) z;;*2!tTzWq;|Hk~0jgk#LM~lm$9i_T64VDdp^D^UiBJtGAahc8luEVoKC;95cme5} zT-I2r%rMh-5K<6{uubJmEkqXdz$Pqb;FBrDttho~cGbRHDpgD8N|jOt*Vs}=c@J!= z&KZVA?!I^H`i(pHZm7Xty`GGZ({L~p#%uJl;o6SfgfG`{>;aFBTQM~i^0n`VR3xc! zz~eAWJ~rNDQ-a3?ny(Hr&UVOfJv z3@5%6fd_PR;lM@~RXV5Vw8AW#=xtg?zxL{*jeqSR4L{~7I4uf!EG`=XxaRisPIAQ{ zhP)D7F*3MO5I+JcAep#gz$QK!0laZ=?e;xo#a|DHVP>d<#E7`ajN@ctz}X+et;7%s ziZwEx!jPFkV%C_984=w2!Gx#Bh8XfNHK2i*#`_T`83!zv%?5u!M&pPZVca-?ejoZw zrKXgVW5fg|FeDKuos9Apd1o5Kxn>&gBxxgz!MG6-VG^RCbMwY~cW;^_+)$U~Y4G=xh&5v_j*|4g zLfyNb#3D^1<7mjiU8!{U1l;##ej>uZ8i#VxxBp83);${fQe87URr|OOhfh##@4d@4`je7=1WV=hbf7(agqv!_~vSa zc^G91rHg{)y6xE@s!EMlmJGTT3!cU#GVja<#;dD=@281KXoB~vWglsgABqrB+6{W& z(JfDjz-3Txevd>;YCWP7hT^M}G$qH7bm1t|c3rE zM`& zg8^r2*hM5Kx&lM%L1$D=grhb7_$q^QwaIuuCXr~;h-7Pm<~#&KoXO*KZEQVXgw4ED zKgov;85wZ0M|^5SrEu}n);upg?0C(E=!ZLG3D2l`tk<5$%84oK;Wf4%Coz@Ik^Qs9 zt@~>M+zQt?p2cgN)sTm9Qyh86I~+`3jD zW-uf9x+p5H`y28RE&r)>O-{5S3Ez-W;kf^Q7NeKzK9A`T{Q%|sLO*{^c8^GU{~28E z1+C}F|KO6HSLD%amlW$`CYgCQzL;U-+)szlwH29r(FTn@!)LUIdezvl+|TaK?F?SZ zHoaJEcuIPD?`-m47H9W{cIGciPyS_RkES8K&BKo!YtFK@FL$^|w#ybhAxphoY?rNe z#m<>Mk8hXlV=vnc4=GFch(bsg<9th>y$5i?V4A%-Ykmk0gwFn(F*88-&6>8O6RZ$y?^`VubJnRWmG