From f7ae5b7235597478df421c80afcf62012a02a95a Mon Sep 17 00:00:00 2001 From: Tony Hannan Date: Wed, 22 Jun 2011 17:18:32 -0400 Subject: [PATCH] MapReduce updated to work with MongoDB version >= 1.7.4 --- Database/MongoDB/Query.hs | 48 +++++++++++++++++++++++++++++++-------- map-reduce-example.md | 28 +++++++++++------------ mongoDB.cabal | 2 +- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 26d46ba..e06fd8d 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -33,7 +33,7 @@ module Database.MongoDB.Query ( -- ** Group Group(..), GroupKey(..), group, -- ** MapReduce - MapReduce(..), MapFun, ReduceFun, FinalizeFun, mapReduce, runMR, runMR', + MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..), MRResult, mapReduce, runMR, runMR', -- * Command Command, runCommand, runCommand1, eval, @@ -531,6 +531,7 @@ group g = at "retval" <$> runCommand ["group" =: groupDocument g] -- ** MapReduce -- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values to a single result. There are additional parameters that may be set to tweak this basic operation. +-- This implements the latest version of map-reduce that requires MongoDB 1.7.4 or greater. To map-reduce against an older server use runCommand directly as described in http://www.mongodb.org/display/DOCS/MapReduce. data MapReduce = MapReduce { rColl :: Collection, rMap :: MapFun, @@ -538,8 +539,7 @@ data MapReduce = MapReduce { rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents. rSort :: Order, -- ^ Default is [] meaning no sort rLimit :: Limit, -- ^ Default is 0 meaning no limit - rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned. - rKeepTemp :: Bool, -- ^ If True, the temporary output collection is made permanent. If False, the temporary output collection persists for the life of the current pipe only, however, other pipes may read from it while the original one is still alive. Note, reading from a temporary collection after its original pipe dies returns an empty result (not an error). The default for this attribute is False, unless 'rOut' is specified, then the collection permanent. + rOut :: MROut, -- ^ Output to a collection with a certain merge policy. Default is no collection (Inline). Note, you don't want this default if your result set is large. rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing. rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is []. rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False. @@ -554,32 +554,60 @@ type ReduceFun = Javascript type FinalizeFun = Javascript -- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value. +data MROut = + Inline -- ^ Return results directly instead of writing them to an output collection. Results must fit within 16MB limit of a single document + | Output MRMerge Collection (Maybe Database) -- ^ Write results to given collection, in other database if specified. Follow merge policy when entry already exists + deriving (Show, Eq) + +data MRMerge = + Replace -- ^ Clear all old data and replace it with new data + | Merge -- ^ Leave old data but overwrite entries with the same key with new data + | Reduce -- ^ Leave old data but combine entries with the same key via MR's reduce function + deriving (Show, Eq) + +type MRResult = Document +-- ^ Result of running a MapReduce has some stats besides the output. See http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Resultobject + mrDocument :: MapReduce -> Document -- ^ Translate MapReduce data into expected document form mrDocument MapReduce{..} = ("mapreduce" =: rColl) : - ("out" =? rOut) ++ + ("out" =: mrOutDoc rOut) : ("finalize" =? rFinalize) ++ [ "map" =: rMap, "reduce" =: rReduce, "query" =: rSelect, "sort" =: rSort, "limit" =: (fromIntegral rLimit :: Int), - "keeptemp" =: rKeepTemp, "scope" =: rScope, "verbose" =: rVerbose ] +mrOutDoc :: MROut -> Document +-- ^ Translate MROut into expected document form +mrOutDoc Inline = ["inline" =: (1 :: Int)] +mrOutDoc (Output mrMerge coll mDB) = (mergeName mrMerge =: coll) : mdb mDB where + mergeName Replace = "replace" + mergeName Merge = "merge" + mergeName Reduce = "reduce" + mdb Nothing = [] + mdb (Just (Database db)) = ["db" =: db] + mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce -- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments. -mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False +mapReduce col map' red = MapReduce col map' red [] [] 0 Inline Nothing [] False runMR :: (DbAccess m) => MapReduce -> m Cursor -- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript) --- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when pipe closes. -runMR mr = find . query [] =<< (at "result" <$> runMR' mr) +runMR mr = do + res <- runMR' mr + case look "result" res of + Just (String coll) -> find $ query [] coll + Just (Doc doc) -> use (Database $ at "db" doc) $ find $ query [] (at "collection" doc) + Just x -> error $ "unexpected map-reduce result field: " ++ show x + Nothing -> newCursor (Database "") "" 0 $ return $ CS 0 0 (at "results" res) -runMR' :: (DbAccess m) => MapReduce -> m Document --- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript). +runMR' :: (DbAccess m) => MapReduce -> m MRResult +-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript). runMR' mr = do doc <- runCommand (mrDocument mr) return $ if true1 "ok" doc then doc else error $ "mapReduce error:\n" ++ show doc ++ "\nin:\n" ++ show mr diff --git a/map-reduce-example.md b/map-reduce-example.md index 5de2236..9a3f0d9 100644 --- a/map-reduce-example.md +++ b/map-reduce-example.md @@ -13,14 +13,13 @@ Setup To start, we'll insert some example data which we can perform map/reduce queries on: - $ ghci -package mongoDB - GHCi, version 6.12.1: http://www.haskell.org/ghc/ :? for help + $ ghci ... Prelude> :set prompt "> " > :set -XOverloadedStrings > import Database.MongoDB > import Data.CompactString () - > conn <- newConnPool 1 (host "localhost") + > conn <- newConnPool 1 (host "127.0.0.1") > let run act = access safe Master conn $ use (Database "test") act > :{ run $ insertMany "mr1" [ @@ -68,18 +67,17 @@ key: Note: We can't just return values.length as the reduce function might be called iteratively on the results of other reduce steps. -Finally, we run mapReduce and iterate over the result collection: - - > run $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest - Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]] - -Advanced Map/Reduce -------------------- - -MongoDB returns additional statistics in the map/reduce results. To -obtain them, use *runMR'* instead: +Finally, we run mapReduce, results by default will be return in an array in the result document (inlined): > run $ runMR' (mapReduce "mr1" mapFn reduceFn) - Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0] + Right [ results: [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]], timeMillis: 379, counts: [ input: 4, emit: 6, reduce: 2, output: 3], ok: 1.0] -You can then obtain the results from here by quering the result collection yourself. *runMR* (above) does this for you but discards the statistics. +Inlining only works if result set < 16MB. An alternative to inlining is outputing to a collection. But what to do if there is data already in the collection from a previous run of the same MapReduce? You have three alternatives in the MRMerge data type: Replace, Merge, and Reduce. See its documentation for details. To output to a collection, set the mOut field in MapReduce. + + > run $ runMR' (mapReduce "mr1" mapFn reduceFn) {rOut = Output Replace "mr1out" Nothing} + Right [ result: "mr1out", timeMillis: 379, counts: [ input: 4, emit: 6, reduce: 2, output: 3], ok: 1.0] + +You can now query the mr1out collection to see the result, or run another MapReduce on it! A shortcut for running the map-reduce then querying the result collection right away is `runMR`. + + > run $ rest =<< runMR (mapReduce "mr1" mapFn reduceFn) {rOut = Output Replace "mr1out" Nothing} + Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]] diff --git a/mongoDB.cabal b/mongoDB.cabal index 6384877..7e11b45 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,5 +1,5 @@ name: mongoDB -version: 0.9.5 +version: 0.10.0 build-type: Simple license: OtherLicense license-file: LICENSE