Single param Network instance. NetworkIO instance of IO that yields Internet so user does not have to supply it

This commit is contained in:
Tony Hannan 2010-12-27 00:23:02 -05:00
parent 111d9a2f72
commit 73012bb430
10 changed files with 118 additions and 110 deletions

View file

@ -6,20 +6,20 @@ Simple example below. Use with language extension /OvererloadedStrings/.
> {-# LANGUAGE OverloadedStrings #-}
>
> import Database.MongoDB
> import Data.UString (u)
> import Data.CompactString () -- Show and IsString instances of UString
> import Control.Monad.Trans (liftIO)
>
> main = do
> pool <- newConnPool Internet 1 (host "127.0.0.1")
> pool <- newConnPool 1 (host "127.0.0.1")
> e <- access safe Master pool run
> print e
>
> run = use (Database "baseball") $ do
> clearTeams
> insertTeams
> print' "All Teams" =<< allTeams
> print' "National League Teams" =<< nationalLeagueTeams
> print' "New York Teams" =<< newYorkTeams
> printDocs "All Teams" =<< allTeams
> printDocs "National League Teams" =<< nationalLeagueTeams
> printDocs "New York Teams" =<< newYorkTeams
>
> clearTeams = delete (select [] "team")
>
@ -35,7 +35,8 @@ Simple example below. Use with language extension /OvererloadedStrings/.
>
> newYorkTeams = rest =<< find (select ["home.state" =: u"NY"] "team") {project = ["name" =: (1 :: Int), "league" =: (1 :: Int)]}
>
> print' title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
> printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
>
-}
module Database.MongoDB (

View file

@ -31,7 +31,7 @@ import Database.MongoDB.Internal.Protocol (pwHash, pwKey)
import Database.MongoDB.Connection (Host, showHostPort)
import Database.MongoDB.Query
import Data.Bson
import Data.UString (pack, unpack, append, intercalate)
import Data.UString (pack, append, intercalate)
import Control.Monad.Reader
import qualified Data.HashTable as T
import Data.IORef
@ -183,8 +183,8 @@ addUser :: (DbAccess m) => Bool -> Username -> Password -> m ()
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do
mu <- findOne (select ["user" =: user] "system.users")
let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" u
let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" usr
removeUser :: (DbAccess m) => Username -> m ()
removeUser user = delete (select ["user" =: user] "system.users")
@ -205,9 +205,9 @@ copyDatabase (Database fromDb) fromHost mup (Database toDb) = do
let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb]
use admin $ case mup of
Nothing -> runCommand c
Just (u, p) -> do
Just (usr, pss) -> do
n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost]
runCommand $ c ++ ["username" =: u, "nonce" =: n, "key" =: pwKey n u p]
runCommand $ c ++ ["username" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
dropDatabase :: (Access m) => Database -> m Document
-- ^ Delete the given database!

View file

@ -3,8 +3,8 @@
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, RecordWildCards, NamedFieldPuns, MultiParamTypeClasses, FlexibleContexts, TypeFamilies, DoRec, RankNTypes, FlexibleInstances #-}
module Database.MongoDB.Connection (
-- * Network
Network', ANetwork', Internet(..),
-- * Pipe
Pipe,
-- * Host
Host(..), PortID(..), host, showHostPort, readHostPort, readHostPortM,
-- * ReplicaSet
@ -12,19 +12,19 @@ module Database.MongoDB.Connection (
-- * MasterOrSlaveOk
MasterOrSlaveOk(..),
-- * Connection Pool
Server(..), newConnPool',
Service(..),
connHost, replicaSet
) where
import Database.MongoDB.Internal.Protocol as X
import Network.Abstract (IOE, connect, ANetwork(..))
import qualified Network.Abstract as C
import Network.Abstract (IOE, NetworkIO, ANetwork)
import Data.Bson ((=:), at, UString)
import Control.Pipeline as P
import Control.Applicative ((<$>))
import Control.Exception (assert)
import Control.Monad.Error
import Control.Monad.MVar
import Control.Monad.Context
import Network (HostName, PortID(..))
import Data.Bson (Document, look)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
@ -148,26 +148,23 @@ isMS SlaveOk i = isSecondary i || isPrimary i -}
type Pool' = Pool IOError
-- | A Server is a single server ('Host') or a replica set of servers ('ReplicaSet')
class Server t where
-- | A Service is a single server ('Host') or a replica set of servers ('ReplicaSet')
class Service t where
data ConnPool t
-- ^ A pool of TCP connections ('Pipe's) to a host or a replica set of hosts
newConnPool :: (Network' n, MonadIO' m) => n -> Int -> t -> m (ConnPool t)
newConnPool :: (NetworkIO m) => Int -> t -> m (ConnPool t)
-- ^ Create a ConnectionPool to a host or a replica set of hosts. Actual TCP connection is not attempted until 'getPipe' request, so no IOError can be raised here. Up to N TCP connections will be established to each host.
getPipe :: MasterOrSlaveOk -> ConnPool t -> IOE Pipe
-- ^ Return a TCP connection (Pipe) to the master or a slave in the server. Master must connect to the master, SlaveOk may connect to a slave or master. To spread the load, SlaveOk requests are distributed amongst all hosts in the server. Throw IOError if failed to connect to right type of host (Master/SlaveOk).
killPipes :: ConnPool t -> IO ()
-- ^ Kill all open pipes (TCP Connections). Will cause any users of them to fail. Alternatively you can let them die on their own when they get garbage collected.
newConnPool' :: (Server t, MonadIO' m, Context ANetwork' m) => Int -> t -> m (ConnPool t)
newConnPool' poolSize' host' = context >>= \(ANetwork net :: ANetwork') -> newConnPool net poolSize' host'
-- ** ConnectionPool Host
instance Server Host where
instance Service Host where
data ConnPool Host = HostConnPool {connHost :: Host, connPool :: Pool' Pipe}
-- ^ A pool of TCP connections ('Pipe's) to a server, handed out in round-robin style.
newConnPool net poolSize' host' = liftIO $ newHostConnPool (ANetwork net) poolSize' host'
newConnPool poolSize' host' = liftIO . newHostConnPool poolSize' host' =<< C.network
-- ^ Create a connection pool to server (host or replica set)
getPipe _ = getHostPipe
-- ^ Return a TCP connection (Pipe). If SlaveOk, connect to a slave if available. Round-robin if multiple slaves are available. Throw IOError if failed to connect.
@ -176,9 +173,9 @@ instance Server Host where
instance Show (ConnPool Host) where
show HostConnPool{connHost} = "ConnPool " ++ show connHost
newHostConnPool :: ANetwork' -> Int -> Host -> IO (ConnPool Host)
newHostConnPool :: Int -> Host -> ANetwork -> IO (ConnPool Host)
-- ^ Create a pool of N 'Pipe's (TCP connections) to server. 'getHostPipe' will return one of those pipes, round-robin style.
newHostConnPool net poolSize' host' = HostConnPool host' <$> newPool Factory{..} poolSize' where
newHostConnPool poolSize' host' net = HostConnPool host' <$> newPool Factory{..} poolSize' where
newResource = tcpConnect net host'
killResource = P.close
isExpired = P.isClosed
@ -187,18 +184,18 @@ getHostPipe :: ConnPool Host -> IOE Pipe
-- ^ Return next pipe (TCP connection) in connection pool, round-robin style. Throw IOError if can't connect to host.
getHostPipe (HostConnPool _ pool) = aResource pool
tcpConnect :: ANetwork' -> Host -> IOE Pipe
tcpConnect :: ANetwork -> Host -> IOE Pipe
-- ^ Create a TCP connection (Pipe) to the given host. Throw IOError if can't connect.
tcpConnect net (Host hostname port) = newPipeline =<< connect net (hostname, port)
tcpConnect net (Host hostname port) = newPipeline =<< C.connect net (C.Server hostname port)
-- ** Connection ReplicaSet
instance Server ReplicaSet where
instance Service ReplicaSet where
data ConnPool ReplicaSet = ReplicaSetConnPool {
network :: ANetwork',
network :: ANetwork,
repsetName :: Name,
currentMembers :: MVar [ConnPool Host] } -- master at head after a refresh
newConnPool net poolSize' repset = liftIO $ newSetConnPool (ANetwork net) poolSize' repset
newConnPool poolSize' repset = liftIO . newSetConnPool poolSize' repset =<< C.network
getPipe = getSetPipe
killPipes ReplicaSetConnPool{..} = withMVar currentMembers (mapM_ killPipes)
@ -209,10 +206,10 @@ replicaSet :: (MonadIO' m) => ConnPool ReplicaSet -> m ReplicaSet
-- ^ Return replicas set name with current members as seed list
replicaSet ReplicaSetConnPool{..} = ReplicaSet repsetName . map connHost <$> readMVar currentMembers
newSetConnPool :: ANetwork' -> Int -> ReplicaSet -> IO (ConnPool ReplicaSet)
newSetConnPool :: Int -> ReplicaSet -> ANetwork -> IO (ConnPool ReplicaSet)
-- ^ Create a connection pool to each member of the replica set.
newSetConnPool net poolSize' repset = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (newHostConnPool net poolSize') (seedHosts repset)
newSetConnPool poolSize' repset net = assert (not . null $ seedHosts repset) $ do
currentMembers <- newMVar =<< mapM (\h -> newHostConnPool poolSize' h net) (seedHosts repset)
return $ ReplicaSetConnPool net (setName repset) currentMembers
getMembers :: Name -> [ConnPool Host] -> IOE [Host]
@ -220,13 +217,13 @@ getMembers :: Name -> [ConnPool Host] -> IOE [Host]
-- TODO: Verify config for request replica set name and not some other replica set. ismaster config should include replica set name in result but currently does not.
getMembers _repsetName connections = hosts <$> untilSuccess (getReplicaInfo <=< getHostPipe) connections
refreshMembers :: ANetwork' -> Name -> [ConnPool Host] -> IOE [ConnPool Host]
refreshMembers :: ANetwork -> Name -> [ConnPool Host] -> IOE [ConnPool Host]
-- ^ Update current members with master at head. Reuse unchanged members. Throw IOError if can't connect to any and fetch config. Dropped connections are not closed in case they still have users; they will be closed when garbage collected.
refreshMembers net repsetName connections = do
n <- liftIO . poolSize . connPool $ head connections
mapM (liftIO . connection n) =<< getMembers repsetName connections
where
connection n host' = maybe (newHostConnPool net n host') return mc where
connection n host' = maybe (newHostConnPool n host' net) return mc where
mc = find ((host' ==) . connHost) connections

View file

@ -5,8 +5,6 @@ This module is not intended for direct use. Use the high-level interface at "Dat
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
module Database.MongoDB.Internal.Protocol (
-- * Network
Network', ANetwork', Internet(..),
-- * Pipe
Pipe, send, call,
-- * Message
@ -24,7 +22,6 @@ module Database.MongoDB.Internal.Protocol (
import Prelude as X
import Control.Applicative ((<$>))
import Control.Arrow ((***))
import System.IO (Handle)
import Data.ByteString.Lazy as B (length, hPut)
import qualified Control.Pipeline as P
import Data.Bson (Document, UString)
@ -40,58 +37,13 @@ import Data.UString as U (pack, append, toByteString)
import System.IO.Error as E (try)
import Control.Monad.Error
import Control.Monad.Util (whenJust)
import Network.Abstract (IOE, ANetwork, Network(..), Connection(Connection))
import Network (connectTo)
import System.IO (hFlush, hClose)
import Network.Abstract hiding (send)
import System.IO (hFlush)
import Database.MongoDB.Internal.Util (hGetN, bitOr)
-- * Network
-- Network -> Server -> (Sink, Source)
-- (Sink, Source) -> Pipeline
type Message = ([Notice], Maybe (Request, RequestId))
-- ^ Write notice(s), write notice(s) with getLastError request, or just query request
-- Note, that requestId will be out of order because request ids will be generated for notices, after the request id supplied was generated. This is ok because the mongo server does not care about order they are just used as unique identifiers.
type Response = (ResponseTo, Reply)
class (Network n Message Response) => Network' n
instance (Network n Message Response) => Network' n
type ANetwork' = ANetwork Message Response
data Internet = Internet
-- ^ Normal Network instance, i.e. no logging or replay
-- | Connect to server. Write messages and receive replies; not thread-safe!
instance Network Internet Message Response where
connect _ (hostname, portid) = ErrorT . E.try $ do
handle <- connectTo hostname portid
return $ Connection (sink handle) (source handle) (hClose handle)
where
sink h (notices, mRequest) = ErrorT . E.try $ do
forM_ notices $ \n -> writeReq h . (Left n,) =<< genRequestId
whenJust mRequest $ writeReq h . (Right *** id)
hFlush h
source h = ErrorT . E.try $ readResp h
writeReq :: Handle -> (Either Notice Request, RequestId) -> IO ()
writeReq handle (e, requestId) = do
hPut handle lenBytes
hPut handle bytes
where
bytes = runPut $ (either putNotice putRequest e) requestId
lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes
encodeSize = runPut . putInt32 . (+ 4)
readResp :: Handle -> IO (ResponseTo, Reply)
readResp handle = do
len <- fromEnum . decodeSize <$> hGetN handle 4
runGet getReply <$> hGetN handle len
where
decodeSize = subtract 4 . runGet getInt32
-- * Pipe
type Pipe = P.Pipeline Message Response
@ -111,7 +63,35 @@ call pipe notices request = do
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- * Messages
-- * Message
type Message = ([Notice], Maybe (Request, RequestId))
-- ^ A write notice(s), write notice(s) with getLastError request, or just query request.
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
instance WriteMessage Message where
writeMessage handle (notices, mRequest) = ErrorT . E.try $ do
forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId
whenJust mRequest $ writeReq . (Right *** id)
hFlush handle
where
writeReq (e, requestId) = do
hPut handle lenBytes
hPut handle bytes
where
bytes = runPut $ (either putNotice putRequest e) requestId
lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes
encodeSize = runPut . putInt32 . (+ 4)
type Response = (ResponseTo, Reply)
-- ^ Message received from a Mongo server in response to a Request
instance ReadMessage Response where
readMessage handle = ErrorT . E.try $ readResp where
readResp = do
len <- fromEnum . decodeSize <$> hGetN handle 4
runGet getReply <$> hGetN handle len
decodeSize = subtract 4 . runGet getInt32
type FullCollection = UString
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"

View file

@ -49,12 +49,12 @@ import Control.Monad.Throw
import Control.Monad.MVar
import qualified Database.MongoDB.Internal.Protocol as P
import Database.MongoDB.Internal.Protocol hiding (Query, QueryOption(..), send, call)
import Database.MongoDB.Connection (MasterOrSlaveOk(..), Server(..))
import Database.MongoDB.Connection (MasterOrSlaveOk(..), Service(..))
import Data.Bson
import Data.Word
import Data.Int
import Data.Maybe (listToMaybe, catMaybes)
import Data.UString as U (dropWhile, any, tail, unpack)
import Data.UString as U (dropWhile, any, tail)
import Control.Monad.Util (MonadIO', loop)
import Database.MongoDB.Internal.Util ((<.>), true1)
@ -63,7 +63,7 @@ mapErrorIO f = throwLeft' f . liftIO . runErrorT
-- * Mongo Monad
access :: (Server s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> ConnPool s -> Action m a -> m (Either Failure a)
access :: (Service s, MonadIO m) => WriteMode -> MasterOrSlaveOk -> ConnPool s -> Action m a -> m (Either Failure a)
-- ^ Run action under given write and read mode against the server or replicaSet behind given connection pool. Return Left Failure if there is a connection failure or read/write error.
access w mos pool act = do
ePipe <- liftIO . runErrorT $ getPipe mos pool
@ -124,9 +124,9 @@ thisDatabase = context
auth :: (DbAccess m) => Username -> Password -> m Bool
-- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe.
auth u p = do
auth usr pss = do
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: u, "nonce" =: n, "key" =: pwKey n u p]
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
-- * Collection

View file

@ -1,20 +1,33 @@
-- | Generalize a network connection to a sink and source
{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE MultiParamTypeClasses, ExistentialQuantification, FlexibleContexts, FlexibleInstances, OverlappingInstances, UndecidableInstances #-}
module Network.Abstract where
import Network (HostName, PortID)
import System.IO (Handle, hClose)
import Network (HostName, PortID, connectTo)
import Control.Monad.Error
import System.IO.Error (try)
import Control.Monad.Context
import Control.Monad.Util (MonadIO')
type IOE = ErrorT IOError IO
-- ^ Be explicit about exception that may be raised.
type Server = (HostName, PortID)
data Server i o = Server HostName PortID
-- ^ A server receives messages of type i and returns messages of type o.
-- | Serialize message over handle
class WriteMessage i where
writeMessage :: Handle -> i -> IOE ()
-- | Deserialize message from handle
class ReadMessage o where
readMessage :: Handle -> IOE o
-- | A network controls connections to other hosts. It may want to overide to log messages or play them back.
-- A server in the network accepts messages of type i and generates messages of type o.
class Network n i o where
connect :: n -> Server -> IOE (Connection i o)
class Network n where
connect :: (WriteMessage i, ReadMessage o) => n -> Server i o -> IOE (Connection i o)
-- ^ Connect to Server returning the send sink and receive source, throw IOError if can't connect.
data Connection i o = Connection {
@ -22,11 +35,28 @@ data Connection i o = Connection {
receive :: IOE o,
close :: IO () }
data ANetwork i o = forall n. (Network n i o) => ANetwork n
data ANetwork = forall n. (Network n) => ANetwork n
instance Network (ANetwork i o) i o where
instance Network (ANetwork) where
connect (ANetwork n) = connect n
data Internet = Internet
-- ^ Normal Network instance, i.e. no logging or replay
-- | Connect to server. Write messages and receive replies. Not thread-safe, must be wrapped in Pipeline or something.
instance Network Internet where
connect _ (Server hostname portid) = ErrorT . try $ do
handle <- connectTo hostname portid
return $ Connection (writeMessage handle) (readMessage handle) (hClose handle)
class (MonadIO' m) => NetworkIO m where
network :: m ANetwork
instance (Context ANetwork m, MonadIO' m) => NetworkIO m where
network = context
instance NetworkIO IO where
network = return (ANetwork Internet)
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10gen Inc.

1
TODO
View file

@ -10,6 +10,7 @@ Bson
MongoDB
-------
+ Support MapReduce 1.8 version
+ When one connection in a pool fails, close all other since they will likely fail too
+ on insert/update: reject keys that start with "$" or "."
+ dereference dbref

View file

@ -19,7 +19,8 @@ map/reduce queries on:
Prelude> :set prompt "> "
> :set -XOverloadedStrings
> import Database.MongoDB
> conn <- newConnPool Internet 1 (host "localhost")
> import Data.CompactString ()
> conn <- newConnPool 1 (host "localhost")
> let run act = access safe Master conn $ use (Database "test") act
> :{
run $ insertMany "mr1" [

View file

@ -1,5 +1,5 @@
name: mongoDB
version: 0.9
version: 0.9.1
build-type: Simple
license: OtherLicense
license-file: LICENSE
@ -16,7 +16,8 @@ build-depends:
nano-md5 -any,
network -any,
parsec -any,
random -any
random -any,
compact-string-fix -any
stability: alpha
homepage: http://github.com/TonyGen/mongoDB-haskell
package-url:

View file

@ -41,26 +41,23 @@ Import the MongoDB driver library, and set
OverloadedStrings so literal strings are converted to UTF-8 automatically.
> import Database.MongoDB
> import Data.CompactString ()
> :set -XOverloadedStrings
Making A Connection
-------------------
Create a connection pool for your mongo server, using the standard port (27017):
> pool <- newConnPool Internet 1 $ host "127.0.0.1"
> pool <- newConnPool 1 $ host "127.0.0.1"
or for a non-standard port
> pool <- newConnPool Internet 1 $ Host "127.0.0.1" (PortNumber 30000)
> pool <- newConnPool 1 $ Host "127.0.0.1" (PortNumber 30000)
*newConnPool* takes the *network*, the connection pool size, and the host to connect to. It returns
*newConnPool* takes the connection pool size, and the host to connect to. It returns
a *ConnPool*, which is a potential pool of TCP connections. They are not created until first
access, so it is not possible to get a connection error here.
The network parameter allows you to override normal communications to, for example, log
or replay messages sent and received from servers. *Internet* is the normal communication mode
with no logging/replay.
Note, plain IO code in this driver never raises an exception unless it invokes third party IO
code that does. Driver code that may throw an exception says so in its Monad type,
for example, *ErrorT IOError IO a*.