From 8da53a3fa3449abbcaaf5216058f531f6903359e Mon Sep 17 00:00:00 2001 From: Tony Hannan Date: Mon, 1 Nov 2010 15:35:13 -0400 Subject: [PATCH] Use Monad.MVar. Remove Delayed wrapper around promise. --- Control/Monad/Throw.hs | 6 +++ Database/MongoDB/Query.hs | 92 +++++++++++++++------------------------ mongoDB.cabal | 2 +- 3 files changed, 42 insertions(+), 58 deletions(-) diff --git a/Control/Monad/Throw.hs b/Control/Monad/Throw.hs index ef989e3..6d6b85e 100644 --- a/Control/Monad/Throw.hs +++ b/Control/Monad/Throw.hs @@ -7,6 +7,8 @@ module Control.Monad.Throw where import Prelude hiding (catch) import Control.Monad.Reader import Control.Monad.Error +import Control.Arrow ((+++)) +import Control.Applicative ((<$>)) -- | Same as 'MonadError' but without functional dependency so the same monad can have multiple errors with different types class (Monad m) => Throw e m where @@ -42,3 +44,7 @@ instance (Error e, Throw e m, Error x) => Throw e (ErrorT x m) where instance (Throw e m) => Throw e (ReaderT x m) where throw = lift . throw catch a h = ReaderT $ \x -> catch (runReaderT a x) (flip runReaderT x . h) + +mapError :: (Functor m) => (e -> e') -> ErrorT e m a -> ErrorT e' m a +-- ^ Convert error type +mapError f (ErrorT m) = ErrorT $ (f +++ id) <$> m diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 94c3aeb..6fb8dde 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -46,7 +46,7 @@ import Control.Monad.Context import Control.Monad.Reader import Control.Monad.Error import Control.Monad.Throw -import Control.Concurrent.MVar +import Control.Monad.MVar import Control.Pipeline (Resource(..)) import qualified Database.MongoDB.Internal.Protocol as P import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call) @@ -56,7 +56,7 @@ import Data.Word import Data.Int import Data.Maybe (listToMaybe, catMaybes) import Data.UString as U (dropWhile, any, tail, unpack) -import Control.Monad.Util (MonadIO', loop) -- plus Applicative instances of ErrorT & ReaderT +import Control.Monad.Util (MonadIO', loop) import Database.MongoDB.Internal.Util ((<.>), true1) mapErrorIO :: (Throw e m, MonadIO m) => (e' -> e) -> ErrorT e' IO a -> m a @@ -71,11 +71,11 @@ access w mos pool act = do either (return . Left . ConnectionFailure) (runAction act w mos) ePipe -- | A monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read, write, or pipe failure -class (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m -instance (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m +class (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m, MonadMVar m) => Access m +instance (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m, MonadMVar m) => Access m newtype Action m a = Action (ErrorT Failure (ReaderT WriteMode (ReaderT MasterOrSlaveOk (ReaderT Pipe m))) a) - deriving (Context Pipe, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, Monad, Applicative, Functor) + deriving (Context Pipe, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, MonadMVar, Monad, Applicative, Functor) -- ^ Monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws a 'Failure' on read, write or pipe failure instance MonadTrans Action where @@ -363,24 +363,24 @@ queryRequest isExplain mos Query{..} (Database db) = (P.Query{..}, remainingLimi special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection -runQuery :: (DbAccess m) => Bool -> [Notice] -> Query -> m CursorState' +runQuery :: (DbAccess m) => Bool -> [Notice] -> Query -> m DelayedCursorState -- ^ Send query request and return cursor state runQuery isExplain ns q = do db <- thisDatabase slaveOK <- context - call' ns (queryRequest isExplain slaveOK q db) + request ns (queryRequest isExplain slaveOK q db) find :: (DbAccess m) => Query -> m Cursor -- ^ Fetch documents satisfying query find q@Query{selection, batchSize} = do db <- thisDatabase - cs' <- runQuery False [] q - newCursor db (coll selection) batchSize cs' + dcs <- runQuery False [] q + newCursor db (coll selection) batchSize dcs findOne' :: (DbAccess m) => [Notice] -> Query -> m (Maybe Document) -- ^ Send notices and fetch first document satisfying query or Nothing if none satisfy it findOne' ns q = do - CS _ _ docs <- cursorState =<< runQuery False ns q {limit = 1} + CS _ _ docs <- mapErrorIO id =<< runQuery False ns q {limit = 1} return (listToMaybe docs) findOne :: (DbAccess m) => Query -> m (Maybe Document) @@ -390,7 +390,7 @@ findOne = findOne' [] explain :: (DbAccess m) => Query -> m Document -- ^ Return performance stats of query execution explain q = do -- same as findOne but with explain set to true - CS _ _ docs <- cursorState =<< runQuery True [] q {limit = 1} + CS _ _ docs <- mapErrorIO id =<< runQuery True [] q {limit = 1} return $ if null docs then error ("no explain: " ++ show q) else head docs count :: (DbAccess m) => Query -> m Int @@ -405,41 +405,21 @@ distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "ke -- *** Cursor -data Cursor = Cursor FullCollection BatchSize (MVar CursorState') +data Cursor = Cursor FullCollection BatchSize (MVar DelayedCursorState) -- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the pipe is closed, so you can open another pipe to the same server and continue using the cursor. -modifyCursorState' :: (Access m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', a)) -> m a --- ^ Analogous to 'modifyMVar' but with Conn monad -modifyCursorState' (Cursor fcol batch var) act = do - wr <- context - mos <- context - pipe <- context - e <- liftIO . modifyMVar var $ \cs' -> do - e' <- runAction (act fcol batch cs') wr mos pipe - return $ case e' of - Right (cs'', a) -> (cs'', Right a) - Left failure -> (cs', Left $ throw failure) - either id return e - getCursorState :: (Access m) => Cursor -> m CursorState -- ^ Extract current cursor status -getCursorState (Cursor _ _ var) = cursorState =<< liftIO (readMVar var) +getCursorState (Cursor _ _ var) = mapErrorIO id =<< readMVar var -data CursorState' = - Delayed (forall n. (Throw Failure n, MonadIO n) => n CursorState) - | CursorState CursorState --- ^ A cursor state or a promised cursor state which may fail +type DelayedCursorState = ErrorT Failure IO CursorState +-- ^ A promised cursor state which may fail -call' :: (Access m) => [Notice] -> (Request, Limit) -> m CursorState' +request :: (Access m) => [Notice] -> (Request, Limit) -> m DelayedCursorState -- ^ Send notices and request and return promised cursor state -call' ns (req, remainingLimit) = do +request ns (req, remainingLimit) = do promise <- call ns req - return $ Delayed (fromReply remainingLimit =<< promise) - -cursorState :: (Access m) => CursorState' -> m CursorState --- ^ Convert promised cursor state to cursor state or failure -cursorState (Delayed promise) = promise -cursorState (CursorState cs) = return cs + return $ fromReply remainingLimit =<< promise data CursorState = CS Limit CursorId [Document] -- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch. @@ -456,34 +436,30 @@ fromReply limit Reply{..} = do CursorNotFound -> throw (CursorNotFoundFailure rCursorId) QueryError -> throw (QueryFailure $ at "$err" $ head rDocuments) -newCursor :: (Access m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor +newCursor :: (Access m) => Database -> Collection -> BatchSize -> DelayedCursorState -> m Cursor -- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected. newCursor (Database db) col batch cs = do - wr <- context - mos <- context - pipe <- context - var <- liftIO (newMVar cs) + var <- newMVar cs let cursor = Cursor (db <.> col) batch var - liftIO . addMVarFinalizer var $ runAction (close cursor) wr mos pipe >> return () + addMVarFinalizer var (close cursor) return cursor next :: (Access m) => Cursor -> m (Maybe Document) -- ^ Return next document in query result, or Nothing if finished. -next cursor = modifyCursorState' cursor nextState where +next (Cursor fcol batch var) = modifyMVar var nextState where -- Pre-fetch next batch promise from server when last one in current batch is returned. - nextState :: FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', Maybe Document) - nextState fcol batch cs' = do - CS limit cid docs <- cursorState cs' + nextState dcs = do + CS limit cid docs <- mapErrorIO id dcs case docs of doc : docs' -> do - cs'' <- if null docs' && cid /= 0 - then nextBatch fcol batch limit cid - else return $ CursorState (CS limit cid docs') - return (cs'', Just doc) + dcs' <- if null docs' && cid /= 0 + then nextBatch limit cid + else return $ return (CS limit cid docs') + return (dcs', Just doc) [] -> if cid == 0 - then return (CursorState $ CS 0 0 [], Nothing) -- finished + then return (return $ CS 0 0 [], Nothing) -- finished else error $ "server returned empty batch but says more results on server" - nextBatch fcol batch limit cid = call' [] (GetMore fcol batchSize cid, remLimit) + nextBatch limit cid = request [] (GetMore fcol batchSize cid, remLimit) where (batchSize, remLimit) = batchSizeRemainingLimit batch limit nextN :: (Access m) => Int -> Cursor -> m [Document] @@ -495,8 +471,8 @@ rest :: (Access m) => Cursor -> m [Document] rest c = loop (next c) instance (Access m) => Resource m Cursor where - close cursor = modifyCursorState' cursor kill' where - kill' _ _ cs' = first CursorState <$> (kill =<< cursorState cs') + close (Cursor _ _ var) = modifyMVar var kill' where + kill' dcs = first return <$> (kill =<< mapErrorIO id dcs) kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]] isClosed cursor = do CS _ cid docs <- getCursorState cursor @@ -613,7 +589,9 @@ eval code = at "retval" <$> runCommand ["$eval" =: code] send :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> m () -- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails. -send ns = mapErrorIO ConnectionFailure . flip P.send ns =<< context +send ns = do + pipe <- context + mapErrorIO ConnectionFailure (P.send pipe ns) call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply) -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive. diff --git a/mongoDB.cabal b/mongoDB.cabal index fe12b10..e671ac8 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,5 +1,5 @@ name: mongoDB -version: 0.8.1 +version: 0.8.2 build-type: Simple license: OtherLicense license-file: LICENSE