2010-11-01 00:38:38 +00:00
-- | Query and update documents
2010-06-15 03:14:40 +00:00
2022-06-17 17:16:02 +00:00
{- # LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables, BangPatterns # -}
2010-06-15 03:14:40 +00:00
module Database.MongoDB.Query (
2013-12-26 14:57:33 +00:00
-- * Monad
Action , access , Failure ( .. ) , ErrorCode ,
2014-04-12 13:17:47 +00:00
AccessMode ( .. ) , GetLastError , master , slaveOk , accessMode ,
2013-12-26 15:28:44 +00:00
liftDB ,
2015-05-14 12:53:08 +00:00
MongoContext ( .. ) , HasMongoContext ( .. ) ,
2013-12-26 14:57:33 +00:00
-- * Database
Database , allDatabases , useDb , thisDatabase ,
-- ** Authentication
2015-11-01 16:05:39 +00:00
Username , Password , auth , authMongoCR , authSCRAMSHA1 ,
2013-12-26 14:57:33 +00:00
-- * Collection
Collection , allCollections ,
-- ** Selection
Selection ( .. ) , Selector , whereJS ,
Select ( select ) ,
-- * Write
-- ** Insert
insert , insert_ , insertMany , insertMany_ , insertAll , insertAll_ ,
-- ** Update
2016-06-20 02:19:40 +00:00
save , replace , repsert , upsert , Modifier , modify , updateMany , updateAll ,
2017-01-07 20:36:07 +00:00
WriteResult ( .. ) , UpdateOption ( .. ) , Upserted ( .. ) ,
2013-12-26 14:57:33 +00:00
-- ** Delete
2017-01-07 20:36:07 +00:00
delete , deleteOne , deleteMany , deleteAll , DeleteOption ( .. ) ,
2013-12-26 14:57:33 +00:00
-- * Read
-- ** Query
Query ( .. ) , QueryOption ( NoCursorTimeout , TailableCursor , AwaitData , Partial ) ,
2012-06-10 19:47:14 +00:00
Projector , Limit , Order , BatchSize ,
2020-07-29 02:36:30 +00:00
explain , find , findCommand , findOne , fetch ,
2014-07-20 02:32:33 +00:00
findAndModify , findAndModifyOpts , FindAndModifyOpts ( .. ) , defFamUpdateOpts ,
2014-07-08 19:36:23 +00:00
count , distinct ,
2013-12-26 14:57:33 +00:00
-- *** Cursor
Cursor , nextBatch , next , nextN , rest , closeCursor , isCursorClosed ,
-- ** Aggregate
2018-02-04 22:38:58 +00:00
Pipeline , AggregateConfig ( .. ) , aggregate , aggregateCursor ,
2013-12-26 14:57:33 +00:00
-- ** Group
Group ( .. ) , GroupKey ( .. ) , group ,
-- ** MapReduce
MapReduce ( .. ) , MapFun , ReduceFun , FinalizeFun , MROut ( .. ) , MRMerge ( .. ) ,
2012-06-10 19:47:14 +00:00
MRResult , mapReduce , runMR , runMR' ,
2013-12-26 14:57:33 +00:00
-- * Command
Command , runCommand , runCommand1 ,
2017-01-23 02:57:07 +00:00
eval , retrieveServerData , ServerData ( .. )
2010-06-15 03:14:40 +00:00
) where
2017-05-09 05:47:47 +00:00
import qualified Control.Concurrent.MVar as MV
2022-06-17 17:16:02 +00:00
import Control.Concurrent.MVar.Lifted
( MVar ,
readMVar ,
)
import Control.Exception ( Exception , catch , throwIO )
import Control.Monad
( liftM2 ,
replicateM ,
unless ,
void ,
when ,
)
import Control.Monad.Reader ( MonadReader , ReaderT , ask , asks , local , runReaderT )
2023-01-12 05:25:48 +00:00
import Control.Monad.Trans ( MonadIO , liftIO , lift )
2022-06-17 17:16:02 +00:00
import qualified Crypto.Hash.MD5 as MD5
import qualified Crypto.Hash.SHA1 as SHA1
import qualified Crypto.MAC.HMAC as HMAC
import qualified Crypto.Nonce as Nonce
2016-05-30 01:21:31 +00:00
import Data.Binary.Put ( runPut )
2022-06-17 17:16:02 +00:00
import Data.Bits ( xor )
import Data.Bson
( Document ,
Field ( .. ) ,
Javascript ,
Label ,
ObjectId ,
Val ( .. ) ,
Value ( .. ) ,
at ,
genObjectId ,
look ,
lookup ,
valueAt ,
( !? ) ,
( =: ) ,
( =? ) ,
2022-10-27 04:09:24 +00:00
merge ,
cast
2022-06-17 17:16:02 +00:00
)
2016-05-30 01:21:31 +00:00
import Data.Bson.Binary ( putDocument )
2015-11-01 16:05:39 +00:00
import qualified Data.ByteString as BS
import qualified Data.ByteString.Base16 as B16
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Char8 as B
2022-06-17 17:16:02 +00:00
import qualified Data.ByteString.Lazy as LBS
import Data.Default.Class ( Default ( .. ) )
import Data.Either ( lefts , rights )
2017-05-21 07:32:13 +00:00
import qualified Data.Either as E
2022-06-17 17:16:02 +00:00
import Data.Functor ( ( <&> ) )
import Data.Int ( Int32 , Int64 )
import Data.List ( foldl1' )
2015-11-01 16:05:39 +00:00
import qualified Data.Map as Map
2022-10-27 04:09:24 +00:00
import Data.Maybe ( catMaybes , fromMaybe , isNothing , listToMaybe , mapMaybe , maybeToList , fromJust )
2022-06-17 17:16:02 +00:00
import Data.Text ( Text )
import qualified Data.Text as T
import Data.Typeable ( Typeable )
import Data.Word ( Word32 )
import Database.MongoDB.Internal.Protocol
( CursorId ,
DeleteOption ( .. ) ,
FullCollection ,
InsertOption ( .. ) ,
Notice ( .. ) ,
Password ,
Pipe ,
QueryOption ( .. ) ,
Reply ( .. ) ,
Request
( GetMore ,
qBatchSize ,
qFullCollection ,
qOptions ,
qProjector ,
qSelector ,
qSkip
) ,
ResponseFlag ( .. ) ,
ServerData ( .. ) ,
UpdateOption ( .. ) ,
Username ,
2022-10-27 04:09:24 +00:00
Cmd ( .. ) ,
2022-06-17 17:16:02 +00:00
pwKey ,
2022-10-27 04:09:24 +00:00
FlagBit ( .. )
2022-06-17 17:16:02 +00:00
)
2023-01-12 05:25:48 +00:00
import Control.Monad.Trans.Except
2022-06-17 17:16:02 +00:00
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Util ( liftIOE , loop , true1 , ( <.> ) )
import System.Mem.Weak ( Weak )
2015-11-01 16:05:39 +00:00
import Text.Read ( readMaybe )
2022-06-17 17:16:02 +00:00
import Prelude hiding ( lookup )
2015-11-01 16:05:39 +00:00
2011-07-05 14:37:01 +00:00
-- * Monad
2013-12-26 15:32:21 +00:00
type Action = ReaderT MongoContext
2020-04-01 14:53:37 +00:00
-- ^ A monad on top of m (which must be a MonadIO) that may access the database and may fail with a DB 'Failure'
2011-07-09 02:13:47 +00:00
2013-12-26 15:23:02 +00:00
access :: ( MonadIO m ) => Pipe -> AccessMode -> Database -> Action m a -> m a
2017-05-07 06:55:27 +00:00
-- ^ Run action against database on server at other end of pipe. Use access mode for any reads and writes.
-- Throw 'Failure' in case of any error.
2015-05-15 07:58:25 +00:00
access mongoPipe mongoAccessMode mongoDatabase action = runReaderT action MongoContext { .. }
2010-06-21 15:06:20 +00:00
2010-11-01 00:38:38 +00:00
-- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key.
2010-07-27 21:18:53 +00:00
-- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change.
2010-06-21 15:06:20 +00:00
data Failure =
2013-12-26 14:57:33 +00:00
ConnectionFailure IOError -- ^ TCP connection ('Pipeline') failed. May work if you try again on the same Mongo 'Connection' which will create a new Pipe.
| CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
| QueryFailure ErrorCode String -- ^ Query failed for some reason as described in the string
2016-11-06 22:39:04 +00:00
| WriteFailure Int ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string, index of failed document is the first argument
2017-04-08 19:39:32 +00:00
| WriteConcernFailure Int String -- ^ Write concern error. It's reported only by insert, update, delete commands. Not by wire protocol.
2013-12-26 14:57:33 +00:00
| DocNotFound Selection -- ^ 'fetch' found no document matching selection
| AggregateFailure String -- ^ 'aggregate' returned an error
2017-03-05 08:55:31 +00:00
| CompoundFailure [ Failure ] -- ^ When we need to aggregate several failures and report them.
2017-04-08 19:31:24 +00:00
| ProtocolFailure Int String -- ^ The structure of the returned documents doesn't match what we expected
2013-12-26 15:23:02 +00:00
deriving ( Show , Eq , Typeable )
instance Exception Failure
2010-06-21 15:06:20 +00:00
2011-07-05 14:37:01 +00:00
type ErrorCode = Int
2020-04-01 13:11:17 +00:00
-- ^ Error code from @getLastError@ or query failure.
2011-07-05 14:37:01 +00:00
2020-04-01 13:11:17 +00:00
-- | Type of reads and writes to perform.
2011-07-09 02:13:47 +00:00
data AccessMode =
2013-12-26 14:57:33 +00:00
ReadStaleOk -- ^ Read-only action, reading stale data from a slave is OK.
| UnconfirmedWrites -- ^ Read-write action, slave not OK, every write is fire & forget.
2020-04-01 13:11:17 +00:00
| ConfirmWrites GetLastError -- ^ Read-write action, slave not OK, every write is confirmed with @getLastError@.
2012-01-27 15:48:33 +00:00
deriving Show
2011-07-09 02:13:47 +00:00
type GetLastError = Document
2020-04-01 13:11:17 +00:00
-- ^ Parameters for @getLastError@ command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options.
2011-07-09 02:13:47 +00:00
2016-11-01 23:34:47 +00:00
class Result a where
isFailed :: a -> Bool
2017-01-07 20:36:07 +00:00
data WriteResult = WriteResult
2016-09-09 04:23:51 +00:00
{ failed :: Bool
, nMatched :: Int
2016-08-23 04:24:06 +00:00
, nModified :: Maybe Int
2017-01-07 20:36:07 +00:00
, nRemoved :: Int
2017-05-21 07:31:49 +00:00
-- ^ Mongodb server before 2.6 doesn't allow to calculate this value.
-- This field is meaningless if we can't calculate the number of modified documents.
2016-07-20 07:46:16 +00:00
, upserted :: [ Upserted ]
2016-11-06 22:39:04 +00:00
, writeErrors :: [ Failure ]
2017-04-08 19:39:32 +00:00
, writeConcernErrors :: [ Failure ]
2016-07-20 07:46:16 +00:00
} deriving Show
2017-01-07 20:36:07 +00:00
instance Result WriteResult where
2016-11-01 23:34:47 +00:00
isFailed = failed
2016-11-21 00:18:49 +00:00
instance Result ( Either a b ) where
isFailed ( Left _ ) = True
isFailed _ = False
2016-07-20 07:46:16 +00:00
data Upserted = Upserted
{ upsertedIndex :: Int
, upsertedId :: ObjectId
} deriving Show
2016-06-18 20:33:24 +00:00
2011-07-09 02:13:47 +00:00
master :: AccessMode
2011-07-14 22:47:14 +00:00
-- ^ Same as 'ConfirmWrites' []
2011-07-09 02:13:47 +00:00
master = ConfirmWrites []
slaveOk :: AccessMode
2011-07-14 22:47:14 +00:00
-- ^ Same as 'ReadStaleOk'
2011-07-09 02:13:47 +00:00
slaveOk = ReadStaleOk
accessMode :: ( Monad m ) => AccessMode -> Action m a -> Action m a
-- ^ Run action with given 'AccessMode'
2022-06-17 17:16:02 +00:00
accessMode mode = local ( \ ctx -> ctx { mongoAccessMode = mode } )
2011-07-09 02:13:47 +00:00
readMode :: AccessMode -> ReadMode
readMode ReadStaleOk = StaleOk
readMode _ = Fresh
writeMode :: AccessMode -> WriteMode
writeMode ReadStaleOk = Confirm []
writeMode UnconfirmedWrites = NoConfirm
writeMode ( ConfirmWrites z ) = Confirm z
2010-06-15 03:14:40 +00:00
2011-07-05 14:37:01 +00:00
-- | Values needed when executing a db operation
2013-12-26 15:24:15 +00:00
data MongoContext = MongoContext {
2020-04-01 14:53:37 +00:00
mongoPipe :: Pipe , -- ^ operations read/write to this pipelined TCP connection to a MongoDB server
mongoAccessMode :: AccessMode , -- ^ read/write operation will use this access mode
mongoDatabase :: Database -- ^ operations query/update this database
2020-04-01 13:11:17 +00:00
}
2010-06-15 03:14:40 +00:00
2015-05-15 07:58:25 +00:00
mongoReadMode :: MongoContext -> ReadMode
mongoReadMode = readMode . mongoAccessMode
2011-07-09 02:13:47 +00:00
2015-05-15 07:58:25 +00:00
mongoWriteMode :: MongoContext -> WriteMode
mongoWriteMode = writeMode . mongoAccessMode
2010-06-15 03:14:40 +00:00
2013-12-26 15:28:44 +00:00
class HasMongoContext env where
mongoContext :: env -> MongoContext
instance HasMongoContext MongoContext where
mongoContext = id
liftDB :: ( MonadReader env m , HasMongoContext env , MonadIO m )
=> Action IO a
-> m a
2013-12-26 15:32:21 +00:00
liftDB m = do
2013-12-26 15:28:44 +00:00
env <- ask
liftIO $ runReaderT m ( mongoContext env )
2011-07-09 02:13:47 +00:00
2011-07-05 14:37:01 +00:00
-- * Database
2010-06-15 03:14:40 +00:00
2012-05-08 15:13:25 +00:00
type Database = Text
2011-07-05 14:37:01 +00:00
2013-12-27 11:39:22 +00:00
allDatabases :: ( MonadIO m ) => Action m [ Database ]
2011-07-05 14:37:01 +00:00
-- ^ List all databases residing on server
2022-06-17 17:16:02 +00:00
allDatabases = map ( at " name " ) . at " databases " <$> useDb " admin " ( runCommand1 " listDatabases " )
2010-06-15 03:14:40 +00:00
2011-07-05 14:37:01 +00:00
thisDatabase :: ( Monad m ) => Action m Database
2010-06-15 03:14:40 +00:00
-- ^ Current database in use
2015-05-15 07:58:25 +00:00
thisDatabase = asks mongoDatabase
2011-07-05 14:37:01 +00:00
useDb :: ( Monad m ) => Database -> Action m a -> Action m a
-- ^ Run action against given database
2022-06-17 17:16:02 +00:00
useDb db = local ( \ ctx -> ctx { mongoDatabase = db } )
2010-06-15 03:14:40 +00:00
-- * Authentication
2015-11-01 16:05:39 +00:00
auth :: MonadIO m => Username -> Password -> Action m Bool
-- ^ Authenticate with the current database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe. SCRAM-SHA-1 will be used for server versions 3.0+, MONGO-CR for lower versions.
auth un pw = do
2022-06-17 17:16:02 +00:00
let serverVersion = fmap ( at " version " ) $ useDb " admin " $ runCommand [ " buildinfo " =: ( 1 :: Int ) ]
mmv <- readMaybe . T . unpack . head . T . splitOn " . " <$> serverVersion
2015-11-01 16:05:39 +00:00
maybe ( return False ) performAuth mmv
where
performAuth majorVersion =
2022-06-17 17:16:02 +00:00
if majorVersion >= ( 3 :: Int )
then authSCRAMSHA1 un pw
else authMongoCR un pw
2015-11-01 16:05:39 +00:00
authMongoCR :: ( MonadIO m ) => Username -> Password -> Action m Bool
-- ^ Authenticate with the current database, using the MongoDB-CR authentication mechanism (default in MongoDB server < 3.0)
authMongoCR usr pss = do
2022-06-17 17:16:02 +00:00
n <- at " nonce " <$> runCommand [ " getnonce " =: ( 1 :: Int ) ]
true1 " ok " <$> runCommand [ " authenticate " =: ( 1 :: Int ) , " user " =: usr , " nonce " =: n , " key " =: pwKey n usr pss ]
2010-06-15 03:14:40 +00:00
2015-11-01 16:05:39 +00:00
authSCRAMSHA1 :: MonadIO m => Username -> Password -> Action m Bool
-- ^ Authenticate with the current database, using the SCRAM-SHA-1 authentication mechanism (default in MongoDB server >= 3.0)
authSCRAMSHA1 un pw = do
let hmac = HMAC . hmac SHA1 . hash 64
2022-06-17 17:16:02 +00:00
nonce <- liftIO ( Nonce . withGenerator Nonce . nonce128 <&> B64 . encode )
let firstBare = B . concat [ B . pack $ " n= " ++ T . unpack un ++ " ,r= " , nonce ]
2015-11-01 16:05:39 +00:00
let client1 = [ " saslStart " =: ( 1 :: Int ) , " mechanism " =: ( " SCRAM-SHA-1 " :: String ) , " payload " =: ( B . unpack . B64 . encode $ B . concat [ B . pack " n,, " , firstBare ] ) , " autoAuthorize " =: ( 1 :: Int ) ]
server1 <- runCommand client1
2015-11-07 18:37:40 +00:00
2015-11-01 16:05:39 +00:00
shortcircuit ( true1 " ok " server1 ) $ do
let serverPayload1 = B64 . decodeLenient . B . pack . at " payload " $ server1
let serverData1 = parseSCRAM serverPayload1
let iterations = read . B . unpack $ Map . findWithDefault " 1 " " i " serverData1
let salt = B64 . decodeLenient $ Map . findWithDefault " " " s " serverData1
let snonce = Map . findWithDefault " " " r " serverData1
2015-11-07 18:37:40 +00:00
2015-11-01 16:05:39 +00:00
shortcircuit ( B . isInfixOf nonce snonce ) $ do
let withoutProof = B . concat [ B . pack " c=biws,r= " , snonce ]
let digestS = B . pack $ T . unpack un ++ " :mongo: " ++ T . unpack pw
let digest = B16 . encode $ MD5 . hash digestS
let saltedPass = scramHI digest salt iterations
let clientKey = hmac saltedPass ( B . pack " Client Key " )
let storedKey = SHA1 . hash clientKey
let authMsg = B . concat [ firstBare , B . pack " , " , serverPayload1 , B . pack " , " , withoutProof ]
let clientSig = hmac storedKey authMsg
let pval = B64 . encode . BS . pack $ BS . zipWith xor clientKey clientSig
let clientFinal = B . concat [ withoutProof , B . pack " ,p= " , pval ]
let serverKey = hmac saltedPass ( B . pack " Server Key " )
let serverSig = B64 . encode $ hmac serverKey authMsg
2022-06-17 17:16:02 +00:00
let client2 = [ " saslContinue " =: ( 1 :: Int ) , " conversationId " =: ( at " conversationId " server1 :: Int ) , " payload " =: B . unpack ( B64 . encode clientFinal ) ]
2015-11-01 16:05:39 +00:00
server2 <- runCommand client2
2015-11-07 18:37:40 +00:00
2015-11-01 16:05:39 +00:00
shortcircuit ( true1 " ok " server2 ) $ do
let serverPayload2 = B64 . decodeLenient . B . pack $ at " payload " server2
let serverData2 = parseSCRAM serverPayload2
let serverSigComp = Map . findWithDefault " " " v " serverData2
2015-11-30 12:55:20 +00:00
shortcircuit ( serverSig == serverSigComp ) $ do
let done = true1 " done " server2
if done
then return True
else do
2016-06-20 02:27:18 +00:00
let client2Step2 = [ " saslContinue " =: ( 1 :: Int )
, " conversationId " =: ( at " conversationId " server1 :: Int )
, " payload " =: String " " ]
server3 <- runCommand client2Step2
2015-11-30 12:55:20 +00:00
shortcircuit ( true1 " ok " server3 ) $ do
return True
2015-11-01 16:05:39 +00:00
where
shortcircuit True f = f
shortcircuit False _ = return False
scramHI :: B . ByteString -> B . ByteString -> Int -> B . ByteString
scramHI digest salt iters = snd $ foldl com ( u1 , u1 ) [ 1 .. ( iters - 1 ) ]
where
hmacd = HMAC . hmac SHA1 . hash 64 digest
u1 = hmacd ( B . concat [ salt , BS . pack [ 0 , 0 , 0 , 1 ] ] )
com ( u , uc ) _ = let u' = hmacd u in ( u' , BS . pack $ BS . zipWith xor uc u' )
parseSCRAM :: B . ByteString -> Map . Map B . ByteString B . ByteString
2022-06-17 17:16:02 +00:00
parseSCRAM = Map . fromList . fmap ( cleanup . T . breakOn " = " ) . T . splitOn " , " . T . pack . B . unpack
2015-11-01 16:05:39 +00:00
where cleanup ( t1 , t2 ) = ( B . pack $ T . unpack t1 , B . pack . T . unpack $ T . drop 1 t2 )
2022-10-27 04:09:24 +00:00
-- As long as server api is not requested OP_Query has to be used. See:
-- https://github.com/mongodb/specifications/blob/6dc6f80026f0f8d99a8c81f996389534b14f6602/source/mongodb-handshake/handshake.rst#specification
2016-05-20 04:44:42 +00:00
retrieveServerData :: ( MonadIO m ) => Action m ServerData
retrieveServerData = do
d <- runCommand1 " isMaster "
let newSd = ServerData
2022-10-27 04:09:24 +00:00
{ isMaster = fromMaybe False $ lookup " isMaster " d
2022-06-17 17:16:02 +00:00
, minWireVersion = fromMaybe 0 $ lookup " minWireVersion " d
, maxWireVersion = fromMaybe 0 $ lookup " maxWireVersion " d
, maxMessageSizeBytes = fromMaybe 48000000 $ lookup " maxMessageSizeBytes " d
, maxBsonObjectSize = fromMaybe ( 16 * 1024 * 1024 ) $ lookup " maxBsonObjectSize " d
, maxWriteBatchSize = fromMaybe 1000 $ lookup " maxWriteBatchSize " d
2016-05-20 04:44:42 +00:00
}
return newSd
2010-06-15 03:14:40 +00:00
-- * Collection
2012-05-08 15:13:25 +00:00
type Collection = Text
2010-06-15 03:14:40 +00:00
-- ^ Collection name (not prefixed with database)
2017-05-09 06:12:26 +00:00
allCollections :: MonadIO m => Action m [ Collection ]
2010-06-15 03:14:40 +00:00
-- ^ List all collections in this database
allCollections = do
2016-05-20 06:24:37 +00:00
p <- asks mongoPipe
let sd = P . serverData p
2022-06-17 17:16:02 +00:00
if maxWireVersion sd <= 2
2016-05-20 06:24:37 +00:00
then do
db <- thisDatabase
docs <- rest =<< find ( query [] " system.namespaces " ) { sort = [ " name " =: ( 1 :: Int ) ] }
2022-06-17 17:16:02 +00:00
( return . filter ( not . isSpecial db ) ) ( map ( dropDbPrefix . at " name " ) docs )
2022-10-27 04:09:24 +00:00
else
if maxWireVersion sd < 17
then do
r <- runCommand1 " listCollections "
let curData = do
( Doc curDoc ) <- r !? " cursor "
( curId :: Int64 ) <- curDoc !? " id "
( curNs :: Text ) <- curDoc !? " ns "
( firstBatch :: [ Value ] ) <- curDoc !? " firstBatch "
return ( curId , curNs , mapMaybe cast' firstBatch :: [ Document ] )
case curData of
Nothing -> return []
Just ( curId , curNs , firstBatch ) -> do
db <- thisDatabase
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch
docs <- rest nc
return $ mapMaybe ( \ d -> d !? " name " ) docs
else do
let q = Query [] ( Select [ " listCollections " =: ( 1 :: Int ) ] " $cmd " ) [] 0 0 [] False 0 []
qr <- queryRequestOpMsg False q
dBatch <- liftIO $ requestOpMsg p qr []
2016-05-20 06:24:37 +00:00
db <- thisDatabase
2022-10-27 04:09:24 +00:00
nc <- newCursor db " $cmd " 0 dBatch
2016-05-20 06:24:37 +00:00
docs <- rest nc
2022-06-17 17:16:02 +00:00
return $ mapMaybe ( \ d -> d !? " name " ) docs
2010-06-15 03:14:40 +00:00
where
2013-12-26 14:57:33 +00:00
dropDbPrefix = T . tail . T . dropWhile ( /= '.' )
isSpecial db col = T . any ( == '$' ) col && db <.> col /= " local.oplog.$main "
2010-06-15 03:14:40 +00:00
-- * Selection
data Selection = Select { selector :: Selector , coll :: Collection } deriving ( Show , Eq )
-- ^ Selects documents in collection that match selector
type Selector = Document
2011-07-14 22:47:14 +00:00
-- ^ Filter for a query, analogous to the where clause in SQL. @[]@ matches all documents in collection. @[\"x\" =: a, \"y\" =: b]@ is analogous to @where x = a and y = b@ in SQL. See <http://www.mongodb.org/display/DOCS/Querying> for full selector syntax.
2010-06-15 03:14:40 +00:00
whereJS :: Selector -> Javascript -> Selector
-- ^ Add Javascript predicate to selector, in which case a document must match both selector and predicate
whereJS sel js = ( " $where " =: js ) : sel
2010-06-21 15:06:20 +00:00
class Select aQueryOrSelection where
2015-05-15 13:23:40 +00:00
select :: Selector -> Collection -> aQueryOrSelection
-- ^ 'Query' or 'Selection' that selects documents in collection that match selector. The choice of type depends on use, for example, in @'find' (select sel col)@ it is a Query, and in @'delete' (select sel col)@ it is a Selection.
2010-06-21 15:06:20 +00:00
instance Select Selection where
2013-12-26 14:57:33 +00:00
select = Select
2010-06-21 15:06:20 +00:00
instance Select Query where
2013-12-26 14:57:33 +00:00
select = query
2010-06-21 15:06:20 +00:00
2010-06-15 03:14:40 +00:00
-- * Write
2010-06-21 15:06:20 +00:00
data WriteMode =
2013-12-26 14:57:33 +00:00
NoConfirm -- ^ Submit writes without receiving acknowledgments. Fast. Assumes writes succeed even though they may not.
| Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
deriving ( Show , Eq )
2010-06-21 15:06:20 +00:00
2016-07-20 07:46:16 +00:00
write :: Notice -> Action IO ( Maybe Document )
2010-07-27 21:18:53 +00:00
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
2015-05-15 07:58:25 +00:00
write notice = asks mongoWriteMode >>= \ mode -> case mode of
2016-08-06 03:29:20 +00:00
NoConfirm -> do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P . send pipe [ notice ]
2016-07-20 07:46:16 +00:00
return Nothing
2013-12-26 14:57:33 +00:00
Confirm params -> do
let q = query ( ( " getlasterror " =: ( 1 :: Int ) ) : params ) " $cmd "
2016-08-05 05:58:25 +00:00
pipe <- asks mongoPipe
Batch _ _ [ doc ] <- do
r <- queryRequest False q { limit = 1 }
rr <- liftIO $ request pipe [ notice ] r
fulfill rr
2016-07-20 07:46:16 +00:00
return $ Just doc
2010-06-21 15:06:20 +00:00
2010-06-15 03:14:40 +00:00
-- ** Insert
2013-12-27 11:39:22 +00:00
insert :: ( MonadIO m ) => Collection -> Document -> Action m Value
2020-04-01 14:53:37 +00:00
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
2016-11-21 00:18:49 +00:00
insert col doc = do
2017-02-12 19:59:15 +00:00
doc' <- liftIO $ assignId doc
res <- insertBlock [] col ( 0 , [ doc' ] )
2016-11-21 00:18:49 +00:00
case res of
Left failure -> liftIO $ throwIO failure
Right r -> return $ head r
2010-06-15 03:14:40 +00:00
2013-12-27 11:39:22 +00:00
insert_ :: ( MonadIO m ) => Collection -> Document -> Action m ()
2020-04-01 14:53:37 +00:00
-- ^ Same as 'insert' except don't return _id
2010-06-15 03:14:40 +00:00
insert_ col doc = insert col doc >> return ()
2011-07-05 14:37:01 +00:00
insertMany :: ( MonadIO m ) => Collection -> [ Document ] -> Action m [ Value ]
2020-04-01 14:53:37 +00:00
-- ^ Insert documents into collection and return their \"_id\" values,
2016-11-24 22:18:00 +00:00
-- which are created automatically if not supplied.
-- If a document fails to be inserted (eg. due to duplicate key)
2020-04-01 13:11:17 +00:00
-- then remaining docs are aborted, and @LastError@ is set.
2016-11-24 22:18:00 +00:00
-- An exception will be throw if any error occurs.
2011-07-21 20:39:19 +00:00
insertMany = insert' []
2010-06-15 03:14:40 +00:00
2011-07-05 14:37:01 +00:00
insertMany_ :: ( MonadIO m ) => Collection -> [ Document ] -> Action m ()
2020-04-01 14:53:37 +00:00
-- ^ Same as 'insertMany' except don't return _ids
2010-06-15 03:14:40 +00:00
insertMany_ col docs = insertMany col docs >> return ()
2011-07-21 20:39:19 +00:00
insertAll :: ( MonadIO m ) => Collection -> [ Document ] -> Action m [ Value ]
2020-04-01 14:53:37 +00:00
-- ^ Insert documents into collection and return their \"_id\" values,
2016-11-24 22:18:00 +00:00
-- which are created automatically if not supplied. If a document fails
-- to be inserted (eg. due to duplicate key) then remaining docs
-- are still inserted.
2011-07-21 20:39:19 +00:00
insertAll = insert' [ KeepGoing ]
insertAll_ :: ( MonadIO m ) => Collection -> [ Document ] -> Action m ()
2020-04-01 14:53:37 +00:00
-- ^ Same as 'insertAll' except don't return _ids
2011-07-21 20:39:19 +00:00
insertAll_ col docs = insertAll col docs >> return ()
2016-06-19 05:30:32 +00:00
insertCommandDocument :: [ InsertOption ] -> Collection -> [ Document ] -> Document -> Document
insertCommandDocument opts col docs writeConcern =
2016-05-30 01:21:31 +00:00
[ " insert " =: col
, " ordered " =: ( KeepGoing ` notElem ` opts )
, " documents " =: docs
2016-06-19 05:30:32 +00:00
, " writeConcern " =: writeConcern
2016-05-30 01:21:31 +00:00
]
2016-11-17 08:15:01 +00:00
takeRightsUpToLeft :: [ Either a b ] -> [ b ]
2017-05-21 07:32:13 +00:00
takeRightsUpToLeft l = E . rights $ takeWhile E . isRight l
2016-11-17 08:15:01 +00:00
2016-05-30 01:21:31 +00:00
insert' :: ( MonadIO m )
=> [ InsertOption ] -> Collection -> [ Document ] -> Action m [ Value ]
2020-04-01 14:53:37 +00:00
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
2011-07-21 20:39:19 +00:00
insert' opts col docs = do
2016-05-30 01:21:31 +00:00
p <- asks mongoPipe
let sd = P . serverData p
2017-02-12 19:59:15 +00:00
docs' <- liftIO $ mapM assignId docs
2016-06-19 05:30:32 +00:00
mode <- asks mongoWriteMode
let writeConcern = case mode of
2022-10-27 04:09:24 +00:00
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
2016-06-19 05:30:32 +00:00
Confirm params -> params
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
2022-06-17 17:16:02 +00:00
let ordered = KeepGoing ` notElem ` opts
2016-11-17 08:15:01 +00:00
let preChunks = splitAtLimit
2016-05-30 01:21:31 +00:00
( maxBsonObjectSize sd - docSize )
2016-06-18 20:33:24 +00:00
-- size of auxiliary part of insert
2016-05-30 01:21:31 +00:00
-- document should be subtracted from
-- the overall size
( maxWriteBatchSize sd )
2017-02-12 19:59:15 +00:00
docs'
2016-11-17 08:15:01 +00:00
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
2016-11-21 03:18:14 +00:00
let lens = map length chunks
2022-06-17 17:16:02 +00:00
let lSums = 0 : zipWith ( + ) lSums lens
2016-11-21 03:18:14 +00:00
chunkResults <- interruptibleFor ordered ( zip lSums chunks ) $ insertBlock opts col
2016-11-20 21:55:40 +00:00
let lchunks = lefts preChunks
2016-11-24 22:18:00 +00:00
when ( not $ null lchunks ) $ do
2016-11-20 21:55:40 +00:00
liftIO $ throwIO $ head lchunks
2016-11-21 00:18:49 +00:00
let lresults = lefts chunkResults
when ( not $ null lresults ) $ liftIO $ throwIO $ head lresults
return $ concat $ rights chunkResults
2016-05-30 01:21:31 +00:00
insertBlock :: ( MonadIO m )
2016-11-21 03:18:14 +00:00
=> [ InsertOption ] -> Collection -> ( Int , [ Document ] ) -> Action m ( Either Failure [ Value ] )
2016-05-30 01:21:31 +00:00
-- ^ This will fail if the list of documents is bigger than restrictions
2016-11-21 03:18:14 +00:00
insertBlock _ _ ( _ , [] ) = return $ Right []
insertBlock opts col ( prevCount , docs ) = do
2013-12-26 14:57:33 +00:00
db <- thisDatabase
2016-05-23 00:38:07 +00:00
p <- asks mongoPipe
let sd = P . serverData p
2022-06-17 17:16:02 +00:00
if maxWireVersion sd < 2
2016-05-23 00:38:07 +00:00
then do
2017-02-12 19:59:15 +00:00
res <- liftDB $ write ( Insert ( db <.> col ) opts docs )
2016-11-21 00:18:49 +00:00
let errorMessage = do
jRes <- res
em <- lookup " err " jRes
2022-06-17 17:16:02 +00:00
return $ WriteFailure prevCount ( fromMaybe 0 $ lookup " code " jRes ) em
2016-11-21 03:18:14 +00:00
-- In older versions of ^^ the protocol we can't really say which document failed.
-- So we just report the accumulated number of documents in the previous blocks.
2016-11-21 00:18:49 +00:00
case errorMessage of
Just failure -> return $ Left failure
2017-02-12 19:59:15 +00:00
Nothing -> return $ Right $ map ( valueAt " _id " ) docs
2022-10-27 04:09:24 +00:00
else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
2016-06-19 05:30:32 +00:00
mode <- asks mongoWriteMode
let writeConcern = case mode of
2022-10-27 04:09:24 +00:00
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
2016-06-19 05:30:32 +00:00
Confirm params -> params
2017-02-12 19:59:15 +00:00
doc <- runCommand $ insertCommandDocument opts col docs writeConcern
2016-05-23 00:38:07 +00:00
case ( look " writeErrors " doc , look " writeConcernError " doc ) of
2017-02-12 19:59:15 +00:00
( Nothing , Nothing ) -> return $ Right $ map ( valueAt " _id " ) docs
2017-03-05 08:55:31 +00:00
( Just ( Array errs ) , Nothing ) -> do
2022-06-17 17:16:02 +00:00
let writeErrors = map ( anyToWriteError prevCount ) errs
2017-03-05 08:55:31 +00:00
let errorsWithFailureIndex = map ( addFailureIndex prevCount ) writeErrors
return $ Left $ CompoundFailure errorsWithFailureIndex
2016-05-23 00:38:07 +00:00
( Nothing , Just err ) -> do
2017-02-04 22:47:33 +00:00
return $ Left $ WriteFailure
2017-03-05 08:55:31 +00:00
prevCount
2022-06-17 17:16:02 +00:00
( fromMaybe 0 $ lookup " ok " doc )
2016-05-30 01:21:31 +00:00
( show err )
2017-03-05 08:55:31 +00:00
( Just ( Array errs ) , Just writeConcernErr ) -> do
2022-06-17 17:16:02 +00:00
let writeErrors = map ( anyToWriteError prevCount ) errs
2017-03-05 08:55:31 +00:00
let errorsWithFailureIndex = map ( addFailureIndex prevCount ) writeErrors
2022-06-17 17:16:02 +00:00
return $ Left $ CompoundFailure $ WriteFailure
2017-03-05 08:55:31 +00:00
prevCount
2022-06-17 17:16:02 +00:00
( fromMaybe 0 $ lookup " ok " doc )
( show writeConcernErr ) : errorsWithFailureIndex
2017-04-08 19:31:24 +00:00
( Just unknownValue , Nothing ) -> do
return $ Left $ ProtocolFailure prevCount $ " Expected array of errors. Received: " ++ show unknownValue
( Just unknownValue , Just writeConcernErr ) -> do
2022-06-17 17:16:02 +00:00
return $ Left $ CompoundFailure [ ProtocolFailure prevCount $ " Expected array of errors. Received: " ++ show unknownValue
, WriteFailure prevCount ( fromMaybe 0 $ lookup " ok " doc ) $ show writeConcernErr ]
2022-10-27 04:09:24 +00:00
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
Confirm params -> merge params [ " w " =: ( 1 :: Int32 ) ]
doc <- runCommand $ insertCommandDocument opts col docs writeConcern
case ( look " writeErrors " doc , look " writeConcernError " doc ) of
( Nothing , Nothing ) -> return $ Right $ map ( valueAt " _id " ) docs
( Just ( Array errs ) , Nothing ) -> do
let writeErrors = map ( anyToWriteError prevCount ) errs
let errorsWithFailureIndex = map ( addFailureIndex prevCount ) writeErrors
return $ Left $ CompoundFailure errorsWithFailureIndex
( Nothing , Just err ) -> do
return $ Left $ WriteFailure
prevCount
( fromMaybe 0 $ lookup " ok " doc )
( show err )
( Just ( Array errs ) , Just writeConcernErr ) -> do
let writeErrors = map ( anyToWriteError prevCount ) errs
let errorsWithFailureIndex = map ( addFailureIndex prevCount ) writeErrors
return $ Left $ CompoundFailure $ WriteFailure
prevCount
( fromMaybe 0 $ lookup " ok " doc )
( show writeConcernErr ) : errorsWithFailureIndex
( Just unknownValue , Nothing ) -> do
return $ Left $ ProtocolFailure prevCount $ " Expected array of errors. Received: " ++ show unknownValue
( Just unknownValue , Just writeConcernErr ) -> do
return $ Left $ CompoundFailure [ ProtocolFailure prevCount $ " Expected array of errors. Received: " ++ show unknownValue
, WriteFailure prevCount ( fromMaybe 0 $ lookup " ok " doc ) $ show writeConcernErr ]
2016-05-30 01:21:31 +00:00
2016-11-17 08:15:01 +00:00
splitAtLimit :: Int -> Int -> [ Document ] -> [ Either Failure [ Document ] ]
splitAtLimit maxSize maxCount list = chop ( go 0 0 [] ) list
2016-05-30 01:21:31 +00:00
where
2022-06-17 17:16:02 +00:00
go :: Int -> Int -> [ Document ] -> [ Document ] -> ( Either Failure [ Document ] , [ Document ] )
2016-11-17 08:15:01 +00:00
go _ _ res [] = ( Right $ reverse res , [] )
2016-05-30 01:21:31 +00:00
go curSize curCount [] ( x : xs ) |
2022-06-17 17:16:02 +00:00
( curSize + sizeOfDocument x + 2 + curCount ) > maxSize =
2016-11-17 08:15:01 +00:00
( Left $ WriteFailure 0 0 " One document is too big for the message " , xs )
2016-05-30 01:21:31 +00:00
go curSize curCount res ( x : xs ) =
2022-06-17 17:16:02 +00:00
if ( ( curSize + sizeOfDocument x + 2 + curCount ) > maxSize )
2016-05-30 01:21:31 +00:00
-- we have ^ 2 brackets and curCount commas in
-- the document that we need to take into
-- account
2022-06-17 17:16:02 +00:00
|| ( ( curCount + 1 ) > maxCount )
2016-05-30 01:21:31 +00:00
then
2016-11-17 08:15:01 +00:00
( Right $ reverse res , x : xs )
2016-05-30 01:21:31 +00:00
else
2022-06-17 17:16:02 +00:00
go ( curSize + sizeOfDocument x ) ( curCount + 1 ) ( x : res ) xs
2016-05-30 01:21:31 +00:00
chop :: ( [ a ] -> ( b , [ a ] ) ) -> [ a ] -> [ b ]
chop _ [] = []
chop f as = let ( b , as' ) = f as in b : chop f as'
sizeOfDocument :: Document -> Int
sizeOfDocument d = fromIntegral $ LBS . length $ runPut $ putDocument d
2011-07-21 20:39:19 +00:00
2010-06-15 03:14:40 +00:00
assignId :: Document -> IO Document
2020-04-01 14:53:37 +00:00
-- ^ Assign a unique value to _id field if missing
2012-06-10 19:47:14 +00:00
assignId doc = if any ( ( " _id " == ) . label ) doc
2013-12-26 14:57:33 +00:00
then return doc
2022-06-17 17:16:02 +00:00
else ( \ oid -> ( " _id " =: oid ) : doc ) <$> genObjectId
2010-06-15 03:14:40 +00:00
2014-04-12 13:17:47 +00:00
-- ** Update
2010-06-15 03:14:40 +00:00
2016-08-06 22:17:03 +00:00
save :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Collection -> Document -> Action m ()
2020-04-01 14:53:37 +00:00
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or upsert it if its not new (has \"_id\" field)
2010-06-15 03:14:40 +00:00
save col doc = case look " _id " doc of
2013-12-26 14:57:33 +00:00
Nothing -> insert_ col doc
2014-07-24 15:12:15 +00:00
Just i -> upsert ( Select [ " _id " := i ] col ) doc
2010-06-15 03:14:40 +00:00
2016-08-06 22:17:03 +00:00
replace :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Selection -> Document -> Action m ()
2010-06-15 03:14:40 +00:00
-- ^ Replace first document in selection with given document
replace = update []
2016-08-06 22:17:03 +00:00
repsert :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Selection -> Document -> Action m ()
2010-06-15 03:14:40 +00:00
-- ^ Replace first document in selection with given document, or insert document if selection is empty
repsert = update [ Upsert ]
2014-07-24 15:12:15 +00:00
{- # DEPRECATED repsert "use upsert instead" # -}
2016-08-06 22:17:03 +00:00
upsert :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Selection -> Document -> Action m ()
2014-07-24 15:12:15 +00:00
-- ^ Update first document in selection with given document, or insert document if selection is empty
upsert = update [ Upsert ]
2010-06-15 03:14:40 +00:00
type Modifier = Document
2018-10-31 14:03:23 +00:00
-- ^ Update operations on fields in a document. See <https://docs.mongodb.com/manual/reference/operator/update/>
2010-06-15 03:14:40 +00:00
2016-08-06 22:17:03 +00:00
modify :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Selection -> Modifier -> Action m ()
2010-06-15 03:14:40 +00:00
-- ^ Update all documents in selection using given modifier
modify = update [ MultiUpdate ]
2016-08-06 22:17:03 +00:00
update :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> [ UpdateOption ] -> Selection -> Document -> Action m ()
2010-06-15 03:14:40 +00:00
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
2016-08-06 22:17:03 +00:00
update opts ( Select sel col ) up = do
2022-10-27 04:09:24 +00:00
pipe <- asks mongoPipe
2017-01-22 01:16:59 +00:00
db <- thisDatabase
2022-10-27 04:09:24 +00:00
let sd = P . serverData pipe
if maxWireVersion sd < 17
then do
ctx <- ask
liftIO $ runReaderT ( void $ write ( Update ( db <.> col ) opts sel up ) ) ctx
else do
liftIOE ConnectionFailure $
P . sendOpMsg
pipe
[ Nc ( Update ( db <.> col ) opts sel up ) ]
( Just P . MoreToCome )
[ " writeConcern " =: [ " w " =: ( 0 :: Int32 ) ] ]
2016-06-08 07:09:33 +00:00
updateCommandDocument :: Collection -> Bool -> [ Document ] -> Document -> Document
updateCommandDocument col ordered updates writeConcern =
[ " update " =: col
, " ordered " =: ordered
, " updates " =: updates
, " writeConcern " =: writeConcern
]
2016-06-20 00:03:58 +00:00
{- | Bulk update operation. If one update fails it will not update the remaining
2020-04-01 13:11:17 +00:00
documents . Current returned value is only a place holder . With mongodb server
before 2.6 it will send update requests one by one . In order to receive
error messages in versions under 2.6 you need to user confirmed writes .
Otherwise even if the errors had place the list of errors will be empty and
the result will be success . After 2.6 it will use bulk update feature in
mongodb .
2016-06-08 07:09:33 +00:00
- }
2016-08-06 22:17:03 +00:00
updateMany :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Collection
-> [ ( Selector , Document , [ UpdateOption ] ) ]
2017-01-07 20:36:07 +00:00
-> Action m WriteResult
2016-06-08 07:09:33 +00:00
updateMany = update' True
2016-06-20 00:03:58 +00:00
{- | Bulk update operation. If one update fails it will proceed with the
2020-04-01 13:11:17 +00:00
remaining documents . With mongodb server before 2.6 it will send update
requests one by one . In order to receive error messages in versions under
2.6 you need to use confirmed writes . Otherwise even if the errors had
place the list of errors will be empty and the result will be success .
After 2.6 it will use bulk update feature in mongodb .
2016-06-08 07:09:33 +00:00
- }
2016-08-06 22:17:03 +00:00
updateAll :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Collection
-> [ ( Selector , Document , [ UpdateOption ] ) ]
2017-01-07 20:36:07 +00:00
-> Action m WriteResult
2016-06-08 07:09:33 +00:00
updateAll = update' False
2016-08-06 22:17:03 +00:00
update' :: ( MonadIO m )
2016-06-08 07:09:33 +00:00
=> Bool
-> Collection
-> [ ( Selector , Document , [ UpdateOption ] ) ]
2017-01-07 20:36:07 +00:00
-> Action m WriteResult
2016-06-08 07:09:33 +00:00
update' ordered col updateDocs = do
p <- asks mongoPipe
let sd = P . serverData p
let updates = map ( \ ( s , d , os ) -> [ " q " =: s
, " u " =: d
, " upsert " =: ( Upsert ` elem ` os )
, " multi " =: ( MultiUpdate ` elem ` os ) ] )
updateDocs
mode <- asks mongoWriteMode
2016-10-24 04:52:10 +00:00
ctx <- ask
2016-11-07 02:41:33 +00:00
liftIO $ do
let writeConcern = case mode of
2022-10-27 04:09:24 +00:00
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
2016-11-07 02:41:33 +00:00
Confirm params -> params
2016-11-19 18:47:47 +00:00
let docSize = sizeOfDocument $ updateCommandDocument
col
ordered
[]
writeConcern
2016-11-17 08:15:01 +00:00
let preChunks = splitAtLimit
2016-11-07 02:41:33 +00:00
( maxBsonObjectSize sd - docSize )
-- size of auxiliary part of update
-- document should be subtracted from
-- the overall size
( maxWriteBatchSize sd )
updates
2016-11-17 08:15:01 +00:00
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
2016-11-07 02:41:33 +00:00
let lens = map length chunks
2022-06-17 17:16:02 +00:00
let lSums = 0 : zipWith ( + ) lSums lens
2016-11-07 02:41:33 +00:00
blocks <- interruptibleFor ordered ( zip lSums chunks ) $ \ b -> do
2022-06-17 17:16:02 +00:00
runReaderT ( updateBlock ordered col b ) ctx
2016-11-07 02:41:33 +00:00
` catch ` \ ( e :: Failure ) -> do
2017-01-07 20:36:07 +00:00
return $ WriteResult True 0 Nothing 0 [] [ e ] []
2022-06-17 17:16:02 +00:00
let failedTotal = any failed blocks
2016-11-07 02:41:33 +00:00
let updatedTotal = sum $ map nMatched blocks
let modifiedTotal =
2022-06-17 17:16:02 +00:00
if all ( isNothing . nModified ) blocks
2016-11-07 02:41:33 +00:00
then Nothing
2022-06-17 17:16:02 +00:00
else Just $ sum $ mapMaybe nModified blocks
let totalWriteErrors = concatMap writeErrors blocks
let totalWriteConcernErrors = concatMap writeConcernErrors blocks
2016-11-07 02:41:33 +00:00
2022-06-17 17:16:02 +00:00
let upsertedTotal = concatMap upserted blocks
2017-01-07 20:36:07 +00:00
return $ WriteResult
2016-11-19 18:47:47 +00:00
failedTotal
updatedTotal
modifiedTotal
2017-01-07 20:36:07 +00:00
0 -- nRemoved
2016-11-19 18:47:47 +00:00
upsertedTotal
totalWriteErrors
totalWriteConcernErrors
2016-11-07 02:41:33 +00:00
2017-01-07 20:36:07 +00:00
` catch ` \ ( e :: Failure ) -> return $ WriteResult True 0 Nothing 0 [] [ e ] []
2016-06-08 07:09:33 +00:00
2016-08-06 22:17:03 +00:00
updateBlock :: ( MonadIO m )
2017-01-07 20:36:07 +00:00
=> Bool -> Collection -> ( Int , [ Document ] ) -> Action m WriteResult
2016-07-20 07:46:16 +00:00
updateBlock ordered col ( prevCount , docs ) = do
2016-06-08 07:09:33 +00:00
p <- asks mongoPipe
let sd = P . serverData p
2022-06-17 17:16:02 +00:00
if maxWireVersion sd < 2
2017-01-22 01:27:24 +00:00
then liftIO $ ioError $ userError " updateMany doesn't support mongodb older than 2.6 "
2022-10-27 04:09:24 +00:00
else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
2016-06-08 07:09:33 +00:00
mode <- asks mongoWriteMode
let writeConcern = case mode of
2022-10-27 04:09:24 +00:00
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
2016-06-08 07:09:33 +00:00
Confirm params -> params
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
2017-05-02 02:57:43 +00:00
let n = fromMaybe 0 $ doc !? " n "
let writeErrorsResults =
case look " writeErrors " doc of
2017-05-29 19:58:39 +00:00
Nothing -> WriteResult False 0 ( Just 0 ) 0 [] [] []
Just ( Array err ) -> WriteResult True 0 ( Just 0 ) 0 [] ( map ( anyToWriteError prevCount ) err ) []
2017-05-02 02:57:43 +00:00
Just unknownErr -> WriteResult
True
0
2017-05-29 19:58:39 +00:00
( Just 0 )
0
2017-05-02 02:57:43 +00:00
[]
[ ProtocolFailure
prevCount
$ " Expected array of error docs, but received: "
2022-06-17 17:16:02 +00:00
++ show unknownErr ]
2017-05-02 02:57:43 +00:00
[]
let writeConcernResults =
case look " writeConcernError " doc of
2017-05-29 19:58:39 +00:00
Nothing -> WriteResult False 0 ( Just 0 ) 0 [] [] []
2017-05-02 02:57:43 +00:00
Just ( Doc err ) -> WriteResult
True
0
2017-05-29 19:58:39 +00:00
( Just 0 )
0
2017-05-02 02:57:43 +00:00
[]
[]
[ WriteConcernFailure
( fromMaybe ( - 1 ) $ err !? " code " )
( fromMaybe " " $ err !? " errmsg " )
]
Just unknownErr -> WriteResult
True
0
2017-05-29 19:58:39 +00:00
( Just 0 )
0
2017-05-02 02:57:43 +00:00
[]
[]
[ ProtocolFailure
prevCount
$ " Expected doc in writeConcernError, but received: "
2022-06-17 17:16:02 +00:00
++ show unknownErr ]
2017-05-02 02:57:43 +00:00
2022-06-17 17:16:02 +00:00
let upsertedList = maybe [] ( map docToUpserted ) ( doc !? " upserted " )
2017-05-29 19:58:39 +00:00
let successResults = WriteResult False n ( doc !? " nModified " ) 0 upsertedList [] []
return $ foldl1' mergeWriteResults [ writeErrorsResults , writeConcernResults , successResults ]
2022-10-27 04:09:24 +00:00
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
Confirm params -> merge params [ " w " =: ( 1 :: Int32 ) ]
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? " n "
let writeErrorsResults =
case look " writeErrors " doc of
Nothing -> WriteResult False 0 ( Just 0 ) 0 [] [] []
Just ( Array err ) -> WriteResult True 0 ( Just 0 ) 0 [] ( map ( anyToWriteError prevCount ) err ) []
Just unknownErr -> WriteResult
True
0
( Just 0 )
0
[]
[ ProtocolFailure
prevCount
$ " Expected array of error docs, but received: "
++ show unknownErr ]
[]
let writeConcernResults =
case look " writeConcernError " doc of
Nothing -> WriteResult False 0 ( Just 0 ) 0 [] [] []
Just ( Doc err ) -> WriteResult
True
0
( Just 0 )
0
[]
[]
[ WriteConcernFailure
( fromMaybe ( - 1 ) $ err !? " code " )
( fromMaybe " " $ err !? " errmsg " )
]
Just unknownErr -> WriteResult
True
0
( Just 0 )
0
[]
[]
[ ProtocolFailure
prevCount
$ " Expected doc in writeConcernError, but received: "
++ show unknownErr ]
2016-07-20 07:46:16 +00:00
2022-10-27 04:09:24 +00:00
let upsertedList = maybe [] ( map docToUpserted ) ( doc !? " upserted " )
let successResults = WriteResult False n ( doc !? " nModified " ) 0 upsertedList [] []
return $ foldl1' mergeWriteResults [ writeErrorsResults , writeConcernResults , successResults ]
2016-11-01 23:34:47 +00:00
2016-11-21 00:18:49 +00:00
interruptibleFor :: ( Monad m , Result b ) => Bool -> [ a ] -> ( a -> m b ) -> m [ b ]
2016-11-01 23:34:47 +00:00
interruptibleFor ordered = go []
where
go ! res [] _ = return $ reverse res
go ! res ( x : xs ) f = do
y <- f x
if isFailed y && ordered
then return $ reverse ( y : res )
else go ( y : res ) xs f
2017-01-07 20:36:07 +00:00
mergeWriteResults :: WriteResult -> WriteResult -> WriteResult
mergeWriteResults
( WriteResult failed1 nMatched1 nModified1 nDeleted1 upserted1 writeErrors1 writeConcernErrors1 )
( WriteResult failed2 nMatched2 nModified2 nDeleted2 upserted2 writeErrors2 writeConcernErrors2 ) =
2022-06-17 17:16:02 +00:00
WriteResult
2016-09-14 05:32:13 +00:00
( failed1 || failed2 )
( nMatched1 + nMatched2 )
2022-06-17 17:16:02 +00:00
( liftM2 ( + ) nModified1 nModified2 )
2017-01-07 20:36:07 +00:00
( nDeleted1 + nDeleted2 )
2016-11-26 06:33:58 +00:00
-- This function is used in foldl1' function. The first argument is the accumulator.
-- The list in the accumulator is usually longer than the subsequent value which goes in the second argument.
-- So, changing the order of list concatenation allows us to keep linear complexity of the
-- whole list accumulation process.
( upserted2 ++ upserted1 )
( writeErrors2 ++ writeErrors1 )
( writeConcernErrors2 ++ writeConcernErrors1 )
2016-09-14 05:32:13 +00:00
2016-09-09 04:23:51 +00:00
2016-07-20 07:46:16 +00:00
docToUpserted :: Document -> Upserted
docToUpserted doc = Upserted ind uid
where
ind = at " index " doc
uid = at " _id " doc
2016-11-06 22:39:04 +00:00
docToWriteError :: Document -> Failure
docToWriteError doc = WriteFailure ind code msg
2016-08-23 05:50:00 +00:00
where
ind = at " index " doc
code = at " code " doc
msg = at " errmsg " doc
2010-06-15 03:14:40 +00:00
-- ** Delete
2016-08-06 22:17:03 +00:00
delete :: ( MonadIO m )
2016-06-18 20:33:24 +00:00
=> Selection -> Action m ()
2010-06-15 03:14:40 +00:00
-- ^ Delete all documents in selection
2022-10-27 04:09:24 +00:00
delete s = do
pipe <- asks mongoPipe
let sd = P . serverData pipe
if maxWireVersion sd < 17
then deleteHelper [] s
else deleteMany ( coll s ) [ ( [] , [] ) ] >> return ()
2010-06-15 03:14:40 +00:00
2016-08-06 22:17:03 +00:00
deleteOne :: ( MonadIO m )
2016-06-18 20:33:24 +00:00
=> Selection -> Action m ()
2010-06-15 03:14:40 +00:00
-- ^ Delete first document in selection
2022-10-27 04:09:24 +00:00
deleteOne sel @ ( ( Select sel' col ) ) = do
pipe <- asks mongoPipe
let sd = P . serverData pipe
if maxWireVersion sd < 17
then deleteHelper [ SingleRemove ] sel
else do
-- Starting with v6 confirming writes via getLastError as it is
-- performed in the deleteHelper call via its call to write is
-- deprecated. To confirm writes now an appropriate writeConcern has to be
-- set. These confirmations were discarded in deleteHelper anyway so no
-- need to dispatch on the writeConcern as it is currently done in deleteHelper
-- via write for older versions
db <- thisDatabase
liftIOE ConnectionFailure $
P . sendOpMsg
pipe
[ Nc ( Delete ( db <.> col ) [] sel' ) ]
( Just P . MoreToCome )
[ " writeConcern " =: [ " w " =: ( 0 :: Int32 ) ] ]
2010-06-21 15:06:20 +00:00
2016-08-06 22:17:03 +00:00
deleteHelper :: ( MonadIO m )
2016-06-18 20:33:24 +00:00
=> [ DeleteOption ] -> Selection -> Action m ()
2016-08-06 22:17:03 +00:00
deleteHelper opts ( Select sel col ) = do
2017-01-22 01:21:41 +00:00
ctx <- ask
2022-10-27 04:09:24 +00:00
db <- thisDatabase
2017-01-22 01:21:41 +00:00
liftIO $ runReaderT ( void $ write ( Delete ( db <.> col ) opts sel ) ) ctx
2016-06-18 20:33:24 +00:00
{- | Bulk delete operation. If one delete fails it will not delete the remaining
2020-04-01 13:11:17 +00:00
documents . Current returned value is only a place holder . With mongodb server
before 2.6 it will send delete requests one by one . After 2.6 it will use
bulk delete feature in mongodb .
2016-06-18 20:33:24 +00:00
- }
2016-08-06 22:17:03 +00:00
deleteMany :: ( MonadIO m )
2016-06-18 20:33:24 +00:00
=> Collection
-> [ ( Selector , [ DeleteOption ] ) ]
2017-01-07 20:36:07 +00:00
-> Action m WriteResult
2016-06-18 20:33:24 +00:00
deleteMany = delete' True
{- | Bulk delete operation. If one delete fails it will proceed with the
2020-04-01 13:11:17 +00:00
remaining documents . Current returned value is only a place holder . With
mongodb server before 2.6 it will send delete requests one by one . After 2.6
it will use bulk delete feature in mongodb .
2016-06-18 20:33:24 +00:00
- }
2016-08-06 22:17:03 +00:00
deleteAll :: ( MonadIO m )
2016-06-18 20:33:24 +00:00
=> Collection
-> [ ( Selector , [ DeleteOption ] ) ]
2017-01-07 20:36:07 +00:00
-> Action m WriteResult
2016-06-18 20:33:24 +00:00
deleteAll = delete' False
deleteCommandDocument :: Collection -> Bool -> [ Document ] -> Document -> Document
deleteCommandDocument col ordered deletes writeConcern =
[ " delete " =: col
, " ordered " =: ordered
, " deletes " =: deletes
, " writeConcern " =: writeConcern
]
2016-08-06 22:17:03 +00:00
delete' :: ( MonadIO m )
2016-06-18 20:33:24 +00:00
=> Bool
-> Collection
-> [ ( Selector , [ DeleteOption ] ) ]
2017-01-07 20:36:07 +00:00
-> Action m WriteResult
2016-06-18 20:33:24 +00:00
delete' ordered col deleteDocs = do
p <- asks mongoPipe
let sd = P . serverData p
let deletes = map ( \ ( s , os ) -> [ " q " =: s
, " limit " =: if SingleRemove ` elem ` os
then ( 1 :: Int ) -- Remove only one matching
else ( 0 :: Int ) -- Remove all matching
] )
deleteDocs
mode <- asks mongoWriteMode
let writeConcern = case mode of
2022-10-27 04:09:24 +00:00
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
2016-06-18 20:33:24 +00:00
Confirm params -> params
let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern
2020-04-03 10:47:39 +00:00
let chunks = splitAtLimit
2016-06-18 20:33:24 +00:00
( maxBsonObjectSize sd - docSize )
-- size of auxiliary part of delete
-- document should be subtracted from
-- the overall size
( maxWriteBatchSize sd )
deletes
2017-01-21 23:03:14 +00:00
ctx <- ask
2020-04-03 10:47:39 +00:00
let lens = map ( either ( const 1 ) length ) chunks
2022-06-17 17:16:02 +00:00
let lSums = 0 : zipWith ( + ) lSums lens
2020-04-03 10:47:39 +00:00
let failureResult e = return $ WriteResult True 0 Nothing 0 [] [ e ] []
let doChunk b = runReaderT ( deleteBlock ordered col b ) ctx ` catch ` failureResult
blockResult <- liftIO $ interruptibleFor ordered ( zip lSums chunks ) $ \ ( n , c ) ->
case c of
Left e -> failureResult e
Right b -> doChunk ( n , b )
2017-01-07 20:36:07 +00:00
return $ foldl1' mergeWriteResults blockResult
2016-06-18 20:33:24 +00:00
2017-01-21 23:25:28 +00:00
addFailureIndex :: Int -> Failure -> Failure
2017-01-22 06:25:27 +00:00
addFailureIndex i ( WriteFailure ind code s ) = WriteFailure ( ind + i ) code s
addFailureIndex _ f = f
2016-06-18 20:33:24 +00:00
2016-08-06 22:17:03 +00:00
deleteBlock :: ( MonadIO m )
2017-01-21 23:03:14 +00:00
=> Bool -> Collection -> ( Int , [ Document ] ) -> Action m WriteResult
deleteBlock ordered col ( prevCount , docs ) = do
2016-06-18 20:33:24 +00:00
p <- asks mongoPipe
let sd = P . serverData p
2022-06-17 17:16:02 +00:00
if maxWireVersion sd < 2
2017-01-22 01:27:24 +00:00
then liftIO $ ioError $ userError " deleteMany doesn't support mongodb older than 2.6 "
2022-10-27 04:09:24 +00:00
else if maxWireVersion sd == 2 && maxWireVersion sd < 17 then do
2016-06-18 20:33:24 +00:00
mode <- asks mongoWriteMode
let writeConcern = case mode of
2022-10-27 04:09:24 +00:00
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
2016-06-18 20:33:24 +00:00
Confirm params -> params
doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern
2017-01-15 02:39:43 +00:00
let n = fromMaybe 0 $ doc !? " n "
2017-05-28 19:58:29 +00:00
2022-10-27 04:09:24 +00:00
let successResults = WriteResult False 0 Nothing n [] [] []
let writeErrorsResults =
case look " writeErrors " doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just ( Array err ) -> WriteResult True 0 Nothing 0 [] ( map ( anyToWriteError prevCount ) err ) []
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[ ProtocolFailure
prevCount
$ " Expected array of error docs, but received: "
++ show unknownErr ]
[]
let writeConcernResults =
case look " writeConcernError " doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just ( Doc err ) -> WriteResult
True
0
Nothing
0
[]
[]
[ WriteConcernFailure
( fromMaybe ( - 1 ) $ err !? " code " )
( fromMaybe " " $ err !? " errmsg " )
]
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[]
[ ProtocolFailure
prevCount
$ " Expected doc in writeConcernError, but received: "
++ show unknownErr ]
return $ foldl1' mergeWriteResults [ successResults , writeErrorsResults , writeConcernResults ]
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> [ " w " =: ( 0 :: Int32 ) ]
Confirm params -> merge params [ " w " =: ( 1 :: Int32 ) ]
doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern
let n = fromMaybe 0 $ doc !? " n "
2017-05-28 19:58:29 +00:00
let successResults = WriteResult False 0 Nothing n [] [] []
2017-04-10 05:23:34 +00:00
let writeErrorsResults =
case look " writeErrors " doc of
2017-05-28 19:58:29 +00:00
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just ( Array err ) -> WriteResult True 0 Nothing 0 [] ( map ( anyToWriteError prevCount ) err ) []
2017-04-10 05:23:34 +00:00
Just unknownErr -> WriteResult
True
0
Nothing
2017-05-28 19:58:29 +00:00
0
2017-04-10 05:23:34 +00:00
[]
[ ProtocolFailure
prevCount
$ " Expected array of error docs, but received: "
2022-06-17 17:16:02 +00:00
++ show unknownErr ]
2017-04-10 05:23:34 +00:00
[]
let writeConcernResults =
case look " writeConcernError " doc of
2017-05-28 19:58:29 +00:00
Nothing -> WriteResult False 0 Nothing 0 [] [] []
2017-04-10 05:23:34 +00:00
Just ( Doc err ) -> WriteResult
True
0
Nothing
2017-05-28 19:58:29 +00:00
0
2017-04-10 05:23:34 +00:00
[]
[]
[ WriteConcernFailure
( fromMaybe ( - 1 ) $ err !? " code " )
( fromMaybe " " $ err !? " errmsg " )
]
Just unknownErr -> WriteResult
True
0
Nothing
2017-05-28 19:58:29 +00:00
0
2017-04-10 05:23:34 +00:00
[]
[]
[ ProtocolFailure
prevCount
$ " Expected doc in writeConcernError, but received: "
2022-06-17 17:16:02 +00:00
++ show unknownErr ]
2017-05-28 19:58:29 +00:00
return $ foldl1' mergeWriteResults [ successResults , writeErrorsResults , writeConcernResults ]
2010-06-15 03:14:40 +00:00
2017-01-23 02:57:07 +00:00
anyToWriteError :: Int -> Value -> Failure
2017-05-02 02:57:43 +00:00
anyToWriteError _ ( Doc d ) = docToWriteError d
anyToWriteError ind _ = ProtocolFailure ind " Unknown bson value "
2010-06-15 03:14:40 +00:00
-- * Read
2011-07-09 02:13:47 +00:00
data ReadMode =
2013-12-26 14:57:33 +00:00
Fresh -- ^ read from master only
| StaleOk -- ^ read from slave ok
deriving ( Show , Eq )
2010-07-27 21:18:53 +00:00
2011-07-09 02:13:47 +00:00
readModeOption :: ReadMode -> [ QueryOption ]
readModeOption Fresh = []
readModeOption StaleOk = [ SlaveOK ]
2010-07-27 21:18:53 +00:00
2010-06-15 03:14:40 +00:00
-- ** Query
2010-06-21 15:06:20 +00:00
-- | Use 'select' to create a basic query with defaults, then modify if desired. For example, @(select sel col) {limit = 10}@
2010-06-15 03:14:40 +00:00
data Query = Query {
2020-04-01 13:11:17 +00:00
options :: [ QueryOption ] , -- ^ Default = @[]@
2013-12-26 14:57:33 +00:00
selection :: Selection ,
2020-04-01 13:11:17 +00:00
project :: Projector , -- ^ @[]@ = all fields. Default = @[]@
2013-12-26 14:57:33 +00:00
skip :: Word32 , -- ^ Number of initial matching documents to skip. Default = 0
limit :: Limit , -- ^ Maximum number of documents to return, 0 = no limit. Default = 0
2020-04-01 13:11:17 +00:00
sort :: Order , -- ^ Sort results by this order, @[]@ = no sort. Default = @[]@
snapshot :: Bool , -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. Default = @False@
2013-12-26 14:57:33 +00:00
batchSize :: BatchSize , -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Default = 0
2020-04-01 13:11:17 +00:00
hint :: Order -- ^ Force MongoDB to use this index, @[]@ = no hint. Default = @[]@
2013-12-26 14:57:33 +00:00
} deriving ( Show , Eq )
2010-06-15 03:14:40 +00:00
type Projector = Document
2011-07-14 22:47:14 +00:00
-- ^ Fields to return, analogous to the select clause in SQL. @[]@ means return whole document (analogous to * in SQL). @[\"x\" =: 1, \"y\" =: 1]@ means return only @x@ and @y@ fields of each document. @[\"x\" =: 0]@ means return all fields except @x@.
2010-06-15 03:14:40 +00:00
type Limit = Word32
-- ^ Maximum number of documents to return, i.e. cursor will close after iterating over this number of documents. 0 means no limit.
type Order = Document
2011-07-14 22:47:14 +00:00
-- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[\"x\" =: 1, \"y\" =: -1]@ means sort by @x@ ascending then @y@ descending
2010-06-15 03:14:40 +00:00
type BatchSize = Word32
-- ^ The number of document to return in each batch response from the server. 0 means use Mongo default.
2022-10-27 04:09:24 +00:00
-- noticeCommands and adminCommands are needed to identify whether
-- queryRequestOpMsg is called via runCommand or not. If not it will
-- behave like being called by a "find"-like command and add additional fields
-- specific to the find command into the selector, such as "filter", "projection" etc.
noticeCommands :: [ Text ]
noticeCommands = [ " aggregate "
, " count "
, " delete "
, " findAndModify "
, " insert "
, " listCollections "
, " update "
]
adminCommands :: [ Text ]
adminCommands = [ " buildinfo "
, " clone "
, " collstats "
, " copydb "
, " copydbgetnonce "
, " create "
, " dbstats "
, " deleteIndexes "
, " drop "
, " dropDatabase "
, " renameCollection "
, " repairDatabase "
, " serverStatus "
, " validate "
]
2010-06-15 03:14:40 +00:00
query :: Selector -> Collection -> Query
-- ^ Selects documents in collection that match selector. It uses no query options, projects all fields, does not skip any documents, does not limit result size, uses default batch size, does not sort, does not hint, and does not snapshot.
query sel col = Query [] ( Select sel col ) [] 0 0 [] False 0 []
2017-05-09 06:12:26 +00:00
find :: MonadIO m => Query -> Action m Cursor
2010-06-15 03:14:40 +00:00
-- ^ Fetch documents satisfying query
2010-06-21 15:06:20 +00:00
find q @ Query { selection , batchSize } = do
2016-08-05 05:58:25 +00:00
pipe <- asks mongoPipe
2022-10-27 04:09:24 +00:00
db <- thisDatabase
let sd = P . serverData pipe
if maxWireVersion sd < 17
then do
qr <- queryRequest False q
dBatch <- liftIO $ request pipe [] qr
newCursor db ( coll selection ) batchSize dBatch
else do
qr <- queryRequestOpMsg False q
let newQr =
case fst qr of
2023-04-17 07:46:57 +00:00
Req P . Query { .. } ->
let coll = last $ T . splitOn " . " qFullCollection
in ( Req $ P . Query { qSelector = merge qSelector [ " find " =: coll ] , .. } , snd qr )
2022-10-27 04:09:24 +00:00
-- queryRequestOpMsg only returns Cmd types constructed via Req
_ -> error " impossible "
dBatch <- liftIO $ requestOpMsg pipe newQr []
newCursor db ( coll selection ) batchSize dBatch
2010-06-15 03:14:40 +00:00
2023-01-12 05:25:48 +00:00
findCommand :: ( MonadIO m ) => Query -> Action m Cursor
2020-07-29 02:36:30 +00:00
-- ^ Fetch documents satisfying query using the command "find"
2022-10-27 04:09:24 +00:00
findCommand q @ Query { .. } = do
pipe <- asks mongoPipe
let sd = P . serverData pipe
if maxWireVersion sd < 17
then do
let aColl = coll selection
response <- runCommand $
[ " find " =: aColl
, " filter " =: selector selection
, " sort " =: sort
, " projection " =: project
, " hint " =: hint
, " skip " =: toInt32 skip
]
++ mconcat -- optional fields. They should not be present if set to 0 and mongo will use defaults
[ " batchSize " =? toMaybe ( /= 0 ) toInt32 batchSize
, " limit " =? toMaybe ( /= 0 ) toInt32 limit
]
getCursorFromResponse aColl response
>>= either ( liftIO . throwIO . QueryFailure ( at " code " response ) ) return
else find q
2020-07-29 02:36:30 +00:00
where
toInt32 :: Integral a => a -> Int32
toInt32 = fromIntegral
toMaybe :: ( a -> Bool ) -> ( a -> b ) -> a -> Maybe b
toMaybe predicate f a
| predicate a = Just ( f a )
| otherwise = Nothing
2023-04-17 07:46:57 +00:00
isHandshake :: Document -> Bool
isHandshake = ( == [ " isMaster " =: ( 1 :: Int32 ) ] )
2011-07-05 14:37:01 +00:00
findOne :: ( MonadIO m ) => Query -> Action m ( Maybe Document )
2020-04-01 13:11:17 +00:00
-- ^ Fetch first document satisfying query or @Nothing@ if none satisfy it
2011-07-05 14:37:01 +00:00
findOne q = do
2016-08-05 05:58:25 +00:00
pipe <- asks mongoPipe
2022-10-27 04:09:24 +00:00
let legacyQuery = do
qr <- queryRequest False q { limit = 1 }
rq <- liftIO $ request pipe [] qr
Batch _ _ docs <- liftDB $ fulfill rq
return ( listToMaybe docs )
2023-04-17 07:46:57 +00:00
if isHandshake ( selector $ selection q )
2022-10-27 04:09:24 +00:00
then legacyQuery
else do
let sd = P . serverData pipe
if ( maxWireVersion sd < 17 )
then legacyQuery
else do
qr <- queryRequestOpMsg False q { limit = 1 }
let newQr =
case fst qr of
2023-04-17 07:46:57 +00:00
Req P . Query { .. } ->
let coll = last $ T . splitOn " . " qFullCollection
2022-10-27 04:09:24 +00:00
-- We have to understand whether findOne is called as
-- command directly. This is necessary since findOne is used via
-- runCommand as a vehicle to execute any type of commands and notices.
2023-04-17 07:46:57 +00:00
labels = catMaybes $ map ( \ f -> look f qSelector ) ( noticeCommands ++ adminCommands ) :: [ Value ]
2022-10-27 04:09:24 +00:00
in if null labels
2023-04-17 07:46:57 +00:00
then ( Req P . Query { qSelector = merge qSelector [ " find " =: coll ] , .. } , snd qr )
2022-10-27 04:09:24 +00:00
else qr
_ -> error " impossible "
rq <- liftIO $ requestOpMsg pipe newQr []
Batch _ _ docs <- liftDB $ fulfill rq
return ( listToMaybe docs )
2010-06-21 15:06:20 +00:00
2011-07-09 02:13:47 +00:00
fetch :: ( MonadIO m ) => Query -> Action m Document
2011-07-09 02:33:52 +00:00
-- ^ Same as 'findOne' except throw 'DocNotFound' if none match
2013-12-26 15:23:02 +00:00
fetch q = findOne q >>= maybe ( liftIO $ throwIO $ DocNotFound $ selection q ) return
2011-07-09 02:13:47 +00:00
2020-04-01 13:11:17 +00:00
-- | Options for @findAndModify@
data FindAndModifyOpts
= FamRemove Bool -- ^ remove the selected document when the boolean is @True@
| FamUpdate
2020-04-01 14:01:59 +00:00
{ famUpdate :: Document -- ^ the update instructions, or a replacement document
2020-04-01 13:11:17 +00:00
, famNew :: Bool -- ^ return the document with the modifications made on the update
, famUpsert :: Bool -- ^ create a new document if no documents match the query
}
deriving Show
-- | Default options used by 'findAndModify'.
2014-07-20 02:32:33 +00:00
defFamUpdateOpts :: Document -> FindAndModifyOpts
defFamUpdateOpts ups = FamUpdate
{ famNew = True
, famUpsert = False
, famUpdate = ups
}
2014-07-08 19:36:23 +00:00
2020-04-01 13:11:17 +00:00
-- | Run the @findAndModify@ command as an update without an upsert and new set to @True@.
-- Return a single updated document (@new@ option is set to @True@).
2014-07-08 19:36:23 +00:00
--
2020-04-01 13:11:17 +00:00
-- See 'findAndModifyOpts' for more options.
2023-01-12 05:25:48 +00:00
findAndModify :: ( MonadIO m )
2013-06-06 15:00:00 +00:00
=> Query
-> Document -- ^ updates
-> Action m ( Either String Document )
2014-07-08 19:36:23 +00:00
findAndModify q ups = do
2014-07-20 02:32:33 +00:00
eres <- findAndModifyOpts q ( defFamUpdateOpts ups )
2014-07-08 19:36:23 +00:00
return $ case eres of
Left l -> Left l
Right r -> case r of
2014-07-24 15:12:44 +00:00
-- only possible when upsert is True and new is False
2014-07-08 19:36:23 +00:00
Nothing -> Left " findAndModify: impossible null result "
Just doc -> Right doc
2020-04-01 13:11:17 +00:00
-- | Run the @findAndModify@ command
-- (allows more options than 'findAndModify')
2023-01-12 05:25:48 +00:00
findAndModifyOpts :: ( MonadIO m )
2014-07-08 19:36:23 +00:00
=> Query
2020-04-01 13:11:17 +00:00
-> FindAndModifyOpts
2014-07-08 19:36:23 +00:00
-> Action m ( Either String ( Maybe Document ) )
2022-06-17 17:16:02 +00:00
findAndModifyOpts Query {
2013-06-06 15:00:00 +00:00
selection = Select sel collection
, project = project
, sort = sort
2022-06-17 17:16:02 +00:00
} famOpts = do
2013-08-29 18:57:07 +00:00
result <- runCommand
2014-07-08 19:36:23 +00:00
( [ " findAndModify " := String collection
2013-08-29 18:57:07 +00:00
, " query " := Doc sel
, " fields " := Doc project
, " sort " := Doc sort
2014-07-08 19:36:23 +00:00
] ++
case famOpts of
FamRemove shouldRemove -> [ " remove " := Bool shouldRemove ]
FamUpdate { .. } ->
[ " update " := Doc famUpdate
, " new " := Bool famNew -- return updated document, not original document
, " upsert " := Bool famUpsert -- insert if nothing is found
] )
2014-07-24 15:12:44 +00:00
return $ case lookupErr result of
Just e -> leftErr e
Nothing -> case lookup " value " result of
2019-10-04 16:10:24 +00:00
Nothing -> leftErr " no document found "
Just mdoc -> case mdoc of
2014-07-24 15:12:44 +00:00
Just doc @ ( _ : _ ) -> Right ( Just doc )
Just [] -> case famOpts of
FamUpdate { famUpsert = True , famNew = False } -> Right Nothing
_ -> leftErr $ show result
_ -> leftErr $ show result
2013-08-29 18:57:07 +00:00
where
2014-07-24 15:12:44 +00:00
leftErr err = Left $ " findAndModify " ` mappend ` show collection
` mappend ` " \ n from query: " ` mappend ` show sel
` mappend ` " \ n error: " ` mappend ` err
2013-08-29 18:57:07 +00:00
-- return Nothing means ok, Just is the error message
2019-10-04 16:10:24 +00:00
lookupErr :: Document -> Maybe String
lookupErr result = do
errObject <- lookup " lastErrorObject " result
lookup " err " errObject
2013-06-06 15:00:00 +00:00
2011-07-05 14:37:01 +00:00
explain :: ( MonadIO m ) => Query -> Action m Document
2010-06-15 03:14:40 +00:00
-- ^ Return performance stats of query execution
2010-06-21 15:06:20 +00:00
explain q = do -- same as findOne but with explain set to true
2016-08-05 05:58:25 +00:00
pipe <- asks mongoPipe
qr <- queryRequest True q { limit = 1 }
r <- liftIO $ request pipe [] qr
2017-05-09 05:47:47 +00:00
Batch _ _ docs <- liftDB $ fulfill r
2013-12-26 14:57:33 +00:00
return $ if null docs then error ( " no explain: " ++ show q ) else head docs
2010-06-15 03:14:40 +00:00
2013-12-27 11:39:22 +00:00
count :: ( MonadIO m ) => Query -> Action m Int
2010-06-15 03:14:40 +00:00
-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present)
2022-06-17 17:16:02 +00:00
count Query { selection = Select sel col , skip , limit } = at " n " <$> runCommand
2013-12-26 14:57:33 +00:00
( [ " count " =: col , " query " =: sel , " skip " =: ( fromIntegral skip :: Int32 ) ]
++ ( " limit " =? if limit == 0 then Nothing else Just ( fromIntegral limit :: Int32 ) ) )
2010-06-15 03:14:40 +00:00
2013-12-27 11:39:22 +00:00
distinct :: ( MonadIO m ) => Label -> Selection -> Action m [ Value ]
2010-06-15 03:14:40 +00:00
-- ^ Fetch distinct values of field in selected documents
2022-06-17 17:16:02 +00:00
distinct k ( Select sel col ) = at " values " <$> runCommand [ " distinct " =: col , " key " =: k , " query " =: sel ]
2010-06-15 03:14:40 +00:00
2022-10-27 04:09:24 +00:00
queryRequest :: ( Monad m , MonadIO m ) => Bool -> Query -> Action m ( Request , Maybe Limit )
2011-07-05 14:37:01 +00:00
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequest isExplain Query { .. } = do
2013-12-26 15:32:21 +00:00
ctx <- ask
2015-05-15 07:58:25 +00:00
return $ queryRequest' ( mongoReadMode ctx ) ( mongoDatabase ctx )
2011-07-05 14:37:01 +00:00
where
2013-12-26 14:57:33 +00:00
queryRequest' rm db = ( P . Query { .. } , remainingLimit ) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
2015-06-17 07:24:12 +00:00
( qBatchSize , remainingLimit ) = batchSizeRemainingLimit batchSize ( if limit == 0 then Nothing else Just limit )
2013-12-26 14:57:33 +00:00
qProjector = project
mOrder = if null sort then Nothing else Just ( " $orderby " =: sort )
mSnapshot = if snapshot then Just ( " $snapshot " =: True ) else Nothing
mHint = if null hint then Nothing else Just ( " $hint " =: hint )
mExplain = if isExplain then Just ( " $explain " =: True ) else Nothing
special = catMaybes [ mOrder , mSnapshot , mHint , mExplain ]
qSelector = if null special then s else ( " $query " =: s ) : special where s = selector selection
2010-06-15 03:14:40 +00:00
2022-10-27 04:09:24 +00:00
queryRequestOpMsg :: ( Monad m , MonadIO m ) => Bool -> Query -> Action m ( Cmd , Maybe Limit )
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequestOpMsg isExplain Query { .. } = do
ctx <- ask
return $ queryRequest' ( mongoReadMode ctx ) ( mongoDatabase ctx )
where
queryRequest' rm db = ( Req P . Query { .. } , remainingLimit ) where
qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection
qSkip = fromIntegral skip
( qBatchSize , remainingLimit ) = batchSizeRemainingLimit batchSize ( if limit == 0 then Nothing else Just limit )
-- Check whether this query is not a command in disguise. If
-- isNotCommand is true, then we treat this as a find command and add
-- the relevant fields to the selector
isNotCommand = null $ catMaybes $ map ( \ l -> look l ( selector selection ) ) ( noticeCommands ++ adminCommands )
mOrder = if null sort then Nothing else Just ( " sort " =: sort )
mSnapshot = if snapshot then Just ( " snapshot " =: True ) else Nothing
mHint = if null hint then Nothing else Just ( " hint " =: hint )
mExplain = if isExplain then Just ( " $explain " =: True ) else Nothing
special = catMaybes [ mOrder , mSnapshot , mHint , mExplain ]
qProjector = if isNotCommand then [ " projection " =: project ] else project
qSelector = if isNotCommand then c else s
where s = selector selection
bSize = if qBatchSize == 0 then Nothing else Just ( " batchSize " =: qBatchSize )
mLimit = if limit == 0 then Nothing else maybe Nothing ( \ rL -> Just ( " limit " =: ( fromIntegral rL :: Int32 ) ) ) remainingLimit
c = ( " filter " =: s ) : special ++ maybeToList bSize ++ maybeToList mLimit
2022-06-17 17:16:02 +00:00
batchSizeRemainingLimit :: BatchSize -> Maybe Limit -> ( Int32 , Maybe Limit )
2011-07-05 14:37:01 +00:00
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
2015-06-17 07:24:12 +00:00
batchSizeRemainingLimit batchSize mLimit =
2015-07-31 10:25:01 +00:00
let remaining =
case mLimit of
Nothing -> batchSize
Just limit ->
if 0 < batchSize && batchSize < limit
then batchSize
else limit
in ( fromIntegral remaining , mLimit )
2010-06-15 03:14:40 +00:00
2013-12-26 15:23:02 +00:00
type DelayedBatch = IO Batch
2011-07-05 14:37:01 +00:00
-- ^ A promised batch which may fail
2010-06-21 15:06:20 +00:00
2015-06-17 07:24:12 +00:00
data Batch = Batch ( Maybe Limit ) CursorId [ Document ]
-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is number of documents to return. Nothing means no limit.
2010-06-21 15:06:20 +00:00
2016-08-05 05:58:25 +00:00
request :: Pipe -> [ Notice ] -> ( Request , Maybe Limit ) -> IO DelayedBatch
2011-07-05 14:37:01 +00:00
-- ^ Send notices and request and return promised batch
2016-08-05 05:58:25 +00:00
request pipe ns ( req , remainingLimit ) = do
2016-08-05 05:23:30 +00:00
promise <- liftIOE ConnectionFailure $ P . call pipe ns req
let protectedPromise = liftIOE ConnectionFailure promise
return $ fromReply remainingLimit =<< protectedPromise
2010-06-21 15:06:20 +00:00
2022-10-27 04:09:24 +00:00
requestOpMsg :: Pipe -> ( Cmd , Maybe Limit ) -> Document -> IO DelayedBatch
-- ^ Send notices and request and return promised batch
requestOpMsg pipe ( Req r , remainingLimit ) params = do
promise <- liftIOE ConnectionFailure $ P . callOpMsg pipe r Nothing params
let protectedPromise = liftIOE ConnectionFailure promise
return $ fromReply remainingLimit =<< protectedPromise
2023-04-17 07:46:57 +00:00
requestOpMsg _ _ _ = error " requestOpMsg: Only messages of type Query are supported "
2022-10-27 04:09:24 +00:00
2015-06-17 07:24:12 +00:00
fromReply :: Maybe Limit -> Reply -> DelayedBatch
2011-07-05 14:37:01 +00:00
-- ^ Convert Reply to Batch or Failure
2010-07-27 21:18:53 +00:00
fromReply limit Reply { .. } = do
2013-12-26 14:57:33 +00:00
mapM_ checkResponseFlag rResponseFlags
return ( Batch limit rCursorId rDocuments )
2010-07-03 17:15:30 +00:00
where
2013-12-26 14:57:33 +00:00
-- If response flag indicates failure then throw it, otherwise do nothing
checkResponseFlag flag = case flag of
AwaitCapable -> return ()
2013-12-26 15:23:02 +00:00
CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId
QueryError -> throwIO $ QueryFailure ( at " code " $ head rDocuments ) ( at " $err " $ head rDocuments )
2022-10-27 04:09:24 +00:00
fromReply limit ReplyOpMsg { .. } = do
let section = head sections
cur = maybe Nothing cast $ look " cursor " section
case cur of
Nothing -> return ( Batch limit 0 sections )
Just doc ->
case look " firstBatch " doc of
Just ar -> do
let docs = fromJust $ cast ar
id' = fromJust $ cast $ valueAt " id " doc
return ( Batch limit id' docs )
-- A cursor without a firstBatch field, should be a reply to a
-- getMore query and thus have a nextBatch key
Nothing -> do
let docs = fromJust $ cast $ valueAt " nextBatch " doc
id' = fromJust $ cast $ valueAt " id " doc
return ( Batch limit id' docs )
2011-07-05 14:37:01 +00:00
2017-05-09 05:47:47 +00:00
fulfill :: DelayedBatch -> Action IO Batch
2011-07-05 14:37:01 +00:00
-- ^ Demand and wait for result, raise failure if exception
2013-12-26 15:32:21 +00:00
fulfill = liftIO
2011-07-05 14:37:01 +00:00
-- *** Cursor
data Cursor = Cursor FullCollection BatchSize ( MVar DelayedBatch )
-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the pipe is closed, so you can open another pipe to the same server and continue using the cursor.
2010-06-15 03:14:40 +00:00
2017-05-09 06:12:26 +00:00
newCursor :: MonadIO m => Database -> Collection -> BatchSize -> DelayedBatch -> Action m Cursor
2010-06-21 15:06:20 +00:00
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
2011-07-05 14:37:01 +00:00
newCursor db col batchSize dBatch = do
2017-05-09 06:12:26 +00:00
var <- liftIO $ MV . newMVar dBatch
2013-12-26 14:57:33 +00:00
let cursor = Cursor ( db <.> col ) batchSize var
2017-05-09 06:12:26 +00:00
_ <- liftDB $ mkWeakMVar var ( closeCursor cursor )
2013-12-26 14:57:33 +00:00
return cursor
2010-06-15 03:14:40 +00:00
2017-05-09 05:47:47 +00:00
nextBatch :: MonadIO m => Cursor -> Action m [ Document ]
2011-07-21 22:50:52 +00:00
-- ^ Return next batch of documents in query result, which will be empty if finished.
2017-05-09 05:47:47 +00:00
nextBatch ( Cursor fcol batchSize var ) = liftDB $ modifyMVar var $ \ dBatch -> do
2013-12-26 14:57:33 +00:00
-- Pre-fetch next batch promise from server and return current batch.
2017-05-09 05:47:47 +00:00
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
2015-06-17 07:24:12 +00:00
let newLimit = do
limit <- mLimit
2022-06-17 17:16:02 +00:00
return $ limit - min limit ( fromIntegral $ length docs )
2015-07-31 10:25:01 +00:00
let emptyBatch = return $ Batch ( Just 0 ) 0 []
let getNextBatch = nextBatch' fcol batchSize newLimit cid
2022-06-17 17:16:02 +00:00
let resultDocs = maybe id ( take . fromIntegral ) mLimit docs
2015-07-31 10:25:01 +00:00
case ( cid , newLimit ) of
( 0 , _ ) -> return ( emptyBatch , resultDocs )
( _ , Just 0 ) -> do
2016-08-06 03:29:20 +00:00
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P . send pipe [ KillCursors [ cid ] ]
2015-07-31 10:25:01 +00:00
return ( emptyBatch , resultDocs )
( _ , _ ) -> ( , resultDocs ) <$> getNextBatch
2012-01-24 00:45:42 +00:00
2017-05-09 05:47:47 +00:00
fulfill' :: FullCollection -> BatchSize -> DelayedBatch -> Action IO Batch
2012-01-24 00:45:42 +00:00
-- Discard pre-fetched batch if empty with nonzero cid.
fulfill' fcol batchSize dBatch = do
2013-12-26 14:57:33 +00:00
b @ ( Batch limit cid docs ) <- fulfill dBatch
2022-06-17 17:16:02 +00:00
if cid /= 0 && null docs && ( limit > Just 0 )
2013-12-26 14:57:33 +00:00
then nextBatch' fcol batchSize limit cid >>= fulfill
else return b
2012-01-24 00:45:42 +00:00
2022-06-17 17:16:02 +00:00
nextBatch' :: ( MonadIO m ) => FullCollection -> BatchSize -> Maybe Limit -> CursorId -> Action m DelayedBatch
2015-06-17 07:24:12 +00:00
nextBatch' fcol batchSize limit cid = do
2016-08-05 05:58:25 +00:00
pipe <- asks mongoPipe
2022-10-27 04:09:24 +00:00
let sd = P . serverData pipe
if maxWireVersion sd < 17
then liftIO $ request pipe [] ( GetMore fcol batchSize' cid , remLimit )
else liftIO $ requestOpMsg pipe ( Req $ GetMore fcol batchSize' cid , remLimit ) []
2013-12-26 14:57:33 +00:00
where ( batchSize' , remLimit ) = batchSizeRemainingLimit batchSize limit
2011-07-21 22:50:52 +00:00
2017-05-09 05:47:47 +00:00
next :: MonadIO m => Cursor -> Action m ( Maybe Document )
2010-06-15 20:15:37 +00:00
-- ^ Return next document in query result, or Nothing if finished.
2017-05-09 05:47:47 +00:00
next ( Cursor fcol batchSize var ) = liftDB $ modifyMVar var nextState where
2013-12-26 14:57:33 +00:00
-- Pre-fetch next batch promise from server when last one in current batch is returned.
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
nextState dBatch = do
2017-05-09 05:47:47 +00:00
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
2022-06-17 17:16:02 +00:00
if mLimit == Just 0
2015-06-17 07:24:12 +00:00
then return ( return $ Batch ( Just 0 ) 0 [] , Nothing )
else
case docs of
doc : docs' -> do
let newLimit = do
limit <- mLimit
return $ limit - 1
2022-06-17 17:16:02 +00:00
dBatch' <- if null docs' && cid /= 0 && ( ( newLimit > Just 0 ) || isNothing newLimit )
2015-06-17 07:24:12 +00:00
then nextBatch' fcol batchSize newLimit cid
else return $ return ( Batch newLimit cid docs' )
2022-06-17 17:16:02 +00:00
when ( newLimit == Just 0 ) $ unless ( cid == 0 ) $ do
2016-08-06 03:29:20 +00:00
pipe <- asks mongoPipe
2022-10-27 04:09:24 +00:00
let sd = P . serverData pipe
if maxWireVersion sd < 17
then liftIOE ConnectionFailure $ P . send pipe [ KillCursors [ cid ] ]
else liftIOE ConnectionFailure $ P . sendOpMsg pipe [ Kc ( P . KillC ( KillCursors [ cid ] ) fcol ) ] ( Just MoreToCome ) []
2015-06-17 07:24:12 +00:00
return ( dBatch' , Just doc )
[] -> if cid == 0
then return ( return $ Batch ( Just 0 ) 0 [] , Nothing ) -- finished
2017-05-09 05:47:47 +00:00
else do
nb <- nextBatch' fcol batchSize mLimit cid
return ( nb , Nothing )
2010-06-15 03:14:40 +00:00
2017-05-09 05:47:47 +00:00
nextN :: MonadIO m => Int -> Cursor -> Action m [ Document ]
2010-06-15 03:14:40 +00:00
-- ^ Return next N documents or less if end is reached
2022-06-17 17:16:02 +00:00
nextN n c = catMaybes <$> replicateM n ( next c )
2010-06-15 03:14:40 +00:00
2017-05-09 05:47:47 +00:00
rest :: MonadIO m => Cursor -> Action m [ Document ]
2010-06-15 03:14:40 +00:00
-- ^ Return remaining documents in query result
rest c = loop ( next c )
2017-05-09 05:47:47 +00:00
closeCursor :: MonadIO m => Cursor -> Action m ()
closeCursor ( Cursor _ _ var ) = liftDB $ modifyMVar var $ \ dBatch -> do
2013-12-26 14:57:33 +00:00
Batch _ cid _ <- fulfill dBatch
2016-08-06 03:29:20 +00:00
unless ( cid == 0 ) $ do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P . send pipe [ KillCursors [ cid ] ]
2022-06-17 17:16:02 +00:00
return ( return $ Batch ( Just 0 ) 0 [] , () )
2010-12-20 02:08:53 +00:00
2017-05-09 05:47:47 +00:00
isCursorClosed :: MonadIO m => Cursor -> Action m Bool
2011-07-05 14:37:01 +00:00
isCursorClosed ( Cursor _ _ var ) = do
2017-05-09 05:47:47 +00:00
Batch _ cid docs <- liftDB $ fulfill =<< readMVar var
2013-12-26 14:57:33 +00:00
return ( cid == 0 && null docs )
2010-06-15 03:14:40 +00:00
2013-05-23 14:47:57 +00:00
-- ** Aggregate
type Pipeline = [ Document ]
-- ^ The Aggregate Pipeline
2023-01-12 05:25:48 +00:00
aggregate :: ( MonadIO m ) => Collection -> Pipeline -> Action m [ Document ]
2013-05-23 14:47:57 +00:00
-- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details.
aggregate aColl agg = do
2018-02-04 22:38:58 +00:00
aggregateCursor aColl agg def >>= rest
2020-12-15 14:36:10 +00:00
data AggregateConfig = AggregateConfig
{ allowDiskUse :: Bool -- ^ Enable writing to temporary files (aggregations have a 100Mb RAM limit)
}
deriving Show
2018-02-04 22:38:58 +00:00
instance Default AggregateConfig where
2020-12-15 14:36:10 +00:00
def = AggregateConfig
{ allowDiskUse = False
}
aggregateCommand :: Collection -> Pipeline -> AggregateConfig -> Document
aggregateCommand aColl agg AggregateConfig { .. } =
[ " aggregate " =: aColl
, " pipeline " =: agg
, " cursor " =: ( [] :: Document )
, " allowDiskUse " =: allowDiskUse
]
2018-02-04 22:38:58 +00:00
2023-01-12 05:25:48 +00:00
aggregateCursor :: ( MonadIO m ) => Collection -> Pipeline -> AggregateConfig -> Action m Cursor
2018-02-04 22:38:58 +00:00
-- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details.
2020-12-15 14:36:10 +00:00
aggregateCursor aColl agg cfg = do
2022-10-27 04:09:24 +00:00
pipe <- asks mongoPipe
let sd = P . serverData pipe
if maxWireVersion sd < 17
then do
response <- runCommand ( aggregateCommand aColl agg cfg )
getCursorFromResponse aColl response
>>= either ( liftIO . throwIO . AggregateFailure ) return
else do
let q = select ( aggregateCommand aColl agg cfg ) aColl
qr <- queryRequestOpMsg False q
dBatch <- liftIO $ requestOpMsg pipe qr []
db <- thisDatabase
Right <$> newCursor db aColl 0 dBatch
>>= either ( liftIO . throwIO . AggregateFailure ) return
2020-07-29 02:36:30 +00:00
getCursorFromResponse
2023-01-12 05:25:48 +00:00
:: ( MonadIO m )
2020-07-29 02:36:30 +00:00
=> Collection
-> Document
2020-07-30 18:25:19 +00:00
-> Action m ( Either String Cursor )
getCursorFromResponse aColl response
2023-01-12 05:25:48 +00:00
| true1 " ok " response = runExceptT $ do
cursor <- lookup " cursor " response ?? " cursor is missing "
firstBatch <- lookup " firstBatch " cursor ?? " firstBatch is missing "
cursorId <- lookup " id " cursor ?? " id is missing "
db <- lift thisDatabase
lift $ newCursor db aColl 0 ( return $ Batch Nothing cursorId firstBatch )
2020-07-30 18:25:19 +00:00
| otherwise = return $ Left $ at " errmsg " response
2023-01-12 05:25:48 +00:00
where
Nothing ?? e = throwE e
Just a ?? _ = pure a
2013-05-23 14:47:57 +00:00
2010-06-15 03:14:40 +00:00
-- ** Group
2010-07-03 17:15:30 +00:00
-- | Groups documents in collection by key then reduces (aggregates) each group
2010-06-15 03:14:40 +00:00
data Group = Group {
2013-12-26 14:57:33 +00:00
gColl :: Collection ,
gKey :: GroupKey , -- ^ Fields to group by
gReduce :: Javascript , -- ^ @(doc, agg) -> ()@. The reduce function reduces (aggregates) the objects iterated. Typical operations of a reduce function include summing and counting. It takes two arguments, the current document being iterated over and the aggregation value, and updates the aggregate value.
gInitial :: Document , -- ^ @agg@. Initial aggregation value supplied to reduce
2020-04-01 13:11:17 +00:00
gCond :: Selector , -- ^ Condition that must be true for a row to be considered. @[]@ means always true.
gFinalize :: Maybe Javascript -- ^ @agg -> () | result@. An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just @_id@ and average fields).
2013-12-26 14:57:33 +00:00
} deriving ( Show , Eq )
2010-06-15 03:14:40 +00:00
data GroupKey = Key [ Label ] | KeyF Javascript deriving ( Show , Eq )
2020-04-01 13:11:17 +00:00
-- ^ Fields to group by, or function (@doc -> key@) returning a "key object" to be used as the grouping key. Use 'KeyF' instead of 'Key' to specify a key that is not an existing member of the object (or, to access embedded members).
2010-06-15 03:14:40 +00:00
groupDocument :: Group -> Document
-- ^ Translate Group data into expected document form
groupDocument Group { .. } =
2013-12-26 14:57:33 +00:00
( " finalize " =? gFinalize ) ++ [
" ns " =: gColl ,
case gKey of Key k -> " key " =: map ( =: True ) k ; KeyF f -> " $keyf " =: f ,
" $reduce " =: gReduce ,
" initial " =: gInitial ,
" cond " =: gCond ]
2010-06-15 03:14:40 +00:00
2013-12-27 11:39:22 +00:00
group :: ( MonadIO m ) => Group -> Action m [ Document ]
2010-06-15 03:14:40 +00:00
-- ^ Execute group query and return resulting aggregate value for each distinct key
2022-06-17 17:16:02 +00:00
group g = at " retval " <$> runCommand [ " group " =: groupDocument g ]
2010-06-15 03:14:40 +00:00
-- ** MapReduce
2010-10-27 20:13:23 +00:00
-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values to a single result. There are additional parameters that may be set to tweak this basic operation.
2020-04-01 13:11:17 +00:00
-- This implements the latest version of map-reduce that requires MongoDB 1.7.4 or greater. To map-reduce against an older server use 'runCommand' directly as described in http://www.mongodb.org/display/DOCS/MapReduce.
2010-06-15 03:14:40 +00:00
data MapReduce = MapReduce {
2013-12-26 14:57:33 +00:00
rColl :: Collection ,
rMap :: MapFun ,
rReduce :: ReduceFun ,
2020-04-01 13:11:17 +00:00
rSelect :: Selector , -- ^ Operate on only those documents selected. Default is @[]@ meaning all documents.
rSort :: Order , -- ^ Default is @[]@ meaning no sort
2013-12-26 14:57:33 +00:00
rLimit :: Limit , -- ^ Default is 0 meaning no limit
rOut :: MROut , -- ^ Output to a collection with a certain merge policy. Default is no collection ('Inline'). Note, you don't want this default if your result set is large.
rFinalize :: Maybe FinalizeFun , -- ^ Function to apply to all the results when finished. Default is Nothing.
2020-04-01 13:11:17 +00:00
rScope :: Document , -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is @[]@.
2013-12-26 14:57:33 +00:00
rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False.
} deriving ( Show , Eq )
2010-06-15 03:14:40 +00:00
type MapFun = Javascript
2010-07-03 17:15:30 +00:00
-- ^ @() -> void@. The map function references the variable @this@ to inspect the current object under consideration. The function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate.
2010-06-15 03:14:40 +00:00
type ReduceFun = Javascript
2010-10-27 20:13:23 +00:00
-- ^ @(key, [value]) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @reduce(k, [reduce(k,vs)]) == reduce(k,vs)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible.
2010-06-15 03:14:40 +00:00
type FinalizeFun = Javascript
-- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value.
2011-06-22 21:18:32 +00:00
data MROut =
2013-12-26 14:57:33 +00:00
Inline -- ^ Return results directly instead of writing them to an output collection. Results must fit within 16MB limit of a single document
| Output MRMerge Collection ( Maybe Database ) -- ^ Write results to given collection, in other database if specified. Follow merge policy when entry already exists
deriving ( Show , Eq )
2011-06-22 21:18:32 +00:00
data MRMerge =
2013-12-26 14:57:33 +00:00
Replace -- ^ Clear all old data and replace it with new data
| Merge -- ^ Leave old data but overwrite entries with the same key with new data
| Reduce -- ^ Leave old data but combine entries with the same key via MR's reduce function
deriving ( Show , Eq )
2011-06-22 21:18:32 +00:00
type MRResult = Document
-- ^ Result of running a MapReduce has some stats besides the output. See http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Resultobject
2010-06-15 03:14:40 +00:00
mrDocument :: MapReduce -> Document
-- ^ Translate MapReduce data into expected document form
mrDocument MapReduce { .. } =
2013-12-26 14:57:33 +00:00
( " mapreduce " =: rColl ) :
( " out " =: mrOutDoc rOut ) :
( " finalize " =? rFinalize ) ++ [
" map " =: rMap ,
" reduce " =: rReduce ,
" query " =: rSelect ,
" sort " =: rSort ,
" limit " =: ( fromIntegral rLimit :: Int ) ,
" scope " =: rScope ,
" verbose " =: rVerbose ]
2010-06-15 03:14:40 +00:00
2011-06-22 21:18:32 +00:00
mrOutDoc :: MROut -> Document
-- ^ Translate MROut into expected document form
mrOutDoc Inline = [ " inline " =: ( 1 :: Int ) ]
mrOutDoc ( Output mrMerge coll mDB ) = ( mergeName mrMerge =: coll ) : mdb mDB where
2013-12-26 14:57:33 +00:00
mergeName Replace = " replace "
mergeName Merge = " merge "
mergeName Reduce = " reduce "
mdb Nothing = []
mdb ( Just db ) = [ " db " =: db ]
2011-06-22 21:18:32 +00:00
2010-06-15 03:14:40 +00:00
mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
-- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments.
2011-06-22 21:18:32 +00:00
mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
2010-06-15 03:14:40 +00:00
2017-05-09 06:12:26 +00:00
runMR :: MonadIO m => MapReduce -> Action m Cursor
2010-06-15 03:14:40 +00:00
-- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript)
2011-06-22 21:18:32 +00:00
runMR mr = do
2013-12-26 14:57:33 +00:00
res <- runMR' mr
case look " result " res of
Just ( String coll ) -> find $ query [] coll
Just ( Doc doc ) -> useDb ( at " db " doc ) $ find $ query [] ( at " collection " doc )
Just x -> error $ " unexpected map-reduce result field: " ++ show x
2015-06-17 07:24:12 +00:00
Nothing -> newCursor " " " " 0 $ return $ Batch ( Just 0 ) 0 ( at " results " res )
2011-06-22 21:18:32 +00:00
2013-12-27 11:39:22 +00:00
runMR' :: ( MonadIO m ) => MapReduce -> Action m MRResult
2011-06-22 21:18:32 +00:00
-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).
2010-06-15 03:14:40 +00:00
runMR' mr = do
2013-12-26 14:57:33 +00:00
doc <- runCommand ( mrDocument mr )
return $ if true1 " ok " doc then doc else error $ " mapReduce error: \ n " ++ show doc ++ " \ n in: \ n " ++ show mr
2010-06-15 03:14:40 +00:00
-- * Command
type Command = Document
-- ^ A command is a special query or action against the database. See <http://www.mongodb.org/display/DOCS/Commands> for details.
2013-12-27 11:39:22 +00:00
runCommand :: ( MonadIO m ) => Command -> Action m Document
2023-04-17 07:46:57 +00:00
runCommand params = do
pipe <- asks mongoPipe
if isHandshake params || maxWireVersion ( P . serverData pipe ) < 17
then runCommandLegacy pipe params
else runCommand' pipe params
runCommandLegacy :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
runCommandLegacy pipe params = do
qr <- queryRequest False ( query params " $cmd " ) { limit = 1 }
rq <- liftIO $ request pipe [] qr
Batch _ _ docs <- liftDB $ fulfill rq
case docs of
[ doc ] -> pure doc
_ -> error $ " Nothing returned for command: " <> show params
runCommand' :: MonadIO m => Pipe -> Selector -> ReaderT MongoContext m Document
runCommand' pipe params = do
ctx <- ask
rq <- liftIO $ requestOpMsg pipe ( Req ( P . Message ( mongoDatabase ctx ) params ) , Just 1 ) []
Batch _ _ docs <- liftDB $ fulfill rq
case docs of
[ doc ] -> pure doc
_ -> error $ " Nothing returned for command: " <> show params
2010-06-15 03:14:40 +00:00
2013-12-27 11:39:22 +00:00
runCommand1 :: ( MonadIO m ) => Text -> Action m Document
2010-06-21 15:06:20 +00:00
-- ^ @runCommand1 foo = runCommand [foo =: 1]@
2010-06-15 03:14:40 +00:00
runCommand1 c = runCommand [ c =: ( 1 :: Int ) ]
2013-12-27 11:39:22 +00:00
eval :: ( MonadIO m , Val v ) => Javascript -> Action m v
2010-06-15 03:14:40 +00:00
-- ^ Run code on server
2022-10-27 04:09:24 +00:00
eval code = do
p <- asks mongoPipe
let sd = P . serverData p
if maxWireVersion sd <= 7
then at " retval " <$> runCommand [ " $eval " =: code ]
else error " The command db.eval() has been removed since MongoDB 4.2 "
2010-06-15 03:14:40 +00:00
2017-05-09 05:47:47 +00:00
modifyMVar :: MVar a -> ( a -> Action IO ( a , b ) ) -> Action IO b
modifyMVar v f = do
ctx <- ask
liftIO $ MV . modifyMVar v ( \ x -> runReaderT ( f x ) ctx )
2017-05-09 06:12:26 +00:00
mkWeakMVar :: MVar a -> Action IO () -> Action IO ( Weak ( MVar a ) )
mkWeakMVar m closing = do
ctx <- ask
2022-06-17 17:16:02 +00:00
2017-05-09 06:12:26 +00:00
# if MIN_VERSION_base ( 4 , 6 , 0 )
liftIO $ MV . mkWeakMVar m $ runReaderT closing ctx
# else
liftIO $ MV . addMVarFinalizer m $ runReaderT closing ctx
# endif
2010-06-15 03:14:40 +00:00
{- Authors: Tony Hannan <tony@10gen.com>
2011-07-05 14:37:01 +00:00
Copyright 2011 10 gen Inc .
2010-06-15 03:14:40 +00:00
Licensed under the Apache License , Version 2.0 ( the " License " ) ; you may not use this file except in compliance with the License . You may obtain a copy of the License at : http :// www . apache . org / licenses / LICENSE - 2.0 . Unless required by applicable law or agreed to in writing , software distributed under the License is distributed on an " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied . See the License for the specific language governing permissions and limitations under the License . - }