Compare commits

..

4 commits

Author SHA1 Message Date
Fumiaki Kinoshita
46643fd8ad Make runCommand compatible with MongoDB 6.0 2023-05-02 16:56:38 +09:00
Pierre Mizrahi
995087e9a0 repair single document upserts when using OP_MSG
780df80cfc introduces support for the
OP_MSG protocol. Unfortunately, the upsert and multi options of the
update command still use flagBits to communicate the options, whereas
they must be provided directly into the command document,
alongside the "q" and "v" fields.

This commit:
 - introduces a test for a single-document upsert that, if isolated,
   succeeds against the reference MongoDB 3.6 container, but fails
   against an official 6.0 image.
 - provides a patch that sets the appropriate options.

The test is not perfect as the upsert operation is inherently racy and
this difficult to test. A comfortable threadDelay has been inserted as
a workaround to accomodate for medium workloads.
2023-02-13 14:05:56 +01:00
Victor Denisov
fb0d140aa4 Get rid of MonadFail constraints in MongoDB.Query
PR #141
2023-02-04 21:50:34 -08:00
Victor Denisov
6f1d842641 Add missing dependencies to benchmarks 2023-02-04 21:49:53 -08:00
4 changed files with 72 additions and 15 deletions

View file

@ -494,6 +494,9 @@ data FlagBit =
| ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit. | ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
deriving (Show, Eq, Enum) deriving (Show, Eq, Enum)
uOptDoc :: UpdateOption -> Document
uOptDoc Upsert = ["upsert" =: True]
uOptDoc MultiUpdate = ["multi" =: True]
{- {-
OP_MSG header == 16 byte OP_MSG header == 16 byte
@ -528,7 +531,7 @@ putOpMsg cmd requestId flagBit params = do
putCString "documents" -- identifier putCString "documents" -- identifier
mapM_ putDocument iDocuments -- payload mapM_ putDocument iDocuments -- payload
Update{..} -> do Update{..} -> do
let doc = ["q" =: uSelector, "u" =: uUpdater] let doc = ["q" =: uSelector, "u" =: uUpdater] <> concatMap uOptDoc uOptions
(sec0, sec1Size) = (sec0, sec1Size) =
prepSectionInfo prepSectionInfo
uFullCollection uFullCollection
@ -578,6 +581,10 @@ putOpMsg cmd requestId flagBit params = do
putInt32 (bit $ bitOpMsg $ ExhaustAllowed) putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
putInt8 0 putInt8 0
putDocument pre putDocument pre
Message{..} -> do
putInt32 biT
putInt8 0
putDocument $ merge [ "$db" =: mDatabase ] mParams
Kc k -> case k of Kc k -> case k of
KillC{..} -> do KillC{..} -> do
let n = T.splitOn "." kFullCollection let n = T.splitOn "." kFullCollection
@ -653,7 +660,11 @@ data Request =
} | GetMore { } | GetMore {
gFullCollection :: FullCollection, gFullCollection :: FullCollection,
gBatchSize :: Int32, gBatchSize :: Int32,
gCursorId :: CursorId} gCursorId :: CursorId
} | Message {
mDatabase :: Text,
mParams :: Document
}
deriving (Show, Eq) deriving (Show, Eq)
data QueryOption = data QueryOption =
@ -673,6 +684,7 @@ data QueryOption =
qOpcode :: Request -> Opcode qOpcode :: Request -> Opcode
qOpcode Query{} = 2004 qOpcode Query{} = 2004
qOpcode GetMore{} = 2005 qOpcode GetMore{} = 2005
qOpcode Message{} = 2013
opMsgOpcode :: Opcode opMsgOpcode :: Opcode
opMsgOpcode = 2013 opMsgOpcode = 2013
@ -693,6 +705,10 @@ putRequest request requestId = do
putCString gFullCollection putCString gFullCollection
putInt32 gBatchSize putInt32 gBatchSize
putInt64 gCursorId putInt64 gCursorId
Message{..} -> do
putInt32 0
putInt8 0
putDocument $ merge [ "$db" =: mDatabase ] mParams
qBit :: QueryOption -> Int32 qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1 qBit TailableCursor = bit 1

