From 73dfdb0b7fcb78707165074b04418282d78eff3e Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 10 Apr 2016 17:55:58 -0700 Subject: [PATCH] Remove IOStream from Internal.Protocol --- Database/MongoDB/Internal/Connection.hs | 7 ---- Database/MongoDB/Internal/Protocol.hs | 44 ++++++++++--------------- 2 files changed, 17 insertions(+), 34 deletions(-) diff --git a/Database/MongoDB/Internal/Connection.hs b/Database/MongoDB/Internal/Connection.hs index c0abdd7..a55f0a6 100644 --- a/Database/MongoDB/Internal/Connection.hs +++ b/Database/MongoDB/Internal/Connection.hs @@ -4,20 +4,13 @@ module Database.MongoDB.Internal.Connection ( Connection(..), - readExactly, fromHandle, ) where import Prelude hiding (read) -import Data.Monoid -import Data.IORef import Data.ByteString (ByteString) import qualified Data.ByteString as ByteString -import qualified Data.ByteString.Lazy as Lazy (ByteString) -import qualified Data.ByteString.Lazy as Lazy.ByteString -import Control.Monad import System.IO -import System.IO.Error (mkIOError, eofErrorType) -- | Abstract connection interface -- diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 8385659..fffbb88 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -83,25 +83,17 @@ mkWeakMVar :: MVar a -> IO () -> IO () mkWeakMVar = addMVarFinalizer #endif --- * IOStream - --- | An IO sink and source where value of type @o@ are sent and values of type @i@ are received. -data IOStream i o = IOStream { - writeStream :: o -> IO (), - readStream :: IO i, - closeStream :: IO () } - -- * Pipeline -- | Thread-safe and pipelined connection -data Pipeline i o = Pipeline { - vStream :: MVar (IOStream i o), -- ^ Mutex on handle, so only one thread at a time can write to it - responseQueue :: Chan (MVar (Either IOError i)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. +data Pipeline = Pipeline { + vStream :: MVar Connection, -- ^ Mutex on handle, so only one thread at a time can write to it + responseQueue :: Chan (MVar (Either IOError Response)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. listenThread :: ThreadId } -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. -newPipeline :: IOStream i o -> IO (Pipeline i o) +newPipeline :: Connection -> IO Pipeline newPipeline stream = do vStream <- newMVar stream responseQueue <- newChan @@ -110,16 +102,16 @@ newPipeline stream = do listenThread <- forkIO (listen pipe) _ <- mkWeakMVar vStream $ do killThread listenThread - closeStream stream + Connection.close stream return pipe -close :: Pipeline i o -> IO () +close :: Pipeline -> IO () -- ^ Close pipe and underlying connection close Pipeline{..} = do killThread listenThread - closeStream =<< readMVar vStream + Connection.close =<< readMVar vStream -isClosed :: Pipeline i o -> IO Bool +isClosed :: Pipeline -> IO Bool isClosed Pipeline{listenThread} = do status <- threadStatus listenThread return $ case status of @@ -129,36 +121,36 @@ isClosed Pipeline{listenThread} = do ThreadDied -> True --isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read -listen :: Pipeline i o -> IO () +listen :: Pipeline -> IO () -- ^ Listen for responses and supply them to waiting threads in order listen Pipeline{..} = do stream <- readMVar vStream forever $ do - e <- try $ readStream stream + e <- try $ readMessage stream var <- readChan responseQueue putMVar var e case e of - Left err -> closeStream stream >> ioError err -- close and stop looping + Left err -> Connection.close stream >> ioError err -- close and stop looping Right _ -> return () -psend :: Pipeline i o -> o -> IO () +psend :: Pipeline -> Message -> IO () -- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own). -- Throw IOError and close pipeline if send fails -psend p@Pipeline{..} message = withMVar vStream (flip writeStream message) `onException` close p +psend p@Pipeline{..} message = withMVar vStream (flip writeMessage message) `onException` close p -pcall :: Pipeline i o -> o -> IO (IO i) +pcall :: Pipeline -> Message -> IO (IO Response) -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- Throw IOError and closes pipeline if send fails, likewise for promised response. pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p where doCall stream = do - writeStream stream message + writeMessage stream message var <- newEmptyMVar liftIO $ writeChan responseQueue var return $ readMVar var >>= either throwIO return -- return promise -- * Pipe -type Pipe = Pipeline Response Message +type Pipe = Pipeline -- ^ Thread-safe TCP connection with pipelined requests newPipe :: Handle -> IO Pipe @@ -167,9 +159,7 @@ newPipe handle = Connection.fromHandle handle >>= newPipeWith newPipeWith :: Connection -> IO Pipe -- ^ Create pipe over connection -newPipeWith conn = newPipeline $ IOStream (writeMessage conn) - (readMessage conn) - (Connection.close conn) +newPipeWith conn = newPipeline conn send :: Pipe -> [Notice] -> IO () -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.