Compare commits
3 commits
22537d87ee
...
63bba3a6d3
Author | SHA1 | Date | |
---|---|---|---|
63bba3a6d3 | |||
0afaf89e1d | |||
|
46643fd8ad |
2 changed files with 49 additions and 14 deletions
|
@ -581,6 +581,10 @@ putOpMsg cmd requestId flagBit params = do
|
|||
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
|
||||
putInt8 0
|
||||
putDocument pre
|
||||
Message{..} -> do
|
||||
putInt32 biT
|
||||
putInt8 0
|
||||
putDocument $ merge [ "$db" =: mDatabase ] mParams
|
||||
Kc k -> case k of
|
||||
KillC{..} -> do
|
||||
let n = T.splitOn "." kFullCollection
|
||||
|
@ -656,7 +660,11 @@ data Request =
|
|||
} | GetMore {
|
||||
gFullCollection :: FullCollection,
|
||||
gBatchSize :: Int32,
|
||||
gCursorId :: CursorId}
|
||||
gCursorId :: CursorId
|
||||
} | Message {
|
||||
mDatabase :: Text,
|
||||
mParams :: Document
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
data QueryOption =
|
||||
|
@ -676,6 +684,7 @@ data QueryOption =
|
|||
qOpcode :: Request -> Opcode
|
||||
qOpcode Query{} = 2004
|
||||
qOpcode GetMore{} = 2005
|
||||
qOpcode Message{} = 2013
|
||||
|
||||
opMsgOpcode :: Opcode
|
||||
opMsgOpcode = 2013
|
||||
|
@ -696,6 +705,10 @@ putRequest request requestId = do
|
|||
putCString gFullCollection
|
||||
putInt32 gBatchSize
|
||||
putInt64 gCursorId
|
||||
Message{..} -> do
|
||||
putInt32 0
|
||||
putInt8 0
|
||||
putDocument $ merge [ "$db" =: mDatabase ] mParams
|
||||
|
||||
qBit :: QueryOption -> Int32
|
||||
qBit TailableCursor = bit 1
|
||||
|
|
|
@ -1305,9 +1305,9 @@ find q@Query{selection, batchSize} = do
|
|||
qr <- queryRequestOpMsg False q
|
||||
let newQr =
|
||||
case fst qr of
|
||||
Req qry ->
|
||||
let (_db, coll) = splitDot (qFullCollection qry)
|
||||
in (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr)
|
||||
Req P.Query{..} ->
|
||||
let coll = last $ T.splitOn "." qFullCollection
|
||||
in (Req $ P.Query {qSelector = merge qSelector [ "find" =: coll ], ..}, snd qr)
|
||||
-- queryRequestOpMsg only returns Cmd types constructed via Req
|
||||
_ -> error "impossible"
|
||||
dBatch <- liftIO $ requestOpMsg pipe newQr []
|
||||
|
@ -1345,6 +1345,9 @@ findCommand q@Query{..} = do
|
|||
| predicate a = Just (f a)
|
||||
| otherwise = Nothing
|
||||
|
||||
isHandshake :: Document -> Bool
|
||||
isHandshake = (== ["isMaster" =: (1 :: Int32)])
|
||||
|
||||
findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
|
||||
-- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it
|
||||
findOne q = do
|
||||
|
@ -1354,8 +1357,7 @@ findOne q = do
|
|||
rq <- liftIO $ request pipe [] qr
|
||||
Batch _ _ docs <- liftDB $ fulfill rq
|
||||
return (listToMaybe docs)
|
||||
isHandshake = (== ["isMaster" =: (1 :: Int32)]) $ selector $ selection q :: Bool
|
||||
if isHandshake
|
||||
if isHandshake (selector $ selection q)
|
||||
then legacyQuery
|
||||
else do
|
||||
let sd = P.serverData pipe
|
||||
|
@ -1365,14 +1367,14 @@ findOne q = do
|
|||
qr <- queryRequestOpMsg False q {limit = 1}
|
||||
let newQr =
|
||||
case fst qr of
|
||||
Req qry ->
|
||||
let (_db, coll) = splitDot (qFullCollection qry)
|
||||
Req P.Query{..} ->
|
||||
let coll = last $ T.splitOn "." qFullCollection
|
||||
-- We have to understand whether findOne is called as
|
||||
-- command directly. This is necessary since findOne is used via
|
||||
-- runCommand as a vehicle to execute any type of commands and notices.
|
||||
labels = catMaybes $ map (\f -> look f $ qSelector qry) (noticeCommands ++ adminCommands) :: [Value]
|
||||
labels = catMaybes $ map (\f -> look f qSelector) (noticeCommands ++ adminCommands) :: [Value]
|
||||
in if null labels
|
||||
then (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr)
|
||||
then (Req P.Query {qSelector = merge qSelector [ "find" =: coll ], ..}, snd qr)
|
||||
else qr
|
||||
_ -> error "impossible"
|
||||
rq <- liftIO $ requestOpMsg pipe newQr []
|
||||
|
@ -1559,7 +1561,7 @@ requestOpMsg pipe (Req r, remainingLimit) params = do
|
|||
promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params
|
||||
let protectedPromise = liftIOE ConnectionFailure promise
|
||||
return $ fromReply remainingLimit =<< protectedPromise
|
||||
requestOpMsg _ (Nc _, _) _ = error "requestOpMsg: Only messages of type Query are supported"
|
||||
requestOpMsg _ _ _ = error "requestOpMsg: Only messages of type Query are supported"
|
||||
|
||||
fromReply :: Maybe Limit -> Reply -> DelayedBatch
|
||||
-- ^ Convert Reply to Batch or Failure
|
||||
|
@ -1877,9 +1879,29 @@ type Command = Document
|
|||
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
|
||||
|
||||
runCommand :: (MonadIO m) => Command -> Action m Document
|
||||
-- ^ Run command against the database and return its result
|
||||
runCommand c = fromMaybe err <$> findOne (query c "$cmd") where
|
||||
err = error $ "Nothing returned for command: " ++ show c
|
||||
runCommand params = do
|
||||
pipe <- asks mongoPipe
|
||||
if isHandshake params || maxWireVersion (P.serverData pipe) < 17
|
||||
then runCommandLegacy pipe params
|
||||
else runCommand' pipe params
|
||||
|
||||
runCommandLegacy :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
|
||||
runCommandLegacy pipe params = do
|
||||
qr <- queryRequest False (query params "$cmd") {limit = 1}
|
||||
rq <- liftIO $ request pipe [] qr
|
||||
Batch _ _ docs <- liftDB $ fulfill rq
|
||||
case docs of
|
||||
[doc] -> pure doc
|
||||
_ -> error $ "Nothing returned for command: " <> show params
|
||||
|
||||
runCommand' :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
|
||||
runCommand' pipe params = do
|
||||
ctx <- ask
|
||||
rq <- liftIO $ requestOpMsg pipe ( Req (P.Message (mongoDatabase ctx) params), Just 1) []
|
||||
Batch _ _ docs <- liftDB $ fulfill rq
|
||||
case docs of
|
||||
[doc] -> pure doc
|
||||
_ -> error $ "Nothing returned for command: " <> show params
|
||||
|
||||
runCommand1 :: (MonadIO m) => Text -> Action m Document
|
||||
-- ^ @runCommand1 foo = runCommand [foo =: 1]@
|
||||
|
|
Loading…
Reference in a new issue