View file

@ -1272,9 +1272,9 @@ find q@Query{selection, batchSize} = do
qr <- queryRequestOpMsg False q qr <- queryRequestOpMsg False q
let newQr = let newQr =
case fst qr of case fst qr of
Req qry -> Req P.Query{..} ->
let coll = last $ T.splitOn "." (qFullCollection qry) let coll = last $ T.splitOn "." qFullCollection
in (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr) in (Req $ P.Query {qSelector = merge qSelector [ "find" =: coll ], ..}, snd qr)
-- queryRequestOpMsg only returns Cmd types constructed via Req -- queryRequestOpMsg only returns Cmd types constructed via Req
_ -> error "impossible" _ -> error "impossible"
dBatch <- liftIO $ requestOpMsg pipe newQr [] dBatch <- liftIO $ requestOpMsg pipe newQr []
@ -1312,6 +1312,9 @@ findCommand q@Query{..} = do
| predicate a = Just (f a) | predicate a = Just (f a)
| otherwise = Nothing | otherwise = Nothing
isHandshake :: Document -> Bool
isHandshake = (== ["isMaster" =: (1 :: Int32)])
findOne :: (MonadIO m) => Query -> Action m (Maybe Document) findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
-- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it -- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it
findOne q = do findOne q = do
@ -1321,8 +1324,7 @@ findOne q = do
rq <- liftIO $ request pipe [] qr rq <- liftIO $ request pipe [] qr
Batch _ _ docs <- liftDB $ fulfill rq Batch _ _ docs <- liftDB $ fulfill rq
return (listToMaybe docs) return (listToMaybe docs)
isHandshake = (== ["isMaster" =: (1 :: Int32)]) $ selector $ selection q :: Bool if isHandshake (selector $ selection q)
if isHandshake
then legacyQuery then legacyQuery
else do else do
let sd = P.serverData pipe let sd = P.serverData pipe
@ -1332,14 +1334,14 @@ findOne q = do
qr <- queryRequestOpMsg False q {limit = 1} qr <- queryRequestOpMsg False q {limit = 1}
let newQr = let newQr =
case fst qr of case fst qr of
Req qry -> Req P.Query{..} ->
let coll = last $ T.splitOn "." (qFullCollection qry) let coll = last $ T.splitOn "." qFullCollection
-- We have to understand whether findOne is called as -- We have to understand whether findOne is called as
-- command directly. This is necessary since findOne is used via -- command directly. This is necessary since findOne is used via
-- runCommand as a vehicle to execute any type of commands and notices. -- runCommand as a vehicle to execute any type of commands and notices.
labels = catMaybes $ map (\f -> look f $ qSelector qry) (noticeCommands ++ adminCommands) :: [Value] labels = catMaybes $ map (\f -> look f qSelector) (noticeCommands ++ adminCommands) :: [Value]
in if null labels in if null labels
then (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr) then (Req P.Query {qSelector = merge qSelector [ "find" =: coll ], ..}, snd qr)
else qr else qr
_ -> error "impossible" _ -> error "impossible"
rq <- liftIO $ requestOpMsg pipe newQr [] rq <- liftIO $ requestOpMsg pipe newQr []
@ -1526,7 +1528,7 @@ requestOpMsg pipe (Req r, remainingLimit) params = do
promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params
let protectedPromise = liftIOE ConnectionFailure promise let protectedPromise = liftIOE ConnectionFailure promise
return $ fromReply remainingLimit =<< protectedPromise return $ fromReply remainingLimit =<< protectedPromise
requestOpMsg _ (Nc _, _) _ = error "requestOpMsg: Only messages of type Query are supported" requestOpMsg _ _ _ = error "requestOpMsg: Only messages of type Query are supported"
fromReply :: Maybe Limit -> Reply -> DelayedBatch fromReply :: Maybe Limit -> Reply -> DelayedBatch
-- ^ Convert Reply to Batch or Failure -- ^ Convert Reply to Batch or Failure
@ -1844,9 +1846,29 @@ type Command = Document
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details. -- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
runCommand :: (MonadIO m) => Command -> Action m Document runCommand :: (MonadIO m) => Command -> Action m Document
-- ^ Run command against the database and return its result runCommand params = do
runCommand c = fromMaybe err <$> findOne (query c "$cmd") where pipe <- asks mongoPipe
err = error $ "Nothing returned for command: " ++ show c if isHandshake params || maxWireVersion (P.serverData pipe) < 17
then runCommandLegacy pipe params
else runCommand' pipe params
runCommandLegacy :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
runCommandLegacy pipe params = do
qr <- queryRequest False (query params "$cmd") {limit = 1}
rq <- liftIO $ request pipe [] qr
Batch _ _ docs <- liftDB $ fulfill rq
case docs of
[doc] -> pure doc
_ -> error $ "Nothing returned for command: " <> show params
runCommand' :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
runCommand' pipe params = do
ctx <- ask
rq <- liftIO $ requestOpMsg pipe ( Req (P.Message (mongoDatabase ctx) params), Just 1) []
Batch _ _ docs <- liftDB $ fulfill rq
case docs of
[doc] -> pure doc
_ -> error $ "Nothing returned for command: " <> show params
runCommand1 :: (MonadIO m) => Text -> Action m Document runCommand1 :: (MonadIO m) => Text -> Action m Document
-- ^ @runCommand1 foo = runCommand [foo =: 1]@ -- ^ @runCommand1 foo = runCommand [foo =: 1]@

