Add routedHost as an alternative to primary / secondaryOk that lets the user supply their own sort function. This is useful for applications like preferentially connecting to the host or secondary that is geographically the closest.
This commit is contained in:
parent
5e015dc769
commit
d3f54922cc
2 changed files with 38 additions and 2 deletions
|
@ -12,7 +12,7 @@ module Database.MongoDB.Connection (
|
|||
globalConnectTimeout, connect, connect',
|
||||
-- * Replica Set
|
||||
ReplicaSetName, openReplicaSet, openReplicaSet',
|
||||
ReplicaSet, primary, secondaryOk, closeReplicaSet, replSetName
|
||||
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
|
||||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
|
@ -29,7 +29,7 @@ import Control.Applicative ((<$>))
|
|||
import Data.UString (UString, unpack)
|
||||
import Data.Bson as D (Document, lookup, at, (=:))
|
||||
import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand)
|
||||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle)
|
||||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle, mergesortM)
|
||||
import Data.List as L (lookup, intersect, partition, (\\), delete)
|
||||
import Data.IORef (IORef, newIORef, readIORef)
|
||||
import System.Timeout (timeout)
|
||||
|
@ -146,6 +146,15 @@ secondaryOk rs = do
|
|||
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
|
||||
untilSuccess (connection rs Nothing) hosts'
|
||||
|
||||
routedHost :: ((Host, Bool) -> (Host, Bool) -> IOE Ordering) -> ReplicaSet -> IOE Pipe
|
||||
-- ^ Return a connection to a host using a user-supplied sorting function, which sorts based on a tuple containing the host and a boolean indicating whether the host is primary.
|
||||
routedHost f rs = do
|
||||
info <- updateMembers rs
|
||||
hosts <- lift $ shuffle (possibleHosts info)
|
||||
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
|
||||
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
|
||||
untilSuccess (connection rs Nothing) hosts'
|
||||
|
||||
type ReplicaInfo = (Host, Document)
|
||||
-- ^ Result of isMaster command on host in replica set. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false
|
||||
|
||||
|
|
|
@ -30,6 +30,33 @@ deriving instance Ord PortID
|
|||
class (MonadIO m, Applicative m, Functor m) => MonadIO' m
|
||||
instance (MonadIO m, Applicative m, Functor m) => MonadIO' m
|
||||
|
||||
-- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude
|
||||
mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a]
|
||||
mergesortM cmp = mergesortM' cmp . map wrap
|
||||
|
||||
mergesortM' :: Monad m => (a -> a -> m Ordering) -> [[a]] -> m [a]
|
||||
mergesortM' _ [] = return []
|
||||
mergesortM' _ [xs] = return xs
|
||||
mergesortM' cmp xss = mergesortM' cmp =<< (merge_pairsM cmp xss)
|
||||
|
||||
merge_pairsM :: Monad m => (a -> a -> m Ordering) -> [[a]] -> m [[a]]
|
||||
merge_pairsM _ [] = return []
|
||||
merge_pairsM _ [xs] = return [xs]
|
||||
merge_pairsM cmp (xs:ys:xss) = liftM2 (:) (mergeM cmp xs ys) (merge_pairsM cmp xss)
|
||||
|
||||
mergeM :: Monad m => (a -> a -> m Ordering) -> [a] -> [a] -> m [a]
|
||||
mergeM _ [] ys = return ys
|
||||
mergeM _ xs [] = return xs
|
||||
mergeM cmp (x:xs) (y:ys)
|
||||
= do
|
||||
c <- x `cmp` y
|
||||
case c of
|
||||
GT -> liftM (y:) (mergeM cmp (x:xs) ys)
|
||||
_ -> liftM (x:) (mergeM cmp xs (y:ys))
|
||||
|
||||
wrap :: a -> [a]
|
||||
wrap x = [x]
|
||||
|
||||
shuffle :: [a] -> IO [a]
|
||||
-- ^ Randomly shuffle items in list
|
||||
shuffle list = shuffle' list (L.length list) <$> newStdGen
|
||||
|
|
Loading…
Reference in a new issue