2010-11-01 00:38:38 +00:00
{- | A pool of TCP connections to a single server or a replica set of servers. -}
2010-06-15 03:14:40 +00:00
2011-02-22 15:31:54 +00:00
{- # LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes, FlexibleInstances # -}
2010-06-15 03:14:40 +00:00
module Database.MongoDB.Connection (
2010-12-27 05:23:02 +00:00
-- * Pipe
Pipe ,
2010-07-27 21:18:53 +00:00
-- * Host
Host ( .. ) , PortID ( .. ) , host , showHostPort , readHostPort , readHostPortM ,
2010-06-15 03:14:40 +00:00
-- * ReplicaSet
2010-11-01 00:38:38 +00:00
ReplicaSet ( .. ) , Name ,
2010-07-27 21:18:53 +00:00
-- * MasterOrSlaveOk
MasterOrSlaveOk ( .. ) ,
2010-11-01 00:38:38 +00:00
-- * Connection Pool
2010-12-27 05:23:02 +00:00
Service ( .. ) ,
2010-12-20 02:08:53 +00:00
connHost , replicaSet
2010-06-15 03:14:40 +00:00
) where
2011-03-14 20:24:28 +00:00
import Prelude hiding ( lookup )
2010-12-20 02:08:53 +00:00
import Database.MongoDB.Internal.Protocol as X
2010-12-27 05:23:02 +00:00
import qualified Network.Abstract as C
import Network.Abstract ( IOE , NetworkIO , ANetwork )
2011-03-14 20:24:28 +00:00
import Data.Bson ( ( =: ) , at , lookup , UString )
2010-12-20 02:08:53 +00:00
import Control.Pipeline as P
2010-06-15 03:14:40 +00:00
import Control.Applicative ( ( <$> ) )
import Control.Exception ( assert )
import Control.Monad.Error
2010-10-27 20:13:23 +00:00
import Control.Monad.MVar
2010-12-20 02:08:53 +00:00
import Network ( HostName , PortID ( .. ) )
2010-10-27 20:46:11 +00:00
import Data.Bson ( Document , look )
2010-07-27 21:18:53 +00:00
import Text.ParserCombinators.Parsec as T ( parse , many1 , letter , digit , char , eof , spaces , try , ( <|> ) )
2010-06-15 03:14:40 +00:00
import Control.Monad.Identity
2010-10-27 20:13:23 +00:00
import Control.Monad.Util ( MonadIO ' , untilSuccess )
2010-10-27 20:46:11 +00:00
import Database.MongoDB.Internal.Util ( ) -- PortID instances
2010-10-27 20:13:23 +00:00
import Var.Pool
import System.Random ( newStdGen , randomRs )
import Data.List ( delete , find , nub )
2010-11-01 00:38:38 +00:00
import System.IO.Unsafe ( unsafePerformIO )
2010-06-15 03:14:40 +00:00
2010-10-27 20:13:23 +00:00
type Name = UString
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
adminCommand :: Document -> Request
-- ^ Convert command to request
adminCommand cmd = Query { .. } where
qOptions = [ SlaveOK ]
qFullCollection = " admin.$cmd "
qSkip = 0
2011-03-14 20:24:28 +00:00
qBatchSize = - 1
2010-07-27 21:18:53 +00:00
qSelector = cmd
qProjector = []
commandReply :: String -> Reply -> Document
-- ^ Extract first document from reply. Error if query error, using given string as prefix error message.
commandReply title Reply { .. } = if elem QueryError rResponseFlags
then error $ title ++ " : " ++ at " $err " ( head rDocuments )
2011-03-11 00:37:48 +00:00
else if null rDocuments
then error ( " empty reply to: " ++ title )
else head rDocuments
2010-07-27 21:18:53 +00:00
-- * Host
data Host = Host HostName PortID deriving ( Show , Eq , Ord )
2010-06-21 15:06:20 +00:00
2010-06-15 03:14:40 +00:00
defaultPort :: PortID
defaultPort = PortNumber 27017
2010-07-27 21:18:53 +00:00
host :: HostName -> Host
-- ^ Host on default MongoDB port
host hostname = Host hostname defaultPort
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
showHostPort :: Host -> String
-- ^ Display host as \"host:port\"
2011-02-22 15:48:14 +00:00
-- TODO: Distinguish Service and UnixSocket port
showHostPort ( Host hostname port ) = hostname ++ " : " ++ portname where
portname = case port of
Service s -> s
PortNumber p -> show p
2011-02-22 15:31:54 +00:00
# if ! defined ( mingw32_HOST_OS ) && ! defined ( cygwin32_HOST_OS ) && ! defined ( _WIN32 )
2011-02-22 15:48:14 +00:00
UnixSocket s -> s
2011-02-22 15:31:54 +00:00
# endif
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
readHostPortM :: ( Monad m ) => String -> m Host
-- ^ Read string \"hostname:port\" as @Host hosthame port@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
2011-02-22 15:48:14 +00:00
-- TODO: handle Service and UnixSocket port
2010-07-27 21:18:53 +00:00
readHostPortM = either ( fail . show ) return . parse parser " readHostPort " where
2010-06-15 03:14:40 +00:00
hostname = many1 ( letter <|> digit <|> char '-' <|> char '.' )
parser = do
spaces
2010-07-27 21:18:53 +00:00
h <- hostname
T . try ( spaces >> eof >> return ( host h ) ) <|> do
2010-06-15 03:14:40 +00:00
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
2010-07-27 21:18:53 +00:00
return $ Host h ( PortNumber $ fromIntegral port )
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
readHostPort :: String -> Host
-- ^ Read string \"hostname:port\" as @Host hostname port@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
readHostPort = runIdentity . readHostPortM
2010-06-15 03:14:40 +00:00
2010-10-27 20:13:23 +00:00
-- * Replica Set
2010-06-15 03:14:40 +00:00
2010-10-27 20:13:23 +00:00
data ReplicaSet = ReplicaSet { setName :: Name , seedHosts :: [ Host ] } deriving ( Show )
-- ^ Replica set of hosts identified by set name. At least one of the seed hosts must be an active member of the set. However, this list is not used to identify the set, just the set name.
2010-06-15 03:14:40 +00:00
2010-10-27 20:13:23 +00:00
instance Eq ReplicaSet where ReplicaSet x _ == ReplicaSet y _ = x == y
2010-06-15 03:14:40 +00:00
2010-10-27 20:13:23 +00:00
-- ** Replica Info
2010-06-15 03:14:40 +00:00
2011-03-14 20:24:28 +00:00
getReplicaInfo :: ConnPool Host -> IOE ReplicaInfo
2010-10-27 20:13:23 +00:00
-- ^ Get replica info of the connected host. Throw IOError if connection fails or host is not part of a replica set (no /hosts/ and /primary/ field).
2011-03-14 20:24:28 +00:00
getReplicaInfo conn = do
pipe <- getHostPipe conn
2010-12-20 02:08:53 +00:00
promise <- X . call pipe [] ( adminCommand [ " ismaster " =: ( 1 :: Int ) ] )
2010-10-27 20:13:23 +00:00
info <- commandReply " ismaster " <$> promise
2010-10-27 20:46:11 +00:00
_ <- look " hosts " info
2011-03-14 20:24:28 +00:00
_ <- look " ismaster " info
return $ ReplicaInfo ( connHost conn ) info
2010-06-15 03:14:40 +00:00
2011-03-14 20:24:28 +00:00
data ReplicaInfo = ReplicaInfo { infoHost :: Host , infoDoc :: Document } deriving ( Show )
2010-11-01 00:38:38 +00:00
-- ^ Configuration info of a host in a replica set (result of /ismaster/ command). Contains all the hosts in the replica set plus its role in that set (master, slave, or arbiter)
2010-06-15 03:14:40 +00:00
2010-10-27 20:46:11 +00:00
{- isPrimary :: ReplicaInfo - > Bool
2010-07-27 21:18:53 +00:00
-- ^ Is the replica described by this info a master/primary (not slave or arbiter)?
2010-10-27 20:13:23 +00:00
isPrimary = true1 " ismaster "
2010-06-15 03:14:40 +00:00
2010-10-27 20:13:23 +00:00
isSecondary :: ReplicaInfo -> Bool
2010-07-27 21:18:53 +00:00
-- ^ Is the replica described by this info a slave/secondary (not master or arbiter)
2010-10-27 20:46:11 +00:00
isSecondary = true1 " secondary " - }
2010-06-15 03:14:40 +00:00
2011-03-14 20:24:28 +00:00
primary :: ReplicaInfo -> Maybe Host
-- ^ Read primary from configuration info. During failover or minor network partition there is no primary (Nothing).
primary ( ReplicaInfo host info ) = if at " ismaster " info then Just host else readHostPort <$> lookup " primary " info
2010-06-15 03:14:40 +00:00
2011-03-14 20:24:28 +00:00
replicas :: ReplicaInfo -> [ Host ]
-- ^ All replicas in set according to this replica configuration info with primary at head, if there is one.
replicas info = maybe members ( \ m -> m : delete m members ) master where
members = map readHostPort $ at " hosts " ( infoDoc info )
2010-10-27 20:13:23 +00:00
master = primary info
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
-- * MasterOrSlaveOk
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
data MasterOrSlaveOk =
2010-06-15 03:14:40 +00:00
Master -- ^ connect to master only
| SlaveOk -- ^ connect to a slave, or master if no slave available
deriving ( Show , Eq )
2010-10-27 20:46:11 +00:00
{- isMS :: MasterOrSlaveOk - > ReplicaInfo - > Bool
2010-07-27 21:18:53 +00:00
-- ^ Does the host (as described by its replica-info) match the master/slave type
2010-10-27 20:13:23 +00:00
isMS Master i = isPrimary i
2010-10-27 20:46:11 +00:00
isMS SlaveOk i = isSecondary i || isPrimary i - }
2010-06-15 03:14:40 +00:00
2010-11-01 00:38:38 +00:00
-- * Connection Pool
2010-06-15 03:14:40 +00:00
2010-10-27 20:13:23 +00:00
type Pool' = Pool IOError
2010-12-27 05:23:02 +00:00
-- | A Service is a single server ('Host') or a replica set of servers ('ReplicaSet')
class Service t where
2010-11-01 00:38:38 +00:00
data ConnPool t
-- ^ A pool of TCP connections ('Pipe's) to a host or a replica set of hosts
2010-12-27 05:23:02 +00:00
newConnPool :: ( NetworkIO m ) => Int -> t -> m ( ConnPool t )
2010-11-01 00:38:38 +00:00
-- ^ Create a ConnectionPool to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host.
2010-12-20 02:08:53 +00:00
getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe
2010-10-27 20:13:23 +00:00
-- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk).
2010-11-01 00:38:38 +00:00
killPipes :: ConnPool t -> IO ()
2010-12-20 02:08:53 +00:00
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they get garbage collected.
2010-11-01 00:38:38 +00:00
-- ** ConnectionPool Host
2010-10-27 20:13:23 +00:00
2010-12-27 05:23:02 +00:00
instance Service Host where
2010-11-01 00:38:38 +00:00
data ConnPool Host = HostConnPool { connHost :: Host , connPool :: Pool' Pipe }
2010-10-27 20:13:23 +00:00
-- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style.
2010-12-27 05:23:02 +00:00
newConnPool poolSize' host' = liftIO . newHostConnPool poolSize' host' =<< C . network
2010-11-01 00:38:38 +00:00
-- ^ Create a connection pool to server (host or replica set)
2010-10-27 20:13:23 +00:00
getPipe _ = getHostPipe
-- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect.
2010-11-01 00:38:38 +00:00
killPipes ( HostConnPool _ pool ) = killAll pool
2010-10-27 20:13:23 +00:00
2010-11-01 00:38:38 +00:00
instance Show ( ConnPool Host ) where
show HostConnPool { connHost } = " ConnPool " ++ show connHost
2010-12-27 05:23:02 +00:00
newHostConnPool :: Int -> Host -> ANetwork -> IO ( ConnPool Host )
2010-10-27 20:13:23 +00:00
-- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style.
2010-12-27 05:23:02 +00:00
newHostConnPool poolSize' host' net = HostConnPool host' <$> newPool Factory { .. } poolSize' where
2010-12-20 02:08:53 +00:00
newResource = tcpConnect net host'
killResource = P . close
isExpired = P . isClosed
2010-10-27 20:13:23 +00:00
2010-12-20 02:08:53 +00:00
getHostPipe :: ConnPool Host -> IOE Pipe
2010-10-27 20:13:23 +00:00
-- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host.
2010-11-01 00:38:38 +00:00
getHostPipe ( HostConnPool _ pool ) = aResource pool
2010-10-27 20:13:23 +00:00
2010-12-27 05:23:02 +00:00
tcpConnect :: ANetwork -> Host -> IOE Pipe
2010-10-27 20:13:23 +00:00
-- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect.
2010-12-27 05:23:02 +00:00
tcpConnect net ( Host hostname port ) = newPipeline =<< C . connect net ( C . Server hostname port )
2010-10-27 20:13:23 +00:00
-- ** Connection ReplicaSet
2010-12-27 05:23:02 +00:00
instance Service ReplicaSet where
2010-11-01 00:38:38 +00:00
data ConnPool ReplicaSet = ReplicaSetConnPool {
2010-12-27 05:23:02 +00:00
network :: ANetwork ,
2010-10-27 20:13:23 +00:00
repsetName :: Name ,
2010-11-01 00:38:38 +00:00
currentMembers :: MVar [ ConnPool Host ] } -- master at head after a refresh
2010-12-27 05:23:02 +00:00
newConnPool poolSize' repset = liftIO . newSetConnPool poolSize' repset =<< C . network
2010-10-27 20:13:23 +00:00
getPipe = getSetPipe
2010-11-01 00:38:38 +00:00
killPipes ReplicaSetConnPool { .. } = withMVar currentMembers ( mapM_ killPipes )
instance Show ( ConnPool ReplicaSet ) where
show r = " ConnPool " ++ show ( unsafePerformIO $ replicaSet r )
2010-10-27 20:13:23 +00:00
2010-11-01 00:38:38 +00:00
replicaSet :: ( MonadIO' m ) => ConnPool ReplicaSet -> m ReplicaSet
-- ^ Return replicas set name with current members as seed list
replicaSet ReplicaSetConnPool { .. } = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
2010-10-27 20:13:23 +00:00
2010-12-27 05:23:02 +00:00
newSetConnPool :: Int -> ReplicaSet -> ANetwork -> IO ( ConnPool ReplicaSet )
2010-11-01 00:38:38 +00:00
-- ^ Create a connection pool to each member of the replica set.
2010-12-27 05:23:02 +00:00
newSetConnPool poolSize' repset net = assert ( not . null $ seedHosts repset ) $ do
currentMembers <- newMVar =<< mapM ( \ h -> newHostConnPool poolSize' h net ) ( seedHosts repset )
2010-12-20 02:08:53 +00:00
return $ ReplicaSetConnPool net ( setName repset ) currentMembers
2010-10-27 20:13:23 +00:00
2010-12-20 02:08:53 +00:00
getMembers :: Name -> [ ConnPool Host ] -> IOE [ Host ]
2010-10-27 20:13:23 +00:00
-- ^ Get members of replica set, master first. Query supplied connections until config found.
2011-03-14 20:24:28 +00:00
-- TODO: Verify config for request replica set name and not some other replica set. "ismaster" reply includes "setName" in result.
getMembers _repsetName connections = replicas <$> untilSuccess getReplicaInfo connections
2010-10-27 20:13:23 +00:00
2010-12-27 05:23:02 +00:00
refreshMembers :: ANetwork -> Name -> [ ConnPool Host ] -> IOE [ ConnPool Host ]
2010-10-27 20:13:23 +00:00
-- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected.
2010-12-20 02:08:53 +00:00
refreshMembers net repsetName connections = do
2010-10-27 20:13:23 +00:00
n <- liftIO . poolSize . connPool $ head connections
2010-12-20 02:08:53 +00:00
mapM ( liftIO . connection n ) =<< getMembers repsetName connections
2010-10-27 20:13:23 +00:00
where
2010-12-27 05:23:02 +00:00
connection n host' = maybe ( newHostConnPool n host' net ) return mc where
2010-12-20 02:08:53 +00:00
mc = find ( ( host' == ) . connHost ) connections
2010-10-27 20:13:23 +00:00
2010-12-20 02:08:53 +00:00
getSetPipe :: MasterOrSlaveOk -> ConnPool ReplicaSet -> IOE Pipe
2010-10-27 20:13:23 +00:00
-- ^ Return a pipe to primary or a random secondary in replica set. Use primary for SlaveOk if and only if no secondaries. Note, refreshes members each time (makes ismaster call to primary).
2010-11-01 00:38:38 +00:00
getSetPipe mos ReplicaSetConnPool { .. } = modifyMVar currentMembers $ \ conns -> do
2010-12-20 02:08:53 +00:00
connections <- refreshMembers network repsetName conns -- master at head after refresh
2010-10-27 20:13:23 +00:00
pipe <- case mos of
Master -> getHostPipe ( head connections )
SlaveOk -> do
let n = length connections - 1
is <- take ( max 1 n ) . nub . randomRs ( min 1 n , n ) <$> liftIO newStdGen
untilSuccess ( getHostPipe . ( connections !! ) ) is
return ( connections , pipe )
2010-06-15 03:14:40 +00:00
{- Authors: Tony Hannan <tony@10gen.com>
2011-03-11 00:37:48 +00:00
Copyright 2011 10 gen Inc .
2010-06-15 03:14:40 +00:00
Licensed under the Apache License , Version 2.0 ( the " License " ) ; you may not use this file except in compliance with the License . You may obtain a copy of the License at : http :// www . apache . org / licenses / LICENSE - 2.0 . Unless required by applicable law or agreed to in writing , software distributed under the License is distributed on an " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied . See the License for the specific language governing permissions and limitations under the License . - }