Add allowDiskUse option for aggregates

Aggregation pipeline stages have a limit of 100Mb of RAM.
In case of large datasets, one can cross that limit by setting
allowDiskUse = True and making stages write data
to temporary files.

See also https://docs.mongodb.com/manual/reference/method/db.collection.aggregate .
This commit is contained in:
Andrea Condoluci 2020-12-15 14:36:10 +00:00
parent a9e1868d10
commit 9a048f2f85

View file

@ -1338,16 +1338,28 @@ aggregate :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> Action m [Doc
aggregate aColl agg = do aggregate aColl agg = do
aggregateCursor aColl agg def >>= rest aggregateCursor aColl agg def >>= rest
data AggregateConfig = AggregateConfig {} data AggregateConfig = AggregateConfig
{ allowDiskUse :: Bool -- ^ Enable writing to temporary files (aggregations have a 100Mb RAM limit)
}
deriving Show deriving Show
instance Default AggregateConfig where instance Default AggregateConfig where
def = AggregateConfig {} def = AggregateConfig
{ allowDiskUse = False
}
aggregateCommand :: Collection -> Pipeline -> AggregateConfig -> Document
aggregateCommand aColl agg AggregateConfig {..} =
[ "aggregate" =: aColl
, "pipeline" =: agg
, "cursor" =: ([] :: Document)
, "allowDiskUse" =: allowDiskUse
]
aggregateCursor :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> AggregateConfig -> Action m Cursor aggregateCursor :: (MonadIO m, MonadFail m) => Collection -> Pipeline -> AggregateConfig -> Action m Cursor
-- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details. -- ^ Runs an aggregate and unpacks the result. See <http://docs.mongodb.org/manual/core/aggregation/> for details.
aggregateCursor aColl agg _ = do aggregateCursor aColl agg cfg = do
response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg, "cursor" =: ([] :: Document)] response <- runCommand (aggregateCommand aColl agg cfg)
getCursorFromResponse aColl response getCursorFromResponse aColl response
>>= either (liftIO . throwIO . AggregateFailure) return >>= either (liftIO . throwIO . AggregateFailure) return