isClosed Pipeline used to hang because it was waiting on listen loop's read to finish. Now isClosed tests if listen loop has ended

This commit is contained in:
Tony Hannan 2010-10-31 20:36:32 -04:00
parent 583f8330c7
commit ad13914862

View file

@ -1,8 +1,8 @@
{- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will pipeline each thread's request right away without waiting. {- | Pipelining is sending multiple requests over a socket and receiving the responses later, in the same order. This is faster than sending one request, waiting for the response, then sending the next request, and so on. This implementation returns a /promise (future)/ response for each request that when invoked waits for the response if not already arrived. Multiple threads can send on the same pipeline (and get promises back); it will pipeline each thread's request right away without waiting.
A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. -} A pipeline closes itself when a read or write causes an error, so you can detect a broken pipeline by checking isClosed. It also closes itself when garbage collected, or you can close it explicitly. -}
{-# LANGUAGE DoRec, RecordWildCards, MultiParamTypeClasses, FlexibleContexts #-} {-# LANGUAGE DoRec, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts #-}
module Control.Pipeline ( module Control.Pipeline (
-- * Pipeline -- * Pipeline
@ -25,6 +25,7 @@ import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
import Data.Monoid (Monoid(..)) import Data.Monoid (Monoid(..))
import Control.Concurrent (ThreadId, forkIO, killThread) import Control.Concurrent (ThreadId, forkIO, killThread)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Concurrent.Chan import Control.Concurrent.Chan
@ -120,7 +121,14 @@ instance (Resource IO h) => Resource IO (Pipeline h b) where
close Pipeline{..} = do close Pipeline{..} = do
killThread listenThread killThread listenThread
close =<< readMVar vHandle close =<< readMVar vHandle
isClosed Pipeline{..} = isClosed =<< readMVar vHandle isClosed Pipeline{listenThread} = do
status <- threadStatus listenThread
return $ case status of
ThreadRunning -> False
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
--isClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: (Stream h b, Resource IO h) => Pipeline h b -> IO () listen :: (Stream h b, Resource IO h) => Pipeline h b -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order -- ^ Listen for responses and supply them to waiting threads in order