View file

@ -115,6 +115,8 @@ Benchmark bench
, base16-bytestring , base16-bytestring
, binary -any , binary -any
, bson >= 0.3 && < 0.5 , bson >= 0.3 && < 0.5
, conduit
, conduit-extra
, data-default-class -any , data-default-class -any
, text , text
, bytestring -any , bytestring -any
@ -128,6 +130,7 @@ Benchmark bench
, random-shuffle -any , random-shuffle -any
, monad-control >= 0.3.1 , monad-control >= 0.3.1
, lifted-base >= 0.1.0.3 , lifted-base >= 0.1.0.3
, transformers
, transformers-base >= 0.4.1 , transformers-base >= 0.4.1
, hashtables >= 1.1.2.0 , hashtables >= 1.1.2.0
, fail , fail

View file

@ -4,6 +4,7 @@
module QuerySpec (spec) where module QuerySpec (spec) where
import Data.String (IsString(..)) import Data.String (IsString(..))
import TestImport import TestImport
import Control.Concurrent (threadDelay)
import Control.Exception import Control.Exception
import Control.Monad (forM_, when) import Control.Monad (forM_, when)
import System.Environment (getEnv) import System.Environment (getEnv)
@ -87,6 +88,21 @@ spec = around withCleanDatabase $ do
db (count $ select ["name" =: "Yankees", "league" =: "American"] "team") `shouldReturn` 1 db (count $ select ["name" =: "Yankees", "league" =: "American"] "team") `shouldReturn` 1
_id `shouldBe` () _id `shouldBe` ()
describe "upsert" $ do
it "upserts a document twice with the same spec" $ do
let q = select ["name" =: "jack"] "users"
db $ upsert q ["color" =: "blue", "name" =: "jack"]
-- since there is no way to ask for a ack, we must wait for "a sufficient time"
-- for the write to be visible
threadDelay 10000
db (rest =<< find (select [] "users")) >>= print
db (count $ select ["name" =: "jack"] "users") `shouldReturn` 1
db $ upsert q ["color" =: "red", "name" =: "jack"]
threadDelay 10000
db (count $ select ["name" =: "jack"] "users") `shouldReturn` 1
Just doc <- db $ findOne (select ["name" =: "jack"] "users")
doc !? "color" `shouldBe` Just "red"
describe "insertMany" $ do describe "insertMany" $ do
it "inserts documents to the collection and returns their _ids" $ do it "inserts documents to the collection and returns their _ids" $ do
(_id1:_id2:_) <- db $ insertMany "team" [ ["name" =: "Yankees", "league" =: "American"] (_id1:_id2:_) <- db $ insertMany "team" [ ["name" =: "Yankees", "league" =: "American"]