2010-10-27 20:13:23 +00:00
{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will pipeline each thread's request right away without waiting.
A pipeline closes itself when a read or write causes an error , so you can detect a broken pipeline by checking isClosed . - }
2010-06-21 15:06:20 +00:00
{- # LANGUAGE DoRec, RecordWildCards, MultiParamTypeClasses, FlexibleContexts # -}
module Control.Pipeline (
2010-10-27 20:13:23 +00:00
-- * Pipeline
Pipeline , newPipeline , send , call ,
2010-06-21 15:06:20 +00:00
-- * Util
Size ,
Length ( .. ) ,
Resource ( .. ) ,
Flush ( .. ) ,
Stream ( .. ) , getN
) where
import Prelude hiding ( length )
import Control.Applicative ( ( <$> ) )
import Control.Monad ( forever )
2010-10-27 20:13:23 +00:00
import Control.Exception ( assert , onException )
2010-07-27 21:18:53 +00:00
import System.IO.Error ( try , mkIOError , eofErrorType )
2010-06-21 15:06:20 +00:00
import System.IO ( Handle , hFlush , hClose , hIsClosed )
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.Monoid ( Monoid ( .. ) )
import Control.Concurrent ( ThreadId , forkIO , killThread )
import Control.Concurrent.MVar
import Control.Concurrent.Chan
-- * Length
type Size = Int
class Length list where
length :: list -> Size
instance Length S . ByteString where
length = S . length
instance Length L . ByteString where
length = fromEnum . L . length
-- * Resource
class Resource m r where
close :: r -> m ()
2010-07-27 21:18:53 +00:00
-- ^ Close resource
2010-06-21 15:06:20 +00:00
isClosed :: r -> m Bool
2010-07-27 21:18:53 +00:00
-- ^ Is resource closed
2010-06-21 15:06:20 +00:00
instance Resource IO Handle where
close = hClose
isClosed = hIsClosed
-- * Flush
class Flush handle where
flush :: handle -> IO ()
-- ^ Flush written bytes to destination
instance Flush Handle where
flush = hFlush
-- * Stream
class ( Length bytes , Monoid bytes , Flush handle ) => Stream handle bytes where
put :: handle -> bytes -> IO ()
-- ^ Write bytes to handle
get :: handle -> Int -> IO bytes
2010-07-27 21:18:53 +00:00
-- ^ Read up to N bytes from handle; if EOF return empty bytes, otherwise block until at least 1 byte is available
2010-06-21 15:06:20 +00:00
getN :: ( Stream h b ) => h -> Int -> IO b
2010-07-27 21:18:53 +00:00
-- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then throw EOF exception.
2010-06-21 15:06:20 +00:00
getN h n = assert ( n >= 0 ) $ do
bytes <- get h n
let x = length bytes
2010-07-27 21:18:53 +00:00
if x >= n then return bytes
else if x == 0 then ioError ( mkIOError eofErrorType " Control.Pipeline " Nothing Nothing )
else mappend bytes <$> getN h ( n - x )
2010-06-21 15:06:20 +00:00
instance Stream Handle S . ByteString where
put = S . hPut
get = S . hGet
instance Stream Handle L . ByteString where
put = L . hPut
get = L . hGet
2010-10-27 20:13:23 +00:00
-- * Pipeline
2010-06-21 15:06:20 +00:00
-- | Thread-safe and pipelined socket
2010-10-27 20:13:23 +00:00
data Pipeline handle bytes = Pipeline {
2010-06-21 15:06:20 +00:00
encodeSize :: Size -> bytes ,
decodeSize :: bytes -> Size ,
vHandle :: MVar handle , -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan ( MVar ( Either IOError bytes ) ) , -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
listenThread :: ThreadId
}
2010-10-27 20:13:23 +00:00
-- | Create new Pipeline with given encodeInt, decodeInt, and handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: ( Stream h b , Resource IO h ) =>
2010-06-21 15:06:20 +00:00
( Size -> b ) -- ^ Convert Size to bytes of fixed length. Every Int must translate to same number of bytes.
-> ( b -> Size ) -- ^ Convert bytes of fixed length to Size. Must be exact inverse of encodeSize.
2010-10-27 20:13:23 +00:00
-> h -- ^ Underlying socket (handle) this pipeline will read/write from
-> IO ( Pipeline h b )
newPipeline encodeSize decodeSize handle = do
2010-06-21 15:06:20 +00:00
vHandle <- newMVar handle
responseQueue <- newChan
rec
2010-10-27 20:13:23 +00:00
let pipe = Pipeline { .. }
2010-06-21 15:06:20 +00:00
listenThread <- forkIO ( listen pipe )
addMVarFinalizer vHandle $ do
killThread listenThread
close handle
return pipe
2010-10-27 20:13:23 +00:00
instance ( Resource IO h ) => Resource IO ( Pipeline h b ) where
2010-06-21 15:06:20 +00:00
-- | Close pipe and underlying socket (handle)
2010-10-27 20:13:23 +00:00
close Pipeline { .. } = do
2010-06-21 15:06:20 +00:00
killThread listenThread
close =<< readMVar vHandle
2010-10-27 20:13:23 +00:00
isClosed Pipeline { .. } = isClosed =<< readMVar vHandle
2010-06-21 15:06:20 +00:00
2010-10-27 20:13:23 +00:00
listen :: ( Stream h b , Resource IO h ) => Pipeline h b -> IO ()
2010-06-21 15:06:20 +00:00
-- ^ Listen for responses and supply them to waiting threads in order
2010-10-27 20:13:23 +00:00
listen Pipeline { .. } = do
2010-06-21 15:06:20 +00:00
let n = length ( encodeSize 0 )
h <- readMVar vHandle
forever $ do
e <- try $ do
len <- decodeSize <$> getN h n
getN h len
var <- readChan responseQueue
putMVar var e
2010-10-27 20:13:23 +00:00
case e of
Left err -> close h >> fail ( show err ) -- close and stop looping
Right _ -> return ()
2010-06-21 15:06:20 +00:00
2010-10-27 20:13:23 +00:00
send :: ( Stream h b , Resource IO h ) => Pipeline h b -> [ b ] -> IO ()
2010-06-21 15:06:20 +00:00
-- ^ Send messages all together to destination (no messages will be interleaved between them). None of the messages can induce a response, i.e. the destination must not reply to any of these messages (otherwise future 'call's will get these responses instead of their own).
-- Each message is preceeded by its length when written to socket.
2010-10-27 20:13:23 +00:00
-- Raises IOError and closes pipeline if send fails
send Pipeline { .. } messages = withMVar vHandle ( writeAll listenThread encodeSize messages )
2010-06-21 15:06:20 +00:00
2010-10-27 20:13:23 +00:00
call :: ( Stream h b , Resource IO h ) => Pipeline h b -> [ b ] -> IO ( IO b )
2010-06-21 15:06:20 +00:00
-- ^ Send messages all together to destination (no messages will be interleaved between them), and return /promise/ of response from one message only. One and only one message in the list must induce a response, i.e. the destination must reply to exactly one message only (otherwise promises will have the wrong responses in them).
-- Each message is preceeded by its length when written to socket. Likewise, the response must be preceeded by its length.
2010-10-27 20:13:23 +00:00
-- Raises IOError and closes pipeline if send fails, likewise for reply.
call Pipeline { .. } messages = withMVar vHandle $ \ h -> do
writeAll listenThread encodeSize messages h
2010-06-21 15:06:20 +00:00
var <- newEmptyMVar
writeChan responseQueue var
return ( either ioError return =<< readMVar var ) -- return promise
2010-10-27 20:13:23 +00:00
writeAll :: ( Stream h b , Monoid b , Length b , Resource IO h ) => ThreadId -> ( Size -> b ) -> [ b ] -> h -> IO ()
-- ^ Write messages to stream. On error, close pipeline and raise IOError.
writeAll listenThread encodeSize messages h = onException
( mapM_ write messages >> flush h )
( killThread listenThread >> close h )
where
write bytes = put h ( mappend lenBytes bytes ) where lenBytes = encodeSize ( length bytes )