diff --git a/.travis.yml b/.travis.yml index d003a84..474e0cf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,7 @@ env: - GHCVER=8.2.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-11.6 - GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-9.21 - GHCVER=8.2.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-11.6 - - GHCVER=8.4.2 CABALVER=2.2.0.1 MONGO=3.6 STACKAGE=nightly + - GHCVER=8.4.2 CABALVER=2.2 MONGO=3.6 STACKAGE=nightly before_install: diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 384c82e..b2d9c03 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -47,8 +47,9 @@ import System.IO.Unsafe (unsafePerformIO) import Data.Maybe (maybeToList) import GHC.Conc (ThreadStatus(..), threadStatus) import Control.Monad (forever) -import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan, isEmptyChan) +import Control.Monad.STM (atomically) import Control.Concurrent (ThreadId, killThread, forkFinally) +import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan) import Control.Exception.Lifted (onException, throwIO, try) @@ -87,7 +88,7 @@ mkWeakMVar = addMVarFinalizer -- | Thread-safe and pipelined connection data Pipeline = Pipeline { vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it - , responseQueue :: Chan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. + , responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. , listenThread :: ThreadId , finished :: MVar () , serverData :: ServerData @@ -106,14 +107,14 @@ data ServerData = ServerData newPipeline :: ServerData -> Transport -> IO Pipeline newPipeline serverData stream = do vStream <- newMVar stream - responseQueue <- newChan + responseQueue <- atomically newTChan finished <- newEmptyMVar let drainReplies = do - chanEmpty <- isEmptyChan responseQueue + chanEmpty <- atomically $ isEmptyTChan responseQueue if chanEmpty then return () else do - var <- readChan responseQueue + var <- atomically $ readTChan responseQueue putMVar var $ Left $ mkIOError doesNotExistErrorType "Handle has been closed" @@ -159,7 +160,7 @@ listen Pipeline{..} = do stream <- readMVar vStream forever $ do e <- try $ readMessage stream - var <- readChan responseQueue + var <- atomically $ readTChan responseQueue putMVar var e case e of Left err -> Tr.close stream >> ioError err -- close and stop looping @@ -182,7 +183,7 @@ pcall p@Pipeline{..} message = do doCall stream = do writeMessage stream message var <- newEmptyMVar - liftIO $ writeChan responseQueue var + liftIO $ atomically $ writeTChan responseQueue var return $ readMVar var >>= either throwIO return -- return promise -- * Pipe diff --git a/mongoDB.cabal b/mongoDB.cabal index 2436a08..bf1c6e7 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -42,6 +42,7 @@ Library , monad-control >= 0.3.1 , lifted-base >= 0.1.0.3 , pureMD5 + , stm , tagged , tls >= 1.3.0 , time @@ -106,6 +107,7 @@ Benchmark bench , cryptohash -any , network -any , nonce >= 1.0.5 + , stm , parsec -any , random -any , random-shuffle -any