commit
62ae52ecd2
2 changed files with 28 additions and 5 deletions
|
@ -1,6 +1,6 @@
|
|||
-- | Query and update documents
|
||||
|
||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies #-}
|
||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP #-}
|
||||
|
||||
module Database.MongoDB.Query (
|
||||
-- * Monad
|
||||
|
@ -47,8 +47,13 @@ import Data.Int (Int32)
|
|||
import Data.Maybe (listToMaybe, catMaybes)
|
||||
import Data.Word (Word32)
|
||||
|
||||
#if MIN_VERSION_base(4,6,0)
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
|
||||
readMVar, modifyMVar)
|
||||
#else
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
|
||||
readMVar, modifyMVar)
|
||||
#endif
|
||||
import Control.Monad.Base (MonadBase(liftBase))
|
||||
import Control.Monad.Error (ErrorT, Error(..), MonadError, runErrorT,
|
||||
throwError)
|
||||
|
@ -78,6 +83,10 @@ import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..),
|
|||
import Database.MongoDB.Internal.Util (MonadIO', loop, liftIOE, true1, (<.>))
|
||||
import qualified Database.MongoDB.Internal.Protocol as P
|
||||
|
||||
#if !MIN_VERSION_base(4,6,0)
|
||||
--mkWeakMVar = addMVarFinalizer
|
||||
#endif
|
||||
|
||||
-- * Monad
|
||||
|
||||
newtype Action m a = Action {unAction :: ErrorT Failure (ReaderT Context m) a}
|
||||
|
@ -509,8 +518,11 @@ newCursor :: (MonadIO m, MonadBaseControl IO m) => Database -> Collection -> Bat
|
|||
newCursor db col batchSize dBatch = do
|
||||
var <- newMVar dBatch
|
||||
let cursor = Cursor (db <.> col) batchSize var
|
||||
addMVarFinalizer var (closeCursor cursor)
|
||||
mkWeakMVar var (closeCursor cursor)
|
||||
return cursor
|
||||
#if !MIN_VERSION_base(4,6,0)
|
||||
where mkWeakMVar = addMVarFinalizer
|
||||
#endif
|
||||
|
||||
nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
|
||||
-- ^ Return next batch of documents in query result, which will be empty if finished.
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. It also closes itself when garbage collected, or you can close it explicitly. -}
|
||||
|
||||
{-# LANGUAGE DoRec, RecordWildCards, NamedFieldPuns, ScopedTypeVariables #-}
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module System.IO.Pipeline (
|
||||
IOE,
|
||||
|
@ -19,10 +20,20 @@ import Control.Monad (forever)
|
|||
import GHC.Conc (ThreadStatus(..), threadStatus)
|
||||
|
||||
import Control.Monad.Trans (liftIO)
|
||||
#if MIN_VERSION_base(4,6,0)
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
||||
putMVar, readMVar, mkWeakMVar)
|
||||
#else
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
||||
putMVar, readMVar, addMVarFinalizer)
|
||||
#endif
|
||||
import Control.Monad.Error (ErrorT(ErrorT), runErrorT)
|
||||
|
||||
#if !MIN_VERSION_base(4,6,0)
|
||||
mkWeakMVar :: MVar a -> IO () -> IO ()
|
||||
mkWeakMVar = addMVarFinalizer
|
||||
#endif
|
||||
|
||||
onException :: (Monad m) => ErrorT e m a -> m () -> ErrorT e m a
|
||||
-- ^ If first action throws an exception then run second action then re-throw
|
||||
onException (ErrorT action) releaser = ErrorT $ do
|
||||
|
@ -58,7 +69,7 @@ newPipeline stream = do
|
|||
rec
|
||||
let pipe = Pipeline{..}
|
||||
listenThread <- forkIO (listen pipe)
|
||||
addMVarFinalizer vStream $ do
|
||||
mkWeakMVar vStream $ do
|
||||
killThread listenThread
|
||||
closeStream stream
|
||||
return pipe
|
||||
|
|
Loading…
Reference in a new issue