Use Monad.MVar. Remove Delayed wrapper around promise.
This commit is contained in:
parent
630b558b93
commit
8da53a3fa3
3 changed files with 42 additions and 58 deletions
|
@ -7,6 +7,8 @@ module Control.Monad.Throw where
|
|||
import Prelude hiding (catch)
|
||||
import Control.Monad.Reader
|
||||
import Control.Monad.Error
|
||||
import Control.Arrow ((+++))
|
||||
import Control.Applicative ((<$>))
|
||||
|
||||
-- | Same as 'MonadError' but without functional dependency so the same monad can have multiple errors with different types
|
||||
class (Monad m) => Throw e m where
|
||||
|
@ -42,3 +44,7 @@ instance (Error e, Throw e m, Error x) => Throw e (ErrorT x m) where
|
|||
instance (Throw e m) => Throw e (ReaderT x m) where
|
||||
throw = lift . throw
|
||||
catch a h = ReaderT $ \x -> catch (runReaderT a x) (flip runReaderT x . h)
|
||||
|
||||
mapError :: (Functor m) => (e -> e') -> ErrorT e m a -> ErrorT e' m a
|
||||
-- ^ Convert error type
|
||||
mapError f (ErrorT m) = ErrorT $ (f +++ id) <$> m
|
||||
|
|
|
@ -46,7 +46,7 @@ import Control.Monad.Context
|
|||
import Control.Monad.Reader
|
||||
import Control.Monad.Error
|
||||
import Control.Monad.Throw
|
||||
import Control.Concurrent.MVar
|
||||
import Control.Monad.MVar
|
||||
import Control.Pipeline (Resource(..))
|
||||
import qualified Database.MongoDB.Internal.Protocol as P
|
||||
import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call)
|
||||
|
@ -56,7 +56,7 @@ import Data.Word
|
|||
import Data.Int
|
||||
import Data.Maybe (listToMaybe, catMaybes)
|
||||
import Data.UString as U (dropWhile, any, tail, unpack)
|
||||
import Control.Monad.Util (MonadIO', loop) -- plus Applicative instances of ErrorT & ReaderT
|
||||
import Control.Monad.Util (MonadIO', loop)
|
||||
import Database.MongoDB.Internal.Util ((<.>), true1)
|
||||
|
||||
mapErrorIO :: (Throw e m, MonadIO m) => (e' -> e) -> ErrorT e' IO a -> m a
|
||||
|
@ -71,11 +71,11 @@ access w mos pool act = do
|
|||
either (return . Left . ConnectionFailure) (runAction act w mos) ePipe
|
||||
|
||||
-- | A monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws 'Failure' on read, write, or pipe failure
|
||||
class (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m
|
||||
instance (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m) => Access m
|
||||
class (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m, MonadMVar m) => Access m
|
||||
instance (Context Pipe m, Context MasterOrSlaveOk m, Context WriteMode m, Throw Failure m, MonadIO' m, MonadMVar m) => Access m
|
||||
|
||||
newtype Action m a = Action (ErrorT Failure (ReaderT WriteMode (ReaderT MasterOrSlaveOk (ReaderT Pipe m))) a)
|
||||
deriving (Context Pipe, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, Monad, Applicative, Functor)
|
||||
deriving (Context Pipe, Context MasterOrSlaveOk, Context WriteMode, Throw Failure, MonadIO, MonadMVar, Monad, Applicative, Functor)
|
||||
-- ^ Monad with access to a 'Pipe', 'MasterOrSlaveOk', and 'WriteMode', and throws a 'Failure' on read, write or pipe failure
|
||||
|
||||
instance MonadTrans Action where
|
||||
|
@ -363,24 +363,24 @@ queryRequest isExplain mos Query{..} (Database db) = (P.Query{..}, remainingLimi
|
|||
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
|
||||
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
|
||||
|
||||
runQuery :: (DbAccess m) => Bool -> [Notice] -> Query -> m CursorState'
|
||||
runQuery :: (DbAccess m) => Bool -> [Notice] -> Query -> m DelayedCursorState
|
||||
-- ^ Send query request and return cursor state
|
||||
runQuery isExplain ns q = do
|
||||
db <- thisDatabase
|
||||
slaveOK <- context
|
||||
call' ns (queryRequest isExplain slaveOK q db)
|
||||
request ns (queryRequest isExplain slaveOK q db)
|
||||
|
||||
find :: (DbAccess m) => Query -> m Cursor
|
||||
-- ^ Fetch documents satisfying query
|
||||
find q@Query{selection, batchSize} = do
|
||||
db <- thisDatabase
|
||||
cs' <- runQuery False [] q
|
||||
newCursor db (coll selection) batchSize cs'
|
||||
dcs <- runQuery False [] q
|
||||
newCursor db (coll selection) batchSize dcs
|
||||
|
||||
findOne' :: (DbAccess m) => [Notice] -> Query -> m (Maybe Document)
|
||||
-- ^ Send notices and fetch first document satisfying query or Nothing if none satisfy it
|
||||
findOne' ns q = do
|
||||
CS _ _ docs <- cursorState =<< runQuery False ns q {limit = 1}
|
||||
CS _ _ docs <- mapErrorIO id =<< runQuery False ns q {limit = 1}
|
||||
return (listToMaybe docs)
|
||||
|
||||
findOne :: (DbAccess m) => Query -> m (Maybe Document)
|
||||
|
@ -390,7 +390,7 @@ findOne = findOne' []
|
|||
explain :: (DbAccess m) => Query -> m Document
|
||||
-- ^ Return performance stats of query execution
|
||||
explain q = do -- same as findOne but with explain set to true
|
||||
CS _ _ docs <- cursorState =<< runQuery True [] q {limit = 1}
|
||||
CS _ _ docs <- mapErrorIO id =<< runQuery True [] q {limit = 1}
|
||||
return $ if null docs then error ("no explain: " ++ show q) else head docs
|
||||
|
||||
count :: (DbAccess m) => Query -> m Int
|
||||
|
@ -405,41 +405,21 @@ distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "ke
|
|||
|
||||
-- *** Cursor
|
||||
|
||||
data Cursor = Cursor FullCollection BatchSize (MVar CursorState')
|
||||
data Cursor = Cursor FullCollection BatchSize (MVar DelayedCursorState)
|
||||
-- ^ Iterator over results of a query. Use 'next' to iterate or 'rest' to get all results. A cursor is closed when it is explicitly closed, all results have been read from it, garbage collected, or not used for over 10 minutes (unless 'NoCursorTimeout' option was specified in 'Query'). Reading from a closed cursor raises a 'CursorNotFoundFailure'. Note, a cursor is not closed when the pipe is closed, so you can open another pipe to the same server and continue using the cursor.
|
||||
|
||||
modifyCursorState' :: (Access m) => Cursor -> (FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', a)) -> m a
|
||||
-- ^ Analogous to 'modifyMVar' but with Conn monad
|
||||
modifyCursorState' (Cursor fcol batch var) act = do
|
||||
wr <- context
|
||||
mos <- context
|
||||
pipe <- context
|
||||
e <- liftIO . modifyMVar var $ \cs' -> do
|
||||
e' <- runAction (act fcol batch cs') wr mos pipe
|
||||
return $ case e' of
|
||||
Right (cs'', a) -> (cs'', Right a)
|
||||
Left failure -> (cs', Left $ throw failure)
|
||||
either id return e
|
||||
|
||||
getCursorState :: (Access m) => Cursor -> m CursorState
|
||||
-- ^ Extract current cursor status
|
||||
getCursorState (Cursor _ _ var) = cursorState =<< liftIO (readMVar var)
|
||||
getCursorState (Cursor _ _ var) = mapErrorIO id =<< readMVar var
|
||||
|
||||
data CursorState' =
|
||||
Delayed (forall n. (Throw Failure n, MonadIO n) => n CursorState)
|
||||
| CursorState CursorState
|
||||
-- ^ A cursor state or a promised cursor state which may fail
|
||||
type DelayedCursorState = ErrorT Failure IO CursorState
|
||||
-- ^ A promised cursor state which may fail
|
||||
|
||||
call' :: (Access m) => [Notice] -> (Request, Limit) -> m CursorState'
|
||||
request :: (Access m) => [Notice] -> (Request, Limit) -> m DelayedCursorState
|
||||
-- ^ Send notices and request and return promised cursor state
|
||||
call' ns (req, remainingLimit) = do
|
||||
request ns (req, remainingLimit) = do
|
||||
promise <- call ns req
|
||||
return $ Delayed (fromReply remainingLimit =<< promise)
|
||||
|
||||
cursorState :: (Access m) => CursorState' -> m CursorState
|
||||
-- ^ Convert promised cursor state to cursor state or failure
|
||||
cursorState (Delayed promise) = promise
|
||||
cursorState (CursorState cs) = return cs
|
||||
return $ fromReply remainingLimit =<< promise
|
||||
|
||||
data CursorState = CS Limit CursorId [Document]
|
||||
-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch.
|
||||
|
@ -456,34 +436,30 @@ fromReply limit Reply{..} = do
|
|||
CursorNotFound -> throw (CursorNotFoundFailure rCursorId)
|
||||
QueryError -> throw (QueryFailure $ at "$err" $ head rDocuments)
|
||||
|
||||
newCursor :: (Access m) => Database -> Collection -> BatchSize -> CursorState' -> m Cursor
|
||||
newCursor :: (Access m) => Database -> Collection -> BatchSize -> DelayedCursorState -> m Cursor
|
||||
-- ^ Create new cursor. If you don't read all results then close it. Cursor will be closed automatically when all results are read from it or when eventually garbage collected.
|
||||
newCursor (Database db) col batch cs = do
|
||||
wr <- context
|
||||
mos <- context
|
||||
pipe <- context
|
||||
var <- liftIO (newMVar cs)
|
||||
var <- newMVar cs
|
||||
let cursor = Cursor (db <.> col) batch var
|
||||
liftIO . addMVarFinalizer var $ runAction (close cursor) wr mos pipe >> return ()
|
||||
addMVarFinalizer var (close cursor)
|
||||
return cursor
|
||||
|
||||
next :: (Access m) => Cursor -> m (Maybe Document)
|
||||
-- ^ Return next document in query result, or Nothing if finished.
|
||||
next cursor = modifyCursorState' cursor nextState where
|
||||
next (Cursor fcol batch var) = modifyMVar var nextState where
|
||||
-- Pre-fetch next batch promise from server when last one in current batch is returned.
|
||||
nextState :: FullCollection -> BatchSize -> CursorState' -> Action IO (CursorState', Maybe Document)
|
||||
nextState fcol batch cs' = do
|
||||
CS limit cid docs <- cursorState cs'
|
||||
nextState dcs = do
|
||||
CS limit cid docs <- mapErrorIO id dcs
|
||||
case docs of
|
||||
doc : docs' -> do
|
||||
cs'' <- if null docs' && cid /= 0
|
||||
then nextBatch fcol batch limit cid
|
||||
else return $ CursorState (CS limit cid docs')
|
||||
return (cs'', Just doc)
|
||||
dcs' <- if null docs' && cid /= 0
|
||||
then nextBatch limit cid
|
||||
else return $ return (CS limit cid docs')
|
||||
return (dcs', Just doc)
|
||||
[] -> if cid == 0
|
||||
then return (CursorState $ CS 0 0 [], Nothing) -- finished
|
||||
then return (return $ CS 0 0 [], Nothing) -- finished
|
||||
else error $ "server returned empty batch but says more results on server"
|
||||
nextBatch fcol batch limit cid = call' [] (GetMore fcol batchSize cid, remLimit)
|
||||
nextBatch limit cid = request [] (GetMore fcol batchSize cid, remLimit)
|
||||
where (batchSize, remLimit) = batchSizeRemainingLimit batch limit
|
||||
|
||||
nextN :: (Access m) => Int -> Cursor -> m [Document]
|
||||
|
@ -495,8 +471,8 @@ rest :: (Access m) => Cursor -> m [Document]
|
|||
rest c = loop (next c)
|
||||
|
||||
instance (Access m) => Resource m Cursor where
|
||||
close cursor = modifyCursorState' cursor kill' where
|
||||
kill' _ _ cs' = first CursorState <$> (kill =<< cursorState cs')
|
||||
close (Cursor _ _ var) = modifyMVar var kill' where
|
||||
kill' dcs = first return <$> (kill =<< mapErrorIO id dcs)
|
||||
kill (CS _ cid _) = (CS 0 0 [],) <$> if cid == 0 then return () else send [KillCursors [cid]]
|
||||
isClosed cursor = do
|
||||
CS _ cid docs <- getCursorState cursor
|
||||
|
@ -613,7 +589,9 @@ eval code = at "retval" <$> runCommand ["$eval" =: code]
|
|||
|
||||
send :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> m ()
|
||||
-- ^ Send notices as a contiguous batch to server with no reply. Throw 'ConnectionFailure' if pipe fails.
|
||||
send ns = mapErrorIO ConnectionFailure . flip P.send ns =<< context
|
||||
send ns = do
|
||||
pipe <- context
|
||||
mapErrorIO ConnectionFailure (P.send pipe ns)
|
||||
|
||||
call :: (Context Pipe m, Throw Failure m, MonadIO m) => [Notice] -> Request -> m (forall n. (Throw Failure n, MonadIO n) => n Reply)
|
||||
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
name: mongoDB
|
||||
version: 0.8.1
|
||||
version: 0.8.2
|
||||
build-type: Simple
|
||||
license: OtherLicense
|
||||
license-file: LICENSE
|
||||
|
|
Loading…
Reference in a new issue