diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index f087730..384c82e 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -42,13 +42,13 @@ import Data.Bits (bit, testBit) import Data.Int (Int32, Int64) import Data.IORef (IORef, newIORef, atomicModifyIORef) import System.IO (Handle) -import System.IO.Error (doesNotExistErrorType, mkIOError, ioError) +import System.IO.Error (doesNotExistErrorType, mkIOError) import System.IO.Unsafe (unsafePerformIO) import Data.Maybe (maybeToList) import GHC.Conc (ThreadStatus(..), threadStatus) import Control.Monad (forever) -import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) -import Control.Concurrent (ThreadId, forkIO, killThread, forkFinally) +import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan, isEmptyChan) +import Control.Concurrent (ThreadId, killThread, forkFinally) import Control.Exception.Lifted (onException, throwIO, try) @@ -108,9 +108,25 @@ newPipeline serverData stream = do vStream <- newMVar stream responseQueue <- newChan finished <- newEmptyMVar + let drainReplies = do + chanEmpty <- isEmptyChan responseQueue + if chanEmpty + then return () + else do + var <- readChan responseQueue + putMVar var $ Left $ mkIOError + doesNotExistErrorType + "Handle has been closed" + Nothing + Nothing + drainReplies + rec let pipe = Pipeline{..} - listenThread <- forkFinally (listen pipe) (\_ -> putMVar finished ()) + listenThread <- forkFinally (listen pipe) $ \_ -> do + putMVar finished () + drainReplies + _ <- mkWeakMVar vStream $ do killThread listenThread Tr.close stream @@ -158,8 +174,8 @@ pcall :: Pipeline -> Message -> IO (IO Response) -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. pcall p@Pipeline{..} message = do - finished <- isFinished p - if finished + listenerStopped <- isFinished p + if listenerStopped then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing else withMVar vStream doCall `onException` close p where