From 04e5dd32488b471668c1eda2811cdfa576259418 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 23 Oct 2016 23:19:49 -0700 Subject: [PATCH 1/3] Return error if listening thread is closed --- Database/MongoDB/Internal/Protocol.hs | 39 ++++++++++++++++++--------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 324513d..f087730 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -42,12 +42,13 @@ import Data.Bits (bit, testBit) import Data.Int (Int32, Int64) import Data.IORef (IORef, newIORef, atomicModifyIORef) import System.IO (Handle) +import System.IO.Error (doesNotExistErrorType, mkIOError, ioError) import System.IO.Unsafe (unsafePerformIO) import Data.Maybe (maybeToList) import GHC.Conc (ThreadStatus(..), threadStatus) import Control.Monad (forever) import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) -import Control.Concurrent (ThreadId, forkIO, killThread) +import Control.Concurrent (ThreadId, forkIO, killThread, forkFinally) import Control.Exception.Lifted (onException, throwIO, try) @@ -66,11 +67,11 @@ import qualified Data.Text.Encoding as TE import Database.MongoDB.Internal.Util (bitOr, byteStringHex) import Database.MongoDB.Transport (Transport) -import qualified Database.MongoDB.Transport as T +import qualified Database.MongoDB.Transport as Tr #if MIN_VERSION_base(4,6,0) import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, - putMVar, readMVar, mkWeakMVar) + putMVar, readMVar, mkWeakMVar, isEmptyMVar) #else import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, putMVar, readMVar, addMVarFinalizer) @@ -88,6 +89,7 @@ data Pipeline = Pipeline { vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it , responseQueue :: Chan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. , listenThread :: ThreadId + , finished :: MVar () , serverData :: ServerData } @@ -105,19 +107,25 @@ newPipeline :: ServerData -> Transport -> IO Pipeline newPipeline serverData stream = do vStream <- newMVar stream responseQueue <- newChan + finished <- newEmptyMVar rec let pipe = Pipeline{..} - listenThread <- forkIO (listen pipe) + listenThread <- forkFinally (listen pipe) (\_ -> putMVar finished ()) _ <- mkWeakMVar vStream $ do killThread listenThread - T.close stream + Tr.close stream return pipe +isFinished :: Pipeline -> IO Bool +isFinished Pipeline {finished} = do + empty <- isEmptyMVar finished + return $ not empty + close :: Pipeline -> IO () -- ^ Close pipe and underlying connection close Pipeline{..} = do killThread listenThread - T.close =<< readMVar vStream + Tr.close =<< readMVar vStream isClosed :: Pipeline -> IO Bool isClosed Pipeline{listenThread} = do @@ -138,7 +146,7 @@ listen Pipeline{..} = do var <- readChan responseQueue putMVar var e case e of - Left err -> T.close stream >> ioError err -- close and stop looping + Left err -> Tr.close stream >> ioError err -- close and stop looping Right _ -> return () psend :: Pipeline -> Message -> IO () @@ -149,7 +157,12 @@ psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `on pcall :: Pipeline -> Message -> IO (IO Response) -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. -pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p where +pcall p@Pipeline{..} message = do + finished <- isFinished p + if finished + then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing + else withMVar vStream doCall `onException` close p + where doCall stream = do writeMessage stream message var <- newEmptyMVar @@ -163,7 +176,7 @@ type Pipe = Pipeline newPipe :: ServerData -> Handle -> IO Pipe -- ^ Create pipe over handle -newPipe sd handle = T.fromHandle handle >>= (newPipeWith sd) +newPipe sd handle = Tr.fromHandle handle >>= (newPipeWith sd) newPipeWith :: ServerData -> Transport -> IO Pipe -- ^ Create pipe over connection @@ -202,8 +215,8 @@ writeMessage conn (notices, mRequest) = do let s = runPut $ putRequest request requestId return $ (lenBytes s) `L.append` s - T.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString) - T.flush conn + Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString) + Tr.flush conn where lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes encodeSize = runPut . putInt32 . (+ 4) @@ -215,8 +228,8 @@ readMessage :: Transport -> IO Response -- ^ read response from a connection readMessage conn = readResp where readResp = do - len <- fromEnum . decodeSize . L.fromStrict <$> T.read conn 4 - runGet getReply . L.fromStrict <$> T.read conn len + len <- fromEnum . decodeSize . L.fromStrict <$> Tr.read conn 4 + runGet getReply . L.fromStrict <$> Tr.read conn len decodeSize = subtract 4 . runGet getInt32 type FullCollection = Text From 746e670bf0039764ab0be92e952a346fa413b032 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 30 Oct 2016 00:07:17 -0700 Subject: [PATCH 2/3] Drain remaining var in response queue --- Database/MongoDB/Internal/Protocol.hs | 28 +++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index f087730..384c82e 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -42,13 +42,13 @@ import Data.Bits (bit, testBit) import Data.Int (Int32, Int64) import Data.IORef (IORef, newIORef, atomicModifyIORef) import System.IO (Handle) -import System.IO.Error (doesNotExistErrorType, mkIOError, ioError) +import System.IO.Error (doesNotExistErrorType, mkIOError) import System.IO.Unsafe (unsafePerformIO) import Data.Maybe (maybeToList) import GHC.Conc (ThreadStatus(..), threadStatus) import Control.Monad (forever) -import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) -import Control.Concurrent (ThreadId, forkIO, killThread, forkFinally) +import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan, isEmptyChan) +import Control.Concurrent (ThreadId, killThread, forkFinally) import Control.Exception.Lifted (onException, throwIO, try) @@ -108,9 +108,25 @@ newPipeline serverData stream = do vStream <- newMVar stream responseQueue <- newChan finished <- newEmptyMVar + let drainReplies = do + chanEmpty <- isEmptyChan responseQueue + if chanEmpty + then return () + else do + var <- readChan responseQueue + putMVar var $ Left $ mkIOError + doesNotExistErrorType + "Handle has been closed" + Nothing + Nothing + drainReplies + rec let pipe = Pipeline{..} - listenThread <- forkFinally (listen pipe) (\_ -> putMVar finished ()) + listenThread <- forkFinally (listen pipe) $ \_ -> do + putMVar finished () + drainReplies + _ <- mkWeakMVar vStream $ do killThread listenThread Tr.close stream @@ -158,8 +174,8 @@ pcall :: Pipeline -> Message -> IO (IO Response) -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. pcall p@Pipeline{..} message = do - finished <- isFinished p - if finished + listenerStopped <- isFinished p + if listenerStopped then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing else withMVar vStream doCall `onException` close p where From af793261e8f18eb670da6def5642dfae68920d47 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 30 Oct 2016 00:23:08 -0700 Subject: [PATCH 3/3] Add changelog entry --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c6d515..b42eec5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Package_versioning_policy). +## [Unreleased] - unreleased + +### Fixed +- Write functions hang when the connection is lost. + ## [2.1.1] - 2016-08-13 ### Changed