From dd6c5057f572ae074d363c956f390fc11fcc16a1 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Mon, 8 May 2017 22:47:47 -0700 Subject: [PATCH] Add modifyMVar for Action monad --- Database/MongoDB/Internal/Util.hs | 4 +-- Database/MongoDB/Query.hs | 47 ++++++++++++++++++------------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/Database/MongoDB/Internal/Util.hs b/Database/MongoDB/Internal/Util.hs index ac21356..2c05138 100644 --- a/Database/MongoDB/Internal/Util.hs +++ b/Database/MongoDB/Internal/Util.hs @@ -65,9 +65,9 @@ shuffle :: [a] -> IO [a] -- ^ Randomly shuffle items in list shuffle list = shuffle' list (length list) <$> newStdGen -loop :: (Functor m, Monad m) => m (Maybe a) -> m [a] +loop :: Monad m => m (Maybe a) -> m [a] -- ^ Repeatedy execute action, collecting results, until it returns Nothing -loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act) +loop act = act >>= maybe (return []) (\a -> (a :) `liftM` loop act) untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b -- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty. diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 4dec503..2b5840b 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -57,17 +57,17 @@ import Data.Monoid (mappend) #endif import Data.Typeable (Typeable) +import qualified Control.Concurrent.MVar as MV #if MIN_VERSION_base(4,6,0) import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar, - readMVar, modifyMVar) + readMVar) #else import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, - readMVar, modifyMVar) + readMVar) #endif import Control.Applicative ((<$>)) import Control.Exception (SomeException, catch) import Control.Monad (when) -import Control.Monad.Base (MonadBase) import Control.Monad.Error (Error(..)) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) import Control.Monad.Trans (MonadIO, liftIO) @@ -840,7 +840,7 @@ findOne q = do pipe <- asks mongoPipe qr <- queryRequest False q {limit = 1} rq <- liftIO $ request pipe [] qr - Batch _ _ docs <- fulfill rq + Batch _ _ docs <- liftDB $ fulfill rq return (listToMaybe docs) fetch :: (MonadIO m) => Query -> Action m Document @@ -929,7 +929,7 @@ explain q = do -- same as findOne but with explain set to true pipe <- asks mongoPipe qr <- queryRequest True q {limit = 1} r <- liftIO $ request pipe [] qr - Batch _ _ docs <- fulfill r + Batch _ _ docs <- liftDB $ fulfill r return $ if null docs then error ("no explain: " ++ show q) else head docs count :: (MonadIO m) => Query -> Action m Int @@ -998,7 +998,7 @@ fromReply limit Reply{..} = do CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments) -fulfill :: (MonadIO m) => DelayedBatch -> Action m Batch +fulfill :: DelayedBatch -> Action IO Batch -- ^ Demand and wait for result, raise failure if exception fulfill = liftIO @@ -1018,11 +1018,11 @@ newCursor db col batchSize dBatch = do where mkWeakMVar = addMVarFinalizer #endif -nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document] +nextBatch :: MonadIO m => Cursor -> Action m [Document] -- ^ Return next batch of documents in query result, which will be empty if finished. -nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do +nextBatch (Cursor fcol batchSize var) = liftDB $ modifyMVar var $ \dBatch -> do -- Pre-fetch next batch promise from server and return current batch. - Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch let newLimit = do limit <- mLimit return $ limit - (min limit $ fromIntegral $ length docs) @@ -1037,7 +1037,7 @@ nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do return (emptyBatch, resultDocs) (_, _) -> (, resultDocs) <$> getNextBatch -fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch +fulfill' :: FullCollection -> BatchSize -> DelayedBatch -> Action IO Batch -- Discard pre-fetched batch if empty with nonzero cid. fulfill' fcol batchSize dBatch = do b@(Batch limit cid docs) <- fulfill dBatch @@ -1051,13 +1051,13 @@ nextBatch' fcol batchSize limit cid = do liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit) where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit -next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document) +next :: MonadIO m => Cursor -> Action m (Maybe Document) -- ^ Return next document in query result, or Nothing if finished. -next (Cursor fcol batchSize var) = modifyMVar var nextState where +next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where -- Pre-fetch next batch promise from server when last one in current batch is returned. -- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document) nextState dBatch = do - Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch if mLimit == (Just 0) then return (return $ Batch (Just 0) 0 [], Nothing) else @@ -1075,27 +1075,29 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where return (dBatch', Just doc) [] -> if cid == 0 then return (return $ Batch (Just 0) 0 [], Nothing) -- finished - else fmap (,Nothing) $ nextBatch' fcol batchSize mLimit cid + else do + nb <- nextBatch' fcol batchSize mLimit cid + return (nb, Nothing) -nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document] +nextN :: MonadIO m => Int -> Cursor -> Action m [Document] -- ^ Return next N documents or less if end is reached nextN n c = catMaybes `liftM` replicateM n (next c) -rest :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document] +rest :: MonadIO m => Cursor -> Action m [Document] -- ^ Return remaining documents in query result rest c = loop (next c) -closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m () -closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do +closeCursor :: MonadIO m => Cursor -> Action m () +closeCursor (Cursor _ _ var) = liftDB $ modifyMVar var $ \dBatch -> do Batch _ cid _ <- fulfill dBatch unless (cid == 0) $ do pipe <- asks mongoPipe liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]] return $ (return $ Batch (Just 0) 0 [], ()) -isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool +isCursorClosed :: MonadIO m => Cursor -> Action m Bool isCursorClosed (Cursor _ _ var) = do - Batch _ cid docs <- fulfill =<< readMVar var + Batch _ cid docs <- liftDB $ fulfill =<< readMVar var return (cid == 0 && null docs) -- ** Aggregate @@ -1242,6 +1244,11 @@ eval :: (MonadIO m, Val v) => Javascript -> Action m v -- ^ Run code on server eval code = at "retval" `liftM` runCommand ["$eval" =: code] +modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b +modifyMVar v f = do + ctx <- ask + liftIO $ MV.modifyMVar v (\x -> runReaderT (f x) ctx) + {- Authors: Tony Hannan Copyright 2011 10gen Inc.