Compare commits
4 commits
master
...
runCommand
Author | SHA1 | Date | |
---|---|---|---|
|
46643fd8ad | ||
|
995087e9a0 | ||
|
fb0d140aa4 | ||
|
6f1d842641 |
4 changed files with 72 additions and 15 deletions
|
@ -494,6 +494,9 @@ data FlagBit =
|
||||||
| ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
|
| ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
|
||||||
deriving (Show, Eq, Enum)
|
deriving (Show, Eq, Enum)
|
||||||
|
|
||||||
|
uOptDoc :: UpdateOption -> Document
|
||||||
|
uOptDoc Upsert = ["upsert" =: True]
|
||||||
|
uOptDoc MultiUpdate = ["multi" =: True]
|
||||||
|
|
||||||
{-
|
{-
|
||||||
OP_MSG header == 16 byte
|
OP_MSG header == 16 byte
|
||||||
|
@ -528,7 +531,7 @@ putOpMsg cmd requestId flagBit params = do
|
||||||
putCString "documents" -- identifier
|
putCString "documents" -- identifier
|
||||||
mapM_ putDocument iDocuments -- payload
|
mapM_ putDocument iDocuments -- payload
|
||||||
Update{..} -> do
|
Update{..} -> do
|
||||||
let doc = ["q" =: uSelector, "u" =: uUpdater]
|
let doc = ["q" =: uSelector, "u" =: uUpdater] <> concatMap uOptDoc uOptions
|
||||||
(sec0, sec1Size) =
|
(sec0, sec1Size) =
|
||||||
prepSectionInfo
|
prepSectionInfo
|
||||||
uFullCollection
|
uFullCollection
|
||||||
|
@ -578,6 +581,10 @@ putOpMsg cmd requestId flagBit params = do
|
||||||
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
|
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
|
||||||
putInt8 0
|
putInt8 0
|
||||||
putDocument pre
|
putDocument pre
|
||||||
|
Message{..} -> do
|
||||||
|
putInt32 biT
|
||||||
|
putInt8 0
|
||||||
|
putDocument $ merge [ "$db" =: mDatabase ] mParams
|
||||||
Kc k -> case k of
|
Kc k -> case k of
|
||||||
KillC{..} -> do
|
KillC{..} -> do
|
||||||
let n = T.splitOn "." kFullCollection
|
let n = T.splitOn "." kFullCollection
|
||||||
|
@ -653,7 +660,11 @@ data Request =
|
||||||
} | GetMore {
|
} | GetMore {
|
||||||
gFullCollection :: FullCollection,
|
gFullCollection :: FullCollection,
|
||||||
gBatchSize :: Int32,
|
gBatchSize :: Int32,
|
||||||
gCursorId :: CursorId}
|
gCursorId :: CursorId
|
||||||
|
} | Message {
|
||||||
|
mDatabase :: Text,
|
||||||
|
mParams :: Document
|
||||||
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
data QueryOption =
|
data QueryOption =
|
||||||
|
@ -673,6 +684,7 @@ data QueryOption =
|
||||||
qOpcode :: Request -> Opcode
|
qOpcode :: Request -> Opcode
|
||||||
qOpcode Query{} = 2004
|
qOpcode Query{} = 2004
|
||||||
qOpcode GetMore{} = 2005
|
qOpcode GetMore{} = 2005
|
||||||
|
qOpcode Message{} = 2013
|
||||||
|
|
||||||
opMsgOpcode :: Opcode
|
opMsgOpcode :: Opcode
|
||||||
opMsgOpcode = 2013
|
opMsgOpcode = 2013
|
||||||
|
@ -693,6 +705,10 @@ putRequest request requestId = do
|
||||||
putCString gFullCollection
|
putCString gFullCollection
|
||||||
putInt32 gBatchSize
|
putInt32 gBatchSize
|
||||||
putInt64 gCursorId
|
putInt64 gCursorId
|
||||||
|
Message{..} -> do
|
||||||
|
putInt32 0
|
||||||
|
putInt8 0
|
||||||
|
putDocument $ merge [ "$db" =: mDatabase ] mParams
|
||||||
|
|
||||||
qBit :: QueryOption -> Int32
|
qBit :: QueryOption -> Int32
|
||||||
qBit TailableCursor = bit 1
|
qBit TailableCursor = bit 1
|
||||||
|
|
|
@ -1272,9 +1272,9 @@ find q@Query{selection, batchSize} = do
|
||||||
qr <- queryRequestOpMsg False q
|
qr <- queryRequestOpMsg False q
|
||||||
let newQr =
|
let newQr =
|
||||||
case fst qr of
|
case fst qr of
|
||||||
Req qry ->
|
Req P.Query{..} ->
|
||||||
let coll = last $ T.splitOn "." (qFullCollection qry)
|
let coll = last $ T.splitOn "." qFullCollection
|
||||||
in (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr)
|
in (Req $ P.Query {qSelector = merge qSelector [ "find" =: coll ], ..}, snd qr)
|
||||||
-- queryRequestOpMsg only returns Cmd types constructed via Req
|
-- queryRequestOpMsg only returns Cmd types constructed via Req
|
||||||
_ -> error "impossible"
|
_ -> error "impossible"
|
||||||
dBatch <- liftIO $ requestOpMsg pipe newQr []
|
dBatch <- liftIO $ requestOpMsg pipe newQr []
|
||||||
|
@ -1312,6 +1312,9 @@ findCommand q@Query{..} = do
|
||||||
| predicate a = Just (f a)
|
| predicate a = Just (f a)
|
||||||
| otherwise = Nothing
|
| otherwise = Nothing
|
||||||
|
|
||||||
|
isHandshake :: Document -> Bool
|
||||||
|
isHandshake = (== ["isMaster" =: (1 :: Int32)])
|
||||||
|
|
||||||
findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
|
findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
|
||||||
-- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it
|
-- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it
|
||||||
findOne q = do
|
findOne q = do
|
||||||
|
@ -1321,8 +1324,7 @@ findOne q = do
|
||||||
rq <- liftIO $ request pipe [] qr
|
rq <- liftIO $ request pipe [] qr
|
||||||
Batch _ _ docs <- liftDB $ fulfill rq
|
Batch _ _ docs <- liftDB $ fulfill rq
|
||||||
return (listToMaybe docs)
|
return (listToMaybe docs)
|
||||||
isHandshake = (== ["isMaster" =: (1 :: Int32)]) $ selector $ selection q :: Bool
|
if isHandshake (selector $ selection q)
|
||||||
if isHandshake
|
|
||||||
then legacyQuery
|
then legacyQuery
|
||||||
else do
|
else do
|
||||||
let sd = P.serverData pipe
|
let sd = P.serverData pipe
|
||||||
|
@ -1332,14 +1334,14 @@ findOne q = do
|
||||||
qr <- queryRequestOpMsg False q {limit = 1}
|
qr <- queryRequestOpMsg False q {limit = 1}
|
||||||
let newQr =
|
let newQr =
|
||||||
case fst qr of
|
case fst qr of
|
||||||
Req qry ->
|
Req P.Query{..} ->
|
||||||
let coll = last $ T.splitOn "." (qFullCollection qry)
|
let coll = last $ T.splitOn "." qFullCollection
|
||||||
-- We have to understand whether findOne is called as
|
-- We have to understand whether findOne is called as
|
||||||
-- command directly. This is necessary since findOne is used via
|
-- command directly. This is necessary since findOne is used via
|
||||||
-- runCommand as a vehicle to execute any type of commands and notices.
|
-- runCommand as a vehicle to execute any type of commands and notices.
|
||||||
labels = catMaybes $ map (\f -> look f $ qSelector qry) (noticeCommands ++ adminCommands) :: [Value]
|
labels = catMaybes $ map (\f -> look f qSelector) (noticeCommands ++ adminCommands) :: [Value]
|
||||||
in if null labels
|
in if null labels
|
||||||
then (Req $ qry {qSelector = merge (qSelector qry) [ "find" =: coll ]}, snd qr)
|
then (Req P.Query {qSelector = merge qSelector [ "find" =: coll ], ..}, snd qr)
|
||||||
else qr
|
else qr
|
||||||
_ -> error "impossible"
|
_ -> error "impossible"
|
||||||
rq <- liftIO $ requestOpMsg pipe newQr []
|
rq <- liftIO $ requestOpMsg pipe newQr []
|
||||||
|
@ -1526,7 +1528,7 @@ requestOpMsg pipe (Req r, remainingLimit) params = do
|
||||||
promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params
|
promise <- liftIOE ConnectionFailure $ P.callOpMsg pipe r Nothing params
|
||||||
let protectedPromise = liftIOE ConnectionFailure promise
|
let protectedPromise = liftIOE ConnectionFailure promise
|
||||||
return $ fromReply remainingLimit =<< protectedPromise
|
return $ fromReply remainingLimit =<< protectedPromise
|
||||||
requestOpMsg _ (Nc _, _) _ = error "requestOpMsg: Only messages of type Query are supported"
|
requestOpMsg _ _ _ = error "requestOpMsg: Only messages of type Query are supported"
|
||||||
|
|
||||||
fromReply :: Maybe Limit -> Reply -> DelayedBatch
|
fromReply :: Maybe Limit -> Reply -> DelayedBatch
|
||||||
-- ^ Convert Reply to Batch or Failure
|
-- ^ Convert Reply to Batch or Failure
|
||||||
|
@ -1844,9 +1846,29 @@ type Command = Document
|
||||||
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
|
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
|
||||||
|
|
||||||
runCommand :: (MonadIO m) => Command -> Action m Document
|
runCommand :: (MonadIO m) => Command -> Action m Document
|
||||||
-- ^ Run command against the database and return its result
|
runCommand params = do
|
||||||
runCommand c = fromMaybe err <$> findOne (query c "$cmd") where
|
pipe <- asks mongoPipe
|
||||||
err = error $ "Nothing returned for command: " ++ show c
|
if isHandshake params || maxWireVersion (P.serverData pipe) < 17
|
||||||
|
then runCommandLegacy pipe params
|
||||||
|
else runCommand' pipe params
|
||||||
|
|
||||||
|
runCommandLegacy :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
|
||||||
|
runCommandLegacy pipe params = do
|
||||||
|
qr <- queryRequest False (query params "$cmd") {limit = 1}
|
||||||
|
rq <- liftIO $ request pipe [] qr
|
||||||
|
Batch _ _ docs <- liftDB $ fulfill rq
|
||||||
|
case docs of
|
||||||
|
[doc] -> pure doc
|
||||||
|
_ -> error $ "Nothing returned for command: " <> show params
|
||||||
|
|
||||||
|
runCommand' :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
|
||||||
|
runCommand' pipe params = do
|
||||||
|
ctx <- ask
|
||||||
|
rq <- liftIO $ requestOpMsg pipe ( Req (P.Message (mongoDatabase ctx) params), Just 1) []
|
||||||
|
Batch _ _ docs <- liftDB $ fulfill rq
|
||||||
|
case docs of
|
||||||
|
[doc] -> pure doc
|
||||||
|
_ -> error $ "Nothing returned for command: " <> show params
|
||||||
|
|
||||||
runCommand1 :: (MonadIO m) => Text -> Action m Document
|
runCommand1 :: (MonadIO m) => Text -> Action m Document
|
||||||
-- ^ @runCommand1 foo = runCommand [foo =: 1]@
|
-- ^ @runCommand1 foo = runCommand [foo =: 1]@
|
||||||
|
|
|
@ -115,6 +115,8 @@ Benchmark bench
|
||||||
, base16-bytestring
|
, base16-bytestring
|
||||||
, binary -any
|
, binary -any
|
||||||
, bson >= 0.3 && < 0.5
|
, bson >= 0.3 && < 0.5
|
||||||
|
, conduit
|
||||||
|
, conduit-extra
|
||||||
, data-default-class -any
|
, data-default-class -any
|
||||||
, text
|
, text
|
||||||
, bytestring -any
|
, bytestring -any
|
||||||
|
@ -128,6 +130,7 @@ Benchmark bench
|
||||||
, random-shuffle -any
|
, random-shuffle -any
|
||||||
, monad-control >= 0.3.1
|
, monad-control >= 0.3.1
|
||||||
, lifted-base >= 0.1.0.3
|
, lifted-base >= 0.1.0.3
|
||||||
|
, transformers
|
||||||
, transformers-base >= 0.4.1
|
, transformers-base >= 0.4.1
|
||||||
, hashtables >= 1.1.2.0
|
, hashtables >= 1.1.2.0
|
||||||
, fail
|
, fail
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
module QuerySpec (spec) where
|
module QuerySpec (spec) where
|
||||||
import Data.String (IsString(..))
|
import Data.String (IsString(..))
|
||||||
import TestImport
|
import TestImport
|
||||||
|
import Control.Concurrent (threadDelay)
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Control.Monad (forM_, when)
|
import Control.Monad (forM_, when)
|
||||||
import System.Environment (getEnv)
|
import System.Environment (getEnv)
|
||||||
|
@ -87,6 +88,21 @@ spec = around withCleanDatabase $ do
|
||||||
db (count $ select ["name" =: "Yankees", "league" =: "American"] "team") `shouldReturn` 1
|
db (count $ select ["name" =: "Yankees", "league" =: "American"] "team") `shouldReturn` 1
|
||||||
_id `shouldBe` ()
|
_id `shouldBe` ()
|
||||||
|
|
||||||
|
describe "upsert" $ do
|
||||||
|
it "upserts a document twice with the same spec" $ do
|
||||||
|
let q = select ["name" =: "jack"] "users"
|
||||||
|
db $ upsert q ["color" =: "blue", "name" =: "jack"]
|
||||||
|
-- since there is no way to ask for a ack, we must wait for "a sufficient time"
|
||||||
|
-- for the write to be visible
|
||||||
|
threadDelay 10000
|
||||||
|
db (rest =<< find (select [] "users")) >>= print
|
||||||
|
db (count $ select ["name" =: "jack"] "users") `shouldReturn` 1
|
||||||
|
db $ upsert q ["color" =: "red", "name" =: "jack"]
|
||||||
|
threadDelay 10000
|
||||||
|
db (count $ select ["name" =: "jack"] "users") `shouldReturn` 1
|
||||||
|
Just doc <- db $ findOne (select ["name" =: "jack"] "users")
|
||||||
|
doc !? "color" `shouldBe` Just "red"
|
||||||
|
|
||||||
describe "insertMany" $ do
|
describe "insertMany" $ do
|
||||||
it "inserts documents to the collection and returns their _ids" $ do
|
it "inserts documents to the collection and returns their _ids" $ do
|
||||||
(_id1:_id2:_) <- db $ insertMany "team" [ ["name" =: "Yankees", "league" =: "American"]
|
(_id1:_id2:_) <- db $ insertMany "team" [ ["name" =: "Yankees", "league" =: "American"]
|
||||||
|
|
Loading…
Reference in a new issue