2011-07-13 19:34:52 +00:00
{- | Pipelining is sending multiple requests over a socket and receiving the responses later in the same order (a' la HTTP pipelining). This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will send each thread's request right away without waiting.
2010-10-27 20:13:23 +00:00
2010-11-01 00:36:32 +00:00
A pipeline closes itself when a read or write causes an error , so you can detect a broken pipeline by checking isClosed . It also closes itself when garbage collected , or you can close it explicitly . - }
2010-06-21 15:06:20 +00:00
2010-12-20 02:08:53 +00:00
{- # LANGUAGE DoRec, RecordWildCards, NamedFieldPuns, ScopedTypeVariables # -}
2010-06-21 15:06:20 +00:00
2011-07-05 14:37:01 +00:00
module System.IO.Pipeline (
IOE ,
-- * IOStream
IOStream ( .. ) ,
2010-10-27 20:13:23 +00:00
-- * Pipeline
2010-12-20 02:08:53 +00:00
Pipeline , newPipeline , send , call , close , isClosed
2010-06-21 15:06:20 +00:00
) where
2011-07-05 14:37:01 +00:00
import Prelude hiding ( length )
2010-11-01 00:36:32 +00:00
import GHC.Conc ( ThreadStatus ( .. ) , threadStatus )
2011-07-05 14:37:01 +00:00
import Control.Concurrent ( ThreadId , forkIO , killThread )
2010-06-21 15:06:20 +00:00
import Control.Concurrent.Chan
2011-07-05 14:37:01 +00:00
import Control.Monad.MVar
import Control.Monad.Error
onException :: ( Monad m ) => ErrorT e m a -> m () -> ErrorT e m a
-- ^ If first action throws an exception then run second action then re-throw
onException ( ErrorT action ) releaser = ErrorT $ do
e <- action
either ( const releaser ) ( const $ return () ) e
return e
type IOE = ErrorT IOError IO
2011-07-13 19:34:52 +00:00
-- ^ IO monad with explicit error
2011-07-05 14:37:01 +00:00
-- * IOStream
-- | An IO sink and source where value of type @o@ are sent and values of type @i@ are received.
data IOStream i o = IOStream {
writeStream :: o -> IOE () ,
readStream :: IOE i ,
closeStream :: IO () }
2010-06-21 15:06:20 +00:00
2010-10-27 20:13:23 +00:00
-- * Pipeline
2010-06-21 15:06:20 +00:00
2010-12-20 02:08:53 +00:00
-- | Thread-safe and pipelined connection
data Pipeline i o = Pipeline {
2011-07-05 14:37:01 +00:00
vStream :: MVar ( IOStream i o ) , -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan ( MVar ( Either IOError i ) ) , -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
2010-06-21 15:06:20 +00:00
listenThread :: ThreadId
}
2011-07-05 14:37:01 +00:00
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: IOStream i o -> IO ( Pipeline i o )
newPipeline stream = do
vStream <- newMVar stream
2010-06-21 15:06:20 +00:00
responseQueue <- newChan
rec
2010-10-27 20:13:23 +00:00
let pipe = Pipeline { .. }
2010-06-21 15:06:20 +00:00
listenThread <- forkIO ( listen pipe )
2011-07-05 14:37:01 +00:00
addMVarFinalizer vStream $ do
2010-06-21 15:06:20 +00:00
killThread listenThread
2011-07-05 14:37:01 +00:00
closeStream stream
2010-06-21 15:06:20 +00:00
return pipe
2011-07-05 14:37:01 +00:00
close :: Pipeline i o -> IO ()
2011-07-13 19:34:52 +00:00
-- ^ Close pipe and underlying connection
2011-07-05 14:37:01 +00:00
close Pipeline { .. } = do
2010-12-20 02:08:53 +00:00
killThread listenThread
2011-07-05 14:37:01 +00:00
closeStream =<< readMVar vStream
2010-12-20 02:08:53 +00:00
2011-07-05 14:37:01 +00:00
isClosed :: Pipeline i o -> IO Bool
isClosed Pipeline { listenThread } = do
2010-12-20 02:08:53 +00:00
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: Pipeline i o -> IO ()
2010-06-21 15:06:20 +00:00
-- ^ Listen for responses and supply them to waiting threads in order
2010-10-27 20:13:23 +00:00
listen Pipeline { .. } = do
2011-07-05 14:37:01 +00:00
stream <- readMVar vStream
2010-06-21 15:06:20 +00:00
forever $ do
2011-07-05 14:37:01 +00:00
e <- runErrorT $ readStream stream
2010-06-21 15:06:20 +00:00
var <- readChan responseQueue
putMVar var e
2010-10-27 20:13:23 +00:00
case e of
2011-07-05 14:37:01 +00:00
Left err -> closeStream stream >> ioError err -- close and stop looping
2010-10-27 20:13:23 +00:00
Right _ -> return ()
2010-06-21 15:06:20 +00:00
2011-07-05 14:37:01 +00:00
send :: Pipeline i o -> o -> IOE ()
2010-12-20 02:08:53 +00:00
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
-- Throw IOError and close pipeline if send fails
2011-07-05 14:37:01 +00:00
send p @ Pipeline { .. } message = withMVar vStream ( flip writeStream message ) ` onException ` close p
2010-12-20 02:08:53 +00:00
2011-07-05 14:37:01 +00:00
call :: Pipeline i o -> o -> IOE ( IOE i )
2010-12-20 02:08:53 +00:00
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
2011-07-05 14:37:01 +00:00
call p @ Pipeline { .. } message = withMVar vStream doCall ` onException ` close p where
doCall stream = do
writeStream stream message
2010-12-20 02:08:53 +00:00
var <- newEmptyMVar
liftIO $ writeChan responseQueue var
return $ ErrorT ( readMVar var ) -- return promise
{- Authors: Tony Hannan <tony@10gen.com>
2011-07-05 14:37:01 +00:00
Copyright 2011 10 gen Inc .
2010-12-20 02:08:53 +00:00
Licensed under the Apache License , Version 2.0 ( the " License " ) ; you may not use this file except in compliance with the License . You may obtain a copy of the License at : http :// www . apache . org / licenses / LICENSE - 2.0 . Unless required by applicable law or agreed to in writing , software distributed under the License is distributed on an " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied . See the License for the specific language governing permissions and limitations under the License . - }