2011-07-05 14:37:01 +00:00
-- | Connect to a single server or a replica set of servers
2010-06-15 03:14:40 +00:00
2011-07-05 14:37:01 +00:00
{- # LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections # -}
2010-06-15 03:14:40 +00:00
module Database.MongoDB.Connection (
2011-07-13 19:34:52 +00:00
-- * Util
2011-07-22 14:31:24 +00:00
Secs , IOE , runIOE ,
2011-07-05 14:37:01 +00:00
-- * Connection
Pipe , close , isClosed ,
2011-07-13 19:34:52 +00:00
-- * Server
2012-06-10 19:47:14 +00:00
Host ( .. ) , PortID ( .. ) , defaultPort , host , showHostPort , readHostPort ,
readHostPortM , globalConnectTimeout , connect , connect' ,
2011-07-05 14:37:01 +00:00
-- * Replica Set
2011-07-21 15:27:19 +00:00
ReplicaSetName , openReplicaSet , openReplicaSet' ,
2012-02-12 04:34:07 +00:00
ReplicaSet , primary , secondaryOk , routedHost , closeReplicaSet , replSetName
2010-06-15 03:14:40 +00:00
) where
2011-03-14 20:24:28 +00:00
import Prelude hiding ( lookup )
2012-06-10 19:47:14 +00:00
import Data.IORef ( IORef , newIORef , readIORef )
import Data.List ( intersect , partition , ( \\ ) , delete )
import Control.Applicative ( ( <$> ) )
import Control.Monad ( forM_ )
2011-07-05 14:37:01 +00:00
import Network ( HostName , PortID ( .. ) , connectTo )
2012-06-10 19:47:14 +00:00
import System.IO.Unsafe ( unsafePerformIO )
import System.Timeout ( timeout )
import Text.ParserCombinators.Parsec ( parse , many1 , letter , digit , char , eof ,
spaces , try , ( <|> ) )
import qualified Control.Exception as E
import qualified Data.List as List
2011-07-05 14:37:01 +00:00
import Control.Monad.Identity ( runIdentity )
import Control.Monad.Error ( ErrorT ( .. ) , lift , throwError )
2012-06-10 19:47:14 +00:00
import Control.Concurrent.MVar.Lifted ( MVar , newMVar , withMVar , modifyMVar ,
readMVar )
import Data.Bson ( Document , at , ( =: ) )
2012-05-08 15:13:25 +00:00
import Data.Text ( Text )
2012-06-10 19:47:14 +00:00
import qualified Data.Bson as B
2012-05-08 15:13:25 +00:00
import qualified Data.Text as T
2012-06-10 19:47:14 +00:00
import Database.MongoDB.Internal.Protocol ( Pipe , newPipe )
import Database.MongoDB.Internal.Util ( untilSuccess , liftIOE , runIOE ,
updateAssocs , shuffle , mergesortM )
import Database.MongoDB.Query ( Command , Failure ( ConnectionFailure ) , access ,
slaveOk , runCommand )
import System.IO.Pipeline ( IOE , close , isClosed )
2011-07-05 14:37:01 +00:00
adminCommand :: Command -> Pipe -> IOE Document
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
adminCommand cmd pipe =
2011-07-09 02:13:47 +00:00
liftIOE failureToIOError . ErrorT $ access pipe slaveOk " admin " $ runCommand cmd
2011-07-05 14:37:01 +00:00
where
failureToIOError ( ConnectionFailure e ) = e
failureToIOError e = userError $ show e
2010-07-27 21:18:53 +00:00
-- * Host
data Host = Host HostName PortID deriving ( Show , Eq , Ord )
2010-06-21 15:06:20 +00:00
2010-06-15 03:14:40 +00:00
defaultPort :: PortID
2011-07-13 19:34:52 +00:00
-- ^ Default MongoDB port = 27017
2010-06-15 03:14:40 +00:00
defaultPort = PortNumber 27017
2010-07-27 21:18:53 +00:00
host :: HostName -> Host
2011-07-13 19:34:52 +00:00
-- ^ Host on 'defaultPort'
2010-07-27 21:18:53 +00:00
host hostname = Host hostname defaultPort
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
showHostPort :: Host -> String
-- ^ Display host as \"host:port\"
2011-02-22 15:48:14 +00:00
-- TODO: Distinguish Service and UnixSocket port
showHostPort ( Host hostname port ) = hostname ++ " : " ++ portname where
portname = case port of
Service s -> s
PortNumber p -> show p
2011-02-22 15:31:54 +00:00
# if ! defined ( mingw32_HOST_OS ) && ! defined ( cygwin32_HOST_OS ) && ! defined ( _WIN32 )
2011-02-22 15:48:14 +00:00
UnixSocket s -> s
2011-02-22 15:31:54 +00:00
# endif
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
readHostPortM :: ( Monad m ) => String -> m Host
2011-07-13 19:34:52 +00:00
-- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
2011-02-22 15:48:14 +00:00
-- TODO: handle Service and UnixSocket port
2010-07-27 21:18:53 +00:00
readHostPortM = either ( fail . show ) return . parse parser " readHostPort " where
2010-06-15 03:14:40 +00:00
hostname = many1 ( letter <|> digit <|> char '-' <|> char '.' )
parser = do
spaces
2010-07-27 21:18:53 +00:00
h <- hostname
2012-06-10 19:47:14 +00:00
try ( spaces >> eof >> return ( host h ) ) <|> do
2010-06-15 03:14:40 +00:00
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
2010-07-27 21:18:53 +00:00
return $ Host h ( PortNumber $ fromIntegral port )
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
readHostPort :: String -> Host
2011-07-13 19:34:52 +00:00
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
2010-07-27 21:18:53 +00:00
readHostPort = runIdentity . readHostPortM
2010-06-15 03:14:40 +00:00
2011-07-21 15:27:19 +00:00
type Secs = Double
globalConnectTimeout :: IORef Secs
-- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect\'' (and 'openReplicaSet\'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection.
globalConnectTimeout = unsafePerformIO ( newIORef 6 )
{- # NOINLINE globalConnectTimeout # -}
2011-07-05 14:37:01 +00:00
connect :: Host -> IOE Pipe
2011-07-21 15:27:19 +00:00
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within 'globalConnectTimeout'.
connect h = lift ( readIORef globalConnectTimeout ) >>= flip connect' h
connect' :: Secs -> Host -> IOE Pipe
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds.
connect' timeoutSecs ( Host hostname port ) = do
handle <- ErrorT . E . try $ do
mh <- timeout ( round $ timeoutSecs * 1000000 ) ( connectTo hostname port )
maybe ( ioError $ userError " connect timed out " ) return mh
2011-07-22 14:31:24 +00:00
lift $ newPipe handle
2011-07-05 14:37:01 +00:00
2010-10-27 20:13:23 +00:00
-- * Replica Set
2010-06-15 03:14:40 +00:00
2012-05-08 15:13:25 +00:00
type ReplicaSetName = Text
2011-07-05 14:37:01 +00:00
-- | Maintains a connection (created on demand) to each server in the named replica set
2011-07-21 15:27:19 +00:00
data ReplicaSet = ReplicaSet ReplicaSetName ( MVar [ ( Host , Maybe Pipe ) ] ) Secs
2012-05-08 15:13:25 +00:00
replSetName :: ReplicaSet -> Text
2011-07-21 15:27:19 +00:00
-- ^ name of connected replica set
replSetName ( ReplicaSet rsName _ _ ) = rsName
2011-07-05 14:37:01 +00:00
openReplicaSet :: ( ReplicaSetName , [ Host ] ) -> IOE ReplicaSet
2011-07-21 15:27:19 +00:00
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet\'' instead.
openReplicaSet rsSeed = lift ( readIORef globalConnectTimeout ) >>= flip openReplicaSet' rsSeed
openReplicaSet' :: Secs -> ( ReplicaSetName , [ Host ] ) -> IOE ReplicaSet
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
openReplicaSet' timeoutSecs ( rsName , seedList ) = do
vMembers <- newMVar ( map ( , Nothing ) seedList )
let rs = ReplicaSet rsName vMembers timeoutSecs
2011-07-05 14:37:01 +00:00
_ <- updateMembers rs
return rs
2011-07-13 19:34:52 +00:00
closeReplicaSet :: ReplicaSet -> IO ()
-- ^ Close all connections to replica set
2011-07-21 15:27:19 +00:00
closeReplicaSet ( ReplicaSet _ vMembers _ ) = withMVar vMembers $ mapM_ ( maybe ( return () ) close . snd )
2011-07-13 19:34:52 +00:00
2011-07-05 14:37:01 +00:00
primary :: ReplicaSet -> IOE Pipe
2011-07-13 19:34:52 +00:00
-- ^ Return connection to current primary of replica set. Fail if no primary available.
2011-07-21 15:27:19 +00:00
primary rs @ ( ReplicaSet rsName _ _ ) = do
2011-07-05 14:37:01 +00:00
mHost <- statedPrimary <$> updateMembers rs
case mHost of
Just host' -> connection rs Nothing host'
2012-05-08 15:13:25 +00:00
Nothing -> throwError $ userError $ " replica set " ++ T . unpack rsName ++ " has no primary "
2011-07-05 14:37:01 +00:00
secondaryOk :: ReplicaSet -> IOE Pipe
2011-07-13 19:34:52 +00:00
-- ^ Return connection to a random secondary, or primary if no secondaries available.
2011-07-05 14:37:01 +00:00
secondaryOk rs = do
info <- updateMembers rs
hosts <- lift $ shuffle ( possibleHosts info )
2011-07-13 19:34:52 +00:00
let hosts' = maybe hosts ( \ p -> delete p hosts ++ [ p ] ) ( statedPrimary info )
untilSuccess ( connection rs Nothing ) hosts'
2011-07-05 14:37:01 +00:00
2012-02-12 04:34:07 +00:00
routedHost :: ( ( Host , Bool ) -> ( Host , Bool ) -> IOE Ordering ) -> ReplicaSet -> IOE Pipe
-- ^ Return a connection to a host using a user-supplied sorting function, which sorts based on a tuple containing the host and a boolean indicating whether the host is primary.
routedHost f rs = do
info <- updateMembers rs
hosts <- lift $ shuffle ( possibleHosts info )
let addIsPrimary h = ( h , if Just h == statedPrimary info then True else False )
hosts' <- mergesortM ( \ a b -> f ( addIsPrimary a ) ( addIsPrimary b ) ) hosts
untilSuccess ( connection rs Nothing ) hosts'
2011-07-05 14:37:01 +00:00
type ReplicaInfo = ( Host , Document )
-- ^ Result of isMaster command on host in replica set. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false
statedPrimary :: ReplicaInfo -> Maybe Host
-- ^ Primary of replica set or Nothing if there isn't one
2012-06-10 19:47:14 +00:00
statedPrimary ( host' , info ) = if ( at " ismaster " info ) then Just host' else readHostPort <$> B . lookup " primary " info
2011-07-05 14:37:01 +00:00
possibleHosts :: ReplicaInfo -> [ Host ]
-- ^ Non-arbiter, non-hidden members of replica set
possibleHosts ( _ , info ) = map readHostPort $ at " hosts " info
updateMembers :: ReplicaSet -> IOE ReplicaInfo
-- ^ Fetch replica info from any server and update members accordingly
2011-07-21 15:27:19 +00:00
updateMembers rs @ ( ReplicaSet _ vMembers _ ) = do
2011-07-05 14:37:01 +00:00
( host' , info ) <- untilSuccess ( fetchReplicaInfo rs ) =<< readMVar vMembers
modifyMVar vMembers $ \ members -> do
let ( ( members' , old ) , new ) = intersection ( map readHostPort $ at " hosts " info ) members
lift $ forM_ old $ \ ( _ , mPipe ) -> maybe ( return () ) close mPipe
return ( members' ++ map ( , Nothing ) new , ( host' , info ) )
where
intersection :: ( Eq k ) => [ k ] -> [ ( k , v ) ] -> ( ( [ ( k , v ) ] , [ ( k , v ) ] ) , [ k ] )
intersection keys assocs = ( partition ( flip elem inKeys . fst ) assocs , keys \\ inKeys ) where
assocKeys = map fst assocs
inKeys = intersect keys assocKeys
fetchReplicaInfo :: ReplicaSet -> ( Host , Maybe Pipe ) -> IOE ReplicaInfo
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
2011-07-21 15:27:19 +00:00
fetchReplicaInfo rs @ ( ReplicaSet rsName _ _ ) ( host' , mPipe ) = do
2011-07-05 14:37:01 +00:00
pipe <- connection rs mPipe host'
info <- adminCommand [ " isMaster " =: ( 1 :: Int ) ] pipe
2012-06-10 19:47:14 +00:00
case B . lookup " setName " info of
2012-05-08 15:13:25 +00:00
Nothing -> throwError $ userError $ show host' ++ " not a member of any replica set, including " ++ T . unpack rsName ++ " : " ++ show info
Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T . unpack rsName ++ " : " ++ show info
2011-07-05 14:37:01 +00:00
Just _ -> return ( host' , info )
connection :: ReplicaSet -> Maybe Pipe -> Host -> IOE Pipe
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
2011-07-21 15:27:19 +00:00
connection ( ReplicaSet _ vMembers timeoutSecs ) mPipe host' =
2011-07-05 14:37:01 +00:00
maybe conn ( \ p -> lift ( isClosed p ) >>= \ bad -> if bad then conn else return p ) mPipe
2010-10-27 20:13:23 +00:00
where
2011-07-05 14:37:01 +00:00
conn = modifyMVar vMembers $ \ members -> do
2011-07-21 15:27:19 +00:00
let new = connect' timeoutSecs host' >>= \ pipe -> return ( updateAssocs host' ( Just pipe ) members , pipe )
2012-06-10 19:47:14 +00:00
case List . lookup host' members of
2011-07-05 14:37:01 +00:00
Just ( Just pipe ) -> lift ( isClosed pipe ) >>= \ bad -> if bad then new else return ( members , pipe )
_ -> new
2010-06-15 03:14:40 +00:00
{- Authors: Tony Hannan <tony@10gen.com>
2011-03-11 00:37:48 +00:00
Copyright 2011 10 gen Inc .
2010-06-15 03:14:40 +00:00
Licensed under the Apache License , Version 2.0 ( the " License " ) ; you may not use this file except in compliance with the License . You may obtain a copy of the License at : http :// www . apache . org / licenses / LICENSE - 2.0 . Unless required by applicable law or agreed to in writing , software distributed under the License is distributed on an " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied . See the License for the specific language governing permissions and limitations under the License . - }