diff --git a/Database/MongoDB/Admin.hs b/Database/MongoDB/Admin.hs index cc16013..2aa3a87 100644 --- a/Database/MongoDB/Admin.hs +++ b/Database/MongoDB/Admin.hs @@ -5,9 +5,11 @@ module Database.MongoDB.Admin ( -- * Admin -- ** Collection - CollectionOption(..), createCollection, renameCollection, dropCollection, validateCollection, + CollectionOption(..), createCollection, renameCollection, dropCollection, + validateCollection, -- ** Index - Index(..), IndexName, index, ensureIndex, createIndex, dropIndex, getIndexes, dropIndexes, + Index(..), IndexName, index, ensureIndex, createIndex, dropIndex, + getIndexes, dropIndexes, -- ** User allUsers, addUser, removeUser, -- ** Database @@ -27,20 +29,29 @@ module Database.MongoDB.Admin ( import Prelude hiding (lookup) import Control.Applicative ((<$>)) -import Database.MongoDB.Internal.Protocol (pwHash, pwKey) -import Database.MongoDB.Connection (Host, showHostPort) -import Database.MongoDB.Query -import Data.Bson -import Control.Monad.Reader -import Data.Text (Text) -import qualified Data.Text as T -import qualified Data.HashTable as H -import Data.IORef -import qualified Data.Set as S -import System.IO.Unsafe (unsafePerformIO) import Control.Concurrent (forkIO, threadDelay) -import Database.MongoDB.Internal.Util (MonadIO', (<.>), true1) +import Control.Monad (forever, unless) +import Data.IORef (IORef, newIORef, readIORef, writeIORef) +import Data.Set (Set) +import System.IO.Unsafe (unsafePerformIO) + +import qualified Data.HashTable as H +import qualified Data.Set as Set + +import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl) +import Data.Bson (Document, Field(..), at, (=:), (=?), exclude, merge) +import Data.Text (Text) + +import qualified Data.Text as T + +import Database.MongoDB.Connection (Host, showHostPort) +import Database.MongoDB.Internal.Protocol (pwHash, pwKey) +import Database.MongoDB.Internal.Util (MonadIO', (<.>), true1) +import Database.MongoDB.Query (Action, Database, Collection, Username, Password, + Order, Query(..), accessMode, master, runCommand, + useDb, thisDatabase, rest, select, find, findOne, + insert_, save, delete) -- * Admin @@ -109,9 +120,9 @@ ensureIndex :: (MonadIO' m) => Index -> Action m () ensureIndex idx = let k = (iColl idx, iName idx) in do icache <- fetchIndexCache set <- liftIO (readIORef icache) - unless (S.member k set) $ do + unless (Set.member k set) $ do accessMode master (createIndex idx) - liftIO $ writeIORef icache (S.insert k set) + liftIO $ writeIORef icache (Set.insert k set) createIndex :: (MonadIO' m) => Index -> Action m () -- ^ Create index on the server. This call goes to the server every time. @@ -140,7 +151,7 @@ dropIndexes coll = do type DbIndexCache = H.HashTable Database IndexCache -- ^ Cache the indexes we create so repeatedly calling ensureIndex only hits database the first time. Clear cache every once in a while so if someone else deletes index we will recreate it on ensureIndex. -type IndexCache = IORef (S.Set (Collection, IndexName)) +type IndexCache = IORef (Set (Collection, IndexName)) dbIndexCache :: DbIndexCache -- ^ initialize cache and fork thread that clears it every 15 minutes @@ -164,7 +175,7 @@ fetchIndexCache = do maybe (newIdxCache db) return mc where newIdxCache db = do - idx <- newIORef S.empty + idx <- newIORef Set.empty H.insert dbIndexCache db idx return idx @@ -172,7 +183,7 @@ resetIndexCache :: (MonadIO m) => Action m () -- ^ reset index cache for current database resetIndexCache = do icache <- fetchIndexCache - liftIO (writeIORef icache S.empty) + liftIO (writeIORef icache Set.empty) -- ** User diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs index bb761a8..051f9e4 100644 --- a/Database/MongoDB/Connection.hs +++ b/Database/MongoDB/Connection.hs @@ -8,33 +8,43 @@ module Database.MongoDB.Connection ( -- * Connection Pipe, close, isClosed, -- * Server - Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, readHostPortM, - globalConnectTimeout, connect, connect', + Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, + readHostPortM, globalConnectTimeout, connect, connect', -- * Replica Set ReplicaSetName, openReplicaSet, openReplicaSet', ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName ) where import Prelude hiding (lookup) -import Database.MongoDB.Internal.Protocol (Pipe, newPipe) -import System.IO.Pipeline (IOE, close, isClosed) -import Control.Exception as E (try) +import Data.IORef (IORef, newIORef, readIORef) +import Data.List (intersect, partition, (\\), delete) +import Control.Applicative ((<$>)) +import Control.Monad (forM_) import Network (HostName, PortID(..), connectTo) -import Text.ParserCombinators.Parsec as T (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) +import System.IO.Unsafe (unsafePerformIO) +import System.Timeout (timeout) +import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof, + spaces, try, (<|>)) +import qualified Control.Exception as E +import qualified Data.List as List + + import Control.Monad.Identity (runIdentity) import Control.Monad.Error (ErrorT(..), lift, throwError) -import Control.Concurrent.MVar.Lifted -import Control.Monad (forM_) -import Control.Applicative ((<$>)) +import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar, + readMVar) +import Data.Bson (Document, at, (=:)) import Data.Text (Text) + +import qualified Data.Bson as B import qualified Data.Text as T -import Data.Bson as D (Document, lookup, at, (=:)) -import Database.MongoDB.Query (access, slaveOk, Failure(ConnectionFailure), Command, runCommand) -import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, updateAssocs, shuffle, mergesortM) -import Data.List as L (lookup, intersect, partition, (\\), delete) -import Data.IORef (IORef, newIORef, readIORef) -import System.Timeout (timeout) -import System.IO.Unsafe (unsafePerformIO) + +import Database.MongoDB.Internal.Protocol (Pipe, newPipe) +import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, runIOE, + updateAssocs, shuffle, mergesortM) +import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access, + slaveOk, runCommand) +import System.IO.Pipeline (IOE, close, isClosed) adminCommand :: Command -> Pipe -> IOE Document -- ^ Run command against admin database on server connected to pipe. Fail if connection fails. @@ -75,7 +85,7 @@ readHostPortM = either (fail . show) return . parse parser "readHostPort" where parser = do spaces h <- hostname - T.try (spaces >> eof >> return (host h)) <|> do + try (spaces >> eof >> return (host h)) <|> do _ <- char ':' port :: Int <- read <$> many1 digit spaces >> eof @@ -161,7 +171,7 @@ type ReplicaInfo = (Host, Document) statedPrimary :: ReplicaInfo -> Maybe Host -- ^ Primary of replica set or Nothing if there isn't one -statedPrimary (host', info) = if (at "ismaster" info) then Just host' else readHostPort <$> D.lookup "primary" info +statedPrimary (host', info) = if (at "ismaster" info) then Just host' else readHostPort <$> B.lookup "primary" info possibleHosts :: ReplicaInfo -> [Host] -- ^ Non-arbiter, non-hidden members of replica set @@ -186,7 +196,7 @@ fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IOE ReplicaInfo fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do pipe <- connection rs mPipe host' info <- adminCommand ["isMaster" =: (1 :: Int)] pipe - case D.lookup "setName" info of + case B.lookup "setName" info of Nothing -> throwError $ userError $ show host' ++ " not a member of any replica set, including " ++ T.unpack rsName ++ ": " ++ show info Just setName | setName /= rsName -> throwError $ userError $ show host' ++ " not a member of replica set " ++ T.unpack rsName ++ ": " ++ show info Just _ -> return (host', info) @@ -198,7 +208,7 @@ connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' = where conn = modifyMVar vMembers $ \members -> do let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe) - case L.lookup host' members of + case List.lookup host' members of Just (Just pipe) -> lift (isClosed pipe) >>= \bad -> if bad then new else return (members, pipe) _ -> new diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 67c493c..6af173c 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -1,8 +1,12 @@ -{-| Low-level messaging between this client and the MongoDB server, see Mongo Wire Protocol (). +-- | Low-level messaging between this client and the MongoDB server, see Mongo +-- Wire Protocol (). +-- +-- This module is not intended for direct use. Use the high-level interface at +-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -} - -{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-} +{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-} +{-# LANGUAGE FlexibleContexts, TupleSections, TypeSynonymInstances #-} +{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-} module Database.MongoDB.Internal.Protocol ( FullCollection, @@ -18,29 +22,35 @@ module Database.MongoDB.Internal.Protocol ( Username, Password, Nonce, pwHash, pwKey ) where -import Prelude as X import Control.Applicative ((<$>)) import Control.Arrow ((***)) -import Data.ByteString.Lazy as B (length, hPut) -import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..)) -import qualified System.IO.Pipeline as P (send, call) -import System.IO (Handle, hClose) -import Data.Bson (Document) -import Data.Bson.Binary -import Data.Binary.Put -import Data.Binary.Get -import Data.Int -import Data.Bits -import Data.IORef +import Control.Exception (try) +import Control.Monad (forM_, replicateM, unless) +import Data.Binary.Get (Get, runGet) +import Data.Binary.Put (Put, runPut) +import Data.Bits (bit, testBit) +import Data.Int (Int32, Int64) +import Data.IORef (IORef, newIORef, atomicModifyIORef) +import System.IO (Handle, hClose, hFlush) import System.IO.Unsafe (unsafePerformIO) + +import qualified Data.ByteString.Lazy as L + +import Control.Monad.Error (ErrorT(..)) +import Control.Monad.Trans (MonadIO, liftIO) +import Data.Bson (Document) +import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64, + putInt64, putCString) import Data.Text (Text) -import qualified Crypto.Hash.MD5 as MD5 (hash) + +import qualified Crypto.Hash.MD5 as MD5 import qualified Data.Text as T import qualified Data.Text.Encoding as TE -import Control.Exception as E (try) -import Control.Monad.Error -import System.IO (hFlush) + import Database.MongoDB.Internal.Util (whenJust, hGetN, bitOr, byteStringHex) +import System.IO.Pipeline (IOE, Pipeline, newPipeline, IOStream(..)) + +import qualified System.IO.Pipeline as P -- * Pipe @@ -73,17 +83,17 @@ type Message = ([Notice], Maybe (Request, RequestId)) writeMessage :: Handle -> Message -> IOE () -- ^ Write message to socket -writeMessage handle (notices, mRequest) = ErrorT . E.try $ do +writeMessage handle (notices, mRequest) = ErrorT . try $ do forM_ notices $ \n -> writeReq . (Left n,) =<< genRequestId whenJust mRequest $ writeReq . (Right *** id) hFlush handle where writeReq (e, requestId) = do - hPut handle lenBytes - hPut handle bytes + L.hPut handle lenBytes + L.hPut handle bytes where bytes = runPut $ (either putNotice putRequest e) requestId - lenBytes = encodeSize . toEnum . fromEnum $ B.length bytes + lenBytes = encodeSize . toEnum . fromEnum $ L.length bytes encodeSize = runPut . putInt32 . (+ 4) type Response = (ResponseTo, Reply) @@ -91,7 +101,7 @@ type Response = (ResponseTo, Reply) readMessage :: Handle -> IOE Response -- ^ read response from socket -readMessage handle = ErrorT $ E.try readResp where +readMessage handle = ErrorT $ try readResp where readResp = do len <- fromEnum . decodeSize <$> hGetN handle 4 runGet getReply <$> hGetN handle len @@ -196,7 +206,7 @@ putNotice notice requestId = do putDocument dSelector KillCursors{..} -> do putInt32 0 - putInt32 $ toEnum (X.length kCursorIds) + putInt32 $ toEnum (length kCursorIds) mapM_ putInt64 kCursorIds iBit :: InsertOption -> Int32 diff --git a/Database/MongoDB/Internal/Util.hs b/Database/MongoDB/Internal/Util.hs index c8e6eea..4084d5c 100644 --- a/Database/MongoDB/Internal/Util.hs +++ b/Database/MongoDB/Internal/Util.hs @@ -7,23 +7,27 @@ module Database.MongoDB.Internal.Util where import Control.Applicative (Applicative(..), (<$>)) -import Network (PortID(..)) +import Control.Arrow (left) +import Control.Exception (assert) +import Control.Monad (liftM, liftM2) import Data.Bits (Bits, (.|.)) -import Data.Bson -import Data.ByteString.Lazy as S (ByteString, length, append, hGet) +import Data.Word (Word8) +import Network (PortID(..)) +import Numeric (showHex) import System.IO (Handle) import System.IO.Error (mkIOError, eofErrorType) -import Control.Exception (assert) -import Control.Monad.Error -import Control.Arrow (left) -import Data.Text (Text) -import qualified Data.ByteString as BS (ByteString, unpack) -import qualified Data.Text as T -import Data.Word (Word8) -import Numeric (showHex) -import System.Random.Shuffle (shuffle') import System.Random (newStdGen) -import Data.List as L (length) +import System.Random.Shuffle (shuffle') + +import qualified Data.ByteString.Lazy as L +import qualified Data.ByteString as S + +import Control.Monad.Error (MonadError(..), ErrorT(..), Error(..)) +import Control.Monad.Trans (MonadIO, liftIO) +import Data.Bson +import Data.Text (Text) + +import qualified Data.Text as T deriving instance Show PortID deriving instance Eq PortID @@ -62,7 +66,7 @@ wrap x = [x] shuffle :: [a] -> IO [a] -- ^ Randomly shuffle items in list -shuffle list = shuffle' list (L.length list) <$> newStdGen +shuffle list = shuffle' list (length list) <$> newStdGen loop :: (Functor m, Monad m) => m (Maybe a) -> m [a] -- ^ Repeatedy execute action, collecting results, until it returns Nothing @@ -110,18 +114,18 @@ true1 k doc = case valueAt k doc of Int64 n -> n == 1 _ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc -hGetN :: Handle -> Int -> IO ByteString +hGetN :: Handle -> Int -> IO L.ByteString -- ^ Read N bytes from hande, blocking until all N bytes are read. If EOF is reached before N bytes then raise EOF exception. hGetN h n = assert (n >= 0) $ do - bytes <- hGet h n - let x = fromEnum $ S.length bytes + bytes <- L.hGet h n + let x = fromEnum $ L.length bytes if x >= n then return bytes else if x == 0 then ioError (mkIOError eofErrorType "hGetN" (Just h) Nothing) - else S.append bytes <$> hGetN h (n - x) + else L.append bytes <$> hGetN h (n - x) -byteStringHex :: BS.ByteString -> String +byteStringHex :: S.ByteString -> String -- ^ Hexadecimal string representation of a byte string. Each byte yields two hexadecimal characters. -byteStringHex = concatMap byteHex . BS.unpack +byteStringHex = concatMap byteHex . S.unpack byteHex :: Word8 -> String -- ^ Two char hexadecimal representation of byte diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 3a933a8..5c942d1 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -25,38 +25,57 @@ module Database.MongoDB.Query ( delete, deleteOne, -- * Read -- ** Query - Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial), Projector, Limit, Order, BatchSize, + Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial), + Projector, Limit, Order, BatchSize, explain, find, findOne, fetch, count, distinct, -- *** Cursor Cursor, nextBatch, next, nextN, rest, closeCursor, isCursorClosed, -- ** Group Group(..), GroupKey(..), group, -- ** MapReduce - MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..), MRResult, mapReduce, runMR, runMR', + MapReduce(..), MapFun, ReduceFun, FinalizeFun, MROut(..), MRMerge(..), + MRResult, mapReduce, runMR, runMR', -- * Command Command, runCommand, runCommand1, eval, ) where -import Prelude as X hiding (lookup) +import Prelude hiding (lookup) +import Control.Applicative (Applicative, (<$>)) +import Control.Monad (unless, replicateM, liftM) +import Data.Int (Int32) +import Data.Maybe (listToMaybe, catMaybes) +import Data.Word (Word32) + +import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, + readMVar, modifyMVar) +import Control.Monad.Base (MonadBase(liftBase)) +import Control.Monad.Error (ErrorT, Error(..), MonadError, runErrorT, + throwError) +import Control.Monad.Reader (ReaderT, runReaderT, ask, asks, local) +import Control.Monad.RWS (RWST) +import Control.Monad.State (StateT) +import Control.Monad.Trans (MonadIO, MonadTrans, lift, liftIO) +import Control.Monad.Trans.Control (ComposeSt, MonadBaseControl(..), + MonadTransControl(..), StM, StT, + defaultLiftBaseWith, defaultRestoreM) +import Control.Monad.Writer (WriterT, Monoid) +import Data.Bson (Document, Field(..), Label, Value(String,Doc), Javascript, + at, valueAt, lookup, look, genObjectId, (=:), (=?)) import Data.Text (Text) import qualified Data.Text as T -import Data.Bson (Document, at, valueAt, lookup, look, Field(..), (=:), (=?), Label, Value(String,Doc), Javascript, genObjectId) -import Database.MongoDB.Internal.Protocol (Pipe, Notice(..), Request(GetMore, qOptions, qFullCollection, qSkip, qBatchSize, qSelector, qProjector), Reply(..), QueryOption(..), ResponseFlag(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, FullCollection, Username, Password, pwKey) -import qualified Database.MongoDB.Internal.Protocol as P (send, call, Request(Query)) + +import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..), + ResponseFlag(..), InsertOption(..), + UpdateOption(..), DeleteOption(..), + CursorId, FullCollection, Username, + Password, Pipe, Notice(..), + Request(GetMore, qOptions, qSkip, + qFullCollection, qBatchSize, + qSelector, qProjector), + pwKey) import Database.MongoDB.Internal.Util (MonadIO', loop, liftIOE, true1, (<.>)) -import Control.Concurrent.MVar.Lifted -import Control.Monad.Error -import Control.Monad.Reader -import Control.Monad.State (StateT) -import Control.Monad.Writer (WriterT, Monoid) -import Control.Monad.RWS (RWST) -import Control.Monad.Base (MonadBase(liftBase)) -import Control.Monad.Trans.Control (ComposeSt, MonadBaseControl(..), MonadTransControl(..), StM, StT, defaultLiftBaseWith, defaultRestoreM) -import Control.Applicative (Applicative, (<$>)) -import Data.Maybe (listToMaybe, catMaybes) -import Data.Int (Int32) -import Data.Word (Word32) +import qualified Database.MongoDB.Internal.Protocol as P -- * Monad @@ -298,7 +317,7 @@ insert' opts col docs = do assignId :: Document -> IO Document -- ^ Assign a unique value to _id field if missing -assignId doc = if X.any (("_id" ==) . label) doc +assignId doc = if any (("_id" ==) . label) doc then return doc else (\oid -> ("_id" =: oid) : doc) <$> genObjectId diff --git a/System/IO/Pipeline.hs b/System/IO/Pipeline.hs index 04243c7..e88c40c 100644 --- a/System/IO/Pipeline.hs +++ b/System/IO/Pipeline.hs @@ -13,11 +13,15 @@ module System.IO.Pipeline ( ) where import Prelude hiding (length) -import GHC.Conc (ThreadStatus(..), threadStatus) import Control.Concurrent (ThreadId, forkIO, killThread) -import Control.Concurrent.Chan -import Control.Concurrent.MVar.Lifted -import Control.Monad.Error +import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) +import Control.Monad (forever) +import GHC.Conc (ThreadStatus(..), threadStatus) + +import Control.Monad.Trans (liftIO) +import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, + putMVar, readMVar, addMVarFinalizer) +import Control.Monad.Error (ErrorT(ErrorT), runErrorT) onException :: (Monad m) => ErrorT e m a -> m () -> ErrorT e m a -- ^ If first action throws an exception then run second action then re-throw diff --git a/System/IO/Pool.hs b/System/IO/Pool.hs index 0ec9f6c..c97367b 100644 --- a/System/IO/Pool.hs +++ b/System/IO/Pool.hs @@ -5,12 +5,15 @@ module System.IO.Pool where import Control.Applicative ((<$>)) -import Control.Concurrent.MVar.Lifted -import Data.Array.IO -import Data.Maybe (catMaybes) -import Control.Monad.Error -import System.Random (randomRIO) import Control.Exception (assert) +import Data.Array.IO (IOArray, readArray, writeArray, newArray, newListArray, + getElems, getBounds, rangeSize, range) +import Data.Maybe (catMaybes) +import System.Random (randomRIO) + +import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar_) +import Control.Monad.Error (ErrorT, Error) +import Control.Monad.Trans (liftIO) -- | Creator, destroyer, and checker of resources of type r. Creator may throw error or type e. data Factory e r = Factory {