Remove IOStream from Internal.Protocol

This commit is contained in:
Victor Denisov 2016-04-10 17:55:58 -07:00
parent c011b1a23c
commit 73dfdb0b7f
2 changed files with 17 additions and 34 deletions

View file

@ -4,20 +4,13 @@
module Database.MongoDB.Internal.Connection ( module Database.MongoDB.Internal.Connection (
Connection(..), Connection(..),
readExactly,
fromHandle, fromHandle,
) where ) where
import Prelude hiding (read) import Prelude hiding (read)
import Data.Monoid
import Data.IORef
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString import qualified Data.ByteString as ByteString
import qualified Data.ByteString.Lazy as Lazy (ByteString)
import qualified Data.ByteString.Lazy as Lazy.ByteString
import Control.Monad
import System.IO import System.IO
import System.IO.Error (mkIOError, eofErrorType)
-- | Abstract connection interface -- | Abstract connection interface
-- --

View file

@ -83,25 +83,17 @@ mkWeakMVar :: MVar a -> IO () -> IO ()
mkWeakMVar = addMVarFinalizer mkWeakMVar = addMVarFinalizer
#endif #endif
-- * IOStream
-- | An IO sink and source where value of type @o@ are sent and values of type @i@ are received.
data IOStream i o = IOStream {
writeStream :: o -> IO (),
readStream :: IO i,
closeStream :: IO () }
-- * Pipeline -- * Pipeline
-- | Thread-safe and pipelined connection -- | Thread-safe and pipelined connection
data Pipeline i o = Pipeline { data Pipeline = Pipeline {
vStream :: MVar (IOStream i o), -- ^ Mutex on handle, so only one thread at a time can write to it vStream :: MVar Connection, -- ^ Mutex on handle, so only one thread at a time can write to it
responseQueue :: Chan (MVar (Either IOError i)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. responseQueue :: Chan (MVar (Either IOError Response)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
listenThread :: ThreadId listenThread :: ThreadId
} }
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: IOStream i o -> IO (Pipeline i o) newPipeline :: Connection -> IO Pipeline
newPipeline stream = do newPipeline stream = do
vStream <- newMVar stream vStream <- newMVar stream
responseQueue <- newChan responseQueue <- newChan
@ -110,16 +102,16 @@ newPipeline stream = do
listenThread <- forkIO (listen pipe) listenThread <- forkIO (listen pipe)
_ <- mkWeakMVar vStream $ do _ <- mkWeakMVar vStream $ do
killThread listenThread killThread listenThread
closeStream stream Connection.close stream
return pipe return pipe
close :: Pipeline i o -> IO () close :: Pipeline -> IO ()
-- ^ Close pipe and underlying connection -- ^ Close pipe and underlying connection
close Pipeline{..} = do close Pipeline{..} = do
killThread listenThread killThread listenThread
closeStream =<< readMVar vStream Connection.close =<< readMVar vStream
isClosed :: Pipeline i o -> IO Bool isClosed :: Pipeline -> IO Bool
isClosed Pipeline{listenThread} = do isClosed Pipeline{listenThread} = do
status <- threadStatus listenThread status <- threadStatus listenThread
return $ case status of return $ case status of
@ -129,36 +121,36 @@ isClosed Pipeline{listenThread} = do
ThreadDied -> True ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read --isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: Pipeline i o -> IO () listen :: Pipeline -> IO ()
-- ^ Listen for responses and supply them to waiting threads in order -- ^ Listen for responses and supply them to waiting threads in order
listen Pipeline{..} = do listen Pipeline{..} = do
stream <- readMVar vStream stream <- readMVar vStream
forever $ do forever $ do
e <- try $ readStream stream e <- try $ readMessage stream
var <- readChan responseQueue var <- readChan responseQueue
putMVar var e putMVar var e
case e of case e of
Left err -> closeStream stream >> ioError err -- close and stop looping Left err -> Connection.close stream >> ioError err -- close and stop looping
Right _ -> return () Right _ -> return ()
psend :: Pipeline i o -> o -> IO () psend :: Pipeline -> Message -> IO ()
-- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own). -- ^ Send message to destination; the destination must not response (otherwise future 'call's will get these responses instead of their own).
-- Throw IOError and close pipeline if send fails -- Throw IOError and close pipeline if send fails
psend p@Pipeline{..} message = withMVar vStream (flip writeStream message) `onException` close p psend p@Pipeline{..} message = withMVar vStream (flip writeMessage message) `onException` close p
pcall :: Pipeline i o -> o -> IO (IO i) pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response. -- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p where pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p where
doCall stream = do doCall stream = do
writeStream stream message writeMessage stream message
var <- newEmptyMVar var <- newEmptyMVar
liftIO $ writeChan responseQueue var liftIO $ writeChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise return $ readMVar var >>= either throwIO return -- return promise
-- * Pipe -- * Pipe
type Pipe = Pipeline Response Message type Pipe = Pipeline
-- ^ Thread-safe TCP connection with pipelined requests -- ^ Thread-safe TCP connection with pipelined requests
newPipe :: Handle -> IO Pipe newPipe :: Handle -> IO Pipe
@ -167,9 +159,7 @@ newPipe handle = Connection.fromHandle handle >>= newPipeWith
newPipeWith :: Connection -> IO Pipe newPipeWith :: Connection -> IO Pipe
-- ^ Create pipe over connection -- ^ Create pipe over connection
newPipeWith conn = newPipeline $ IOStream (writeMessage conn) newPipeWith conn = newPipeline conn
(readMessage conn)
(Connection.close conn)
send :: Pipe -> [Notice] -> IO () send :: Pipe -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.