From 6435bc3cd99447c6ff6b4b1f00c5189ad5c6fbfe Mon Sep 17 00:00:00 2001 From: Tony Hannan Date: Sat, 3 Jul 2010 13:15:30 -0400 Subject: [PATCH] Handle response flags correctly, plus some comment changes --- Database/MongoDB.hs | 40 ++++++++++++- Database/MongoDB/Internal/Protocol.hs | 20 ++++++- Database/MongoDB/Query.hs | 82 ++++++++++++++++----------- mongoDB.cabal | 2 +- tutorial.md | 4 +- 5 files changed, 107 insertions(+), 41 deletions(-) diff --git a/Database/MongoDB.hs b/Database/MongoDB.hs index 99235ea..c8ea5d9 100644 --- a/Database/MongoDB.hs +++ b/Database/MongoDB.hs @@ -1,4 +1,42 @@ --- | Client interface to MongoDB server(s) +{- | +Client interface to MongoDB server(s). + +Simple example: + +> +> {-# LANGUAGE OverloadedStrings #-} +> +> import Database.MongoDB +> +> main = do +> e <- connect (server "127.0.0.1") +> conn <- either (fail . show) return e +> e <- runConn run conn +> either (fail . show) return e +> +> run = useDb "baseball" $ do +> clearTeams +> insertTeams +> print' "All Teams" =<< allTeams +> print' "National League Teams" =<< nationalLeagueTeams +> print' "New York Teams" =<< newYorkTeams +> +> clearTeams = delete (select [] "team") +> +> insertTeams = insertMany "team" [ +> ["name" =: "Yankees", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "American"], +> ["name" =: "Mets", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "National"], +> ["name" =: "Phillies", "home" =: ["city" =: "Philadelphia", "state" =: "PA"], "league" =: "National"], +> ["name" =: "Red Sox", "home" =: ["city" =: "Boston", "state" =: "MA"], "league" =: "American"] ] +> +> allTeams = rest =<< find (select [] "team") {sort = ["city" =: 1]} +> +> nationalLeagueTeams = rest =<< find (select ["league" =: "National"] "team") +> +> newYorkTeams = rest =<< find (select ["home.state" =: "NY"] "team") {project = ["name" =: 1, "league" =: 1]} +> +> print' title docs = liftIO $ putStrLn title >> mapM_ print docs +-} module Database.MongoDB ( module Data.Bson, diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index ed2ec44..0130325 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -15,7 +15,7 @@ module Database.MongoDB.Internal.Protocol ( -- ** Request Request(..), QueryOption(..), -- ** Reply - Reply(..), + Reply(..), ResponseFlag(..), -- * Authentication Username, Password, Nonce, pwHash, pwKey ) where @@ -238,12 +238,18 @@ qBits = bitOr . map qBit -- | A reply is a message received in response to a 'Request' data Reply = Reply { - rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure + rResponseFlags :: [ResponseFlag], rCursorId :: CursorId, -- ^ 0 = cursor finished rStartingFrom :: Int32, rDocuments :: [Document] } deriving (Show, Eq) +data ResponseFlag = + CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results. + | QueryError -- ^ Server error. Results contains one document containing an "$err" field holding the error message. + | AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's + deriving (Show, Eq, Enum) + -- * Binary format replyOpcode :: Opcode @@ -253,13 +259,21 @@ getReply :: Get (ResponseTo, Reply) getReply = do (opcode, responseTo) <- getHeader unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode - rResponseFlag <- getInt32 + rResponseFlags <- rFlags <$> getInt32 rCursorId <- getInt64 rStartingFrom <- getInt32 numDocs <- fromIntegral <$> getInt32 rDocuments <- replicateM numDocs getDocument return (responseTo, Reply{..}) +rFlags :: Int32 -> [ResponseFlag] +rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..] + +rBit :: ResponseFlag -> Int +rBit CursorNotFound = 0 +rBit QueryError = 1 +rBit AwaitCapable = 3 + -- * Authentication type Username = UString diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index c210f10..f0afc6d 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1,10 +1,10 @@ -- | Query and update documents residing on a MongoDB server(s) -{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving #-} module Database.MongoDB.Query ( -- * Connection - Failure(..), Conn, Connected, runConn, + Connected, runConn, Conn, Failure(..), -- * Database Database, allDatabases, DbConn, useDb, thisDatabase, -- ** Authentication @@ -52,11 +52,22 @@ import Database.MongoDB.Internal.Protocol hiding (Query, send, call) import Data.Bson import Data.Word import Data.Int -import Data.Maybe (listToMaybe, catMaybes) +import Data.Maybe (listToMaybe, catMaybes, mapMaybe) import Data.UString as U (dropWhile, any, tail) import Database.MongoDB.Internal.Util (loop, (<.>), true1) -- plus Applicative instances of ErrorT & ReaderT --- * Connection +-- * Connected + +newtype Connected m a = Connected (ErrorT Failure (ReaderT WriteMode (ReaderT Connection m)) a) + deriving (Context Connection, Context WriteMode, MonadError Failure, MonadIO, Monad, Applicative, Functor) +-- ^ Monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure + +instance MonadTrans Connected where + lift = Connected . lift . lift . lift + +runConn :: Connected m a -> Connection -> m (Either Failure a) +-- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution. +runConn (Connected action) = runReaderT (runReaderT (runErrorT action) Unsafe) -- | A monad with access to a 'Connection' and 'WriteMode' and throws a 'Failure' on connection or server failure class (Context Connection m, Context WriteMode m, MonadError Failure m, MonadIO m, Applicative m, Functor m) => Conn m @@ -72,12 +83,6 @@ data Failure = instance Error Failure where strMsg = ServerFailure -type Connected m = ErrorT Failure (ReaderT WriteMode (ReaderT Connection m)) - -runConn :: Connected m a -> Connection -> m (Either Failure a) --- ^ Run action with access to connection. Return Left Failure if connection or server fails during execution. -runConn action = runReaderT (runReaderT (runErrorT action) Unsafe) - send :: (Conn m) => [Notice] -> m () -- ^ Send notices as a contiguous batch to server with no reply. Raise Failure if connection fails. send ns = do @@ -298,7 +303,7 @@ type Limit = Word32 -- ^ Maximum number of documents to return, i.e. cursor will close after iterating over this number of documents. 0 means no limit. type Order = Document --- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[x =: 1, y =: (-1)]@ means sort by @x@ ascending then @y@ descending +-- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[x =: 1, y =: -1]@ means sort by @x@ ascending then @y@ descending type BatchSize = Word32 -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. @@ -310,10 +315,12 @@ query sel col = Query [] (Select sel col) [] 0 0 [] False 0 [] batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit batchSizeRemainingLimit batchSize limit = if limit == 0 - then (fromIntegral batchSize, 0) -- no limit - else if 0 < batchSize && batchSize < limit - then (fromIntegral batchSize, limit - batchSize) + then (fromIntegral batchSize', 0) -- no limit + else if 0 < batchSize' && batchSize' < limit + then (fromIntegral batchSize', limit - batchSize') else (- fromIntegral limit, 1) + where batchSize' = if batchSize == 1 then 2 else batchSize + -- batchSize 1 is broken because server converts 1 to -1 meaning limit 1 queryRequest :: Bool -> Query -> Database -> (Request, Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. @@ -334,6 +341,12 @@ runQuery :: (DbConn m) => Bool -> [Notice] -> Query -> m CursorState' -- ^ Send query request and return cursor state runQuery isExplain ns q = call' ns . queryRequest isExplain q =<< thisDatabase +call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState' +-- ^ Send notices and request and return promised cursor state +call' ns (req, remainingLimit) = do + promise <- call ns req + return $ Delayed (fmap (fromReply remainingLimit =<<) promise) + find :: (DbConn m) => Query -> m Cursor -- ^ Fetch documents satisfying query find q@Query{selection, batchSize} = do @@ -397,15 +410,16 @@ data CursorState = CS Limit CursorId [Document] fromReply :: Limit -> Reply -> Either Failure CursorState -- ^ Convert Reply to CursorState or Failure -fromReply limit Reply{..} = if rResponseFlag == 0 - then Right (CS limit rCursorId rDocuments) - else Left . ServerFailure $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments - -call' :: (Conn m) => [Notice] -> (Request, Limit) -> m CursorState' --- ^ Send notices and request and return promised cursor state -call' ns (req, remainingLimit) = do - promise <- call ns req - return $ Delayed (fmap (fromReply remainingLimit =<<) promise) +fromReply limit Reply{..} = case mapMaybe fromResponseFlag rResponseFlags of + [] -> Right (CS limit rCursorId rDocuments) + err : _ -> Left err + where + fromResponseFlag :: ResponseFlag -> Maybe Failure + -- ^ If response flag indicate failure then Just Failure, otherwise Nothing + fromResponseFlag x = case x of + AwaitCapable -> Nothing + CursorNotFound -> Just . ServerFailure $ "Cursor " ++ show rCursorId ++ " not found" + QueryError -> Just . ServerFailure $ "Query failure " ++ show rDocuments newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor -- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected. @@ -432,9 +446,8 @@ next cursor = modifyCursorState' cursor nextState where [] -> if cid == 0 then return (CursorState $ CS 0 0 [], Nothing) -- finished else error $ "server returned empty batch but says more results on server" - nextBatch fcol batch limit cid = let - (batchSize, remLimit) = batchSizeRemainingLimit batch limit - in call' [] (GetMore fcol batchSize cid, remLimit) + nextBatch fcol batch limit cid = call' [] (GetMore fcol batchSize cid, remLimit) + where (batchSize, remLimit) = batchSizeRemainingLimit batch limit nextN :: (Conn m) => Int -> Cursor -> m [Document] -- ^ Return next N documents or less if end is reached @@ -454,17 +467,18 @@ instance (Conn m) => Resource m Cursor where -- ** Group +-- | Groups documents in collection by key then reduces (aggregates) each group data Group = Group { gColl :: Collection, gKey :: GroupKey, -- ^ Fields to group by - gReduce :: Javascript, -- ^ The reduce function aggregates (reduces) the objects iterated. Typical operations of a reduce function include summing and counting. reduce takes two arguments: the current document being iterated over and the aggregation value. - gInitial :: Document, -- ^ Initial aggregation value supplied to reduce + gReduce :: Javascript, -- ^ @(doc, agg) -> ()@. The reduce function reduces (aggregates) the objects iterated. Typical operations of a reduce function include summing and counting. It takes two arguments, the current document being iterated over and the aggregation value, and updates the aggregate value. + gInitial :: Document, -- ^ @agg@. Initial aggregation value supplied to reduce gCond :: Selector, -- ^ Condition that must be true for a row to be considered. [] means always true. - gFinalize :: Maybe Javascript -- ^ An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields). + gFinalize :: Maybe Javascript -- ^ @agg -> () | result@. An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields). } deriving (Show, Eq) data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq) --- ^ Fields to group by, or function returning a "key object" to be used as the grouping key. Use this instead of key to specify a key that is not an existing member of the object (or, to access embedded members). +-- ^ Fields to group by, or function (@doc -> key@) returning a "key object" to be used as the grouping key. Use KeyF instead of Key to specify a key that is not an existing member of the object (or, to access embedded members). groupDocument :: Group -> Document -- ^ Translate Group data into expected document form @@ -482,12 +496,12 @@ group g = at "retval" <$> runCommand ["group" =: groupDocument g] -- ** MapReduce --- | Maps every document in collection to a (key, value) pair, then for each unique key reduces all its associated values to a result. Therefore, the final output is a list of (key, result) pairs, where every key is unique. This is the basic description. There are additional nuances that may be used. See for details. +-- | Maps every document in collection to a list of (key, value) pairs, then for each unique key reduces all its associated values from all lists to a single result. There are additional parameters that may be set to tweak this basic operation. data MapReduce = MapReduce { rColl :: Collection, rMap :: MapFun, rReduce :: ReduceFun, - rSelect :: Selector, -- ^ Default is [] + rSelect :: Selector, -- ^ Operate on only those documents selected. Default is [] meaning all documents. rSort :: Order, -- ^ Default is [] meaning no sort rLimit :: Limit, -- ^ Default is 0 meaning no limit rOut :: Maybe Collection, -- ^ Output to given permanent collection, otherwise output to a new temporary collection whose name is returned. @@ -498,10 +512,10 @@ data MapReduce = MapReduce { } deriving (Show, Eq) type MapFun = Javascript --- ^ @() -> void@. The map function references the variable this to inspect the current object under consideration. A map function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate. +-- ^ @() -> void@. The map function references the variable @this@ to inspect the current object under consideration. The function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate. type ReduceFun = Javascript --- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values. To use, reduce the received values, and return a result. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible. +-- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values and returns an aggregate result value. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible. type FinalizeFun = Javascript -- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value. diff --git a/mongoDB.cabal b/mongoDB.cabal index ba20e20..0643df5 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,5 +1,5 @@ Name: mongoDB -Version: 0.6 +Version: 0.6.1 License: OtherLicense License-file: LICENSE Maintainer: Tony Hannan diff --git a/tutorial.md b/tutorial.md index 6feeaf1..8bf03cf 100644 --- a/tutorial.md +++ b/tutorial.md @@ -55,12 +55,12 @@ Open up a connection to your DB instance, using the standard port: or for a non-standard port - > Right con <- connect $ server "127.0.0.1" (PortNumber 666) + > Right con <- connect $ Server "127.0.0.1" (PortNumber 666) *connect* returns Left IOError if connection fails. We are assuming above it won't fail. If it does you will get a pattern match error. -Task and Db monad +Connected monad ------------------- The current connection is held in a Connected monad, and the current database