From d3f54922cc07bb1e96092b07f543f2930f3607d9 Mon Sep 17 00:00:00 2001 From: Andrew Miller Date: Sun, 12 Feb 2012 17:34:07 +1300 Subject: [PATCH] Add routedHost as an alternative to primary / secondaryOk that lets the user supply their own sort function. This is useful for applications like preferentially connecting to the host or secondary that is geographically the closest. --- Database/MongoDB/Connection.hs | 13 +++++++++++-- Database/MongoDB/Internal/Util.hs | 27 +++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index e5ddddf..bd7f330 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -12,7 +12,7 @@ module Database.MongoDB.Connection ( globalConnectTimeout, connect, connect', -- * Replica Set ReplicaSetName, openReplicaSet, openReplicaSet', - ReplicaSet, primary, secondaryOk, closeReplicaSet, replSetName + ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName ) where import Prelude hiding (lookup) @@ -29,7 +29,7 @@ import Control.Applicative ((<$>)) import Data.UString (UString, unpack) import Data.Bson as D (Document, lookup, at, (=:)) import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand) -import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle) +import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle, mergesortM) import Data.List as L (lookup, intersect, partition, (\\), delete) import Data.IORef (IORef, newIORef, readIORef) import System.Timeout (timeout) @@ -146,6 +146,15 @@ secondaryOk rs = do let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info) untilSuccess (connection rs Nothing) hosts' +routedHost :: ((Host, Bool) -> (Host, Bool) -> IOE Ordering) -> ReplicaSet -> IOE Pipe +-- ^ Return a connection to a host using a user-supplied sorting function, which sorts based on a tuple containing the host and a boolean indicating whether the host is primary. +routedHost f rs = do + info <- updateMembers rs + hosts <- lift $ shuffle (possibleHosts info) + let addIsPrimary h = (h, if Just h == statedPrimary info then True else False) + hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts + untilSuccess (connection rs Nothing) hosts' + type ReplicaInfo = (Host, Document) -- ^ Result of isMaster command on host in replica set. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false diff --git a/Database/MongoDB/Internal/Util.hs b/Database/MongoDB/Internal/Util.hs index 7bf5146..80551f3 100644 --- a/Database/MongoDB/Internal/Util.hs +++ b/Database/MongoDB/Internal/Util.hs @@ -30,6 +30,33 @@ deriving instance Ord PortID class (MonadIO m, Applicative m, Functor m) => MonadIO' m instance (MonadIO m, Applicative m, Functor m) => MonadIO' m +-- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude +mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a] +mergesortM cmp = mergesortM' cmp . map wrap + +mergesortM' :: Monad m => (a -> a -> m Ordering) -> [[a]] -> m [a] +mergesortM' _ [] = return [] +mergesortM' _ [xs] = return xs +mergesortM' cmp xss = mergesortM' cmp =<< (merge_pairsM cmp xss) + +merge_pairsM :: Monad m => (a -> a -> m Ordering) -> [[a]] -> m [[a]] +merge_pairsM _ [] = return [] +merge_pairsM _ [xs] = return [xs] +merge_pairsM cmp (xs:ys:xss) = liftM2 (:) (mergeM cmp xs ys) (merge_pairsM cmp xss) + +mergeM :: Monad m => (a -> a -> m Ordering) -> [a] -> [a] -> m [a] +mergeM _ [] ys = return ys +mergeM _ xs [] = return xs +mergeM cmp (x:xs) (y:ys) + = do + c <- x `cmp` y + case c of + GT -> liftM (y:) (mergeM cmp (x:xs) ys) + _ -> liftM (x:) (mergeM cmp xs (y:ys)) + +wrap :: a -> [a] +wrap x = [x] + shuffle :: [a] -> IO [a] -- ^ Randomly shuffle items in list shuffle list = shuffle' list (L.length list) <$> newStdGen