Drain remaining var in response queue

This commit is contained in:
Victor Denisov 2016-10-30 00:07:17 -07:00
parent 04e5dd3248
commit 746e670bf0

View file

@ -42,13 +42,13 @@ import Data.Bits (bit, testBit)
import Data.Int (Int32, Int64) import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef) import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle) import System.IO (Handle)
import System.IO.Error (doesNotExistErrorType, mkIOError, ioError) import System.IO.Error (doesNotExistErrorType, mkIOError)
import System.IO.Unsafe (unsafePerformIO) import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList) import Data.Maybe (maybeToList)
import GHC.Conc (ThreadStatus(..), threadStatus) import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad (forever) import Control.Monad (forever)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan, isEmptyChan)
import Control.Concurrent (ThreadId, forkIO, killThread, forkFinally) import Control.Concurrent (ThreadId, killThread, forkFinally)
import Control.Exception.Lifted (onException, throwIO, try) import Control.Exception.Lifted (onException, throwIO, try)
@ -108,9 +108,25 @@ newPipeline serverData stream = do
vStream <- newMVar stream vStream <- newMVar stream
responseQueue <- newChan responseQueue <- newChan
finished <- newEmptyMVar finished <- newEmptyMVar
let drainReplies = do
chanEmpty <- isEmptyChan responseQueue
if chanEmpty
then return ()
else do
var <- readChan responseQueue
putMVar var $ Left $ mkIOError
doesNotExistErrorType
"Handle has been closed"
Nothing
Nothing
drainReplies
rec rec
let pipe = Pipeline{..} let pipe = Pipeline{..}
listenThread <- forkFinally (listen pipe) (\_ -> putMVar finished ()) listenThread <- forkFinally (listen pipe) $ \_ -> do
putMVar finished ()
drainReplies
_ <- mkWeakMVar vStream $ do _ <- mkWeakMVar vStream $ do
killThread listenThread killThread listenThread
Tr.close stream Tr.close stream
@ -158,8 +174,8 @@ pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response. -- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcall p@Pipeline{..} message = do pcall p@Pipeline{..} message = do
finished <- isFinished p listenerStopped <- isFinished p
if finished if listenerStopped
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
else withMVar vStream doCall `onException` close p else withMVar vStream doCall `onException` close p
where where