Merge pull request #16 from A1kmm/master

Add routedHost as an alternative to primary / secondaryOk that lets the user supply their own sort function
This commit is contained in:
Tony Hannan 2012-02-20 08:10:33 -08:00
commit a618ebbf95
2 changed files with 38 additions and 2 deletions

View file

@ -12,7 +12,7 @@ module Database.MongoDB.Connection (
globalConnectTimeout, connect, connect', globalConnectTimeout, connect, connect',
-- * Replica Set -- * Replica Set
ReplicaSetName, openReplicaSet, openReplicaSet', ReplicaSetName, openReplicaSet, openReplicaSet',
ReplicaSet, primary, secondaryOk, closeReplicaSet, replSetName ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where ) where
import Prelude hiding (lookup) import Prelude hiding (lookup)
@ -29,7 +29,7 @@ import Control.Applicative ((<$>))
import Data.UString (UString, unpack) import Data.UString (UString, unpack)
import Data.Bson as D (Document, lookup, at, (=:)) import Data.Bson as D (Document, lookup, at, (=:))
import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand) import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle) import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle, mergesortM)
import Data.List as L (lookup, intersect, partition, (\\), delete) import Data.List as L (lookup, intersect, partition, (\\), delete)
import Data.IORef (IORef, newIORef, readIORef) import Data.IORef (IORef, newIORef, readIORef)
import System.Timeout (timeout) import System.Timeout (timeout)
@ -146,6 +146,15 @@ secondaryOk rs = do
let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info) let hosts' = maybe hosts (\p -> delete p hosts ++ [p]) (statedPrimary info)
untilSuccess (connection rs Nothing) hosts' untilSuccess (connection rs Nothing) hosts'
routedHost :: ((Host, Bool) -> (Host, Bool) -> IOE Ordering) -> ReplicaSet -> IOE Pipe
-- ^ Return a connection to a host using a user-supplied sorting function, which sorts based on a tuple containing the host and a boolean indicating whether the host is primary.
routedHost f rs = do
info <- updateMembers rs
hosts <- lift $ shuffle (possibleHosts info)
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
untilSuccess (connection rs Nothing) hosts'
type ReplicaInfo = (Host, Document) type ReplicaInfo = (Host, Document)
-- ^ Result of isMaster command on host in replica set. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false -- ^ Result of isMaster command on host in replica set. Returned fields are: setName, ismaster, secondary, hosts, [primary]. primary only present when ismaster = false

View file

@ -30,6 +30,33 @@ deriving instance Ord PortID
class (MonadIO m, Applicative m, Functor m) => MonadIO' m class (MonadIO m, Applicative m, Functor m) => MonadIO' m
instance (MonadIO m, Applicative m, Functor m) => MonadIO' m instance (MonadIO m, Applicative m, Functor m) => MonadIO' m
-- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude
mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a]
mergesortM cmp = mergesortM' cmp . map wrap
mergesortM' :: Monad m => (a -> a -> m Ordering) -> [[a]] -> m [a]
mergesortM' _ [] = return []
mergesortM' _ [xs] = return xs
mergesortM' cmp xss = mergesortM' cmp =<< (merge_pairsM cmp xss)
merge_pairsM :: Monad m => (a -> a -> m Ordering) -> [[a]] -> m [[a]]
merge_pairsM _ [] = return []
merge_pairsM _ [xs] = return [xs]
merge_pairsM cmp (xs:ys:xss) = liftM2 (:) (mergeM cmp xs ys) (merge_pairsM cmp xss)
mergeM :: Monad m => (a -> a -> m Ordering) -> [a] -> [a] -> m [a]
mergeM _ [] ys = return ys
mergeM _ xs [] = return xs
mergeM cmp (x:xs) (y:ys)
= do
c <- x `cmp` y
case c of
GT -> liftM (y:) (mergeM cmp (x:xs) ys)
_ -> liftM (x:) (mergeM cmp xs (y:ys))
wrap :: a -> [a]
wrap x = [x]
shuffle :: [a] -> IO [a] shuffle :: [a] -> IO [a]
-- ^ Randomly shuffle items in list -- ^ Randomly shuffle items in list
shuffle list = shuffle' list (L.length list) <$> newStdGen shuffle list = shuffle' list (L.length list) <$> newStdGen