diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index f8e8631..9db3acb 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -4,7 +4,7 @@ module Database.MongoDB.Connection ( -- * Util - Secs, IOE, runIOE, + Secs, -- * Connection Pipe, close, isClosed, -- * Server @@ -25,12 +25,11 @@ import System.IO.Unsafe (unsafePerformIO) import System.Timeout (timeout) import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) -import qualified Control.Exception as E import qualified Data.List as List import Control.Monad.Identity (runIdentity) -import Control.Monad.Error (ErrorT(..), lift, throwError) +import Control.Monad.Error (throwError) import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar, readMVar) import Data.Bson (Document, at, (=:)) @@ -40,16 +39,16 @@ import qualified Data.Bson as B import qualified Data.Text as T import Database.MongoDB.Internal.Protocol (Pipe, newPipe) -import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, +import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, updateAssocs, shuffle, mergesortM) import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access, slaveOk, runCommand) -import System.IO.Pipeline (IOE, close, isClosed) +import System.IO.Pipeline (close, isClosed) -adminCommand :: Command -> Pipe -> IOE Document +adminCommand :: Command -> Pipe -> IO Document -- ^ Run command against admin database on server connected to pipe. Fail if connection fails. adminCommand cmd pipe = - liftIOE failureToIOError . ErrorT $ access pipe slaveOk "admin" $ runCommand cmd + liftIOE failureToIOError $ access pipe slaveOk "admin" $ runCommand cmd where failureToIOError (ConnectionFailure e) = e failureToIOError e = userError $ show e @@ -102,17 +101,16 @@ globalConnectTimeout :: IORef Secs globalConnectTimeout = unsafePerformIO (newIORef 6) {-# NOINLINE globalConnectTimeout #-} -connect :: Host -> IOE Pipe +connect :: Host -> IO Pipe -- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within 'globalConnectTimeout'. -connect h = lift (readIORef globalConnectTimeout) >>= flip connect' h +connect h = readIORef globalConnectTimeout >>= flip connect' h -connect' :: Secs -> Host -> IOE Pipe +connect' :: Secs -> Host -> IO Pipe -- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds. connect' timeoutSecs (Host hostname port) = do - handle <- ErrorT . E.try $ do - mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port) - maybe (ioError $ userError "connect timed out") return mh - lift $ newPipe handle + mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port) + handle <- maybe (ioError $ userError "connect timed out") return mh + newPipe handle -- * Replica Set @@ -125,11 +123,11 @@ replSetName :: ReplicaSet -> Text -- ^ name of connected replica set replSetName (ReplicaSet rsName _ _) = rsName -openReplicaSet :: (ReplicaSetName, [Host]) -> IOE ReplicaSet +openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet -- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet\'' instead. -openReplicaSet rsSeed = lift (readIORef globalConnectTimeout) >>= flip openReplicaSet' rsSeed +openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed -openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IOE ReplicaSet +openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet -- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members. openReplicaSet' timeoutSecs (rsName, seedList) = do vMembers <- newMVar (map (, Nothing) seedList) @@ -141,7 +139,7 @@ closeReplicaSet :: ReplicaSet -> IO () -- ^ Close all connections to replica set closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd) -primary :: ReplicaSet -> IOE Pipe +primary :: ReplicaSet -> IO Pipe -- ^ Return connection to current primary of replica set. Fail if no primary available. primary rs@(ReplicaSet rsName _ _) = do mHost <- statedPrimary <$> updateMembers rs @@ -149,19 +147,19 @@ primary rs@(ReplicaSet rsName _ _) = do Just host' -> connection rs Nothing host' Nothing -> throwError $ userError $ "replica set " ++ T.unpack rsName ++ " has no primary" -secondaryOk :: ReplicaSet -> IOE Pipe +secondaryOk :: ReplicaSet -> IO Pipe -- ^ Return connection to a random secondary, or primary if no secondaries available. secondaryOk rs = do info <- updateMembers rs - hosts <- lift $ shuffle (possibleHosts info) + hosts <- shuffle (possibleHosts info) let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info) untilSuccess (connection rs Nothing) hosts' -routedHost :: ((Host, Bool) -> (Host, Bool) -> IOE Ordering) -> ReplicaSet -> IOE Pipe +routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO Pipe -- ^ Return a connection to a host using a user-supplied sorting function, which sorts based on a tuple containing the host and a boolean indicating whether the host is primary. routedHost f rs = do info <- updateMembers rs - hosts <- lift $ shuffle (possibleHosts info) + hosts <- shuffle (possibleHosts info) let addIsPrimary h = (h, if Just h == statedPrimary info then True else False) hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts untilSuccess (connection rs Nothing) hosts' @@ -177,13 +175,13 @@ possibleHosts :: ReplicaInfo -> [Host] -- ^ Non-arbiter, non-hidden members of replica set possibleHosts (_, info) = map readHostPort $ at "hosts" info -updateMembers :: ReplicaSet -> IOE ReplicaInfo +updateMembers :: ReplicaSet -> IO ReplicaInfo -- ^ Fetch replica info from any server and update members accordingly updateMembers rs@(ReplicaSet _ vMembers _) = do (host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers modifyMVar vMembers $ \members -> do let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members - lift $ forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe + forM_ old $ \(_, mPipe) -> maybe (return ()) close mPipe return (members' ++ map (, Nothing) new, (host', info)) where intersection :: (Eq k) => [k] -> [(k, v)] -> (([(k, v)], [(k, v)]), [k]) @@ -191,7 +189,7 @@ updateMembers rs@(ReplicaSet _ vMembers _) = do assocKeys = map fst assocs inKeys = intersect keys assocKeys -fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IOE ReplicaInfo +fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo -- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set. fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do pipe <- connection rs mPipe host' @@ -201,15 +199,15 @@ fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info Just _ -> return (host', info) -connection :: ReplicaSet -> Maybe Pipe -> Host -> IOE Pipe +connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe -- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open. connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' = - maybe conn (\p -> lift (isClosed p) >>= \bad -> if bad then conn else return p) mPipe + maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe where conn = modifyMVar vMembers $ \members -> do let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe) case List.lookup host' members of - Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe) + Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe) _ -> new diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 48120ec..0305ae8 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -24,7 +24,6 @@ module Database.MongoDB.Internal.Protocol ( import Control.Applicative ((<$>)) import Control.Arrow ((***)) -import Control.Exception (try) import Control.Monad (forM_, replicateM, unless) import Data.Binary.Get (Get, runGet) import Data.Binary.Put (Put, runPut) @@ -36,7 +35,6 @@ import System.IO.Unsafe (unsafePerformIO) import qualified Data.ByteString.Lazy as L -import Control.Monad.Error (ErrorT(..)) import Control.Monad.Trans (MonadIO, liftIO) import Data.Bson (Document) import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64, @@ -48,7 +46,7 @@ import qualified Data.Text as T import qualified Data.Text.Encoding as TE import Database.MongoDB.Internal.Util (whenJust, hGetN, bitOr, byteStringHex) -import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..)) +import System.IO.Pipeline (Pipeline, newPipeline, IOStream(..)) import qualified System.IO.Pipeline as P @@ -61,11 +59,11 @@ newPipe :: Handle -> IO Pipe -- ^ Create pipe over handle newPipe handle = newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle) -send :: Pipe -> [Notice] -> IOE () +send :: Pipe -> [Notice] -> IO () -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. send pipe notices = P.send pipe (notices, Nothing) -call :: Pipe -> [Notice] -> Request -> IOE (IOE Reply) +call :: Pipe -> [Notice] -> Request -> IO (IO Reply) -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. call pipe notices request = do requestId <- genRequestId @@ -81,9 +79,9 @@ type Message = ([Notice], Maybe (Request, RequestId)) -- ^ A write notice(s) with getLastError request, or just query request. -- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness. -writeMessage :: Handle -> Message -> IOE () +writeMessage :: Handle -> Message -> IO () -- ^ Write message to socket -writeMessage handle (notices, mRequest) = ErrorT . try $ do +writeMessage handle (notices, mRequest) = do forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId whenJust mRequest $ writeReq . (Right *** id) hFlush handle @@ -99,9 +97,9 @@ writeMessage handle (notices, mRequest) = ErrorT . try $ do type Response = (ResponseTo, Reply) -- ^ Message received from a Mongo server in response to a Request -readMessage :: Handle -> IOE Response +readMessage :: Handle -> IO Response -- ^ read response from socket -readMessage handle = ErrorT $ try readResp where +readMessage handle = readResp where readResp = do len <- fromEnum . decodeSize <$> hGetN handle 4 runGet getReply <$> hGetN handle len diff --git a/Database/MongoDB/Internal/Util.hs b/Database/MongoDB/Internal/Util.hs index b9675c6..b1bb7d9 100644 --- a/Database/MongoDB/Internal/Util.hs +++ b/Database/MongoDB/Internal/Util.hs @@ -8,8 +8,7 @@ module Database.MongoDB.Internal.Util where import Control.Applicative (Applicative(..), (<$>)) -import Control.Arrow (left) -import Control.Exception (assert) +import Control.Exception (assert, handle, throwIO, Exception) import Control.Monad (liftM, liftM2) import Data.Bits (Bits, (.|.)) import Data.Word (Word8) @@ -23,7 +22,7 @@ import System.Random.Shuffle (shuffle') import qualified Data.ByteString.Lazy as L import qualified Data.ByteString as S -import Control.Monad.Error (MonadError(..), ErrorT(..), Error(..)) +import Control.Monad.Error (MonadError(..), Error(..)) import Control.Monad.Trans (MonadIO, liftIO) import Data.Bson import Data.Text (Text) @@ -87,13 +86,9 @@ untilSuccess' _ f (x : xs) = catchError (f x) (\e -> untilSuccess' e f xs) whenJust :: (Monad m) => Maybe a -> (a -> m ()) -> m () whenJust mVal act = maybe (return ()) act mVal -liftIOE :: (MonadIO m) => (e -> e') -> ErrorT e IO a -> ErrorT e' m a +liftIOE :: (MonadIO m, Exception e, Exception e') => (e -> e') -> IO a -> m a -- ^ lift IOE monad to ErrorT monad over some MonadIO m -liftIOE f = ErrorT . liftIO . fmap (left f) . runErrorT - -runIOE :: ErrorT IOError IO a -> IO a --- ^ Run action while catching explicit error and rethrowing in IO monad -runIOE (ErrorT action) = action >>= either ioError return +liftIOE f = liftIO . handle (throwIO . f) updateAssocs :: (Eq k) => k -> v -> [(k, v)] -> [(k, v)] -- ^ Change or insert value of key in association list diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 7887ff0..68b9ddf 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -1,6 +1,6 @@ -- | Query and update documents -{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP #-} +{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable #-} module Database.MongoDB.Query ( -- * Monad @@ -44,11 +44,13 @@ module Database.MongoDB.Query ( import Prelude hiding (lookup) import Control.Applicative (Applicative, (<$>)) +import Control.Exception (Exception, throwIO) import Control.Monad (unless, replicateM, liftM) import Data.Int (Int32) import Data.Maybe (listToMaybe, catMaybes) import Data.Word (Word32) import Data.Monoid (mappend) +import Data.Typeable (Typeable) #if MIN_VERSION_base(4,6,0) import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar, @@ -58,8 +60,7 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, readMVar, modifyMVar) #endif import Control.Monad.Base (MonadBase(liftBase)) -import Control.Monad.Error (ErrorT, Error(..), MonadError, runErrorT, - throwError) +import Control.Monad.Error (ErrorT, Error(..)) import Control.Monad.Reader (ReaderT, runReaderT, ask, asks, local) import Control.Monad.RWS (RWST) import Control.Monad.State (StateT) @@ -92,8 +93,8 @@ import qualified Database.MongoDB.Internal.Protocol as P -- * Monad -newtype Action m a = Action {unAction :: ErrorT Failure (ReaderT Context m) a} - deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure) +newtype Action m a = Action {unAction :: ReaderT Context m a} + deriving (Functor, Applicative, Monad, MonadIO) -- ^ A monad on top of m (which must be a MonadIO) that may access the database and may fail with a DB 'Failure' instance MonadBase b m => MonadBase b (Action m) where @@ -105,18 +106,17 @@ instance (MonadIO m, MonadBaseControl b m) => MonadBaseControl b (Action m) wher restoreM = defaultRestoreM unStMT instance MonadTrans Action where - lift = Action . lift . lift + lift = Action . lift instance MonadTransControl Action where - newtype StT Action a = StActionT {unStAction :: StT (ReaderT Context) (StT (ErrorT Failure) a)} - liftWith f = Action $ liftWith $ \runError -> - liftWith $ \runReader' -> - f (liftM StActionT . runReader' . runError . unAction) - restoreT = Action . restoreT . restoreT . liftM unStAction + newtype StT Action a = StActionT {unStAction :: StT (ReaderT Context) a} + liftWith f = Action $ liftWith $ \runReader' -> + f (liftM StActionT . runReader' . unAction) + restoreT = Action . restoreT . liftM unStAction -access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m (Either Failure a) +access :: (MonadIO m) => Pipe -> AccessMode -> Database -> Action m a -> m a -- ^ Run action against database on server at other end of pipe. Use access mode for any reads and writes. Return Left on connection failure or read/write failure. -access myPipe myAccessMode myDatabase (Action action) = runReaderT (runErrorT action) Context{..} +access myPipe myAccessMode myDatabase (Action action) = runReaderT action Context{..} -- | A connection failure, or a read or write exception like cursor expired or inserting a duplicate key. -- Note, unexpected data from the server is not a Failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change. @@ -127,7 +127,8 @@ data Failure = | WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string | DocNotFound Selection -- ^ 'fetch' found no document matching selection | AggregateFailure String -- ^ 'aggregate' returned an error - deriving (Show, Eq) + deriving (Show, Eq, Typeable) +instance Exception Failure type ErrorCode = Int -- ^ Error code from getLastError or query failure @@ -184,7 +185,7 @@ send ns = Action $ do pipe <- asks myPipe liftIOE ConnectionFailure $ P.send pipe ns -call :: (MonadIO m) => [Notice] -> Request -> Action m (ErrorT Failure IO Reply) +call :: (MonadIO m) => [Notice] -> Request -> Action m (IO Reply) -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call will throw 'ConnectionFailure' if pipe fails on send, and promise will throw 'ConnectionFailure' if pipe fails on receive. call ns r = Action $ do pipe <- asks myPipe @@ -293,7 +294,7 @@ write notice = Action (asks myWriteMode) >>= \mode -> case mode of Batch _ _ [doc] <- fulfill =<< request [notice] =<< queryRequest False q {limit = 1} case lookup "err" doc of Nothing -> return () - Just err -> throwError $ WriteFailure (maybe 0 id $ lookup "code" doc) err + Just err -> liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "code" doc) err -- ** Insert @@ -437,7 +438,7 @@ findOne q = do fetch :: (MonadIO m) => Query -> Action m Document -- ^ Same as 'findOne' except throw 'DocNotFound' if none match -fetch q = findOne q >>= maybe (throwError $ DocNotFound $ selection q) return +fetch q = findOne q >>= maybe (liftIO $ throwIO $ DocNotFound $ selection q) return -- | runs the findAndModify command. -- Returns a single updated document (new option is set to true). @@ -523,7 +524,7 @@ batchSizeRemainingLimit batchSize limit = if limit == 0 where batchSize' = if batchSize == 1 then 2 else batchSize -- batchSize 1 is broken because server converts 1 to -1 meaning limit 1 -type DelayedBatch = ErrorT Failure IO Batch +type DelayedBatch = IO Batch -- ^ A promised batch which may fail data Batch = Batch Limit CursorId [Document] @@ -544,12 +545,12 @@ fromReply limit Reply{..} = do -- If response flag indicates failure then throw it, otherwise do nothing checkResponseFlag flag = case flag of AwaitCapable -> return () - CursorNotFound -> throwError $ CursorNotFoundFailure rCursorId - QueryError -> throwError $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments) + CursorNotFound -> throwIO $ CursorNotFoundFailure rCursorId + QueryError -> throwIO $ QueryFailure (at "code" $ head rDocuments) (at "$err" $ head rDocuments) fulfill :: (MonadIO m) => DelayedBatch -> Action m Batch -- ^ Demand and wait for result, raise failure if exception -fulfill = Action . liftIOE id +fulfill = Action . liftIO -- *** Cursor @@ -634,7 +635,7 @@ aggregate aColl agg = do response <- runCommand ["aggregate" =: aColl, "pipeline" =: agg] case true1 "ok" response of True -> lookup "result" response - False -> throwError $ AggregateFailure $ at "errmsg" response + False -> liftIO $ throwIO $ AggregateFailure $ at "errmsg" response -- ** Group diff --git a/System/IO/Pipeline.hs b/System/IO/Pipeline.hs index 2904a0d..ce2de75 100644 --- a/System/IO/Pipeline.hs +++ b/System/IO/Pipeline.hs @@ -12,7 +12,6 @@ A pipeline closes itself when a read or write causes an error, so you can detect #endif module System.IO.Pipeline ( - IOE, -- * IOStream IOStream(..), -- * Pipeline @@ -33,29 +32,19 @@ import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, addMVarFinalizer) #endif -import Control.Monad.Error (ErrorT(ErrorT), runErrorT) +import Control.Exception.Lifted (onException, throwIO, try) #if !MIN_VERSION_base(4,6,0) mkWeakMVar :: MVar a -> IO () -> IO () mkWeakMVar = addMVarFinalizer #endif -onException :: (Monad m) => ErrorT e m a -> m () -> ErrorT e m a --- ^ If first action throws an exception then run second action then re-throw -onException (ErrorT action) releaser = ErrorT $ do - e <- action - either (const releaser) (const $ return ()) e - return e - -type IOE = ErrorT IOError IO --- ^ IO monad with explicit error - -- * IOStream -- | An IO sink and source where value of type @o@ are sent and values of type @i@ are received. data IOStream i o = IOStream { - writeStream :: o -> IOE (), - readStream :: IOE i, + writeStream :: o -> IO (), + readStream :: IO i, closeStream :: IO () } -- * Pipeline @@ -101,19 +90,19 @@ listen :: Pipeline i o -> IO () listen Pipeline{..} = do stream <- readMVar vStream forever $ do - e <- runErrorT $ readStream stream + e <- try $ readStream stream var <- readChan responseQueue putMVar var e case e of Left err -> closeStream stream >> ioError err -- close and stop looping Right _ -> return () -send :: Pipeline i o -> o -> IOE () +send :: Pipeline i o -> o -> IO () -- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own). -- Throw IOError and close pipeline if send fails send p@Pipeline{..} message = withMVar vStream (flip writeStream message) `onException` close p -call :: Pipeline i o -> o -> IOE (IOE i) +call :: Pipeline i o -> o -> IO (IO i) -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. call p@Pipeline{..} message = withMVar vStream doCall `onException` close p where @@ -121,7 +110,7 @@ call p@Pipeline{..} message = withMVar vStream doCall `onException` close p whe writeStream stream message var <- newEmptyMVar liftIO $ writeChan responseQueue var - return $ ErrorT (readMVar var) -- return promise + return $ readMVar var >>= either throwIO return -- return promise {- Authors: Tony Hannan