Use stm channels

This commit is contained in:
Victor Denisov 2018-04-28 15:14:30 -07:00
parent b66318d5ea
commit 9e0781dff5
3 changed files with 11 additions and 8 deletions

View file

@ -19,7 +19,7 @@ env:
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-11.6
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-9.21
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-11.6
- GHCVER=8.4.2 CABALVER=2.2.0.1 MONGO=3.6 STACKAGE=nightly
- GHCVER=8.4.2 CABALVER=2.2 MONGO=3.6 STACKAGE=nightly
before_install:

View file

@ -47,8 +47,9 @@ import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad (forever)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan, isEmptyChan)
import Control.Monad.STM (atomically)
import Control.Concurrent (ThreadId, killThread, forkFinally)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)
import Control.Exception.Lifted (onException, throwIO, try)
@ -87,7 +88,7 @@ mkWeakMVar = addMVarFinalizer
-- | Thread-safe and pipelined connection
data Pipeline = Pipeline
{ vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
, responseQueue :: Chan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
, responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
, listenThread :: ThreadId
, finished :: MVar ()
, serverData :: ServerData
@ -106,14 +107,14 @@ data ServerData = ServerData
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline serverData stream = do
vStream <- newMVar stream
responseQueue <- newChan
responseQueue <- atomically newTChan
finished <- newEmptyMVar
let drainReplies = do
chanEmpty <- isEmptyChan responseQueue
chanEmpty <- atomically $ isEmptyTChan responseQueue
if chanEmpty
then return ()
else do
var <- readChan responseQueue
var <- atomically $ readTChan responseQueue
putMVar var $ Left $ mkIOError
doesNotExistErrorType
"Handle has been closed"
@ -159,7 +160,7 @@ listen Pipeline{..} = do
stream <- readMVar vStream
forever $ do
e <- try $ readMessage stream
var <- readChan responseQueue
var <- atomically $ readTChan responseQueue
putMVar var e
case e of
Left err -> Tr.close stream >> ioError err -- close and stop looping
@ -182,7 +183,7 @@ pcall p@Pipeline{..} message = do
doCall stream = do
writeMessage stream message
var <- newEmptyMVar
liftIO $ writeChan responseQueue var
liftIO $ atomically $ writeTChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise
-- * Pipe

View file

@ -42,6 +42,7 @@ Library
, monad-control >= 0.3.1
, lifted-base >= 0.1.0.3
, pureMD5
, stm
, tagged
, tls >= 1.3.0
, time
@ -106,6 +107,7 @@ Benchmark bench
, cryptohash -any
, network -any
, nonce >= 1.0.5
, stm
, parsec -any
, random -any
, random-shuffle -any