Add find using the runCommand operation

This commit is contained in:
Diego Balseiro 2020-07-28 21:36:30 -05:00
parent 7f83416ddf
commit 9ad016c043

View file

@ -29,7 +29,7 @@ module Database.MongoDB.Query (
-- ** Query -- ** Query
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial), Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
Projector, Limit, Order, BatchSize, Projector, Limit, Order, BatchSize,
explain, find, findOne, fetch, explain, find, findCommand, findOne, fetch,
findAndModify, findAndModifyOpts, FindAndModifyOpts(..), defFamUpdateOpts, findAndModify, findAndModifyOpts, FindAndModifyOpts(..), defFamUpdateOpts,
count, distinct, count, distinct,
-- *** Cursor -- *** Cursor
@ -77,8 +77,8 @@ import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans (MonadIO, liftIO)
import Data.Binary.Put (runPut) import Data.Binary.Put (runPut)
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool), import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
Javascript, at, valueAt, lookup, look, genObjectId, (=:), Javascript, at, valueAt, lookup, look, genObjectId, merge,
(=?), (!?), Val(..), ObjectId, Value(..)) (=:), (=?), (!?), Val(..), ObjectId, Value(..))
import Data.Bson.Binary (putDocument) import Data.Bson.Binary (putDocument)
import Data.Text (Text) import Data.Text (Text)
import qualified Data.Text as T import qualified Data.Text as T
@ -130,6 +130,7 @@ data Failure =
| WriteConcernFailure Int String -- ^ Write concern error. It's reported only by insert, update, delete commands. Not by wire protocol. | WriteConcernFailure Int String -- ^ Write concern error. It's reported only by insert, update, delete commands. Not by wire protocol.
| DocNotFound Selection -- ^ 'fetch' found no document matching selection | DocNotFound Selection -- ^ 'fetch' found no document matching selection
| AggregateFailure String -- ^ 'aggregate' returned an error | AggregateFailure String -- ^ 'aggregate' returned an error
| FindFailure String -- ^ 'find' returned an error
| CompoundFailure [Failure] -- ^ When we need to aggregate several failures and report them. | CompoundFailure [Failure] -- ^ When we need to aggregate several failures and report them.
| ProtocolFailure Int String -- ^ The structure of the returned documents doesn't match what we expected | ProtocolFailure Int String -- ^ The structure of the returned documents doesn't match what we expected
deriving (Show, Eq, Typeable) deriving (Show, Eq, Typeable)
@ -1032,6 +1033,33 @@ find q@Query{selection, batchSize} = do
dBatch <- liftIO $ request pipe [] qr dBatch <- liftIO $ request pipe [] qr
newCursor db (coll selection) batchSize dBatch newCursor db (coll selection) batchSize dBatch
findCommand :: (MonadIO m, MonadFail m) => Query -> Action m Cursor
-- ^ Fetch documents satisfying query using the command "find"
findCommand Query{..} = do
let aColl = coll selection
response <- runCommand $
[ "find" =: aColl
, "filter" =: selector selection
, "sort" =: sort
, "projection" =: project
, "hint" =: hint
, "skip" =: toInt32 skip
]
++ mconcat -- optional fields
[ "batchSize" =? toMaybe (/= 0) toInt32 batchSize
, "limit" =? toMaybe (/= 0) toInt32 limit
]
getCursorFromResponse aColl response FindFailure
where
toInt32 :: Integral a => a -> Int32
toInt32 = fromIntegral
toMaybe :: (a -> Bool) -> (a -> b) -> a -> Maybe b
toMaybe predicate f a
| predicate a = Just (f a)
| otherwise = Nothing
findOne :: (MonadIO m) => Query -> Action m (Maybe Document) findOne :: (MonadIO m) => Query -> Action m (Maybe Document)
-- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it -- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it
findOne q = do findOne q = do
@ -1319,14 +1347,22 @@ aggregateCursor :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> Aggrega
-- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details. -- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details.
aggregateCursor aColl agg _ = do aggregateCursor aColl agg _ = do
response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg, "cursor" =: ([] :: Document)] response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg, "cursor" =: ([] :: Document)]
case true1 "ok" response of getCursorFromResponse aColl response AggregateFailure
True -> do
cursor :: Document <- lookup "cursor" response getCursorFromResponse
firstBatch :: [Document] <- lookup "firstBatch" cursor :: (MonadIO m, MonadFail m)
cursorId :: Int64 <- lookup "id" cursor => Collection
-> Document
-> (String -> Failure)
-> Action m Cursor
getCursorFromResponse aColl response err
| true1 "ok" response = do
cursor <- lookup "cursor" response
firstBatch <- lookup "firstBatch" cursor
cursorId <- lookup "id" cursor
db <- thisDatabase db <- thisDatabase
newCursor db aColl 0 $ return $ Batch Nothing cursorId firstBatch newCursor db aColl 0 (return $ Batch Nothing cursorId firstBatch)
False -> liftIO $ throwIO $ AggregateFailure $ at "errmsg" response | otherwise = liftIO . throwIO . err $ at "errmsg" response
-- ** Group -- ** Group