diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index b2d9c03..5368f35 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -48,10 +48,10 @@ import Data.Maybe (maybeToList) import GHC.Conc (ThreadStatus(..), threadStatus) import Control.Monad (forever) import Control.Monad.STM (atomically) -import Control.Concurrent (ThreadId, killThread, forkFinally) +import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask) import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan) -import Control.Exception.Lifted (onException, throwIO, try) +import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try) import qualified Data.ByteString.Lazy as L @@ -103,6 +103,12 @@ data ServerData = ServerData , maxWriteBatchSize :: Int } +-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread. +forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId +forkUnmaskedFinally action and_then = + mask_ $ forkIOWithUnmask $ \unmask -> + try (unmask action) >>= and_then + -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. newPipeline :: ServerData -> Transport -> IO Pipeline newPipeline serverData stream = do @@ -124,9 +130,9 @@ newPipeline serverData stream = do rec let pipe = Pipeline{..} - listenThread <- forkFinally (listen pipe) $ \_ -> do - putMVar finished () - drainReplies + listenThread <- forkUnmaskedFinally (listen pipe) $ \_ -> do + putMVar finished () + drainReplies _ <- mkWeakMVar vStream $ do killThread listenThread