Fedor Gogolev 2012-06-10 23:47:14 +04:00
parent 17f528e835
commit dd6a3010f6
7 changed files with 174 additions and 113 deletions

@ -5,9 +5,11 @@
module Database.MongoDB.Admin (
-- * Admin
-- ** Collection
CollectionOption(..), createCollection, renameCollection, dropCollection, validateCollection,
CollectionOption(..), createCollection, renameCollection, dropCollection,
-- ** Index
Index(..), IndexName, index, ensureIndex, createIndex, dropIndex, getIndexes, dropIndexes,
Index(..), IndexName, index, ensureIndex, createIndex, dropIndex,
getIndexes, dropIndexes,
-- ** User
allUsers, addUser, removeUser,
-- ** Database
@ -27,20 +29,29 @@ module Database.MongoDB.Admin (
import Prelude hiding (lookup)
import Control.Applicative ((<$>))
import Database.MongoDB.Internal.Protocol (pwHash, pwKey)
import Database.MongoDB.Connection (Host, showHostPort)
import Database.MongoDB.Query
import Data.Bson
import Control.Monad.Reader
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.HashTable as H
import Data.IORef
import qualified Data.Set as S
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent (forkIO, threadDelay)
import Database.MongoDB.Internal.Util (MonadIO', (<.>), true1)
import Control.Monad (forever, unless)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Set (Set)
import System.IO.Unsafe (unsafePerformIO)
import qualified Data.HashTable as H
import qualified Data.Set as Set
import Control.Monad.Trans (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Bson (Document, Field(..), at, (=:), (=?), exclude, merge)
import Data.Text (Text)
import qualified Data.Text as T
import Database.MongoDB.Connection (Host, showHostPort)
import Database.MongoDB.Internal.Protocol (pwHash, pwKey)
import Database.MongoDB.Internal.Util (MonadIO', (<.>), true1)
import Database.MongoDB.Query (Action, Database, Collection, Username, Password,
Order, Query(..), accessMode, master, runCommand,
useDb, thisDatabase, rest, select, find, findOne,
insert_, save, delete)
-- * Admin
@ -109,9 +120,9 @@ ensureIndex :: (MonadIO' m) => Index -> Action m ()
ensureIndex idx = let k = (iColl idx, iName idx) in do
icache <- fetchIndexCache
set <- liftIO (readIORef icache)
unless (S.member k set) $ do
unless (Set.member k set) $ do
accessMode master (createIndex idx)
liftIO $ writeIORef icache (S.insert k set)
liftIO $ writeIORef icache (Set.insert k set)
createIndex :: (MonadIO' m) => Index -> Action m ()
-- ^ Create index on the server. This call goes to the server every time.
@ -140,7 +151,7 @@ dropIndexes coll = do
type DbIndexCache = H.HashTable Database IndexCache
-- ^ Cache the indexes we create so repeatedly calling ensureIndex only hits database the first time. Clear cache every once in a while so if someone else deletes index we will recreate it on ensureIndex.
type IndexCache = IORef (S.Set (Collection, IndexName))
type IndexCache = IORef (Set (Collection, IndexName))
dbIndexCache :: DbIndexCache
-- ^ initialize cache and fork thread that clears it every 15 minutes
@ -164,7 +175,7 @@ fetchIndexCache = do
maybe (newIdxCache db) return mc
newIdxCache db = do
idx <- newIORef S.empty
idx <- newIORef Set.empty
H.insert dbIndexCache db idx
return idx
@ -172,7 +183,7 @@ resetIndexCache :: (MonadIO m) => Action m ()
-- ^ reset index cache for current database
resetIndexCache = do
icache <- fetchIndexCache
liftIO (writeIORef icache S.empty)
liftIO (writeIORef icache Set.empty)
-- ** User

@ -8,33 +8,43 @@ module Database.MongoDB.Connection (
-- * Connection
Pipe, close, isClosed,
-- * Server
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, readHostPortM,
globalConnectTimeout, connect, connect',
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect',
-- * Replica Set
ReplicaSetName, openReplicaSet, openReplicaSet',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where
import Prelude hiding (lookup)
import Database.MongoDB.Internal.Protocol (Pipe, newPipe)
import System.IO.Pipeline (IOE, close, isClosed)
import Control.Exception as E (try)
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersect, partition, (\\), delete)
import Control.Applicative ((<$>))
import Control.Monad (forM_)
import Network (HostName, PortID(..), connectTo)
import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>))
import System.IO.Unsafe (unsafePerformIO)
import System.Timeout (timeout)
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof,
spaces, try, (<|>))
import qualified Control.Exception as E
import qualified Data.List as List
import Control.Monad.Identity (runIdentity)
import Control.Monad.Error (ErrorT(..), lift, throwError)
import Control.Concurrent.MVar.Lifted
import Control.Monad (forM_)
import Control.Applicative ((<$>))
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
import Data.Bson (Document, at, (=:))
import Data.Text (Text)
import qualified Data.Bson as B
import qualified Data.Text as T
import Data.Bson as D (Document, lookup, at, (=:))
import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle, mergesortM)
import Data.List as L (lookup, intersect, partition, (\\), delete)
import Data.IORef (IORef, newIORef, readIORef)
import System.Timeout (timeout)
import System.IO.Unsafe (unsafePerformIO)
import Database.MongoDB.Internal.Protocol (Pipe, newPipe)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE,
updateAssocs, shuffle, mergesortM)
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
slaveOk, runCommand)
import System.IO.Pipeline (IOE, close, isClosed)
adminCommand :: Command -> Pipe -> IOE Document
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
@ -75,7 +85,7 @@ readHostPortM = either (fail . show) return . parse parser "readHostPort" where
parser = do
h <- hostname
T.try (spaces >> eof >> return (host h)) <|> do
try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
port :: Int <- read <$> many1 digit
spaces >> eof
@ -161,7 +171,7 @@ type ReplicaInfo = (Host, Document)
statedPrimary :: ReplicaInfo -> Maybe Host
-- ^ Primary of replica set or Nothing if there isn't one
statedPrimary (host', info) = if (at "ismaster" info) then Just host' else readHostPort <$> D.lookup "primary" info
statedPrimary (host', info) = if (at "ismaster" info) then Just host' else readHostPort <$> B.lookup "primary" info
possibleHosts :: ReplicaInfo -> [Host]
-- ^ Non-arbiter, non-hidden members of replica set
@ -186,7 +196,7 @@ fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IOE ReplicaInfo
fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
pipe <- connection rs mPipe host'
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
case D.lookup "setName" info of
case B.lookup "setName" info of
Nothing -> throwError $ userError $ show host' ++ " not a member of any replica set, including " ++ T.unpack rsName ++ ": " ++ show info
Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info
Just _ -> return (host', info)
@ -198,7 +208,7 @@ connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
conn = modifyMVar vMembers $ \members -> do
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
case L.lookup host' members of
case List.lookup host' members of
Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe)
_ -> new

@ -1,8 +1,12 @@
{-| Low-level messaging between this client and the MongoDB server, see Mongo Wire Protocol (<>).
-- | Low-level messaging between this client and the MongoDB server, see Mongo
-- Wire Protocol (<>).
-- This module is not intended for direct use. Use the high-level interface at
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.
This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts, TupleSections, TypeSynonymInstances #-}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
module Database.MongoDB.Internal.Protocol (
@ -18,29 +22,35 @@ module Database.MongoDB.Internal.Protocol (
Username, Password, Nonce, pwHash, pwKey
) where
import Prelude as X
import Control.Applicative ((<$>))
import Control.Arrow ((***))
import Data.ByteString.Lazy as B (length, hPut)
import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..))
import qualified System.IO.Pipeline as P (send, call)
import System.IO (Handle, hClose)
import Data.Bson (Document)
import Data.Bson.Binary
import Data.Binary.Put
import Data.Binary.Get
import Data.Int
import Data.Bits
import Data.IORef
import Control.Exception (try)
import Control.Monad (forM_, replicateM, unless)
import Data.Binary.Get (Get, runGet)
import Data.Binary.Put (Put, runPut)
import Data.Bits (bit, testBit)
import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle, hClose, hFlush)
import System.IO.Unsafe (unsafePerformIO)
import qualified Data.ByteString.Lazy as L
import Control.Monad.Error (ErrorT(..))
import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson (Document)
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
putInt64, putCString)
import Data.Text (Text)
import qualified Crypto.Hash.MD5 as MD5 (hash)
import qualified Crypto.Hash.MD5 as MD5
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import Control.Exception as E (try)
import Control.Monad.Error
import System.IO (hFlush)
import Database.MongoDB.Internal.Util (whenJust, hGetN, bitOr, byteStringHex)
import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..))
import qualified System.IO.Pipeline as P
-- * Pipe
@ -73,17 +83,17 @@ type Message = ([Notice], Maybe (Request, RequestId))
writeMessage :: Handle -> Message -> IOE ()
-- ^ Write message to socket
writeMessage handle (notices, mRequest) = ErrorT . E.try $ do
writeMessage handle (notices, mRequest) = ErrorT . try $ do
forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId
whenJust mRequest $ writeReq . (Right *** id)
hFlush handle
writeReq (e, requestId) = do
hPut handle lenBytes
hPut handle bytes
L.hPut handle lenBytes
L.hPut handle bytes
bytes = runPut $ (either putNotice putRequest e) requestId
lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes
lenBytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4)
type Response = (ResponseTo, Reply)
@ -91,7 +101,7 @@ type Response = (ResponseTo, Reply)
readMessage :: Handle -> IOE Response
-- ^ read response from socket
readMessage handle = ErrorT $ E.try readResp where
readMessage handle = ErrorT $ try readResp where
readResp = do
len <- fromEnum . decodeSize <$> hGetN handle 4
runGet getReply <$> hGetN handle len
@ -196,7 +206,7 @@ putNotice notice requestId = do
putDocument dSelector
KillCursors{..} -> do
putInt32 0
putInt32 $ toEnum (X.length kCursorIds)
putInt32 $ toEnum (length kCursorIds)
mapM_ putInt64 kCursorIds
iBit :: InsertOption -> Int32

