diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 0492794..06b54f6 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -64,8 +64,8 @@ import qualified Data.Text.Encoding as TE import Database.MongoDB.Internal.Util (bitOr, byteStringHex) -import Database.MongoDB.Internal.Connection (Connection) -import qualified Database.MongoDB.Internal.Connection as Connection +import Database.MongoDB.Transport (Transport) +import qualified Database.MongoDB.Transport as T #if MIN_VERSION_base(4,6,0) import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, @@ -84,13 +84,13 @@ mkWeakMVar = addMVarFinalizer -- | Thread-safe and pipelined connection data Pipeline = Pipeline { - vStream :: MVar Connection, -- ^ Mutex on handle, so only one thread at a time can write to it + vStream :: MVar Transport, -- ^ Mutex on handle, so only one thread at a time can write to it responseQueue :: Chan (MVar (Either IOError Response)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response. listenThread :: ThreadId } -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. -newPipeline :: Connection -> IO Pipeline +newPipeline :: Transport -> IO Pipeline newPipeline stream = do vStream <- newMVar stream responseQueue <- newChan @@ -99,14 +99,14 @@ newPipeline stream = do listenThread <- forkIO (listen pipe) _ <- mkWeakMVar vStream $ do killThread listenThread - Connection.close stream + T.close stream return pipe close :: Pipeline -> IO () -- ^ Close pipe and underlying connection close Pipeline{..} = do killThread listenThread - Connection.close =<< readMVar vStream + T.close =<< readMVar vStream isClosed :: Pipeline -> IO Bool isClosed Pipeline{listenThread} = do @@ -127,7 +127,7 @@ listen Pipeline{..} = do var <- readChan responseQueue putMVar var e case e of - Left err -> Connection.close stream >> ioError err -- close and stop looping + Left err -> T.close stream >> ioError err -- close and stop looping Right _ -> return () psend :: Pipeline -> Message -> IO () @@ -152,9 +152,9 @@ type Pipe = Pipeline newPipe :: Handle -> IO Pipe -- ^ Create pipe over handle -newPipe handle = Connection.fromHandle handle >>= newPipeWith +newPipe handle = T.fromHandle handle >>= newPipeWith -newPipeWith :: Connection -> IO Pipe +newPipeWith :: Transport -> IO Pipe -- ^ Create pipe over connection newPipeWith conn = newPipeline conn @@ -178,7 +178,7 @@ type Message = ([Notice], Maybe (Request, RequestId)) -- ^ A write notice(s) with getLastError request, or just query request. -- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness. -writeMessage :: Connection -> Message -> IO () +writeMessage :: Transport -> Message -> IO () -- ^ Write message to connection writeMessage conn (notices, mRequest) = do noticeStrings <- forM notices $ \n -> do @@ -191,8 +191,8 @@ writeMessage conn (notices, mRequest) = do let s = runPut $ putRequest request requestId return $ (lenBytes s) `L.append` s - Connection.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString) - Connection.flush conn + T.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString) + T.flush conn where lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes encodeSize = runPut . putInt32 . (+ 4) @@ -200,12 +200,12 @@ writeMessage conn (notices, mRequest) = do type Response = (ResponseTo, Reply) -- ^ Message received from a Mongo server in response to a Request -readMessage :: Connection -> IO Response +readMessage :: Transport -> IO Response -- ^ read response from a connection readMessage conn = readResp where readResp = do - len <- fromEnum . decodeSize . L.fromStrict <$> Connection.read conn 4 - runGet getReply . L.fromStrict <$> Connection.read conn len + len <- fromEnum . decodeSize . L.fromStrict <$> T.read conn 4 + runGet getReply . L.fromStrict <$> T.read conn len decodeSize = subtract 4 . runGet getInt32 type FullCollection = Text diff --git a/Database/MongoDB/Internal/Tls.hs b/Database/MongoDB/Internal/Tls.hs index f78ed65..2f51540 100644 --- a/Database/MongoDB/Internal/Tls.hs +++ b/Database/MongoDB/Internal/Tls.hs @@ -20,8 +20,8 @@ import Control.Monad (when, unless) import System.IO import Database.MongoDB (Pipe) import Database.MongoDB.Internal.Protocol (newPipeWith) -import Database.MongoDB.Internal.Connection (Connection(Connection)) -import qualified Database.MongoDB.Internal.Connection as Connection +import Database.MongoDB.Transport (Transport(Transport)) +import qualified Database.MongoDB.Transport as T import System.IO.Error (mkIOError, eofErrorType) import Network (connectTo, HostName, PortID) import qualified Network.TLS as TLS @@ -49,11 +49,11 @@ connect host port = bracketOnError Region.open Region.close $ \r -> do conn <- tlsConnection context (Region.close r) newPipeWith conn -tlsConnection :: TLS.Context -> IO () -> IO Connection +tlsConnection :: TLS.Context -> IO () -> IO Transport tlsConnection ctx close = do restRef <- newIORef mempty - return Connection - { Connection.read = \count -> let + return Transport + { T.read = \count -> let readSome = do rest <- readIORef restRef writeIORef restRef mempty @@ -75,10 +75,10 @@ tlsConnection ctx close = do unread rest return (acc <> Lazy.ByteString.fromStrict res) else go (acc <> Lazy.ByteString.fromStrict chunk) (n - len) - eof = mkIOError eofErrorType "Database.MongoDB.Internal.Connection" + eof = mkIOError eofErrorType "Database.MongoDB.Transport" Nothing Nothing in Lazy.ByteString.toStrict <$> go mempty count - , Connection.write = TLS.sendData ctx . Lazy.ByteString.fromStrict - , Connection.flush = TLS.contextFlush ctx - , Connection.close = close + , T.write = TLS.sendData ctx . Lazy.ByteString.fromStrict + , T.flush = TLS.contextFlush ctx + , T.close = close } diff --git a/Database/MongoDB/Internal/Connection.hs b/Database/MongoDB/Transport.hs similarity index 77% rename from Database/MongoDB/Internal/Connection.hs rename to Database/MongoDB/Transport.hs index 8679c5e..dffdd35 100644 --- a/Database/MongoDB/Internal/Connection.hs +++ b/Database/MongoDB/Transport.hs @@ -2,8 +2,8 @@ -- | This module defines a connection interface. It could be a regular -- network connection, TLS connection, a mock or anything else. -module Database.MongoDB.Internal.Connection ( - Connection(..), +module Database.MongoDB.Transport ( + Transport(..), fromHandle, ) where @@ -12,19 +12,19 @@ import Data.ByteString (ByteString) import qualified Data.ByteString as ByteString import System.IO --- | Abstract connection interface +-- | Abstract transport interface -- -- `read` should return `ByteString.null` on EOF -data Connection = Connection { +data Transport = Transport { read :: Int -> IO ByteString, write :: ByteString -> IO (), flush :: IO (), close :: IO ()} -fromHandle :: Handle -> IO Connection +fromHandle :: Handle -> IO Transport -- ^ Make connection form handle fromHandle handle = do - return Connection + return Transport { read = ByteString.hGet handle , write = ByteString.hPut handle , flush = hFlush handle diff --git a/mongoDB.cabal b/mongoDB.cabal index a0f4989..3df9215 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -51,11 +51,11 @@ Library Exposed-modules: Database.MongoDB Database.MongoDB.Admin Database.MongoDB.Connection - Database.MongoDB.Internal.Connection Database.MongoDB.Internal.Tls Database.MongoDB.Internal.Protocol Database.MongoDB.Internal.Util Database.MongoDB.Query + Database.MongoDB.Transport Source-repository head Type: git