MapReduce updated to work with MongoDB version >= 1.7.4

This commit is contained in:
Tony Hannan 2011-06-22 17:18:32 -04:00
parent 3db3cc9999
commit f7ae5b7235
3 changed files with 52 additions and 26 deletions

View file

@ -33,7 +33,7 @@ module Database.MongoDB.Query (
-- ** Group
Group(..), GroupKey(..), group,
-- ** MapReduce
MapReduce(..), MapFun, ReduceFun, FinalizeFun, mapReduce, runMR, runMR',
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..), MRResult, mapReduce, runMR, runMR',
-- * Command
Command, runCommand, runCommand1,
eval,
@ -531,6 +531,7 @@ group g = at "retval" <$> runCommand ["group" =: groupDocument g]
-- ** MapReduce
-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values to a single result. There are additional parameters that may be set to tweak this basic operation.
-- This implements the latest version of map-reduce that requires MongoDB 1.7.4 or greater. To map-reduce against an older server use runCommand directly as described in http://www.mongodb.org/display/DOCS/MapReduce.
data MapReduce = MapReduce {
rColl :: Collection,
rMap :: MapFun,
@ -538,8 +539,7 @@ data MapReduce = MapReduce {
rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents.
rSort :: Order, -- ^ Default is [] meaning no sort
rLimit :: Limit, -- ^ Default is 0 meaning no limit
rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned.
rKeepTemp :: Bool, -- ^ If True, the temporary output collection is made permanent. If False, the temporary output collection persists for the life of the current pipe only, however, other pipes may read from it while the original one is still alive. Note, reading from a temporary collection after its original pipe dies returns an empty result (not an error). The default for this attribute is False, unless 'rOut' is specified, then the collection permanent.
rOut :: MROut, -- ^ Output to a collection with a certain merge policy. Default is no collection (Inline). Note, you don't want this default if your result set is large.
rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing.
rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is [].
rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False.
@ -554,32 +554,60 @@ type ReduceFun = Javascript
type FinalizeFun = Javascript
-- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value.
data MROut =
Inline -- ^ Return results directly instead of writing them to an output collection. Results must fit within 16MB limit of a single document
| Output MRMerge Collection (Maybe Database) -- ^ Write results to given collection, in other database if specified. Follow merge policy when entry already exists
deriving (Show, Eq)
data MRMerge =
Replace -- ^ Clear all old data and replace it with new data
| Merge -- ^ Leave old data but overwrite entries with the same key with new data
| Reduce -- ^ Leave old data but combine entries with the same key via MR's reduce function
deriving (Show, Eq)
type MRResult = Document
-- ^ Result of running a MapReduce has some stats besides the output. See http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Resultobject
mrDocument :: MapReduce -> Document
-- ^ Translate MapReduce data into expected document form
mrDocument MapReduce{..} =
("mapreduce" =: rColl) :
("out" =? rOut) ++
("out" =: mrOutDoc rOut) :
("finalize" =? rFinalize) ++ [
"map" =: rMap,
"reduce" =: rReduce,
"query" =: rSelect,
"sort" =: rSort,
"limit" =: (fromIntegral rLimit :: Int),
"keeptemp" =: rKeepTemp,
"scope" =: rScope,
"verbose" =: rVerbose ]
mrOutDoc :: MROut -> Document
-- ^ Translate MROut into expected document form
mrOutDoc Inline = ["inline" =: (1 :: Int)]
mrOutDoc (Output mrMerge coll mDB) = (mergeName mrMerge =: coll) : mdb mDB where
mergeName Replace = "replace"
mergeName Merge = "merge"
mergeName Reduce = "reduce"
mdb Nothing = []
mdb (Just (Database db)) = ["db" =: db]
mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce
-- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments.
mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False
mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False
runMR :: (DbAccess m) => MapReduce -> m Cursor
-- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript)
-- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when pipe closes.
runMR mr = find . query [] =<< (at "result" <$> runMR' mr)
runMR mr = do
res <- runMR' mr
case look "result" res of
Just (String coll) -> find $ query [] coll
Just (Doc doc) -> use (Database $ at "db" doc) $ find $ query [] (at "collection" doc)
Just x -> error $ "unexpected map-reduce result field: " ++ show x
Nothing -> newCursor (Database "") "" 0 $ return $ CS 0 0 (at "results" res)
runMR' :: (DbAccess m) => MapReduce -> m Document
-- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript).
runMR' :: (DbAccess m) => MapReduce -> m MRResult
-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).
runMR' mr = do
doc <- runCommand (mrDocument mr)
return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr

View file

@ -13,14 +13,13 @@ Setup
To start, we'll insert some example data which we can perform
map/reduce queries on:
$ ghci -package mongoDB
GHCi, version 6.12.1: http://www.haskell.org/ghc/ :? for help
$ ghci
...
Prelude> :set prompt "> "
> :set -XOverloadedStrings
> import Database.MongoDB
> import Data.CompactString ()
> conn <- newConnPool 1 (host "localhost")
> conn <- newConnPool 1 (host "127.0.0.1")
> let run act = access safe Master conn $ use (Database "test") act
> :{
run $ insertMany "mr1" [
@ -68,18 +67,17 @@ key:
Note: We can't just return values.length as the reduce function might
be called iteratively on the results of other reduce steps.
Finally, we run mapReduce and iterate over the result collection:
> run $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest
Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]
Advanced Map/Reduce
-------------------
MongoDB returns additional statistics in the map/reduce results. To
obtain them, use *runMR'* instead:
Finally, we run mapReduce, results by default will be return in an array in the result document (inlined):
> run $ runMR' (mapReduce "mr1" mapFn reduceFn)
Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0]
Right [ results: [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]], timeMillis: 379, counts: [ input: 4, emit: 6, reduce: 2, output: 3], ok: 1.0]
You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics.
Inlining only works if result set < 16MB. An alternative to inlining is outputing to a collection. But what to do if there is data already in the collection from a previous run of the same MapReduce? You have three alternatives in the MRMerge data type: Replace, Merge, and Reduce. See its documentation for details. To output to a collection, set the mOut field in MapReduce.
> run $ runMR' (mapReduce "mr1" mapFn reduceFn) {rOut = Output Replace "mr1out" Nothing}
Right [ result: "mr1out", timeMillis: 379, counts: [ input: 4, emit: 6, reduce: 2, output: 3], ok: 1.0]
You can now query the mr1out collection to see the result, or run another MapReduce on it! A shortcut for running the map-reduce then querying the result collection right away is `runMR`.
> run $ rest =<< runMR (mapReduce "mr1" mapFn reduceFn) {rOut = Output Replace "mr1out" Nothing}
Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]]

View file

@ -1,5 +1,5 @@
name: mongoDB
version: 0.9.5
version: 0.10.0
build-type: Simple
license: OtherLicense
license-file: LICENSE