Rename Internal.Connection module to Transport
This commit is contained in:
parent
ccd6727ab4
commit
89ee88e67c
4 changed files with 31 additions and 31 deletions
|
@ -64,8 +64,8 @@ import qualified Data.Text.Encoding as TE
|
||||||
|
|
||||||
import Database.MongoDB.Internal.Util (bitOr, byteStringHex)
|
import Database.MongoDB.Internal.Util (bitOr, byteStringHex)
|
||||||
|
|
||||||
import Database.MongoDB.Internal.Connection (Connection)
|
import Database.MongoDB.Transport (Transport)
|
||||||
import qualified Database.MongoDB.Internal.Connection as Connection
|
import qualified Database.MongoDB.Transport as T
|
||||||
|
|
||||||
#if MIN_VERSION_base(4,6,0)
|
#if MIN_VERSION_base(4,6,0)
|
||||||
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
||||||
|
@ -84,13 +84,13 @@ mkWeakMVar = addMVarFinalizer
|
||||||
|
|
||||||
-- | Thread-safe and pipelined connection
|
-- | Thread-safe and pipelined connection
|
||||||
data Pipeline = Pipeline {
|
data Pipeline = Pipeline {
|
||||||
vStream :: MVar Connection, -- ^ Mutex on handle, so only one thread at a time can write to it
|
vStream :: MVar Transport, -- ^ Mutex on handle, so only one thread at a time can write to it
|
||||||
responseQueue :: Chan (MVar (Either IOError Response)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
|
responseQueue :: Chan (MVar (Either IOError Response)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
|
||||||
listenThread :: ThreadId
|
listenThread :: ThreadId
|
||||||
}
|
}
|
||||||
|
|
||||||
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
|
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
|
||||||
newPipeline :: Connection -> IO Pipeline
|
newPipeline :: Transport -> IO Pipeline
|
||||||
newPipeline stream = do
|
newPipeline stream = do
|
||||||
vStream <- newMVar stream
|
vStream <- newMVar stream
|
||||||
responseQueue <- newChan
|
responseQueue <- newChan
|
||||||
|
@ -99,14 +99,14 @@ newPipeline stream = do
|
||||||
listenThread <- forkIO (listen pipe)
|
listenThread <- forkIO (listen pipe)
|
||||||
_ <- mkWeakMVar vStream $ do
|
_ <- mkWeakMVar vStream $ do
|
||||||
killThread listenThread
|
killThread listenThread
|
||||||
Connection.close stream
|
T.close stream
|
||||||
return pipe
|
return pipe
|
||||||
|
|
||||||
close :: Pipeline -> IO ()
|
close :: Pipeline -> IO ()
|
||||||
-- ^ Close pipe and underlying connection
|
-- ^ Close pipe and underlying connection
|
||||||
close Pipeline{..} = do
|
close Pipeline{..} = do
|
||||||
killThread listenThread
|
killThread listenThread
|
||||||
Connection.close =<< readMVar vStream
|
T.close =<< readMVar vStream
|
||||||
|
|
||||||
isClosed :: Pipeline -> IO Bool
|
isClosed :: Pipeline -> IO Bool
|
||||||
isClosed Pipeline{listenThread} = do
|
isClosed Pipeline{listenThread} = do
|
||||||
|
@ -127,7 +127,7 @@ listen Pipeline{..} = do
|
||||||
var <- readChan responseQueue
|
var <- readChan responseQueue
|
||||||
putMVar var e
|
putMVar var e
|
||||||
case e of
|
case e of
|
||||||
Left err -> Connection.close stream >> ioError err -- close and stop looping
|
Left err -> T.close stream >> ioError err -- close and stop looping
|
||||||
Right _ -> return ()
|
Right _ -> return ()
|
||||||
|
|
||||||
psend :: Pipeline -> Message -> IO ()
|
psend :: Pipeline -> Message -> IO ()
|
||||||
|
@ -152,9 +152,9 @@ type Pipe = Pipeline
|
||||||
|
|
||||||
newPipe :: Handle -> IO Pipe
|
newPipe :: Handle -> IO Pipe
|
||||||
-- ^ Create pipe over handle
|
-- ^ Create pipe over handle
|
||||||
newPipe handle = Connection.fromHandle handle >>= newPipeWith
|
newPipe handle = T.fromHandle handle >>= newPipeWith
|
||||||
|
|
||||||
newPipeWith :: Connection -> IO Pipe
|
newPipeWith :: Transport -> IO Pipe
|
||||||
-- ^ Create pipe over connection
|
-- ^ Create pipe over connection
|
||||||
newPipeWith conn = newPipeline conn
|
newPipeWith conn = newPipeline conn
|
||||||
|
|
||||||
|
@ -178,7 +178,7 @@ type Message = ([Notice], Maybe (Request, RequestId))
|
||||||
-- ^ A write notice(s) with getLastError request, or just query request.
|
-- ^ A write notice(s) with getLastError request, or just query request.
|
||||||
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
|
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
|
||||||
|
|
||||||
writeMessage :: Connection -> Message -> IO ()
|
writeMessage :: Transport -> Message -> IO ()
|
||||||
-- ^ Write message to connection
|
-- ^ Write message to connection
|
||||||
writeMessage conn (notices, mRequest) = do
|
writeMessage conn (notices, mRequest) = do
|
||||||
noticeStrings <- forM notices $ \n -> do
|
noticeStrings <- forM notices $ \n -> do
|
||||||
|
@ -191,8 +191,8 @@ writeMessage conn (notices, mRequest) = do
|
||||||
let s = runPut $ putRequest request requestId
|
let s = runPut $ putRequest request requestId
|
||||||
return $ (lenBytes s) `L.append` s
|
return $ (lenBytes s) `L.append` s
|
||||||
|
|
||||||
Connection.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
|
T.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
|
||||||
Connection.flush conn
|
T.flush conn
|
||||||
where
|
where
|
||||||
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
|
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
|
||||||
encodeSize = runPut . putInt32 . (+ 4)
|
encodeSize = runPut . putInt32 . (+ 4)
|
||||||
|
@ -200,12 +200,12 @@ writeMessage conn (notices, mRequest) = do
|
||||||
type Response = (ResponseTo, Reply)
|
type Response = (ResponseTo, Reply)
|
||||||
-- ^ Message received from a Mongo server in response to a Request
|
-- ^ Message received from a Mongo server in response to a Request
|
||||||
|
|
||||||
readMessage :: Connection -> IO Response
|
readMessage :: Transport -> IO Response
|
||||||
-- ^ read response from a connection
|
-- ^ read response from a connection
|
||||||
readMessage conn = readResp where
|
readMessage conn = readResp where
|
||||||
readResp = do
|
readResp = do
|
||||||
len <- fromEnum . decodeSize . L.fromStrict <$> Connection.read conn 4
|
len <- fromEnum . decodeSize . L.fromStrict <$> T.read conn 4
|
||||||
runGet getReply . L.fromStrict <$> Connection.read conn len
|
runGet getReply . L.fromStrict <$> T.read conn len
|
||||||
decodeSize = subtract 4 . runGet getInt32
|
decodeSize = subtract 4 . runGet getInt32
|
||||||
|
|
||||||
type FullCollection = Text
|
type FullCollection = Text
|
||||||
|
|
|
@ -20,8 +20,8 @@ import Control.Monad (when, unless)
|
||||||
import System.IO
|
import System.IO
|
||||||
import Database.MongoDB (Pipe)
|
import Database.MongoDB (Pipe)
|
||||||
import Database.MongoDB.Internal.Protocol (newPipeWith)
|
import Database.MongoDB.Internal.Protocol (newPipeWith)
|
||||||
import Database.MongoDB.Internal.Connection (Connection(Connection))
|
import Database.MongoDB.Transport (Transport(Transport))
|
||||||
import qualified Database.MongoDB.Internal.Connection as Connection
|
import qualified Database.MongoDB.Transport as T
|
||||||
import System.IO.Error (mkIOError, eofErrorType)
|
import System.IO.Error (mkIOError, eofErrorType)
|
||||||
import Network (connectTo, HostName, PortID)
|
import Network (connectTo, HostName, PortID)
|
||||||
import qualified Network.TLS as TLS
|
import qualified Network.TLS as TLS
|
||||||
|
@ -49,11 +49,11 @@ connect host port = bracketOnError Region.open Region.close $ \r -> do
|
||||||
conn <- tlsConnection context (Region.close r)
|
conn <- tlsConnection context (Region.close r)
|
||||||
newPipeWith conn
|
newPipeWith conn
|
||||||
|
|
||||||
tlsConnection :: TLS.Context -> IO () -> IO Connection
|
tlsConnection :: TLS.Context -> IO () -> IO Transport
|
||||||
tlsConnection ctx close = do
|
tlsConnection ctx close = do
|
||||||
restRef <- newIORef mempty
|
restRef <- newIORef mempty
|
||||||
return Connection
|
return Transport
|
||||||
{ Connection.read = \count -> let
|
{ T.read = \count -> let
|
||||||
readSome = do
|
readSome = do
|
||||||
rest <- readIORef restRef
|
rest <- readIORef restRef
|
||||||
writeIORef restRef mempty
|
writeIORef restRef mempty
|
||||||
|
@ -75,10 +75,10 @@ tlsConnection ctx close = do
|
||||||
unread rest
|
unread rest
|
||||||
return (acc <> Lazy.ByteString.fromStrict res)
|
return (acc <> Lazy.ByteString.fromStrict res)
|
||||||
else go (acc <> Lazy.ByteString.fromStrict chunk) (n - len)
|
else go (acc <> Lazy.ByteString.fromStrict chunk) (n - len)
|
||||||
eof = mkIOError eofErrorType "Database.MongoDB.Internal.Connection"
|
eof = mkIOError eofErrorType "Database.MongoDB.Transport"
|
||||||
Nothing Nothing
|
Nothing Nothing
|
||||||
in Lazy.ByteString.toStrict <$> go mempty count
|
in Lazy.ByteString.toStrict <$> go mempty count
|
||||||
, Connection.write = TLS.sendData ctx . Lazy.ByteString.fromStrict
|
, T.write = TLS.sendData ctx . Lazy.ByteString.fromStrict
|
||||||
, Connection.flush = TLS.contextFlush ctx
|
, T.flush = TLS.contextFlush ctx
|
||||||
, Connection.close = close
|
, T.close = close
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,8 @@
|
||||||
-- | This module defines a connection interface. It could be a regular
|
-- | This module defines a connection interface. It could be a regular
|
||||||
-- network connection, TLS connection, a mock or anything else.
|
-- network connection, TLS connection, a mock or anything else.
|
||||||
|
|
||||||
module Database.MongoDB.Internal.Connection (
|
module Database.MongoDB.Transport (
|
||||||
Connection(..),
|
Transport(..),
|
||||||
fromHandle,
|
fromHandle,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
|
@ -12,19 +12,19 @@ import Data.ByteString (ByteString)
|
||||||
import qualified Data.ByteString as ByteString
|
import qualified Data.ByteString as ByteString
|
||||||
import System.IO
|
import System.IO
|
||||||
|
|
||||||
-- | Abstract connection interface
|
-- | Abstract transport interface
|
||||||
--
|
--
|
||||||
-- `read` should return `ByteString.null` on EOF
|
-- `read` should return `ByteString.null` on EOF
|
||||||
data Connection = Connection {
|
data Transport = Transport {
|
||||||
read :: Int -> IO ByteString,
|
read :: Int -> IO ByteString,
|
||||||
write :: ByteString -> IO (),
|
write :: ByteString -> IO (),
|
||||||
flush :: IO (),
|
flush :: IO (),
|
||||||
close :: IO ()}
|
close :: IO ()}
|
||||||
|
|
||||||
fromHandle :: Handle -> IO Connection
|
fromHandle :: Handle -> IO Transport
|
||||||
-- ^ Make connection form handle
|
-- ^ Make connection form handle
|
||||||
fromHandle handle = do
|
fromHandle handle = do
|
||||||
return Connection
|
return Transport
|
||||||
{ read = ByteString.hGet handle
|
{ read = ByteString.hGet handle
|
||||||
, write = ByteString.hPut handle
|
, write = ByteString.hPut handle
|
||||||
, flush = hFlush handle
|
, flush = hFlush handle
|
|
@ -51,11 +51,11 @@ Library
|
||||||
Exposed-modules: Database.MongoDB
|
Exposed-modules: Database.MongoDB
|
||||||
Database.MongoDB.Admin
|
Database.MongoDB.Admin
|
||||||
Database.MongoDB.Connection
|
Database.MongoDB.Connection
|
||||||
Database.MongoDB.Internal.Connection
|
|
||||||
Database.MongoDB.Internal.Tls
|
Database.MongoDB.Internal.Tls
|
||||||
Database.MongoDB.Internal.Protocol
|
Database.MongoDB.Internal.Protocol
|
||||||
Database.MongoDB.Internal.Util
|
Database.MongoDB.Internal.Util
|
||||||
Database.MongoDB.Query
|
Database.MongoDB.Query
|
||||||
|
Database.MongoDB.Transport
|
||||||
|
|
||||||
Source-repository head
|
Source-repository head
|
||||||
Type: git
|
Type: git
|
||||||
|
|
Loading…
Reference in a new issue