Unmask the forked thread in new pipeline

This commit is contained in:
Victor Denisov 2019-12-31 15:44:05 -08:00
commit 7b5d85cca8

View file

@ -48,10 +48,10 @@ import Data.Maybe (maybeToList)
import GHC.Conc (ThreadStatus(..), threadStatus) import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad (forever) import Control.Monad (forever)
import Control.Monad.STM (atomically) import Control.Monad.STM (atomically)
import Control.Concurrent (ThreadId, killThread, forkFinally) import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan) import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)
import Control.Exception.Lifted (onException, throwIO, try) import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try)
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
@ -103,6 +103,12 @@ data ServerData = ServerData
, maxWriteBatchSize :: Int , maxWriteBatchSize :: Int
} }
-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally action and_then =
mask_ $ forkIOWithUnmask $ \unmask ->
try (unmask action) >>= and_then
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: ServerData -> Transport -> IO Pipeline newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline serverData stream = do newPipeline serverData stream = do
@ -124,7 +130,7 @@ newPipeline serverData stream = do
rec rec
let pipe = Pipeline{..} let pipe = Pipeline{..}
listenThread <- forkFinally (listen pipe) $ \_ -> do listenThread <- forkUnmaskedFinally (listen pipe) $ \_ -> do
putMVar finished () putMVar finished ()
drainReplies drainReplies