Add modifyMVar for Action monad
This commit is contained in:
parent
ef819eb1aa
commit
dd6c5057f5
2 changed files with 29 additions and 22 deletions
|
@ -65,9 +65,9 @@ shuffle :: [a] -> IO [a]
|
||||||
-- ^ Randomly shuffle items in list
|
-- ^ Randomly shuffle items in list
|
||||||
shuffle list = shuffle' list (length list) <$> newStdGen
|
shuffle list = shuffle' list (length list) <$> newStdGen
|
||||||
|
|
||||||
loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
|
loop :: Monad m => m (Maybe a) -> m [a]
|
||||||
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
|
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
|
||||||
loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act)
|
loop act = act >>= maybe (return []) (\a -> (a :) `liftM` loop act)
|
||||||
|
|
||||||
untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b
|
untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b
|
||||||
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty.
|
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty.
|
||||||
|
|
|
@ -57,17 +57,17 @@ import Data.Monoid (mappend)
|
||||||
#endif
|
#endif
|
||||||
import Data.Typeable (Typeable)
|
import Data.Typeable (Typeable)
|
||||||
|
|
||||||
|
import qualified Control.Concurrent.MVar as MV
|
||||||
#if MIN_VERSION_base(4,6,0)
|
#if MIN_VERSION_base(4,6,0)
|
||||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
|
import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
|
||||||
readMVar, modifyMVar)
|
readMVar)
|
||||||
#else
|
#else
|
||||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
|
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
|
||||||
readMVar, modifyMVar)
|
readMVar)
|
||||||
#endif
|
#endif
|
||||||
import Control.Applicative ((<$>))
|
import Control.Applicative ((<$>))
|
||||||
import Control.Exception (SomeException, catch)
|
import Control.Exception (SomeException, catch)
|
||||||
import Control.Monad (when)
|
import Control.Monad (when)
|
||||||
import Control.Monad.Base (MonadBase)
|
|
||||||
import Control.Monad.Error (Error(..))
|
import Control.Monad.Error (Error(..))
|
||||||
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
||||||
import Control.Monad.Trans (MonadIO, liftIO)
|
import Control.Monad.Trans (MonadIO, liftIO)
|
||||||
|
@ -840,7 +840,7 @@ findOne q = do
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
qr <- queryRequest False q {limit = 1}
|
qr <- queryRequest False q {limit = 1}
|
||||||
rq <- liftIO $ request pipe [] qr
|
rq <- liftIO $ request pipe [] qr
|
||||||
Batch _ _ docs <- fulfill rq
|
Batch _ _ docs <- liftDB $ fulfill rq
|
||||||
return (listToMaybe docs)
|
return (listToMaybe docs)
|
||||||
|
|
||||||
fetch :: (MonadIO m) => Query -> Action m Document
|
fetch :: (MonadIO m) => Query -> Action m Document
|
||||||
|
@ -929,7 +929,7 @@ explain q = do -- same as findOne but with explain set to true
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
qr <- queryRequest True q {limit = 1}
|
qr <- queryRequest True q {limit = 1}
|
||||||
r <- liftIO $ request pipe [] qr
|
r <- liftIO $ request pipe [] qr
|
||||||
Batch _ _ docs <- fulfill r
|
Batch _ _ docs <- liftDB $ fulfill r
|
||||||
return $ if null docs then error ("no explain: " ++ show q) else head docs
|
return $ if null docs then error ("no explain: " ++ show q) else head docs
|
||||||
|
|
||||||
count :: (MonadIO m) => Query -> Action m Int
|
count :: (MonadIO m) => Query -> Action m Int
|
||||||
|
@ -998,7 +998,7 @@ fromReply limit Reply{..} = do
|
||||||
CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId
|
CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId
|
||||||
QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
|
QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments)
|
||||||
|
|
||||||
fulfill :: (MonadIO m) => DelayedBatch -> Action m Batch
|
fulfill :: DelayedBatch -> Action IO Batch
|
||||||
-- ^ Demand and wait for result, raise failure if exception
|
-- ^ Demand and wait for result, raise failure if exception
|
||||||
fulfill = liftIO
|
fulfill = liftIO
|
||||||
|
|
||||||
|
@ -1018,11 +1018,11 @@ newCursor db col batchSize dBatch = do
|
||||||
where mkWeakMVar = addMVarFinalizer
|
where mkWeakMVar = addMVarFinalizer
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
|
nextBatch :: MonadIO m => Cursor -> Action m [Document]
|
||||||
-- ^ Return next batch of documents in query result, which will be empty if finished.
|
-- ^ Return next batch of documents in query result, which will be empty if finished.
|
||||||
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
|
nextBatch (Cursor fcol batchSize var) = liftDB $ modifyMVar var $ \dBatch -> do
|
||||||
-- Pre-fetch next batch promise from server and return current batch.
|
-- Pre-fetch next batch promise from server and return current batch.
|
||||||
Batch mLimit cid docs <- fulfill' fcol batchSize dBatch
|
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
|
||||||
let newLimit = do
|
let newLimit = do
|
||||||
limit <- mLimit
|
limit <- mLimit
|
||||||
return $ limit - (min limit $ fromIntegral $ length docs)
|
return $ limit - (min limit $ fromIntegral $ length docs)
|
||||||
|
@ -1037,7 +1037,7 @@ nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
|
||||||
return (emptyBatch, resultDocs)
|
return (emptyBatch, resultDocs)
|
||||||
(_, _) -> (, resultDocs) <$> getNextBatch
|
(_, _) -> (, resultDocs) <$> getNextBatch
|
||||||
|
|
||||||
fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch
|
fulfill' :: FullCollection -> BatchSize -> DelayedBatch -> Action IO Batch
|
||||||
-- Discard pre-fetched batch if empty with nonzero cid.
|
-- Discard pre-fetched batch if empty with nonzero cid.
|
||||||
fulfill' fcol batchSize dBatch = do
|
fulfill' fcol batchSize dBatch = do
|
||||||
b@(Batch limit cid docs) <- fulfill dBatch
|
b@(Batch limit cid docs) <- fulfill dBatch
|
||||||
|
@ -1051,13 +1051,13 @@ nextBatch' fcol batchSize limit cid = do
|
||||||
liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit)
|
liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit)
|
||||||
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
|
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
|
||||||
|
|
||||||
next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document)
|
next :: MonadIO m => Cursor -> Action m (Maybe Document)
|
||||||
-- ^ Return next document in query result, or Nothing if finished.
|
-- ^ Return next document in query result, or Nothing if finished.
|
||||||
next (Cursor fcol batchSize var) = modifyMVar var nextState where
|
next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where
|
||||||
-- Pre-fetch next batch promise from server when last one in current batch is returned.
|
-- Pre-fetch next batch promise from server when last one in current batch is returned.
|
||||||
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
|
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
|
||||||
nextState dBatch = do
|
nextState dBatch = do
|
||||||
Batch mLimit cid docs <- fulfill' fcol batchSize dBatch
|
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
|
||||||
if mLimit == (Just 0)
|
if mLimit == (Just 0)
|
||||||
then return (return $ Batch (Just 0) 0 [], Nothing)
|
then return (return $ Batch (Just 0) 0 [], Nothing)
|
||||||
else
|
else
|
||||||
|
@ -1075,27 +1075,29 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where
|
||||||
return (dBatch', Just doc)
|
return (dBatch', Just doc)
|
||||||
[] -> if cid == 0
|
[] -> if cid == 0
|
||||||
then return (return $ Batch (Just 0) 0 [], Nothing) -- finished
|
then return (return $ Batch (Just 0) 0 [], Nothing) -- finished
|
||||||
else fmap (,Nothing) $ nextBatch' fcol batchSize mLimit cid
|
else do
|
||||||
|
nb <- nextBatch' fcol batchSize mLimit cid
|
||||||
|
return (nb, Nothing)
|
||||||
|
|
||||||
nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document]
|
nextN :: MonadIO m => Int -> Cursor -> Action m [Document]
|
||||||
-- ^ Return next N documents or less if end is reached
|
-- ^ Return next N documents or less if end is reached
|
||||||
nextN n c = catMaybes `liftM` replicateM n (next c)
|
nextN n c = catMaybes `liftM` replicateM n (next c)
|
||||||
|
|
||||||
rest :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
|
rest :: MonadIO m => Cursor -> Action m [Document]
|
||||||
-- ^ Return remaining documents in query result
|
-- ^ Return remaining documents in query result
|
||||||
rest c = loop (next c)
|
rest c = loop (next c)
|
||||||
|
|
||||||
closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m ()
|
closeCursor :: MonadIO m => Cursor -> Action m ()
|
||||||
closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do
|
closeCursor (Cursor _ _ var) = liftDB $ modifyMVar var $ \dBatch -> do
|
||||||
Batch _ cid _ <- fulfill dBatch
|
Batch _ cid _ <- fulfill dBatch
|
||||||
unless (cid == 0) $ do
|
unless (cid == 0) $ do
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
|
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
|
||||||
return $ (return $ Batch (Just 0) 0 [], ())
|
return $ (return $ Batch (Just 0) 0 [], ())
|
||||||
|
|
||||||
isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool
|
isCursorClosed :: MonadIO m => Cursor -> Action m Bool
|
||||||
isCursorClosed (Cursor _ _ var) = do
|
isCursorClosed (Cursor _ _ var) = do
|
||||||
Batch _ cid docs <- fulfill =<< readMVar var
|
Batch _ cid docs <- liftDB $ fulfill =<< readMVar var
|
||||||
return (cid == 0 && null docs)
|
return (cid == 0 && null docs)
|
||||||
|
|
||||||
-- ** Aggregate
|
-- ** Aggregate
|
||||||
|
@ -1242,6 +1244,11 @@ eval :: (MonadIO m, Val v) => Javascript -> Action m v
|
||||||
-- ^ Run code on server
|
-- ^ Run code on server
|
||||||
eval code = at "retval" `liftM` runCommand ["$eval" =: code]
|
eval code = at "retval" `liftM` runCommand ["$eval" =: code]
|
||||||
|
|
||||||
|
modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b
|
||||||
|
modifyMVar v f = do
|
||||||
|
ctx <- ask
|
||||||
|
liftIO $ MV.modifyMVar v (\x -> runReaderT (f x) ctx)
|
||||||
|
|
||||||
|
|
||||||
{- Authors: Tony Hannan <tony@10gen.com>
|
{- Authors: Tony Hannan <tony@10gen.com>
|
||||||
Copyright 2011 10gen Inc.
|
Copyright 2011 10gen Inc.
|
||||||
|
|
Loading…
Reference in a new issue