@ -7,23 +7,27 @@
module Database.MongoDB.Internal.Util where
import Control.Applicative (Applicative(..), (<$>))
import Network (PortID(..))
import Control.Arrow (left)
import Control.Exception (assert)
import Control.Monad (liftM, liftM2)
import Data.Bits (Bits, (.|.))
import Data.Bson
import Data.ByteString.Lazy as S (ByteString, length, append, hGet)
import Data.Word (Word8)
import Network (PortID(..))
import Numeric (showHex)
import System.IO (Handle)
import System.IO.Error (mkIOError, eofErrorType)
import Control.Exception (assert)
import Control.Monad.Error
import Control.Arrow (left)
import Data.Text (Text)
import qualified Data.ByteString as BS (ByteString, unpack)
import qualified Data.Text as T
import Data.Word (Word8)
import Numeric (showHex)
import System.Random.Shuffle (shuffle')
import System.Random (newStdGen)
import Data.List as L (length)
import System.Random.Shuffle (shuffle')
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as S
import Control.Monad.Error (MonadError(..), ErrorT(..), Error(..))
import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson
import Data.Text (Text)
import qualified Data.Text as T
deriving instance Show PortID
deriving instance Eq PortID
@ -62,7 +66,7 @@ wrap x = [x]
shuffle :: [a] -> IO [a]
-- ^ Randomly shuffle items in list
shuffle list = shuffle' list (L.length list) <$> newStdGen
shuffle list = shuffle' list (length list) <$> newStdGen
loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
@ -110,18 +114,18 @@ true1 k doc = case valueAt k doc of
Int64 n -> n == 1
_ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc
hGetN :: Handle -> Int -> IO ByteString
hGetN :: Handle -> Int -> IO L.ByteString
-- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then raise EOF exception.
hGetN h n = assert (n >= 0) $ do
bytes <- hGet h n
let x = fromEnum $ S.length bytes
bytes <- L.hGet h n
let x = fromEnum $ L.length bytes
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "hGetN" (Just h) Nothing)
else S.append bytes <$> hGetN h (n - x)
else L.append bytes <$> hGetN h (n - x)
byteStringHex :: BS.ByteString -> String
byteStringHex :: S.ByteString -> String
-- ^ Hexadecimal string representation of a byte string. Each byte yields two hexadecimal characters.
byteStringHex = concatMap byteHex . BS.unpack
byteStringHex = concatMap byteHex . S.unpack
byteHex :: Word8 -> String
-- ^ Two char hexadecimal representation of byte

@ -25,38 +25,57 @@ module Database.MongoDB.Query (
delete, deleteOne,
-- * Read
-- ** Query
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial), Projector, Limit, Order, BatchSize,
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
Projector, Limit, Order, BatchSize,
explain, find, findOne, fetch, count, distinct,
-- *** Cursor
Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed,
-- ** Group
Group(..), GroupKey(..), group,
-- ** MapReduce
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..), MRResult, mapReduce, runMR, runMR',
MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..),
MRResult, mapReduce, runMR, runMR',
-- * Command
Command, runCommand, runCommand1,
) where
import Prelude as X hiding (lookup)
import Prelude hiding (lookup)
import Control.Applicative (Applicative, (<$>))
import Control.Monad (unless, replicateM, liftM)
import Data.Int (Int32)
import Data.Maybe (listToMaybe, catMaybes)
import Data.Word (Word32)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
readMVar, modifyMVar)
import Control.Monad.Base (MonadBase(liftBase))
import Control.Monad.Error (ErrorT, Error(..), MonadError, runErrorT,
import Control.Monad.Reader (ReaderT, runReaderT, ask, asks, local)
import Control.Monad.RWS (RWST)
import Control.Monad.State (StateT)
import Control.Monad.Trans (MonadIO, MonadTrans, lift, liftIO)
import Control.Monad.Trans.Control (ComposeSt, MonadBaseControl(..),
MonadTransControl(..), StM, StT,
defaultLiftBaseWith, defaultRestoreM)
import Control.Monad.Writer (WriterT, Monoid)
import Data.Bson (Document, Field(..), Label, Value(String,Doc), Javascript,
at, valueAt, lookup, look, genObjectId, (=:), (=?))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Bson (Document, at, valueAt, lookup, look, Field(..), (=:), (=?), Label, Value(String,Doc), Javascript, genObjectId)
import Database.MongoDB.Internal.Protocol (Pipe, Notice(..), Request(GetMore, qOptions, qFullCollection, qSkip, qBatchSize, qSelector, qProjector), Reply(..), QueryOption(..), ResponseFlag(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, FullCollection, Username, Password, pwKey)
import qualified Database.MongoDB.Internal.Protocol as P (send, call, Request(Query))
import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..),
ResponseFlag(..), InsertOption(..),
UpdateOption(..), DeleteOption(..),
CursorId, FullCollection, Username,
Password, Pipe, Notice(..),
Request(GetMore, qOptions, qSkip,
qFullCollection, qBatchSize,
qSelector, qProjector),
import Database.MongoDB.Internal.Util (MonadIO', loop, liftIOE, true1, (<.>))
import Control.Concurrent.MVar.Lifted
import Control.Monad.Error
import Control.Monad.Reader
import Control.Monad.State (StateT)
import Control.Monad.Writer (WriterT, Monoid)
import Control.Monad.RWS (RWST)
import Control.Monad.Base (MonadBase(liftBase))
import Control.Monad.Trans.Control (ComposeSt, MonadBaseControl(..), MonadTransControl(..), StM, StT, defaultLiftBaseWith, defaultRestoreM)
import Control.Applicative (Applicative, (<$>))
import Data.Maybe (listToMaybe, catMaybes)
import Data.Int (Int32)
import Data.Word (Word32)
import qualified Database.MongoDB.Internal.Protocol as P
-- * Monad
@ -298,7 +317,7 @@ insert' opts col docs = do
assignId :: Document -> IO Document
-- ^ Assign a unique value to _id field if missing
assignId doc = if X.any (("_id" ==) . label) doc
assignId doc = if any (("_id" ==) . label) doc
then return doc
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId

@ -13,11 +13,15 @@ module System.IO.Pipeline (
) where
import Prelude hiding (length)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Concurrent.Chan
import Control.Concurrent.MVar.Lifted
import Control.Monad.Error
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Monad (forever)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad.Trans (liftIO)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, addMVarFinalizer)
import Control.Monad.Error (ErrorT(ErrorT), runErrorT)
onException :: (Monad m) => ErrorT e m a -> m () -> ErrorT e m a
-- ^ If first action throws an exception then run second action then re-throw

@ -5,12 +5,15 @@
module System.IO.Pool where
import Control.Applicative ((<$>))
import Control.Concurrent.MVar.Lifted
import Data.Array.IO
import Data.Maybe (catMaybes)
import Control.Monad.Error
import System.Random (randomRIO)
import Control.Exception (assert)
import Data.Array.IO (IOArray, readArray, writeArray, newArray, newListArray,
getElems, getBounds, rangeSize, range)
import Data.Maybe (catMaybes)
import System.Random (randomRIO)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar_)
import Control.Monad.Error (ErrorT, Error)
import Control.Monad.Trans (liftIO)
-- | Creator, destroyer, and checker of resources of type r. Creator may throw error or type e.
data Factory e r = Factory {