connect timeout
This commit is contained in:
parent
9d2f09a91e
commit
19da43a348
2 changed files with 45 additions and 18 deletions
|
@ -4,13 +4,15 @@
|
||||||
|
|
||||||
module Database.MongoDB.Connection (
|
module Database.MongoDB.Connection (
|
||||||
-- * Util
|
-- * Util
|
||||||
IOE, runIOE,
|
IOE, runIOE, Secs,
|
||||||
-- * Connection
|
-- * Connection
|
||||||
Pipe, close, isClosed,
|
Pipe, close, isClosed,
|
||||||
-- * Server
|
-- * Server
|
||||||
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, readHostPortM, connect,
|
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, readHostPortM,
|
||||||
|
globalConnectTimeout, connect, connect',
|
||||||
-- * Replica Set
|
-- * Replica Set
|
||||||
ReplicaSetName, openReplicaSet, ReplicaSet, primary, secondaryOk, closeReplicaSet
|
ReplicaSetName, openReplicaSet, openReplicaSet',
|
||||||
|
ReplicaSet, primary, secondaryOk, closeReplicaSet, replSetName
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Prelude hiding (lookup)
|
import Prelude hiding (lookup)
|
||||||
|
@ -30,6 +32,9 @@ import Data.Bson as D (Document, lookup, at, (=:))
|
||||||
import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand)
|
import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand)
|
||||||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle)
|
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle)
|
||||||
import Data.List as L (lookup, intersect, partition, (\\), delete)
|
import Data.List as L (lookup, intersect, partition, (\\), delete)
|
||||||
|
import Data.IORef (IORef, newIORef, readIORef)
|
||||||
|
import System.Timeout (timeout)
|
||||||
|
import System.IO.Unsafe (unsafePerformIO)
|
||||||
|
|
||||||
adminCommand :: Command -> Pipe -> IOE Document
|
adminCommand :: Command -> Pipe -> IOE Document
|
||||||
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
|
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
|
||||||
|
@ -80,10 +85,23 @@ readHostPort :: String -> Host
|
||||||
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
|
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
|
||||||
readHostPort = runIdentity . readHostPortM
|
readHostPort = runIdentity . readHostPortM
|
||||||
|
|
||||||
|
type Secs = Double
|
||||||
|
|
||||||
|
globalConnectTimeout :: IORef Secs
|
||||||
|
-- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect\'' (and 'openReplicaSet\'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection.
|
||||||
|
globalConnectTimeout = unsafePerformIO (newIORef 6)
|
||||||
|
{-# NOINLINE globalConnectTimeout #-}
|
||||||
|
|
||||||
connect :: Host -> IOE Pipe
|
connect :: Host -> IOE Pipe
|
||||||
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if problem connecting.
|
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within 'globalConnectTimeout'.
|
||||||
connect (Host hostname port) = do
|
connect h = lift (readIORef globalConnectTimeout) >>= flip connect' h
|
||||||
handle <- ErrorT . E.try $ connectTo hostname port
|
|
||||||
|
connect' :: Secs -> Host -> IOE Pipe
|
||||||
|
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds.
|
||||||
|
connect' timeoutSecs (Host hostname port) = do
|
||||||
|
handle <- ErrorT . E.try $ do
|
||||||
|
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
|
||||||
|
maybe (ioError $ userError "connect timed out") return mh
|
||||||
lift $ newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle)
|
lift $ newPipeline $ IOStream (writeMessage handle) (readMessage handle) (hClose handle)
|
||||||
|
|
||||||
-- * Replica Set
|
-- * Replica Set
|
||||||
|
@ -91,22 +109,31 @@ connect (Host hostname port) = do
|
||||||
type ReplicaSetName = UString
|
type ReplicaSetName = UString
|
||||||
|
|
||||||
-- | Maintains a connection (created on demand) to each server in the named replica set
|
-- | Maintains a connection (created on demand) to each server in the named replica set
|
||||||
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)])
|
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs
|
||||||
|
|
||||||
|
replSetName :: ReplicaSet -> UString
|
||||||
|
-- ^ name of connected replica set
|
||||||
|
replSetName (ReplicaSet rsName _ _) = rsName
|
||||||
|
|
||||||
openReplicaSet :: (ReplicaSetName, [Host]) -> IOE ReplicaSet
|
openReplicaSet :: (ReplicaSetName, [Host]) -> IOE ReplicaSet
|
||||||
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail.
|
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet\'' instead.
|
||||||
openReplicaSet (rsName, seedList) = do
|
openReplicaSet rsSeed = lift (readIORef globalConnectTimeout) >>= flip openReplicaSet' rsSeed
|
||||||
rs <- ReplicaSet rsName <$> newMVar (map (, Nothing) seedList)
|
|
||||||
|
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IOE ReplicaSet
|
||||||
|
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
|
||||||
|
openReplicaSet' timeoutSecs (rsName, seedList) = do
|
||||||
|
vMembers <- newMVar (map (, Nothing) seedList)
|
||||||
|
let rs = ReplicaSet rsName vMembers timeoutSecs
|
||||||
_ <- updateMembers rs
|
_ <- updateMembers rs
|
||||||
return rs
|
return rs
|
||||||
|
|
||||||
closeReplicaSet :: ReplicaSet -> IO ()
|
closeReplicaSet :: ReplicaSet -> IO ()
|
||||||
-- ^ Close all connections to replica set
|
-- ^ Close all connections to replica set
|
||||||
closeReplicaSet (ReplicaSet _ vMembers) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
|
closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
|
||||||
|
|
||||||
primary :: ReplicaSet -> IOE Pipe
|
primary :: ReplicaSet -> IOE Pipe
|
||||||
-- ^ Return connection to current primary of replica set. Fail if no primary available.
|
-- ^ Return connection to current primary of replica set. Fail if no primary available.
|
||||||
primary rs@(ReplicaSet rsName _) = do
|
primary rs@(ReplicaSet rsName _ _) = do
|
||||||
mHost <- statedPrimary <$> updateMembers rs
|
mHost <- statedPrimary <$> updateMembers rs
|
||||||
case mHost of
|
case mHost of
|
||||||
Just host' -> connection rs Nothing host'
|
Just host' -> connection rs Nothing host'
|
||||||
|
@ -133,7 +160,7 @@ possibleHosts (_, info) = map readHostPort $ at "hosts" info
|
||||||
|
|
||||||
updateMembers :: ReplicaSet -> IOE ReplicaInfo
|
updateMembers :: ReplicaSet -> IOE ReplicaInfo
|
||||||
-- ^ Fetch replica info from any server and update members accordingly
|
-- ^ Fetch replica info from any server and update members accordingly
|
||||||
updateMembers rs@(ReplicaSet _ vMembers) = do
|
updateMembers rs@(ReplicaSet _ vMembers _) = do
|
||||||
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
|
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
|
||||||
modifyMVar vMembers $ \members -> do
|
modifyMVar vMembers $ \members -> do
|
||||||
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
|
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
|
||||||
|
@ -147,7 +174,7 @@ updateMembers rs@(ReplicaSet _ vMembers) = do
|
||||||
|
|
||||||
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IOE ReplicaInfo
|
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IOE ReplicaInfo
|
||||||
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
|
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
|
||||||
fetchReplicaInfo rs@(ReplicaSet rsName _) (host', mPipe) = do
|
fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
|
||||||
pipe <- connection rs mPipe host'
|
pipe <- connection rs mPipe host'
|
||||||
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
|
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
|
||||||
case D.lookup "setName" info of
|
case D.lookup "setName" info of
|
||||||
|
@ -157,11 +184,11 @@ fetchReplicaInfo rs@(ReplicaSet rsName _) (host', mPipe) = do
|
||||||
|
|
||||||
connection :: ReplicaSet -> Maybe Pipe -> Host -> IOE Pipe
|
connection :: ReplicaSet -> Maybe Pipe -> Host -> IOE Pipe
|
||||||
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
|
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
|
||||||
connection (ReplicaSet _ vMembers) mPipe host' =
|
connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
|
||||||
maybe conn (\p -> lift (isClosed p) >>= \bad -> if bad then conn else return p) mPipe
|
maybe conn (\p -> lift (isClosed p) >>= \bad -> if bad then conn else return p) mPipe
|
||||||
where
|
where
|
||||||
conn = modifyMVar vMembers $ \members -> do
|
conn = modifyMVar vMembers $ \members -> do
|
||||||
let new = connect host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
|
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
|
||||||
case L.lookup host' members of
|
case L.lookup host' members of
|
||||||
Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe)
|
Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe)
|
||||||
_ -> new
|
_ -> new
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
name: mongoDB
|
name: mongoDB
|
||||||
version: 1.0.0
|
version: 1.0.1
|
||||||
build-type: Simple
|
build-type: Simple
|
||||||
license: OtherLicense
|
license: OtherLicense
|
||||||
license-file: LICENSE
|
license-file: LICENSE
|
||||||
|
@ -58,7 +58,7 @@ install-includes:
|
||||||
include-dirs:
|
include-dirs:
|
||||||
hs-source-dirs: .
|
hs-source-dirs: .
|
||||||
other-modules:
|
other-modules:
|
||||||
ghc-prof-options:
|
ghc-prof-options: -auto-all
|
||||||
ghc-shared-options:
|
ghc-shared-options:
|
||||||
ghc-options: -Wall -O2
|
ghc-options: -Wall -O2
|
||||||
hugs-options:
|
hugs-options:
|
||||||
|
|
Loading…
Reference in a new issue