diff --git a/Database/MongoDB.hs b/Database/MongoDB.hs index eb02093..99235ea 100644 --- a/Database/MongoDB.hs +++ b/Database/MongoDB.hs @@ -1,1039 +1,13 @@ -{- - -Copyright (C) 2010 Scott R Parish - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - --} - --- | A driver for MongoDB --- --- This module lets you connect to MongoDB, do inserts, queries, --- updates, etc. Also has many convience functions inspired by HDBC --- such as more easily converting between the BsonValue types and --- native Haskell types. --- --- * Tutorial for this driver: --- --- --- * Map/Reduce example for this driver: --- --- --- * MongoDB: --- --- - -module Database.MongoDB - ( - -- * Connection - Connection, ConnectOpt(..), - connect, connectOnPort, conClose, disconnect, dropDatabase, - connectCluster, connectClusterOnPort, - serverInfo, serverShutdown, - databasesInfo, databaseNames, - -- * Database - Database, MongoDBCollectionInvalid, Password, Username, - ColCreateOpt(..), - collectionNames, createCollection, dropCollection, - renameCollection, runCommand, validateCollection, - auth, addUser, login, logout, - -- * Collection - Collection, FieldSelector, FullCollection, - NumToSkip, NumToReturn, Selector, - QueryOpt(..), - UpdateFlag(..), - count, countMatching, delete, insert, insertMany, query, remove, update, - save, - -- * Convenience collection operations - find, findOne, quickFind, quickFind', - -- * Query Helpers - whereClause, - -- * Cursor - Cursor, - allDocs, allDocs', finish, nextDoc, - -- * Index - Key, Unique, - Direction(..), - createIndex, dropIndex, dropIndexes, indexInformation, - -- * Map-Reduce - MapReduceOpt(..), - mapReduce, mapReduceWScopes, - runMapReduce, runMapReduceWScopes, - mapReduceResults, - ) -where -import Control.Exception -import Control.Monad -import Data.Binary() -import Data.Binary.Get -import Data.Binary.Put -import Data.Bits -import Data.ByteString.Char8 (pack) -import Data.ByteString.Internal (c2w) -import qualified Data.ByteString.Lazy as L -import qualified Data.ByteString.Lazy.UTF8 as L8 -import Data.Digest.OpenSSL.MD5 -import Data.Int -import Data.IORef -import qualified Data.List as List -import Data.Maybe -import Data.Typeable -import Database.MongoDB.BSON as BSON -import Database.MongoDB.Util -import qualified Network -import Network.Socket hiding (connect, send, sendTo, recv, recvFrom) -import Prelude hiding (getContents) -import System.IO -import System.IO.Unsafe -import System.Random - --- | A list of handles to database connections -data Connection = Connection { - cHandle :: IORef Handle, - cRand :: IORef [Int], - cOidGen :: ObjectIdGen - } - -data ConnectOpt - = SlaveOK -- ^ It's fine to connect to the slave - deriving (Show, Eq) - --- | Establish a connection to a MongoDB server -connect :: HostName -> [ConnectOpt] -> IO Connection -connect = flip connectOnPort (Network.PortNumber 27017) - --- | Establish connections to a list of MongoDB servers -connectCluster :: [HostName] -> [ConnectOpt] -> IO Connection -connectCluster xs = - connectClusterOnPort (fmap (flip (,) $ Network.PortNumber 27017) xs) - --- | Establish connections to a list of MongoDB servers specifying each port. -connectClusterOnPort :: [(HostName, Network.PortID)] -> [ConnectOpt] - -> IO Connection -connectClusterOnPort [] _ = throwOpFailure "No hostnames in list" -connectClusterOnPort servers opts = newConnection servers opts - --- | Establish a connection to a MongoDB server on a non-standard port -connectOnPort :: HostName -> Network.PortID -> [ConnectOpt] -> IO Connection -connectOnPort host port = newConnection [(host, port)] - -newConnection :: [(HostName, Network.PortID)] -> [ConnectOpt] -> IO Connection -newConnection servers opts = do - r <- newStdGen - let ns = randomRs (fromIntegral (minBound :: Int32), - fromIntegral (maxBound :: Int32)) r - nsRef <- newIORef ns - hRef <- openHandle (head servers) >>= newIORef - oidGen <- mkObjectIdGen - let c = Connection hRef nsRef oidGen - res <- isMaster c - if fromBson (fromLookup $ List.lookup (s2L "ismaster") res) == (1::Int) || - isJust (List.elemIndex SlaveOK opts) - then return c - else case List.lookup (s2L "remote") res of - Nothing -> throwConFailure "Couldn't find master to connect to" - Just server -> do - hRef' <- openHandle (splitHostPort $ fromBson server) >>= newIORef - return $ c {cHandle = hRef'} - -openHandle :: (HostName, Network.PortID) -> IO Handle -openHandle (host, port) = do - h <- Network.connectTo host port - hSetBuffering h NoBuffering - return h - -getHandle :: Connection -> IO Handle -getHandle c = readIORef $ cHandle c - -cPut :: Connection -> L.ByteString -> IO () -cPut c msg = getHandle c >>= flip L.hPut msg - --- | Close database connection -conClose :: Connection -> IO () -conClose c = readIORef (cHandle c) >>= hClose - --- | Information about the databases on the server. -databasesInfo :: Connection -> IO BsonDoc -databasesInfo c = - runCommand c (s2L "admin") $ toBsonDoc [("listDatabases", BsonInt32 1)] - --- | Return a list of database names on the server. -databaseNames :: Connection -> IO [Database] -databaseNames c = do - info <- databasesInfo c - let (BsonArray dbs) = fromLookup $ List.lookup (s2L "databases") info - names = mapMaybe (List.lookup (s2L "name") . fromBson) dbs - return $ List.map fromBson (names::[BsonValue]) - --- | Alias for 'conClose' -disconnect :: Connection -> IO () -disconnect = conClose - --- | Drop a database. -dropDatabase :: Connection -> Database -> IO () -dropDatabase c db = do - _ <- runCommand c db $ toBsonDoc [("dropDatabase", BsonInt32 1)] - return () - -isMaster :: Connection -> IO BsonDoc -isMaster c = runCommand c (s2L "admin") $ toBsonDoc [("ismaster", BsonInt32 1)] - --- | Get information about the MongoDB server we're connected to. -serverInfo :: Connection -> IO BsonDoc -serverInfo c = - runCommand c (s2L "admin") $ toBsonDoc [("buildinfo", BsonInt32 1)] - --- | Shut down the MongoDB server. --- --- Force a clean exit, flushing and closing all data files. --- Note that it will wait until all ongoing operations are complete. -serverShutdown :: Connection -> IO BsonDoc -serverShutdown c = - runCommand c (s2L "admin") $ toBsonDoc [("shutdown", BsonInt32 1)] - --- | Return a list of collections in /Database/. -collectionNames :: Connection -> Database -> IO [FullCollection] -collectionNames c db = do - docs <- quickFind' c (L.append db $ s2L ".system.namespaces") empty - let names = flip List.map docs $ - fromBson . fromLookup . List.lookup (s2L "name") - return $ List.filter (L.notElem $ c2w '$') names - -data ColCreateOpt = CCOSize Int64 -- ^ Desired initial size for the - -- collection (in bytes). must be - -- less than or equal to - -- 10000000000. For capped - -- collections this size is the max - -- size of the collection. - | CCOCapped Bool -- ^ If 'True', this is a capped collection. - | CCOMax Int64 -- ^ Maximum number of objects if capped. - deriving (Show, Eq) - -colCreateOptToBson :: ColCreateOpt -> (String, BsonValue) -colCreateOptToBson (CCOSize sz) = ("size", toBson sz) -colCreateOptToBson (CCOCapped b) = ("capped", toBson b) -colCreateOptToBson (CCOMax m) = ("max", toBson m) - --- | Create a new collection in this database. --- --- Normally collection creation is automatic. This function should --- only be needed if you want to specify 'ColCreateOpt's on creation. --- 'MongoDBCollectionInvalid' is thrown if the collection already --- exists. -createCollection :: Connection -> FullCollection -> [ColCreateOpt] -> IO () -createCollection c col opts = do - (db, col') <- validateCollectionName col - dbcols <- collectionNames c db - when (col `List.elem` dbcols) $ - throwColInvalid $ "Collection already exists: " ++ show col - let cmd = ("create", toBson col') : List.map colCreateOptToBson opts - _ <- runCommand c db $ toBsonDoc cmd - return () - --- | Drop a collection. -dropCollection :: Connection -> FullCollection -> IO () -dropCollection c col = do - let (db, col') = splitFullCol col - _ <- runCommand c db $ toBsonDoc [("drop", toBson col')] - return () - --- | Rename a collection--first /FullCollection/ argument is the --- existing name, the second is the new name. At the moment this command --- can also be used to move a collection between databases. -renameCollection :: Connection -> FullCollection -> FullCollection -> IO () -renameCollection c col newName = do - _ <- validateCollectionName col - _ <- runCommand c (s2L "admin") $ toBsonDoc [("renameCollection", toBson col), - ("to", toBson newName)] - return () - --- | Return a string of validation info about the collection. --- --- Example output (note this probably can/will change with different --- versions of the server): --- --- > validate --- > details: 0x7fe5cc2c1da4 ofs:e7da4 --- > firstExtent:0:24100 ns:test.foo.bar --- > lastExtent:0:24100 ns:test.foo.bar --- > # extents:1 --- > datasize?:180 nrecords?:5 lastExtentSize:1024 --- > padding:1 --- > first extent: --- > loc:0:24100 xnext:null xprev:null --- > nsdiag:test.foo.bar --- > size:1024 firstRecord:0:241e4 lastRecord:0:24280 --- > 5 objects found, nobj:5 --- > 260 bytes data w/headers --- > 180 bytes data wout/headers --- > deletedList: 0100100000000000000 --- > deleted: n: 4 size: 588 --- > nIndexes:1 --- > test.foo.bar.$_id_ keys:5 -validateCollection :: Connection -> FullCollection -> IO String -validateCollection c col = do - let (db, col') = splitFullCol col - res <- runCommand c db $ toBsonDoc [("validate", toBson col')] - return $ fromBson $ fromLookup $ List.lookup (s2L "result") res - -splitFullCol :: FullCollection -> (Database, Collection) -splitFullCol col = (L.takeWhile (c2w '.' /=) col, - L.tail $ L.dropWhile (c2w '.' /=) col) - -splitHostPort :: String -> (HostName, Network.PortID) -splitHostPort hp = (host, port) - where host = List.takeWhile (':' /=) hp - port = case List.dropWhile (':' /=) hp of - "" -> Network.PortNumber 27017 - pstr -> Network.Service $ List.tail pstr - --- | Run a database command. Usually this is unneeded as driver wraps --- all of the commands for you (eg 'createCollection', --- 'dropCollection', etc). -runCommand :: Connection -> Database -> BsonDoc -> IO BsonDoc -runCommand c db cmd = do - mres <- findOne c (L.append db $ s2L ".$cmd") cmd - let res = fromLookup mres - when (BsonDouble 1.0 /= fromLookup (List.lookup (s2L "ok") res)) $ - throwOpFailure $ "command \"" ++ show cmd ++ "\" failed: " ++ - fromBson (fromLookup $ List.lookup (s2L "errmsg") res) - return res - --- | An Iterator over the results of a query. Use 'nextDoc' to get each --- successive result document, or 'allDocs' or 'allDocs'' to get lazy or --- strict lists of results. -data Cursor = Cursor { - curCon :: Connection, - curID :: IORef Int64, - curNumToRet :: Int32, - curCol :: FullCollection, - curDocBytes :: IORef L.ByteString, - curClosed :: IORef Bool - } - -data Opcode - = OPReply -- 1 Reply to a client request. responseTo is set - | OPMsg -- 1000 generic msg command followed by a string - | OPUpdate -- 2001 update document - | OPInsert -- 2002 insert new document - | OPGetByOid -- 2003 is this used? - | OPQuery -- 2004 query a collection - | OPGetMore -- 2005 Get more data from a query. See Cursors - | OPDelete -- 2006 Delete documents - | OPKillCursors -- 2007 Tell database client is done with a cursor - deriving (Show, Eq) - -data MongoDBInternalError = MongoDBInternalError String - deriving (Eq, Show, Read) - -mongoDBInternalError :: TyCon -mongoDBInternalError = mkTyCon "Database.MongoDB.MongoDBInternalError" - -instance Typeable MongoDBInternalError where - typeOf _ = mkTyConApp mongoDBInternalError [] - -instance Exception MongoDBInternalError - -data MongoDBCollectionInvalid = MongoDBCollectionInvalid String - deriving (Eq, Show, Read) - -mongoDBCollectionInvalid :: TyCon -mongoDBCollectionInvalid = mkTyCon "Database.MongoDB.MongoDBcollectionInvalid" - -instance Typeable MongoDBCollectionInvalid where - typeOf _ = mkTyConApp mongoDBCollectionInvalid [] - -instance Exception MongoDBCollectionInvalid - -throwColInvalid :: String -> a -throwColInvalid = throw . MongoDBCollectionInvalid - -data MongoDBOperationFailure = MongoDBOperationFailure String - deriving (Eq, Show, Read) - -mongoDBOperationFailure :: TyCon -mongoDBOperationFailure = mkTyCon "Database.MongoDB.MongoDBoperationFailure" - -instance Typeable MongoDBOperationFailure where - typeOf _ = mkTyConApp mongoDBOperationFailure [] - -instance Exception MongoDBOperationFailure - -throwOpFailure :: String -> a -throwOpFailure = throw . MongoDBOperationFailure - -data MongoDBConnectionFailure = MongoDBConnectionFailure String - deriving (Eq, Show, Read) - -mongoDBConnectionFailure :: TyCon -mongoDBConnectionFailure = mkTyCon "Database.MongoDB.MongoDBconnectionFailure" - -instance Typeable MongoDBConnectionFailure where - typeOf _ = mkTyConApp mongoDBConnectionFailure [] - -instance Exception MongoDBConnectionFailure - -throwConFailure :: String -> a -throwConFailure = throw . MongoDBConnectionFailure - -fromOpcode :: Opcode -> Int32 -fromOpcode OPReply = 1 -fromOpcode OPMsg = 1000 -fromOpcode OPUpdate = 2001 -fromOpcode OPInsert = 2002 -fromOpcode OPGetByOid = 2003 -fromOpcode OPQuery = 2004 -fromOpcode OPGetMore = 2005 -fromOpcode OPDelete = 2006 -fromOpcode OPKillCursors = 2007 - -toOpcode :: Int32 -> Opcode -toOpcode 1 = OPReply -toOpcode 1000 = OPMsg -toOpcode 2001 = OPUpdate -toOpcode 2002 = OPInsert -toOpcode 2003 = OPGetByOid -toOpcode 2004 = OPQuery -toOpcode 2005 = OPGetMore -toOpcode 2006 = OPDelete -toOpcode 2007 = OPKillCursors -toOpcode n = throw $ MongoDBInternalError $ "Got unexpected Opcode: " ++ show n - --- | The name of a database. -type Database = L8.ByteString - --- | The full collection name. The full collection name is the --- concatenation of the database name with the collection name, using --- a @.@ for the concatenation. For example, for the database @foo@ --- and the collection @bar@, the full collection name is @foo.bar@. -type FullCollection = L8.ByteString - --- | The same as 'FullCollection' but without the 'Database' prefix. -type Collection = L8.ByteString - --- | A 'BsonDoc' representing restrictions for a query much like the --- /where/ part of an SQL query. -type Selector = BsonDoc - --- | A list of field names that limits the fields in the returned --- documents. The list can contains zero or more elements, each of --- which is the name of a field that should be returned. An empty list --- means that no limiting is done and all fields are returned. -type FieldSelector = [L8.ByteString] - -type RequestID = Int32 - --- | Sets the number of documents to omit - starting from the first --- document in the resulting dataset - when returning the result of --- the query. -type NumToSkip = Int32 - --- | This controls how many documents are returned at a time. The --- cursor works by requesting /NumToReturn/ documents, which are then --- immediately all transfered over the network; these are held locally --- until the those /NumToReturn/ are all consumed and then the network --- will be hit again for the next /NumToReturn/ documents. --- --- If the value @0@ is given, the database will choose the number of --- documents to return. --- --- Otherwise choosing a good value is very dependant on the document size --- and the way the cursor is being used. -type NumToReturn = Int32 - -type Username = String -type Password = String - -type JSCode = L8.ByteString - --- | Options that control the behavior of a 'query' operation. -data QueryOpt = QOTailableCursor - | QOSlaveOK - | QOOpLogReplay - | QONoCursorTimeout - deriving (Show) - -fromQueryOpts :: [QueryOpt] -> Int32 -fromQueryOpts opts = List.foldl (.|.) 0 $ fmap toVal opts - where toVal QOTailableCursor = 2 - toVal QOSlaveOK = 4 - toVal QOOpLogReplay = 8 - toVal QONoCursorTimeout = 16 - --- | Options that effect the behavior of a 'update' operation. -data UpdateFlag = UFUpsert - | UFMultiupdate - deriving (Show, Enum) - -fromUpdateFlags :: [UpdateFlag] -> Int32 -fromUpdateFlags flags = List.foldl (.|.) 0 $ - flip fmap flags $ (1 `shiftL`) . fromEnum - --- | Return the number of documents in /FullCollection/. -count :: Connection -> FullCollection -> IO Integer -count c col = countMatching c col empty - --- | Return the number of documents in /FullCollection/ matching /Selector/ -countMatching :: Connection -> FullCollection -> Selector -> IO Integer -countMatching c col sel = do - let (db, col') = splitFullCol col - res <- runCommand c db $ toBsonDoc [("count", toBson col'), - ("query", toBson sel)] - let cnt = (fromBson $ fromLookup $ List.lookup (s2L "n") res :: Double) - return $ truncate cnt - --- | Delete documents matching /Selector/ from the given /FullCollection/. -delete :: Connection -> FullCollection -> Selector -> IO RequestID -delete c col sel = do - let body = runPut $ do - putI32 0 - putCol col - putI32 0 - putBsonDoc sel - (reqID, msg) <- packMsg c OPDelete body - cPut c msg - return reqID - --- | An alias for 'delete'. -remove :: Connection -> FullCollection -> Selector -> IO RequestID -remove = delete - -moveOidToFrontOrGen :: Connection -> BsonDoc -> IO BsonDoc -moveOidToFrontOrGen c doc = - case List.lookup (s2L "_id") doc of - Nothing -> do - oid <- genObjectId $ cOidGen c - return $ (s2L "_id", oid) : doc - Just oid -> do - let keyEq = (\(k1, _) (k2, _) -> k1 == k2) - delByKey = \k -> List.deleteBy keyEq (k, undefined) - return $ (s2L "_id", oid) : delByKey (s2L "_id") doc - --- | Insert a single document into /FullCollection/ returning the /_id/ field. -insert :: Connection -> FullCollection -> BsonDoc -> IO BsonValue -insert c col doc = do - doc' <- moveOidToFrontOrGen c doc - let body = runPut $ do - putI32 0 - putCol col - putBsonDoc doc' - (_reqID, msg) <- packMsg c OPInsert body - cPut c msg - return $ snd $ head doc' - --- | Insert a list of documents into /FullCollection/ returing the --- /_id/ field for each one in the same order as they were given. -insertMany :: Connection -> FullCollection -> [BsonDoc] -> IO [BsonValue] -insertMany c col docs = do - docs' <- mapM (moveOidToFrontOrGen c) docs - let body = runPut $ do - putI32 0 - putCol col - forM_ docs' putBsonDoc - (_, msg) <- packMsg c OPInsert body - cPut c msg - return $ List.map (snd . head) docs' - --- | Open a cursor to find documents. If you need full functionality, --- see 'query' -find :: Connection -> FullCollection -> Selector -> IO Cursor -find c col sel = query c col [] 0 0 sel [] - --- | Query, but only return the first result, if any. -findOne :: Connection -> FullCollection -> Selector -> IO (Maybe BsonDoc) -findOne c col sel = query c col [] 0 (-1) sel [] >>= nextDoc - --- | Perform a query and return the result as a lazy list. Be sure to --- understand the comments about using the lazy list given for --- 'allDocs'. -quickFind :: Connection -> FullCollection -> Selector -> IO [BsonDoc] -quickFind c col sel = find c col sel >>= allDocs - --- | Perform a query and return the result as a strict list. -quickFind' :: Connection -> FullCollection -> Selector -> IO [BsonDoc] -quickFind' c col sel = find c col sel >>= allDocs' - --- | Open a cursor to find documents in /FullCollection/ that match --- /Selector/. See the documentation for each argument's type for --- information about how it effects the query. -query :: Connection -> FullCollection -> [QueryOpt] -> - NumToSkip -> NumToReturn -> Selector -> FieldSelector -> IO Cursor -query c col opts nskip ret sel fsel = do - h <- getHandle c - - let body = runPut $ do - putI32 $ fromQueryOpts opts - putCol col - putI32 nskip - putI32 ret - putBsonDoc sel - case fsel of - [] -> putNothing - _ -> putBsonDoc $ toBsonDoc $ List.zip fsel $ - repeat $ BsonInt32 1 - (reqID, msg) <- packMsg c OPQuery body - L.hPut h msg - - hdr <- getHeader h - assert (OPReply == hOp hdr) $ return () - assert (hRespTo hdr == reqID) $ return () - reply <- getReply h - assert (rRespFlags reply == 0) $ return () - docBytes <- L.hGet h (fromIntegral $ hMsgLen hdr - 16 - 20) >>= newIORef - closed <- newIORef False - cid <- newIORef $ rCursorID reply - return Cursor { - curCon = c, - curID = cid, - curNumToRet = ret, - curCol = col, - curDocBytes = docBytes, - curClosed = closed - } - --- | Update documents with /BsonDoc/ in /FullCollection/ that match /Selector/. -update :: Connection -> FullCollection -> - [UpdateFlag] -> Selector -> BsonDoc -> IO RequestID -update c col flags sel obj = do - let body = runPut $ do - putI32 0 - putCol col - putI32 $ fromUpdateFlags flags - putBsonDoc sel - putBsonDoc obj - (reqID, msg) <- packMsg c OPUpdate body - cPut c msg - return reqID - --- | log into the mongodb /Database/ attached to the /Connection/ -login :: Connection -> Database -> Username -> Password -> IO BsonDoc -login c db user pass = do - doc <- runCommand c db (toBsonDoc [("getnonce", toBson (1 :: Int))]) - let nonce = fromBson $ fromLookup $ List.lookup (s2L "nonce") doc :: String - digest = md5sum $ pack $ nonce ++ user ++ - md5sum (pack (user ++ ":mongo:" ++ pass)) - request = toBsonDoc [("authenticate", toBson (1 :: Int)), - ("user", toBson user), - ("nonce", toBson nonce), - ("key", toBson digest)] - in runCommand c db request - -auth :: Connection -> Database -> Username -> Password -> IO BsonDoc -auth = login - -logout :: Connection -> Database -> IO () -logout c db = - runCommand c db (toBsonDoc [(s2L "logout", BsonInt32 1)]) >> return () - --- | create a new user in the current /Database/ -addUser :: Connection -> Database -> Username -> Password -> IO BsonDoc -addUser c db user pass = do - let userDoc = toBsonDoc [(s2L "user", toBson user)] - fdb = L.append db (s2L ".system.users") - doc <- findOne c fdb userDoc - let pwd = md5sum $ pack (user ++ ":mongo:" ++ pass) - doc' = (s2L "pwd", toBson pwd) : - List.deleteBy (\(k1,_) (k2,_) -> (k1 == k2)) - (s2L user, undefined) - (fromMaybe userDoc doc) - _ <- save c fdb doc' - return doc' - -data MapReduceOpt - = MROptQuery BsonDoc -- ^ query filter object - - -- | MRSort ???? TODO - - | MROptLimit Int64 -- ^ number of objects to return from - -- collection - - | MROptOut L8.ByteString -- ^ output-collection name - - | MROptKeepTemp -- ^ If set the generated collection is - -- not treated as temporary, as it will - -- be by defualt. When /MROptOut/ is - -- specified, the collection is - -- automatically made permanent. - - | MROptFinalize JSCode -- ^ function to apply to all the - -- results when finished - - | MROptScope BsonDoc -- ^ can pass in variables that can be - -- access from map/reduce/finalize - - | MROptVerbose -- ^ provide statistics on job execution - -- time - -mrOptToTuple :: MapReduceOpt -> (String, BsonValue) -mrOptToTuple (MROptQuery q) = ("query", BsonDoc q) -mrOptToTuple (MROptLimit l) = ("limit", BsonInt64 l) -mrOptToTuple (MROptOut c) = ("out", BsonString c) -mrOptToTuple MROptKeepTemp = ("keeptemp", BsonBool True) -mrOptToTuple (MROptFinalize f) = ("finalize", BsonJSCode f) -mrOptToTuple (MROptScope s) = ("scope", BsonDoc s) -mrOptToTuple MROptVerbose = ("verbose", BsonBool True) - --- | Issue a map/reduce command and return the results metadata. If --- all you care about is the actual map/reduce results you might want --- to use the 'mapReduce' command instead. --- --- The results meta-document will look something like this: --- --- > {"result": "tmp.mr.mapreduce_1268095152_14", --- > "timeMillis": 67, --- > "counts": {"input": 4, --- > "emit": 6, --- > "output": 3}, --- > "ok": 1.0} --- --- The /results/ field is the collection name within the same Database --- that contain the results of the map/reduce. -runMapReduce :: Connection -> FullCollection - -> JSCode -- ^ mapping javascript function - -> JSCode -- ^ reducing javascript function - -> [MapReduceOpt] - -> IO BsonDoc -runMapReduce c fc m r opts = do - let (db, col) = splitFullCol fc - doc = [("mapreduce", toBson col), - ("map", BsonJSCode m), - ("reduce", BsonJSCode r)] ++ List.map mrOptToTuple opts - runCommand c db $ toBsonDoc doc - --- | Issue a map/reduce command with associated scopes and return the --- results metadata. If all you care about is the actual map/reduce --- results you might want to use the 'mapReduce' command instead. --- --- See 'runMapReduce' for more information about the form of the --- result metadata. -runMapReduceWScopes :: Connection -> FullCollection - -> JSCode -- ^ mapping javascript function - -> BsonDoc -- ^ Scope for mapping function - -> JSCode -- ^ reducing javascript function - -> BsonDoc -- ^ Scope for reducing function - -> [MapReduceOpt] - -> IO BsonDoc -runMapReduceWScopes c fc m ms r rs opts = do - let (db, col) = splitFullCol fc - doc = [("mapreduce", toBson col), - ("map", BsonJSCodeWScope m ms), - ("reduce", BsonJSCodeWScope r rs)] ++ List.map mrOptToTuple opts - runCommand c db $ toBsonDoc doc - --- | Given a result metadata from a 'mapReduce' command (or --- 'mapReduceWScope'), issue the 'find' command that will produce the --- actual map/reduce results. -mapReduceResults :: Connection -> Database -> BsonDoc -> IO Cursor -mapReduceResults c db r = do - let col = case List.lookup (s2L "result") r of - Just bCol -> fromBson bCol - Nothing -> throwOpFailure "No 'result' in mapReduce response" - fc = L.append (L.append db $ s2L ".") col - find c fc [] - --- | Run map/reduce and produce a cursor on the results. -mapReduce :: Connection -> FullCollection - -> JSCode -- ^ mapping javascript function - -> JSCode -- ^ reducing javascript function - -> [MapReduceOpt] - -> IO Cursor -mapReduce c fc m r opts = - runMapReduce c fc m r opts >>= mapReduceResults c (fst $ splitFullCol fc) - --- | Run map/reduce with associated scopes and produce a cursor on the --- results. -mapReduceWScopes :: Connection -> FullCollection - -> JSCode -- ^ mapping javascript function - -> BsonDoc -- ^ Scope for mapping function - -> JSCode -- ^ reducing javascript function - -> BsonDoc -- ^ Scope for mapping function - -> [MapReduceOpt] - -> IO Cursor -mapReduceWScopes c fc m ms r rs opts = - runMapReduceWScopes c fc m ms r rs opts >>= - mapReduceResults c (fst $ splitFullCol fc) - --- | Conveniently stores the /BsonDoc/ to the /FullCollection/ --- if there is an _id present in the /BsonDoc/ then it already has --- a place in the DB, so we update it using the _id, otherwise --- we insert it -save :: Connection -> FullCollection -> BsonDoc -> IO BsonValue -save c fc doc = - case List.lookup (s2L "_id") doc of - Nothing -> insert c fc doc - Just oid -> update c fc [UFUpsert] (toBsonDoc [("_id", oid)]) doc >> - return oid - --- | Use this in the place of the query portion of a select type query --- This uses javascript and a scope supplied by a /BsonDoc/ to evaluate --- documents in the database for retrieval. --- --- Example: --- --- > findOne conn mycoll $ whereClause "this.name == (name1 + name2)" --- > Just (toBsonDoc [("name1", toBson "mar"), ("name2", toBson "tha")]) -whereClause :: String -> Maybe BsonDoc -> BsonDoc -whereClause qry Nothing = toBsonDoc [("$where", BsonJSCode (s2L qry))] -whereClause qry (Just scope) = - toBsonDoc [("$where", BsonJSCodeWScope (s2L qry) scope)] - -data Hdr = Hdr { - hMsgLen :: Int32, - -- hReqID :: Int32, - hRespTo :: Int32, - hOp :: Opcode - } deriving (Show) - -data Reply = Reply { - rRespFlags :: Int32, - rCursorID :: Int64 - -- rStartFrom :: Int32, - -- rNumReturned :: Int32 - } deriving (Show) - -getHeader :: Handle -> IO Hdr -getHeader h = do - hdrBytes <- L.hGet h 16 - return $ flip runGet hdrBytes $ do - msgLen <- getI32 - skip 4 -- reqID <- getI32 - respTo <- getI32 - op <- getI32 - return $ Hdr msgLen respTo $ toOpcode op - -getReply :: Handle -> IO Reply -getReply h = do - replyBytes <- L.hGet h 20 - return $ flip runGet replyBytes $ do - respFlags <- getI32 - cursorID <- getI64 - skip 4 -- startFrom <- getI32 - skip 4 -- numReturned <- getI32 - return $ Reply respFlags cursorID - - --- | Return one document or Nothing if there are no more. --- Automatically closes the cursor when last document is read -nextDoc :: Cursor -> IO (Maybe BsonDoc) -nextDoc cur = do - closed <- readIORef $ curClosed cur - if closed - then return Nothing - else do - docBytes <- readIORef $ curDocBytes cur - cid <- readIORef $ curID cur - case L.length docBytes of - 0 -> if cid == 0 - then writeIORef (curClosed cur) True >> return Nothing - else getMore cur - _ -> do - let (doc, docBytes') = getFirstDoc docBytes - writeIORef (curDocBytes cur) docBytes' - return $ Just doc - --- | Return a lazy list of all (of the rest) of the documents in the --- cursor. This works much like hGetContents--it will lazily read the --- cursor data out of the database as the list is used. The cursor is --- automatically closed when the list has been fully read. --- --- If you manually finish the cursor before consuming off this list --- you won't get all the original documents in the cursor. --- --- If you don't consume to the end of the list, you must manually --- close the cursor or you will leak the cursor, which may also leak --- on the database side. -allDocs :: Cursor -> IO [BsonDoc] -allDocs cur = unsafeInterleaveIO $ do - doc <- nextDoc cur - case doc of - Nothing -> return [] - Just d -> liftM (d :) (allDocs cur) - --- | Returns a strict list of all (of the rest) of the documents in --- the cursor. This means that all of the documents will immediately --- be read out of the database and loaded into memory. -allDocs' :: Cursor -> IO [BsonDoc] -allDocs' cur = do - doc <- nextDoc cur - case doc of - Nothing -> return [] - Just d -> liftM (d :) (allDocs' cur) - -getFirstDoc :: L.ByteString -> (BsonDoc, L.ByteString) -getFirstDoc docBytes = flip runGet docBytes $ do - doc <- getBsonDoc - docBytes' <- getRemainingLazyByteString - return (doc, docBytes') - -getMore :: Cursor -> IO (Maybe BsonDoc) -getMore cur = do - h <- getHandle $ curCon cur - - cid <- readIORef $ curID cur - let body = runPut $ do - putI32 0 - putCol $ curCol cur - putI32 $ curNumToRet cur - putI64 cid - (reqID, msg) <- packMsg (curCon cur) OPGetMore body - L.hPut h msg - - hdr <- getHeader h - assert (OPReply == hOp hdr) $ return () - assert (hRespTo hdr == reqID) $ return () - reply <- getReply h - assert (rRespFlags reply == 0) $ return () - case rCursorID reply of - 0 -> writeIORef (curID cur) 0 - ncid -> assert (ncid == cid) $ return () - docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20) - case L.length docBytes of - 0 -> writeIORef (curClosed cur) True >> return Nothing - _ -> do - let (doc, docBytes') = getFirstDoc docBytes - writeIORef (curDocBytes cur) docBytes' - return $ Just doc - --- | Manually close a cursor -- usually not needed if you use --- 'allDocs', 'allDocs'', or 'nextDoc'. -finish :: Cursor -> IO () -finish cur = do - h <- getHandle $ curCon cur - cid <- readIORef $ curID cur - unless (cid == 0) $ do - let body = runPut $ do - putI32 0 - putI32 1 - putI64 cid - (_reqID, msg) <- packMsg (curCon cur) OPKillCursors body - L.hPut h msg - writeIORef (curClosed cur) True - return () - --- | The field key to index on. -type Key = L8.ByteString - --- | Direction to index. -data Direction = Ascending - | Descending - deriving (Show, Eq) - -fromDirection :: Direction -> Int -fromDirection Ascending = 1 -fromDirection Descending = - 1 - --- | Should this index guarantee uniqueness? -type Unique = Bool - --- | Create a new index on /FullCollection/ on the list of /Key/ / --- /Direction/ pairs. -createIndex :: Connection -> FullCollection -> - [(Key, Direction)] -> Unique -> IO L8.ByteString -createIndex c col keys uniq = do - let (db, _col') = splitFullCol col - name = indexName keys - keysDoc = flip fmap keys $ - \(k, d) -> (k, toBson $ fromDirection d :: BsonValue) - _ <- insert c (L.append db $ s2L ".system.indexes") $ - toBsonDoc [("name", toBson name), - ("ns", toBson col), - ("key", toBson keysDoc), - ("unique", toBson uniq)] - return name - --- | Drop the specified index on the given /FullCollection/. -dropIndex :: Connection -> FullCollection -> [(Key, Direction)] -> IO () -dropIndex c col keys = do - let (db, col') = splitFullCol col - name = indexName keys - _ <- runCommand c db $ toBsonDoc [("deleteIndexes", toBson col'), - ("index", toBson name)] - return () - --- | Drop all indexes on /FullCollection/. -dropIndexes :: Connection -> FullCollection -> IO () -dropIndexes c col = do - let (db, col') = splitFullCol col - _ <- runCommand c db $ toBsonDoc [("deleteIndexes", toBson col'), - ("index", toBson "*")] - return () - --- | Return a BsonDoc describing the existing indexes on /FullCollection/. --- --- With the current server versions (1.2) this will return documents --- such as: --- --- > {"key": {"lastname": -1, "firstname": 1}, --- > "name": "lastname_-1_firstname_1", --- > "ns": "mydb.people", --- > "unique": true} --- --- Which is a single key that indexes on @lastname@ (descending) and --- then @firstname@ (ascending) on the collection @people@ of the --- database @mydb@ with a uniqueness requirement. -indexInformation :: Connection -> FullCollection -> IO [BsonDoc] -indexInformation c col = do - let (db, _col') = splitFullCol col - quickFind' c (L.append db $ s2L ".system.indexes") $ - toBsonDoc [("ns", toBson col)] - -indexName :: [(Key, Direction)] -> L8.ByteString -indexName = L.intercalate (s2L "_") . List.map partName - where partName (k, Ascending) = L.append k $ s2L "_1" - partName (k, Descending) = L.append k $ s2L "_-1" - -putCol :: Collection -> Put -putCol col = putLazyByteString col >> putNull - -packMsg :: Connection -> Opcode -> L.ByteString -> IO (RequestID, L.ByteString) -packMsg c op body = do - reqID <- randNum c - let msg = runPut $ do - putI32 $ fromIntegral $ L.length body + 16 - putI32 reqID - putI32 0 - putI32 $ fromOpcode op - putLazyByteString body - return (reqID, msg) - -randNum :: Connection -> IO Int32 -randNum Connection { cRand = nsRef } = atomicModifyIORef nsRef $ \ns -> - (List.tail ns, - fromIntegral $ List.head ns) - -s2L :: String -> L8.ByteString -s2L = L8.fromString - -validateCollectionName :: FullCollection -> IO (Database, Collection) -validateCollectionName col = do - let (db, col') = splitFullCol col - when (s2L ".." `List.elem` L.group col) $ - throwColInvalid $ "Collection can't contain \"..\": " ++ show col - when (c2w '$' `L.elem` col && - not (s2L "oplog.$mail" `L.isPrefixOf` col' || - s2L "$cmd" `L.isPrefixOf` col')) $ - throwColInvalid $ "Collection can't contain '$': " ++ show col - when (L.head col == c2w '.' || L.last col == c2w '.') $ - throwColInvalid $ "Collection can't start or end with '.': " ++ show col - return (db, col') - -fromLookup :: Maybe a -> a -fromLookup (Just m) = m -fromLookup Nothing = throwColInvalid "cannot find key" +-- | Client interface to MongoDB server(s) + +module Database.MongoDB ( + module Data.Bson, + module Database.MongoDB.Connection, + module Database.MongoDB.Query, + module Database.MongoDB.Admin +) where + +import Data.Bson +import Database.MongoDB.Connection +import Database.MongoDB.Query +import Database.MongoDB.Admin diff --git a/Database/MongoDB/Admin.hs b/Database/MongoDB/Admin.hs new file mode 100644 index 0000000..40beeac --- /dev/null +++ b/Database/MongoDB/Admin.hs @@ -0,0 +1,283 @@ +-- | Database administrative functions + +{-# LANGUAGE OverloadedStrings, RecordWildCards #-} + +module Database.MongoDB.Admin ( + -- * Admin + -- ** Collection + CollectionOption(..), createCollection, renameCollection, dropCollection, validateCollection, + -- ** Index + Index(..), IndexName, index, ensureIndex, createIndex, dropIndex, getIndexes, dropIndexes, + -- ** User + allUsers, addUser, removeUser, + -- ** Database + cloneDatabase, copyDatabase, dropDatabase, repairDatabase, + -- ** Server + serverBuildInfo, serverVersion, + -- * Diagnotics + -- ** Collection + collectionStats, dataSize, storageSize, totalIndexSize, totalSize, + -- ** Profiling + ProfilingLevel, getProfilingLevel, MilliSec, setProfilingLevel, + -- ** Database + dbStats, OpNum, currentOp, killOp, + -- ** Server + serverStatus +) where + +import Prelude hiding (lookup) +import Control.Applicative ((<$>)) +import Database.MongoDB.Internal.Protocol (pwHash, pwKey) +import Database.MongoDB.Connection (Server, showHostPort, Conn) +import Database.MongoDB.Query +import Data.Bson +import Data.UString (pack, unpack, append, intercalate) +import Control.Monad.Reader +import qualified Data.HashTable as T +import Data.IORef +import qualified Data.Set as S +import System.IO.Unsafe (unsafePerformIO) +import Control.Concurrent (forkIO, threadDelay) +import Database.MongoDB.Util ((<.>), true1) + +-- * Admin + +-- ** Collection + +data CollectionOption = Capped | MaxByteSize Int | MaxItems Int deriving (Show, Eq) + +coptElem :: CollectionOption -> Field +coptElem Capped = "capped" =: True +coptElem (MaxByteSize n) = "size" =: n +coptElem (MaxItems n) = "max" =: n + +createCollection :: (Conn m) => [CollectionOption] -> Collection -> Db m Document +-- ^ Create collection with given options. You only need to call this to set options, otherwise a collection is created automatically on first use with no options. +createCollection opts col = runCommand $ ["create" =: col] ++ map coptElem opts + +renameCollection :: (Conn m) => Collection -> Collection -> Db m Document +-- ^ Rename first collection to second collection +renameCollection from to = ReaderT $ \db -> useDb "admin" $ + runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True] + +dropCollection :: (Conn m) => Collection -> Db m Bool +-- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action). +dropCollection coll = do + resetIndexCache + r <- runCommand ["drop" =: coll] + if true1 "ok" r then return True else do + if at "errmsg" r == ("ns not found" :: UString) then return False else + fail $ "dropCollection failed: " ++ show r + +validateCollection :: (Conn m) => Collection -> Db m Document +-- ^ This operation takes a while +validateCollection coll = runCommand ["validate" =: coll] + +-- ** Index + +type IndexName = UString + +data Index = Index { + iColl :: Collection, + iKey :: Order, + iName :: IndexName, + iUnique :: Bool, + iDropDups :: Bool + } deriving (Show, Eq) + +idxDocument :: Index -> Database -> Document +idxDocument Index{..} db = [ + "ns" =: db <.> iColl, + "key" =: iKey, + "name" =: iName, + "unique" =: iUnique, + "dropDups" =: iDropDups ] + +index :: Collection -> Order -> Index +-- ^ Spec of index of ordered keys on collection. Name is generated from keys. Unique and dropDups are False. +index coll keys = Index coll keys (genName keys) False False + +genName :: Order -> IndexName +genName keys = intercalate "_" (map f keys) where + f (k := v) = k `append` "_" `append` pack (show v) + +ensureIndex :: (Conn m) => Index -> Db m () +-- ^ Create index if we did not already create one. May be called repeatedly with practically no performance hit, because we remember if we already called this for the same index (although this memory gets wiped out every 15 minutes, in case another client drops the index and we want to create it again). +ensureIndex idx = let k = (iColl idx, iName idx) in do + icache <- fetchIndexCache + set <- liftIO (readIORef icache) + unless (S.member k set) . runDbOp $ do + createIndex idx + me <- getLastError + case me of + Nothing -> liftIO $ writeIORef icache (S.insert k set) + Just (c, e) -> fail $ "createIndex failed: (" ++ show c ++ ") " ++ e + +createIndex :: (Conn m) => Index -> Db m () +-- ^ Create index on the server. This call goes to the server every time. +createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase + +dropIndex :: (Conn m) => Collection -> IndexName -> Db m Document +-- ^ Remove the index +dropIndex coll idxName = do + resetIndexCache + runCommand ["deleteIndexes" =: coll, "index" =: idxName] + +getIndexes :: (Conn m) => Collection -> Db m [Document] +-- ^ Get all indexes on this collection +getIndexes coll = do + db <- thisDatabase + rest =<< find (query ["ns" =: db <.> coll] "system.indexes") + +dropIndexes :: (Conn m) => Collection -> Db m Document +-- ^ Drop all indexes on this collection +dropIndexes coll = do + resetIndexCache + runCommand ["deleteIndexes" =: coll, "index" =: ("*" :: UString)] + +-- *** Index cache + +type DbIndexCache = T.HashTable Database IndexCache +-- ^ Cache the indexes we create so repeatedly calling ensureIndex only hits database the first time. Clear cache every once in a while so if someone else deletes index we will recreate it on ensureIndex. + +type IndexCache = IORef (S.Set (Collection, IndexName)) + +dbIndexCache :: DbIndexCache +-- ^ initialize cache and fork thread that clears it every 15 minutes +dbIndexCache = unsafePerformIO $ do + table <- T.new (==) (T.hashString . unpack) + _ <- forkIO . forever $ threadDelay 900000000 >> clearDbIndexCache + return table +{-# NOINLINE dbIndexCache #-} + +clearDbIndexCache :: IO () +clearDbIndexCache = do + keys <- map fst <$> T.toList dbIndexCache + mapM_ (T.delete dbIndexCache) keys + +fetchIndexCache :: (Conn m) => Db m IndexCache +-- ^ Get index cache for current database +fetchIndexCache = ReaderT $ \db -> liftIO $ do + mc <- T.lookup dbIndexCache db + maybe (newIdxCache db) return mc + where + newIdxCache db = do + idx <- newIORef S.empty + T.insert dbIndexCache db idx + return idx + +resetIndexCache :: (Conn m) => Db m () +-- ^ reset index cache for current database +resetIndexCache = do + icache <- fetchIndexCache + liftIO (writeIORef icache S.empty) + +-- ** User + +allUsers :: (Conn m) => Db m [Document] +-- ^ Fetch all users of this database +allUsers = map (exclude ["_id"]) <$> (rest =<< find + (query [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]}) + +addUser :: (Conn m) => Bool -> Username -> Password -> Db m () +-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False +addUser readOnly user pass = do + mu <- findOne (query ["user" =: user] "system.users") + let u = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu) + save "system.users" u + +removeUser :: (Conn m) => Username -> Db m () +removeUser user = delete (Select ["user" =: user] "system.users") + +-- ** Database + +cloneDatabase :: (Conn m) => Database -> Server -> m Document +-- ^ Copy database from given server to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case). +cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost] + +copyDatabase :: (Conn m) => Database -> Server -> Maybe (Username, Password) -> Database -> m Document +-- ^ Copy database from given server to the server I am connected to. If username & password is supplied use them to read from given server. +copyDatabase fromDb fromHost mup toDb = do + let c = ["copydb" =: (1 :: Int), "fromhost" =: showHostPort fromHost, "fromdb" =: fromDb, "todb" =: toDb] + useDb "admin" $ case mup of + Nothing -> runCommand c + Just (u, p) -> do + n <- at "nonce" <$> runCommand ["copydbgetnonce" =: (1 :: Int), "fromhost" =: showHostPort fromHost] + runCommand $ c ++ ["username" =: u, "nonce" =: n, "key" =: pwKey n u p] + +dropDatabase :: (Conn m) => Database -> m Document +-- ^ Delete the given database! +dropDatabase db = useDb db $ runCommand ["dropDatabase" =: (1 :: Int)] + +repairDatabase :: (Conn m) => Database -> m Document +-- ^ Attempt to fix any corrupt records. This operation takes a while. +repairDatabase db = useDb db $ runCommand ["repairDatabase" =: (1 :: Int)] + +-- ** Server + +serverBuildInfo :: (Conn m) => m Document +serverBuildInfo = useDb "admin" $ runCommand ["buildinfo" =: (1 :: Int)] + +serverVersion :: (Conn m) => m UString +serverVersion = at "version" <$> serverBuildInfo + +-- * Diagnostics + +-- ** Collection + +collectionStats :: (Conn m) => Collection -> Db m Document +collectionStats coll = runCommand ["collstats" =: coll] + +dataSize :: (Conn m) => Collection -> Db m Int +dataSize c = at "size" <$> collectionStats c + +storageSize :: (Conn m) => Collection -> Db m Int +storageSize c = at "storageSize" <$> collectionStats c + +totalIndexSize :: (Conn m) => Collection -> Db m Int +totalIndexSize c = at "totalIndexSize" <$> collectionStats c + +totalSize :: (Conn m) => Collection -> Db m Int +totalSize coll = do + x <- storageSize coll + xs <- mapM isize =<< getIndexes coll + return (foldl (+) x xs) + where + isize idx = at "storageSize" <$> collectionStats (coll `append` ".$" `append` at "name" idx) + +-- ** Profiling + +data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq) + +getProfilingLevel :: (Conn m) => Db m ProfilingLevel +getProfilingLevel = toEnum . at "was" <$> runCommand ["profile" =: (-1 :: Int)] + +type MilliSec = Int + +setProfilingLevel :: (Conn m) => ProfilingLevel -> Maybe MilliSec -> Db m () +setProfilingLevel p mSlowMs = + runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return () + +-- ** Database + +dbStats :: (Conn m) => Db m Document +dbStats = runCommand ["dbstats" =: (1 :: Int)] + +currentOp :: (Conn m) => Db m (Maybe Document) +-- ^ See currently running operation on the database, if any +currentOp = findOne (query [] "$cmd.sys.inprog") + +type OpNum = Int + +killOp :: (Conn m) => OpNum -> Db m (Maybe Document) +killOp op = findOne (query ["op" =: op] "$cmd.sys.killop") + +-- ** Server + +serverStatus :: (Conn m) => m Document +serverStatus = useDb "admin" $ runCommand ["serverStatus" =: (1 :: Int)] + + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Connection.hs b/Database/MongoDB/Connection.hs new file mode 100644 index 0000000..82ac14b --- /dev/null +++ b/Database/MongoDB/Connection.hs @@ -0,0 +1,174 @@ +{- | A replica set is a set of servers that mirror each other (a non-replicated server can act like a replica set of one). One server in a replica set is the master and the rest are slaves. When the master goes down, one of the slaves becomes master. The ReplicaSet object in this client maintains a list of servers that it currently knows are in the set. It refreshes this list every time it establishes a new connection with one of the servers in the set. Each server in the set knows who the other member in the set are, and who is master. The user asks the ReplicaSet object for a new master or slave connection. When a connection fails, the user must ask the ReplicaSet for a new connection (which most likely will connect to another server since the previous one failed). When you loose a connection you loose all session state that was stored with that connection on the server, which includes open cursors and temporary map-reduce output collections. Attempting to read from a lost cursor (on a new connection) will only returning the remaining documents in the last batch returned to this client. It will not fetch the remaining documents from the server. Likewise, attempting to read a lost map-reduce output will return an empty set of documents. Notice, in both cases, no error is raised, just empty results. -} + +{-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-} + +module Database.MongoDB.Connection ( + -- * Server + I.Server(..), PortID(..), server, showHostPort, readHostPort, readHostPortF, + -- * ReplicaSet + ReplicaSet, replicaSet, replicaServers, + MasterOrSlave(..), FailedToConnect, newConnection, + -- * Connection + I.Connection, I.connServer, I.showHandle, + connect, I.closeConnection, I.isClosed, + -- * Connected monad + I.Conn(..), I.Failure(..), + -- ** Task + I.Task, I.runTask, + -- ** Op + I.Op +) where + +import Database.MongoDB.Internal.Connection as I +import Database.MongoDB.Query (useDb, runCommand1) +import Control.Applicative ((<$>)) +import Control.Arrow ((+++), left) +import Control.Exception (assert) +import System.IO.Error as E (try) +import Control.Monad.Error +import Data.IORef +import Network (HostName, PortID(..), connectTo) +import Data.Bson (Document, look, typed) +import Text.ParserCombinators.Parsec as P (parse, many1, letter, digit, char, eof, spaces, try, (<|>)) +import Control.Monad.Identity +import Database.MongoDB.Util (true1) -- PortID instances + +-- * Server + +defaultPort :: PortID +defaultPort = PortNumber 27017 + +server :: HostName -> Server +-- ^ Server on default MongoDB port +server host = Server host defaultPort + +showHostPort :: Server -> String +-- ^ Display server as \"host:port\" +showHostPort (Server host port) = host ++ ":" ++ (case port of + Service s -> s + PortNumber p -> show p + UnixSocket s -> s) + +readHostPortF :: (Monad m) => String -> m Server +-- ^ Read string \"host:port\" as 'Server host port' or \"host\" as 'server host' (default port). Fail if string does not match either syntax. +readHostPortF = either (fail . show) return . parse parser "readHostPort" where + hostname = many1 (letter <|> digit <|> char '-' <|> char '.') + parser = do + spaces + host <- hostname + P.try (spaces >> eof >> return (server host)) <|> do + _ <- char ':' + port :: Int <- read <$> many1 digit + spaces >> eof + return $ Server host (PortNumber $ fromIntegral port) + +readHostPort :: String -> Server +-- ^ Read string \"host:port\" as 'Server host port' or \"host\" as 'server host' (default port). Error if string does not match either syntax. +readHostPort = runIdentity . readHostPortF + +-- * Replica Set + +newtype ReplicaSet = ReplicaSet (IORef [Server]) +-- ^ Reference to a replica set of servers. Ok if really not a replica set and just a stand-alone server, in which case it acts like a replica set of one. + +replicaSet :: [Server] -> IO ReplicaSet +-- ^ Create a reference to a replica set with servers as the initial seed list (a subset of the servers in the replica set) +replicaSet s = assert (not $ null s) (ReplicaSet <$> newIORef s) + +replicaServers :: ReplicaSet -> IO [Server] +-- ^ Return current list of known servers in replica set. This list is updated on every 'newConnection'. +replicaServers (ReplicaSet ref) = readIORef ref + +-- * Replica Info + +data ReplicaInfo = ReplicaInfo Server Document deriving (Show, Eq) +-- ^ Configuration info of a server in a replica set. Contains all the servers in the replica set plus its role in that set (master, slave, or arbiter) + +isMaster :: ReplicaInfo -> Bool +-- ^ Is the replica server described by this info a master/primary (not slave or arbiter)? +isMaster (ReplicaInfo _ i) = true1 "ismaster" i + +isSlave :: ReplicaInfo -> Bool +-- ^ Is the replica server described by this info a slave/secondary (not master or arbiter) +isSlave = not . isMaster -- TODO: distinguish between slave and arbiter + +allReplicas :: ReplicaInfo -> [Server] +-- ^ All replicas in set according to this replica configuration info. +-- If server is stand-alone then it won't have \"hosts\" in it configuration, in which case we return the server by itself. +allReplicas (ReplicaInfo s i) = maybe [s] (map readHostPort . typed) (look "hosts" i) + +sortedReplicas :: ReplicaInfo -> IO [Server] +-- ^ All replicas in set sorted by distance from this client. TODO +sortedReplicas = return . allReplicas + +getReplicaInfo' :: Connection -> IO (Either IOError ReplicaInfo) +-- ^ Get replica info of the connected server. Return Left IOError if connection fails +getReplicaInfo' conn = left err <$> runTask getReplicaInfo conn where + err (ConnectionFailure e) = e + err (ServerFailure s) = userError s + +getReplicaInfo :: (Conn m) => m ReplicaInfo +-- ^ Get replica info of connect server +getReplicaInfo = do + c <- getConnection + ReplicaInfo (connServer c) <$> useDb "admin" (runCommand1 "ismaster") + +-- * MasterOrSlave + +data MasterOrSlave = + Master -- ^ connect to master only + | SlaveOk -- ^ connect to a slave, or master if no slave available + deriving (Show, Eq) + +isMS :: MasterOrSlave -> ReplicaInfo -> Bool +-- ^ Does the server (as described by its info) match the master/slave type +isMS Master i = isMaster i +isMS SlaveOk i = isSlave i || isMaster i + +-- * Connection + +type FailedToConnect = [(Server, IOError)] +-- ^ All servers tried in replica set along with reason why each failed to connect + +newConnection :: MasterOrSlave -> ReplicaSet -> IO (Either FailedToConnect Connection) +-- ^ Create a connection to a master or slave in the replica set. Don't forget to close connection when you are done using it even if Failure exception is raised when using it. newConnection returns Left if failed to connect to any server in replica set. +-- TODO: prefer slave over master when SlaveOk and both are available. +newConnection mos (ReplicaSet vServers) = do + servers <- readIORef vServers + e <- connectFirst mos servers + case e of + Right (conn, info) -> do + writeIORef vServers =<< sortedReplicas info + return (Right conn) + Left (fs, is) -> if null is + then return (Left fs) + else do + replicas <- sortedReplicas (head is) + writeIORef vServers replicas + (fst +++ fst) <$> connectFirst mos replicas + +connectFirst :: MasterOrSlave -> [Server] -> IO (Either ([(Server, IOError)], [ReplicaInfo]) (Connection, ReplicaInfo)) +-- ^ Connect to first server that succeeds and is master/slave, otherwise return list of failed connections plus info of connections that succeeded but were not master/slave. +connectFirst mos = connectFirst' ([], []) where + connectFirst' (fs, is) [] = return $ Left (fs, is) + connectFirst' (fs, is) (s : ss) = do + e <- runErrorT $ do + c <- ErrorT (connect s) + i <- ErrorT (getReplicaInfo' c) + return (c, i) + case e of + Left f -> connectFirst' ((s, f) : fs, is) ss + Right (c, i) -> if isMS mos i + then return $ Right (c, i) + else do + closeConnection c + connectFirst' ((s, userError $ "not a " ++ show mos) : fs, i : is) ss + +connect :: Server -> IO (Either IOError Connection) +-- ^ Create a connection to the given server (as opposed to connecting to some server in a replica set via 'newConnection'). Return Left IOError if failed to connect. +connect s@(Server host port) = E.try (mkConnection s =<< connectTo host port) + + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Internal/Connection.hs b/Database/MongoDB/Internal/Connection.hs new file mode 100644 index 0000000..0d73b40 --- /dev/null +++ b/Database/MongoDB/Internal/Connection.hs @@ -0,0 +1,148 @@ +{-| Low-level connection to a server. + +This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Connection" instead. -} + +{-# LANGUAGE GeneralizedNewtypeDeriving, TupleSections, TypeSynonymInstances, OverlappingInstances #-} + +module Database.MongoDB.Internal.Connection ( + -- * Server + Server(..), + -- * Connection + Connection, connServer, showHandle, mkConnection, withConn, closeConnection, isClosed, + -- * Connected monad + Conn(..), Failure(..), + -- ** Task + Task, runTask, + -- ** Op + Op, sendBytes, flushBytes, receiveBytes, + exposeIO, hideIO +) where + +import Control.Applicative (Applicative(..), (<$>)) +import Control.Arrow (left) +import System.IO.Error (try) +import Control.Concurrent.MVar +import Control.Monad.Reader +import Control.Monad.Error +import Network (HostName, PortID(..)) +import System.IO (Handle, hFlush, hClose, hIsClosed) +import Data.ByteString.Lazy as B (ByteString, hPut) +import System.Timeout +import Database.MongoDB.Util (Secs, ignore, hGetN) -- Reader/Error Applicative instances + +-- * Server + +data Server = Server HostName PortID deriving (Show, Eq, Ord) + +-- * Connection + +data Connection = Connection Server (MVar Handle) deriving (Eq) +-- ^ A connection to a server. This connection holds session state on the server like open cursors and temporary map-reduce tables that disappear when the connection is closed or fails. + +connServer :: Connection -> Server +-- ^ Server this connection is connected to +connServer (Connection serv _) = serv + +showHandle :: Secs -> Connection -> IO String +-- ^ Show handle if not locked for more than given seconds +showHandle secs (Connection _ vHand) = + maybe "handle currently locked" show <$> timeout (round (secs * 1000000)) (readMVar vHand) + +instance Show Connection where + showsPrec d c = showParen (d > 10) $ showString "a connection to " . showsPrec 11 (connServer c) + +mkConnection :: Server -> Handle -> IO Connection +-- ^ Wrap handle in a MVar to control access +mkConnection s h = Connection s <$> newMVar h + +withConn :: Connection -> (Handle -> IO a) -> IO a +-- Execute IO action with exclusive access to TCP connection +withConn (Connection _ vHand) = withMVar vHand + +closeConnection :: Connection -> IO () +-- ^ Close connection. Attempting to read or write to a closed connection will raise 'Failure' exception. +closeConnection (Connection _ vHand) = withMVar vHand $ \h -> catch (hClose h) ignore + +isClosed :: Connection -> IO Bool +-- ^ Is connection closed? +isClosed (Connection _ vHand) = withMVar vHand hIsClosed + +-- * Task + +-- | Connection or Server failure like network problem or disk full +data Failure = + ConnectionFailure IOError + -- ^ Error during sending or receiving bytes over a 'Connection'. The connection is not automatically closed when this error happens; the user must close it. Any other IOErrors raised during a Task or Op are not caught. The user is responsible for these other types of errors not related to sending/receiving bytes over the connection. + | ServerFailure String + -- ^ Failure on server, like disk full, which is usually observed using getLastError. Calling 'fail' inside a Task or Op raises this failure. Do not call 'fail' unless it is a temporary server failure, like disk full. For example, receiving unexpected data from the server is not a server failure, rather it is a programming error (you should call 'error' in this case) because the client and server are incompatible and requires a programming change. + deriving (Show, Eq) + +instance Error Failure where strMsg = ServerFailure + +type Task m = ErrorT Failure (ReaderT Connection m) +-- ^ Action with shared access to connection (the connection can be supplied to multiple concurrent tasks). m must be a MonadIO. + +runTask :: Task m a -> Connection -> m (Either Failure a) +-- ^ Run task with shared access to connection. Return Left if connection fails anytime during its execution, in which case the task was partially executed. +runTask = runReaderT . runErrorT + +-- * Op + +newtype Op a = Op (ErrorT Failure (ReaderT (Connection, Handle) IO) a) + deriving (Functor, Applicative, Monad, MonadIO, MonadError Failure) +-- ^ Action with exclusive access to connection (other ops must wait) + +runOp' :: (MonadIO m) => Op a -> Task m a +-- ^ Run operation with exclusive access to connection. Fail if connection fails anytime during its execution, in which case the operation was partially executed. +runOp' (Op act) = ErrorT . ReaderT $ \conn -> + liftIO . withConn conn $ runReaderT (runErrorT act) . (conn,) + +sendBytes :: ByteString -> Op () +-- ^ Put bytes on socket +sendBytes bytes = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hPut h bytes) + +flushBytes :: Op () +-- ^ Flush socket +flushBytes = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hFlush h) + +receiveBytes :: Int -> Op ByteString +-- ^ Get N bytes from socket, blocking until all N bytes are received +receiveBytes n = Op . ErrorT . ReaderT $ \(_, h) -> left ConnectionFailure <$> try (hGetN h n) + +exposeIO :: ((Connection, Handle) -> IO (Either Failure a)) -> Op a +-- ^ Expose connection to underlying IO +exposeIO = Op . ErrorT . ReaderT + +hideIO :: Op a -> (Connection, Handle) -> IO (Either Failure a) +-- ^ Run op from IO +hideIO (Op act) = runReaderT (runErrorT act) + +-- * Connected monad + +-- | A monad with shared or exclusive access to a connection, ie. 'Task' or 'Op' +class (Functor m, Applicative m, MonadIO m) => Conn m where + runOp :: Op a -> m a + -- ^ Run op with exclusive access to connection. If @m@ is already exclusive then simply run op. + getConnection :: m Connection + -- ^ Return connection that this monad has access to + +instance (MonadIO m) => Conn (Task m) where + runOp = runOp' + getConnection = ask + +instance Conn Op where + runOp = id + getConnection = Op (asks fst) + +instance (Conn m) => Conn (ReaderT r m) where + runOp = lift . runOp + getConnection = lift getConnection + +instance (Conn m, Error e) => Conn (ErrorT e m) where + runOp = lift . runOp + getConnection = lift getConnection + + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs new file mode 100644 index 0000000..d2955a8 --- /dev/null +++ b/Database/MongoDB/Internal/Protocol.hs @@ -0,0 +1,296 @@ +{-| Low-level messaging between this client and the MongoDB server. See Mongo Wire Protocol (). + +This module is not intended for direct use. Use the high-level interface at "Database.MongoDB.Query" instead. -} + +{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-} + +module Database.MongoDB.Internal.Protocol ( + -- * FullCollection + FullCollection, + -- * Write + Insert(..), insert, + Update(..), UpdateOption(..), update, + Delete(..), DeleteOption(..), delete, + -- * Read + Query(..), QueryOption(..), query, + GetMore(..), getMore, + -- ** Reply + Reply(..), + -- ** Cursor + CursorId, killCursors, + -- * Authentication + Username, Password, Nonce, pwHash, pwKey +) where + +import Prelude as P +import Database.MongoDB.Internal.Connection (Op, sendBytes, flushBytes, receiveBytes) +import Data.Bson +import Data.Bson.Binary +import Data.UString as U (pack, append, toByteString) +import Data.ByteString.Lazy as B (length, append) +import Data.Binary.Put +import Data.Binary.Get +import Data.Int +import Data.Bits +import Control.Monad.Reader +import Control.Applicative ((<$>)) +import Data.IORef +import System.IO.Unsafe (unsafePerformIO) +import Data.Digest.OpenSSL.MD5 (md5sum) +import Database.MongoDB.Util (bitOr, (<.>)) + +-- * Authentication + +type Username = UString +type Password = UString +type Nonce = UString + +pwHash :: Username -> Password -> UString +pwHash u p = pack . md5sum . toByteString $ u `U.append` ":mongo:" `U.append` p + +pwKey :: Nonce -> Username -> Password -> UString +pwKey n u p = pack . md5sum . toByteString . U.append n . U.append u $ pwHash u p + +-- * FullCollection + +type FullCollection = UString +-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\" + +-- * Request / response + +insert :: Insert -> Op () +-- ^ Insert documents into collection +insert = send_ . putInsert + +update :: Update -> Op () +-- ^ Update documents in collection matching selector using updater +update = send_ . putUpdate + +delete :: Delete -> Op () +-- ^ Delete documents in collection matching selector +delete = send_ . putDelete + +killCursors :: [CursorId] -> Op () +-- ^ Close cursors on server because we will not be getting anymore documents from them +killCursors = send_ . putKillCursors . KillCursors + +query :: Query -> Op Reply +-- ^ Return first batch of documents in collection matching selector and a cursor-id for getting remaining documents (see 'getMore') +query q = do + requestId <- send (putQuery q) + (reply, responseTo) <- receive getReply + unless (responseTo == requestId) $ fail "expected response id to match query request id" + return reply + +getMore :: GetMore -> Op Reply +-- ^ Get next batch of documents from cursor +getMore g = do + requestId <- send (putGetMore g) + (reply, responseTo) <- receive getReply + unless (responseTo == requestId) $ fail "expected response id to match get-more request id" + return reply + +-- ** Send / receive + +type RequestId = Int32 +-- ^ A fresh request id is generated for every message + +genRequestId :: IO RequestId +-- ^ Generate fresh request id +genRequestId = atomicModifyIORef counter $ \n -> (n + 1, n) where + counter :: IORef RequestId + counter = unsafePerformIO (newIORef 0) + {-# NOINLINE counter #-} + +type ResponseTo = RequestId + +send_ :: (RequestId -> Put) -> Op () +send_ x = send x >> return () + +send :: (RequestId -> Put) -> Op RequestId +send rput = do + requestId <- liftIO genRequestId + let bytes = runPut (rput requestId) + let lengthBytes = runPut . putInt32 $ (toEnum . fromEnum) (B.length bytes + 4) + sendBytes (B.append lengthBytes bytes) + flushBytes + return requestId + +receive :: Get a -> Op a +receive getMess = do + messageLength <- fromIntegral . runGet getInt32 <$> receiveBytes 4 + runGet getMess <$> receiveBytes (messageLength - 4) + +-- * Messages + +data Insert = Insert { + iFullCollection :: FullCollection, + iDocuments :: [Document] + } deriving (Show, Eq) + +data Update = Update { + uFullCollection :: FullCollection, + uOptions :: [UpdateOption], + uSelector :: Document, + uUpdater :: Document + } deriving (Show, Eq) + +data UpdateOption = + Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found + | MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc + deriving (Show, Eq) + +data Delete = Delete { + dFullCollection :: FullCollection, + dOptions :: [DeleteOption], + dSelector :: Document + } deriving (Show, Eq) + +data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed + deriving (Show, Eq) + +data Query = Query { + qOptions :: [QueryOption], + qFullCollection :: FullCollection, + qSkip :: Int32, -- ^ Number of initial matching documents to skip + qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size. + qSelector :: Document, -- ^ \[\] = return all documents in collection + qProjector :: Document -- ^ \[\] = return whole document + } deriving (Show, Eq) + +data QueryOption = + TailableCursor | + SlaveOK | + NoCursorTimeout + deriving (Show, Eq) + +data GetMore = GetMore { + gFullCollection :: FullCollection, + gBatchSize :: Int32, + gCursorId :: CursorId + } deriving (Show, Eq) + +newtype KillCursors = KillCursors { + kCursorIds :: [CursorId] + } deriving (Show, Eq) + +data Reply = Reply { + rResponseFlag :: Int32, -- ^ 0 = success, non-zero = failure + rCursorId :: CursorId, -- ^ 0 = cursor finished + rStartingFrom :: Int32, + rDocuments :: [Document] + } deriving (Show, Eq) + +type CursorId = Int64 + +-- ** Messages binary format + +type Opcode = Int32 +-- ^ Code for each message type +replyOpcode, updateOpcode, insertOpcode, queryOpcode, getMoreOpcode, deleteOpcode, killCursorsOpcode :: Opcode +replyOpcode = 1 +updateOpcode = 2001 +insertOpcode = 2002 +queryOpcode = 2004 +getMoreOpcode = 2005 +deleteOpcode = 2006 +killCursorsOpcode = 2007 + +putUpdate :: Update -> RequestId -> Put +putUpdate Update{..} = putMessage updateOpcode $ do + putInt32 0 + putCString uFullCollection + putInt32 (uBits uOptions) + putDocument uSelector + putDocument uUpdater + +uBit :: UpdateOption -> Int32 +uBit Upsert = bit 0 +uBit MultiUpdate = bit 1 + +uBits :: [UpdateOption] -> Int32 +uBits = bitOr . map uBit + +putInsert :: Insert -> RequestId -> Put +putInsert Insert{..} = putMessage insertOpcode $ do + putInt32 0 + putCString iFullCollection + mapM_ putDocument iDocuments + +putDelete :: Delete -> RequestId -> Put +putDelete Delete{..} = putMessage deleteOpcode $ do + putInt32 0 + putCString dFullCollection + putInt32 (dBits dOptions) + putDocument dSelector + +dBit :: DeleteOption -> Int32 +dBit SingleRemove = bit 0 + +dBits :: [DeleteOption] -> Int32 +dBits = bitOr . map dBit + +putQuery :: Query -> RequestId -> Put +putQuery Query{..} = putMessage queryOpcode $ do + putInt32 (qBits qOptions) + putCString qFullCollection + putInt32 qSkip + putInt32 qBatchSize + putDocument qSelector + unless (null qProjector) (putDocument qProjector) + +qBit :: QueryOption -> Int32 +qBit TailableCursor = bit 1 +qBit SlaveOK = bit 2 +qBit NoCursorTimeout = bit 4 + +qBits :: [QueryOption] -> Int32 +qBits = bitOr . map qBit + +putGetMore :: GetMore -> RequestId -> Put +putGetMore GetMore{..} = putMessage getMoreOpcode $ do + putInt32 0 + putCString gFullCollection + putInt32 gBatchSize + putInt64 gCursorId + +putKillCursors :: KillCursors -> RequestId -> Put +putKillCursors KillCursors{..} = putMessage killCursorsOpcode $ do + putInt32 0 + putInt32 $ toEnum (P.length kCursorIds) + mapM_ putInt64 kCursorIds + +getReply :: Get (Reply, ResponseTo) +getReply = getMessage replyOpcode $ do + rResponseFlag <- getInt32 + rCursorId <- getInt64 + rStartingFrom <- getInt32 + numDocs <- getInt32 + rDocuments <- replicateM (fromIntegral numDocs) getDocument + return $ Reply {..} + +-- *** Message header + +putMessage :: Opcode -> Put -> RequestId -> Put +-- ^ Note, does not write message length (first int32), assumes caller will write it +putMessage opcode messageBodyPut requestId = do + putInt32 requestId + putInt32 0 + putInt32 opcode + messageBodyPut + +getMessage :: Opcode -> Get a -> Get (a, ResponseTo) +-- ^ Note, does not read message length (first int32), assumes it was already read +getMessage expectedOpcode getMessageBody = do + _requestId <- getInt32 + responseTo <- getInt32 + opcode <- getInt32 + unless (opcode == expectedOpcode) $ + fail $ "expected opcode " ++ show expectedOpcode ++ " but got " ++ show opcode + body <- getMessageBody + return (body, responseTo) + + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs new file mode 100644 index 0000000..ad8d0c0 --- /dev/null +++ b/Database/MongoDB/Query.hs @@ -0,0 +1,444 @@ +-- | Query and update documents residing on a MongoDB server(s) + +{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections #-} + +module Database.MongoDB.Query ( + -- * Database + Database, allDatabases, Db, useDb, thisDatabase, runDbOp, + -- ** Authentication + P.Username, P.Password, auth, + -- * Collection + Collection, allCollections, + -- ** Selection + Selection(..), select, Selector, whereJS, + -- * Write + -- ** Insert + insert, insert_, insertMany, insertMany_, + -- ** Update + save, replace, repsert, Modifier, modify, + -- ** Delete + delete, deleteOne, + -- * Read + -- ** Query + Query(..), P.QueryOption(..), Projector, Limit, Order, BatchSize, query, + explain, find, findOne, count, distinct, + -- *** Cursor + Cursor, next, nextN, rest, closeCursor, + -- ** Group + Group(..), GroupKey(..), group, + -- ** MapReduce + MapReduce(..), MapFun, ReduceFun, FinalizeFun, mapReduce, runMR, runMR', + -- * Command + Command, runCommand, runCommand1, + eval, + ErrorCode, getLastError, resetLastError +) where + +import Prelude as X hiding (lookup) +import Control.Applicative ((<$>)) +import Database.MongoDB.Internal.Connection +import qualified Database.MongoDB.Internal.Protocol as P +import Database.MongoDB.Internal.Protocol hiding (insert, update, delete, query, Query(Query)) +import Data.Bson +import Data.Word +import Data.Int +import Control.Monad.Reader +import Control.Concurrent.MVar +import Data.Maybe (listToMaybe, catMaybes) +import Data.UString as U (dropWhile, any, tail) +import Database.MongoDB.Util (loop, (<.>), true1) + +-- * Database + +type Database = UString +-- ^ Database name + +allDatabases :: (Conn m) => m [Database] +-- ^ List all databases residing on server +allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases") + +type Db m = ReaderT Database m + +useDb :: Database -> Db m a -> m a +-- ^ Run Db action against given database +useDb = flip runReaderT + +thisDatabase :: (Monad m) => Db m Database +-- ^ Current database in use +thisDatabase = ask + +runDbOp :: (Conn m) => Db Op a -> Db m a +-- ^ Run db operation with exclusive access to the connection +runDbOp dbOp = ReaderT (runOp . flip useDb dbOp) + +-- * Authentication + +auth :: (Conn m) => Username -> Password -> Db m Bool +-- ^ Authenticate with the database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new connection. +auth u p = do + n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)] + true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: u, "nonce" =: n, "key" =: pwKey n u p] + +-- * Collection + +type Collection = UString +-- ^ Collection name (not prefixed with database) + +allCollections :: (Conn m) => Db m [Collection] +-- ^ List all collections in this database +allCollections = do + db <- thisDatabase + docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]} + return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs + where + dropDbPrefix = U.tail . U.dropWhile (/= '.') + isSpecial db col = U.any (== '$') col && db <.> col /= "local.oplog.$main" + +-- * Selection + +data Selection = Select {selector :: Selector, coll :: Collection} deriving (Show, Eq) +-- ^ Selects documents in collection that match selector + +select :: Selector -> Collection -> Selection +-- ^ Synonym for 'Select' +select = Select + +type Selector = Document +-- ^ Filter for a query, analogous to the where clause in SQL. @[]@ matches all documents in collection. @[x =: a, y =: b]@ is analogous to @where x = a and y = b@ in SQL. See for full selector syntax. + +whereJS :: Selector -> Javascript -> Selector +-- ^ Add Javascript predicate to selector, in which case a document must match both selector and predicate +whereJS sel js = ("$where" =: js) : sel + +-- * Write + +-- ** Insert + +insert :: (Conn m) => Collection -> Document -> Db m Value +-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied +insert col doc = head <$> insertMany col [doc] + +insert_ :: (Conn m) => Collection -> Document -> Db m () +-- ^ Same as 'insert' except don't return _id +insert_ col doc = insert col doc >> return () + +insertMany :: (Conn m) => Collection -> [Document] -> Db m [Value] +-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied +insertMany col docs = ReaderT $ \db -> do + docs' <- liftIO $ mapM assignId docs + runOp $ P.insert (Insert (db <.> col) docs') + mapM (look "_id") docs' + +insertMany_ :: (Conn m) => Collection -> [Document] -> Db m () +-- ^ Same as 'insertMany' except don't return _ids +insertMany_ col docs = insertMany col docs >> return () + +assignId :: Document -> IO Document +-- ^ Assign a unique value to _id field if missing +assignId doc = if X.any (("_id" ==) . label) doc + then return doc + else (\oid -> ("_id" =: oid) : doc) <$> genObjectId + +-- ** Update + +save :: (Conn m) => Collection -> Document -> Db m () +-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or update it if its not new (has \"_id\" field) +save col doc = case look "_id" doc of + Nothing -> insert_ col doc + Just i -> repsert (Select ["_id" := i] col) doc + +replace :: (Conn m) => Selection -> Document -> Db m () +-- ^ Replace first document in selection with given document +replace = update [] + +repsert :: (Conn m) => Selection -> Document -> Db m () +-- ^ Replace first document in selection with given document, or insert document if selection is empty +repsert = update [Upsert] + +type Modifier = Document +-- ^ Update operations on fields in a document. See + +modify :: (Conn m) => Selection -> Modifier -> Db m () +-- ^ Update all documents in selection using given modifier +modify = update [MultiUpdate] + +update :: (Conn m) => [UpdateOption] -> Selection -> Document -> Db m () +-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty. +update opts (Select sel col) up = ReaderT $ \db -> runOp $ P.update (Update (db <.> col) opts sel up) + +-- ** Delete + +delete :: (Conn m) => Selection -> Db m () +-- ^ Delete all documents in selection +delete (Select sel col) = ReaderT $ \db -> runOp $ P.delete (Delete (db <.> col) [] sel) + +deleteOne :: (Conn m) => Selection -> Db m () +-- ^ Delete first document in selection +deleteOne (Select sel col) = ReaderT $ \db -> runOp $ P.delete (Delete (db <.> col) [SingleRemove] sel) + +-- * Read + +-- ** Query + +data Query = Query { + options :: [QueryOption], + selection :: Selection, + project :: Projector, -- ^ \[\] = all fields + skip :: Word32, -- ^ Number of initial matching documents to skip + limit :: Limit, -- ^ Maximum number of documents to return, 0 = no limit + sort :: Order, -- ^ Sort results by this order, [] = no sort + snapshot :: Bool, -- ^ If true assures no duplicates are returned, or objects missed, which were present at both the start and end of the query's execution (even if the object were updated). If an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode. Note that short query responses (less than 1MB) are always effectively snapshotted. + batchSize :: BatchSize, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. + hint :: Order -- ^ Force MongoDB to use this index, [] = no hint + } deriving (Show, Eq) + +type Projector = Document +-- ^ Fields to return, analogous to the select clause in SQL. @[]@ means return whole document (analogous to * in SQL). @[x =: 1, y =: 1]@ means return only @x@ and @y@ fields of each document. @[x =: 0]@ means return all fields except @x@. + +type Limit = Word32 +-- ^ Maximum number of documents to return, i.e. cursor will close after iterating over this number of documents. 0 means no limit. + +type Order = Document +-- ^ Fields to sort by. Each one is associated with 1 or -1. Eg. @[x =: 1, y =: (-1)]@ means sort by @x@ ascending then @y@ descending + +type BatchSize = Word32 +-- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. + +query :: Selector -> Collection -> Query +-- ^ Selects documents in collection that match selector. It uses no query options, projects all fields, does not skip any documents, does not limit result size, uses default batch size, does not sort, does not hint, and does not snapshot. +query sel col = Query [] (Select sel col) [] 0 0 [] False 0 [] + +batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit) +-- ^ Given batchSize and limit return P.qBatchSize and remaining limit +batchSizeRemainingLimit batchSize limit = if limit == 0 + then (fromIntegral batchSize, 0) -- no limit + else if 0 < batchSize && batchSize < limit + then (fromIntegral batchSize, limit - batchSize) + else (- fromIntegral limit, 1) + +protoQuery :: Database -> Query -> (P.Query, Limit) +protoQuery = protoQuery' False + +protoQuery' :: Bool -> Database -> Query -> (P.Query, Limit) +-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. +protoQuery' isExplain db Query{..} = (P.Query{..}, remainingLimit) where + qOptions = options + qFullCollection = db <.> coll selection + qSkip = fromIntegral skip + (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit + qProjector = project + mOrder = if null sort then Nothing else Just ("$orderby" =: sort) + mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing + mHint = if null hint then Nothing else Just ("$hint" =: hint) + mExplain = if isExplain then Just ("$explain" =: True) else Nothing + special = catMaybes [mOrder, mSnapshot, mHint, mExplain] + qSelector = if null special then s else ("$query" =: s) : special where s = selector selection + +find :: (Conn m) => Query -> Db m Cursor +-- ^ Fetch documents satisfying query +find q@Query{selection, batchSize} = ReaderT $ \db -> do + let (q', remainingLimit) = protoQuery db q + cs <- fromReply remainingLimit =<< runOp (P.query q') + newCursor db (coll selection) batchSize cs + +findOne :: (Conn m) => Query -> Db m (Maybe Document) +-- ^ Fetch first document satisfying query or Nothing if none satisfy it +findOne q = ReaderT $ \db -> do + let (q', x) = protoQuery db q {limit = 1} + CS _ _ docs <- fromReply x =<< runOp (P.query q') + return (listToMaybe docs) + +explain :: (Conn m) => Query -> Db m Document +-- ^ Return performance stats of query execution +explain q = ReaderT $ \db -> do -- same as findOne but with explain set to true + let (q', x) = protoQuery' True db q {limit = 1} + CS _ _ docs <- fromReply x =<< runOp (P.query q') + when (null docs) . fail $ "no explain: " ++ show q' + return (head docs) + +count :: (Conn m) => Query -> Db m Int +-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present) +count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand + (["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)] + ++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32))) + +distinct :: (Conn m) => Label -> Selection -> Db m [Value] +-- ^ Fetch distinct values of field in selected documents +distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel] + +-- *** Cursor + +data Cursor = Cursor FullCollection BatchSize (MVar CursorState) +-- ^ Iterator over results of a query. Use 'next' to iterate. Cursor remains open during current connection and is closed when connection is closed, cursor is closed, or cursor is garbage collected. + +data CursorState = CS Limit CursorId [Document] +-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch. + +fromReply :: (Monad m) => Limit -> Reply -> m CursorState +fromReply limit Reply{..} = if rResponseFlag == 0 + then return (CS limit rCursorId rDocuments) + else fail $ "Query failure " ++ show rResponseFlag ++ " " ++ show rDocuments + +newCursor :: (Conn m) => Database -> Collection -> BatchSize -> CursorState -> m Cursor +-- ^ Cursor is closed when garbage collected, explicitly closed, or CIO action ends (connection closed) +newCursor db col batch cs = do + conn <- getConnection + var <- liftIO (newMVar cs) + liftIO . addMVarFinalizer var $ do + -- kill cursor on server when garbage collected on client, if connection not already closed + CS _ cid _ <- readMVar var + unless (cid == 0) $ do + done <- isClosed conn + unless done $ runTask (runOp $ P.killCursors [cid]) conn >> return () + return (Cursor (db <.> col) batch var) + +next :: (Conn m) => Cursor -> m (Maybe Document) +-- ^ Return next document in query result, or Nothing if finished +next (Cursor fcol batch var) = runOp . exposeIO $ \h -> modifyMVar var $ \cs -> + -- Get lock on connection (runOp) first then get lock on cursor, otherwise you could get in deadlock if already inside an Op (connection locked), but another Task gets lock on cursor first and then tries runOp (deadlock). + either ((cs,) . Left) (fmap Right) <$> hideIO (nextState cs) h + where + nextState :: CursorState -> Op (CursorState, Maybe Document) + nextState (CS limit cid docs) = case docs of + doc : docs' -> return (CS limit cid docs', Just doc) + [] -> if cid == 0 + then return (CS 0 0 [], Nothing) -- finished + else let -- fetch next batch from server + (batchSize, remLimit) = batchSizeRemainingLimit batch limit + getNextBatch = fromReply remLimit =<< P.getMore (GetMore fcol batchSize cid) + in nextState =<< getNextBatch + +nextN :: (Conn m) => Int -> Cursor -> m [Document] +-- ^ Return next N documents or less if end is reached +nextN n c = catMaybes <$> replicateM n (next c) + +rest :: (Conn m) => Cursor -> m [Document] +-- ^ Return remaining documents in query result +rest c = loop (next c) + +closeCursor :: (Conn m) => Cursor -> m () +-- ^ Close cursor without reading rest of results. Cursor closes automatically when you read all results. +closeCursor (Cursor _ _ var) = runOp . exposeIO $ \h -> + modifyMVar var $ \cs@(CS _ cid _) -> if cid == 0 + then return (CS 0 0 [], Right ()) + else either ((cs,) . Left) ((CS 0 0 [],) . Right) <$> hideIO (P.killCursors [cid]) h + +-- ** Group + +data Group = Group { + gColl :: Collection, + gKey :: GroupKey, -- ^ Fields to group by + gReduce :: Javascript, -- ^ The reduce function aggregates (reduces) the objects iterated. Typical operations of a reduce function include summing and counting. reduce takes two arguments: the current document being iterated over and the aggregation value. + gInitial :: Document, -- ^ Initial aggregation value supplied to reduce + gCond :: Selector, -- ^ Condition that must be true for a row to be considered. [] means always true. + gFinalize :: Maybe Javascript -- ^ An optional function to be run on each item in the result set just before the item is returned. Can either modify the item (e.g., add an average field given a count and a total) or return a replacement object (returning a new object with just _id and average fields). + } deriving (Show, Eq) + +data GroupKey = Key [Label] | KeyF Javascript deriving (Show, Eq) +-- ^ Fields to group by, or function returning a "key object" to be used as the grouping key. Use this instead of key to specify a key that is not an existing member of the object (or, to access embedded members). + +groupDocument :: Group -> Document +-- ^ Translate Group data into expected document form +groupDocument Group{..} = + ("finalize" =? gFinalize) ++ [ + "ns" =: gColl, + case gKey of Key k -> "key" =: map (=: True) k; KeyF f -> "$keyf" =: f, + "$reduce" =: gReduce, + "initial" =: gInitial, + "cond" =: gCond ] + +group :: (Conn m) => Group -> Db m [Document] +-- ^ Execute group query and return resulting aggregate value for each distinct key +group g = at "retval" <$> runCommand ["group" =: groupDocument g] + +-- ** MapReduce + +-- | Maps every document in collection to a (key, value) pair, then for each unique key reduces all its associated values to a result. Therefore, the final output is a list of (key, result) pairs, where every key is unique. This is the basic description. There are additional nuances that may be used. See for details. +data MapReduce = MapReduce { + rColl :: Collection, + rMap :: MapFun, + rReduce :: ReduceFun, + rSelect :: Selector, -- ^ Default is [] + rSort :: Order, -- ^ Default is [] meaning no sort + rLimit :: Limit, -- ^ Default is 0 meaning no limit + rOut :: Maybe Collection, -- ^ Output to permanent collection. Default is Nothing. + rKeepTemp :: Bool, -- ^ If True, the generated collection is made permanent. If False, the generated collection persists for the life of the current connection only. Default is False. When out is specified, the collection is automatically made permanent. + rFinalize :: Maybe FinalizeFun, -- ^ Function to apply to all the results when finished. Default is Nothing. + rScope :: Document, -- ^ Variables (environment) that can be accessed from map/reduce/finalize. Default is []. + rVerbose :: Bool -- ^ Provide statistics on job execution time. Default is False. + } deriving (Show, Eq) + +type MapFun = Javascript +-- ^ @() -> void@. The map function references the variable this to inspect the current object under consideration. A map function must call @emit(key,value)@ at least once, but may be invoked any number of times, as may be appropriate. + +type ReduceFun = Javascript +-- ^ @(key, value_array) -> value@. The reduce function receives a key and an array of values. To use, reduce the received values, and return a result. The MapReduce engine may invoke reduce functions iteratively; thus, these functions must be idempotent. That is, the following must hold for your reduce function: @for all k, vals : reduce(k, [reduce(k,vals)]) == reduce(k,vals)@. If you need to perform an operation only once, use a finalize function. The output of emit (the 2nd param) and reduce should be the same format to make iterative reduce possible. + +type FinalizeFun = Javascript +-- ^ @(key, value) -> final_value@. A finalize function may be run after reduction. Such a function is optional and is not necessary for many map/reduce cases. The finalize function takes a key and a value, and returns a finalized value. + +mrDocument :: MapReduce -> Document +-- ^ Translate MapReduce data into expected document form +mrDocument MapReduce{..} = + ("mapreduce" =: rColl) : + ("out" =? rOut) ++ + ("finalize" =? rFinalize) ++ [ + "map" =: rMap, + "reduce" =: rReduce, + "query" =: rSelect, + "sort" =: rSort, + "limit" =: (fromIntegral rLimit :: Int), + "keeptemp" =: rKeepTemp, + "scope" =: rScope, + "verbose" =: rVerbose ] + +mapReduce :: Collection -> MapFun -> ReduceFun -> MapReduce +-- ^ MapReduce on collection with given map and reduce functions. Remaining attributes are set to their defaults, which are stated in their comments. +mapReduce col map' red = MapReduce col map' red [] [] 0 Nothing False Nothing [] False + +runMR :: (Conn m) => MapReduce -> Db m Cursor +-- ^ Run MapReduce and return cursor of results. Error if map/reduce fails (because of bad Javascript) +-- TODO: Delete temp result collection when cursor closes. Until then, it will be deleted by the server when connection closes. +runMR mr = find . query [] =<< (at "result" <$> runMR' mr) + +runMR' :: (Conn m) => MapReduce -> Db m Document +-- ^ Run MapReduce and return a result document containing a "result" field holding the output Collection and additional statistic fields. Error if the map/reduce failed (because of bad Javascript). +runMR' mr = do + doc <- runCommand (mrDocument mr) + return $ if true1 "ok" doc then doc else error $ at "errmsg" doc ++ " in:\n" ++ show mr + +-- * Command + +type Command = Document +-- ^ A command is a special query or action against the database. See for details. + +runCommand :: (Conn m) => Command -> Db m Document +-- ^ Run command against the database and return its result +runCommand c = maybe err return =<< findOne (query c "$cmd") where + err = fail $ "Nothing returned for command: " ++ show c + +runCommand1 :: (Conn m) => UString -> Db m Document +-- ^ @runCommand1 "foo" = runCommand ["foo" =: 1]@ +runCommand1 c = runCommand [c =: (1 :: Int)] + +eval :: (Conn m) => Javascript -> Db m Document +-- ^ Run code on server +eval code = at "retval" <$> runCommand ["$eval" =: code] + +type ErrorCode = Int +-- ^ Error code from getLastError + +getLastError :: Db Op (Maybe (ErrorCode, String)) +-- ^ Fetch what the last error was, Nothing means no error. Especially useful after a write since it is asynchronous (ie. nothing is returned after a write, so we don't know if it succeeded or not). To ensure no interleaving db operation executes between the write we want to check and getLastError, this can only be executed inside a 'runDbOp' which gets exclusive access to the connection. +getLastError = do + r <- runCommand1 "getlasterror" + return $ (at "code" r,) <$> lookup "err" r + +resetLastError :: Db Op () +-- ^ Clear last error +resetLastError = runCommand1 "reseterror" >> return () + + +{- Authors: Tony Hannan + Copyright 2010 10gen Inc. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -} diff --git a/Database/MongoDB/Util.hs b/Database/MongoDB/Util.hs index 4de9f5c..b868dfc 100644 --- a/Database/MongoDB/Util.hs +++ b/Database/MongoDB/Util.hs @@ -1,83 +1,65 @@ -{- +-- | Miscellaneous general functions -Copyright (C) 2010 Scott R Parish +{-# LANGUAGE StandaloneDeriving #-} -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: +module Database.MongoDB.Util where -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - --} - -module Database.MongoDB.Util - ( - putI8, putI16, putI32, putI64, putNothing, putNull, putS, - getI8, getI32, getI64, getC, getS, getNull, putStrSz, - ) -where +import Prelude hiding (length) +import Network (PortID(..)) +import Control.Applicative (Applicative(..), (<$>)) import Control.Exception (assert) -import Control.Monad -import Data.Binary -import Data.Binary.Get -import Data.Binary.Put -import Data.ByteString.Char8 -import qualified Data.ByteString.Lazy as L -import qualified Data.ByteString.Lazy.UTF8 as L8 -import Data.Char (chr) -import Data.Int +import Control.Monad.Reader +import Control.Monad.Error +import Data.UString as U (UString, cons, append) +import Data.Bits (Bits, (.|.)) +import Data.Bson +import System.IO (Handle) +import Data.ByteString.Lazy as B (ByteString, length, append, hGet) -getC :: Get Char -getC = liftM chr getI8 +deriving instance Show PortID +deriving instance Eq PortID +deriving instance Ord PortID -getI8 :: (Integral a) => Get a -getI8 = liftM fromIntegral getWord8 +instance (Monad m) => Applicative (ReaderT r m) where + pure = return + (<*>) = ap -getI32 :: Get Int32 -getI32 = liftM fromIntegral getWord32le +instance (Monad m, Error e) => Applicative (ErrorT e m) where + pure = return + (<*>) = ap -getI64 :: Get Int64 -getI64 = liftM fromIntegral getWord64le +ignore :: (Monad m) => a -> m () +ignore _ = return () -getS :: Get (Integer, L8.ByteString) -getS = getLazyByteStringNul >>= \s -> return (fromIntegral $ L.length s + 1, s) +type Secs = Float -getNull :: Get () -getNull = do {c <- getC; assert (c == '\0') $ return ()} +bitOr :: (Bits a) => [a] -> a +-- ^ bit-or all numbers together +bitOr = foldl (.|.) 0 -putI8 :: Int8 -> Put -putI8 = putWord8 . fromIntegral +(<.>) :: UString -> UString -> UString +-- ^ Concat first and second together with period in between. Eg. @\"hello\" \<.\> \"world\" = \"hello.world\"@ +a <.> b = U.append a (cons '.' b) -putI16 :: Int16 -> Put -putI16 = putWord16le . fromIntegral +loop :: (Functor m, Monad m) => m (Maybe a) -> m [a] +-- ^ Repeatedy execute action, collecting results, until it returns Nothing +loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act) -putI32 :: Int32 -> Put -putI32 = putWord32le . fromIntegral +true1 :: Label -> Document -> Bool +-- ^ Is field's value a 1 or True (MongoDB use both Int and Bools for truth values). Error if field not in document or field not a Num or Bool. +true1 k doc = case valueAt k doc of + Bool b -> b + Float n -> n == 1 + Int32 n -> n == 1 + Int64 n -> n == 1 + _ -> error $ "expected " ++ show k ++ " to be Num or Bool in " ++ show doc -putI64 :: Int64 -> Put -putI64 = putWord64le . fromIntegral - -putNothing :: Put -putNothing = putByteString $ pack "" - -putNull :: Put -putNull = putI8 0 - -putS :: L8.ByteString -> Put -putS s = putLazyByteString s >> putNull - -putStrSz :: L8.ByteString -> Put -putStrSz s = putI32 (fromIntegral $ 1 + L.length s) >> putS s +hGetN :: Handle -> Int -> IO ByteString +-- ^ Read N bytes from hande, blocking until all N bytes are read. +-- Unlike hGet which only blocks if no bytes are available, otherwise it returns the X bytes immediately available where X <= N. +hGetN h n = assert (n >= 0) $ do + bytes <- hGet h n + let x = fromIntegral (length bytes) + if x >= n then return bytes else do + remainingBytes <- hGetN h (n - x) + return (B.append bytes remainingBytes) diff --git a/README.md b/README.md index bf22ca0..5a7c443 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A mongoDB driver for Haskell. This driver lets you connect to MongoDB, do inserts, queries, updates, etc. Also has many convience functions inspired by HDBC such as more -easily converting between the BsonValue types and native Haskell +easily converting between the Bson.Value types and native Haskell types. Links diff --git a/TODO b/TODO index fb9208d..0231b56 100644 --- a/TODO +++ b/TODO @@ -15,74 +15,52 @@ BSON MongoDB ------- + support full level 0 - - handle query errors - - hint, explain, $where - - database profiling: set/get profiling level, get profiling info - - pair mode connection - - handle defunct servers - - connection fail over + - hint - operations on database objects - * getName - * getCollection - * add_son_manipulators + * add_son_manipulators? * dereference (dbref) - * error - * eval - * last_status - * reset_error_history - database admin - * getAdmin - * getProfilingLevel - * setProfilingLevel * getProfilingInfo - - collection - * modify - * replace - * repsert - - index operations - * ensureIndex / index existance caching - misc operations * explain - * getOptions - * getName - * close - * group - * distinct + * getCollectionOptions - cursor object * hasMore - orderby (sort) +- Query attribute: timeout +- CreateIndex attributes: background, min, max +- CreateIndex Order [Asc, Dec, Geo2d] +- FindAndModify +- getIndexInfo +- logout +- collectionsInfo +- databasesInfo +- getLastError options +- Update If Current (http://www.mongodb.org/display/DOCS/Atomic+Operations) +- block write until written on N replicas +- lazyRest on cursor, although lazy I/) is problematic and we may not want to support it. optional: - automatic reconnection - buffer pooling - - advanced connection management (master-server, replica pair) - - Tailable cursor support -+ support safe operations -+ auto-reconnection -+ auto-destoy connection (how?/when?) + - connection pooling. Although may not be desired because each connection maintains seperate session state (open cursors and temp map/reduce collections) and switching between connections automatically would change session state without the user knowing. ++ support safe operations, although operation with exclusive connection access is available which can be used to getLastError and check for that previous write was safe (successful). ++ auto-destoy connection (how?/when?). Although, GHC will automatically close connection (Handle) when garbage collected. + don't read into cursor until needed, but have cursor send getMore before it is actually out of docs (so network is finished by the time we're ready to consume more) -+ support a LIMITed quickFind Misc ---- + learn more about haskelldb, anything we can learn from there + go through pymongo api and figure out what parts to adopt (also look at other languages?) - - database_names() - - collection_names() -+ support for aggricated commands like listing collections + kill prefix on data types "eg QO_*"? + javascript + tailable cursor support - only close cursor when cursorID is 0 - have to create loop that sleeps and retries - lazy list support -+ error handling? -+ concurrency (share connection?) -+ is there a garbage collector hook that will let us free cursors and connections? Tests? Documentation @@ -90,16 +68,17 @@ Documentation GridFS - -pretty printer deep "lookup" function (other deep Map functions?) -how to make bytestrings less painful -custom Show/Read instance that looks more like json +Read instance for Documents that can read its Show representation make sure NULLs aren't in created table names update tutorial to match new python one + custom types (see python examples) -+ support array conversions again -+ better type conversion errors -+ make BSON an instance of Binary (eg get/put) \ No newline at end of file ++ make BSON an instance of Binary (eg get/put) + +Questions: +- In Mongo shell, db.foo.totalSize fetches storageSize of each index but does not use it + +Notes: +- Remember that in the new version of MongoDB (>= 1.6), "ok" field can be a number (0 or 1) or boolean (False or True). Use 'true1' function defined in Database.MongoDB.Util diff --git a/V0.5.0-Redesign.md b/V0.5.0-Redesign.md new file mode 100644 index 0000000..77d540c --- /dev/null +++ b/V0.5.0-Redesign.md @@ -0,0 +1,30 @@ +Hi Scott, + +Thanks for writing the Haskell driver for MongoDB! It functions well but I basically rewrote it in an attempt to factor it nicely and support additional functionality like multiple threads using the same connection. I hope you like it! You can find it on my fork of your repository at http://github.com/TonyGen/mongoDB. + +First, I separated out BSON into its own package, since it can be used as an interchange format independent of MongoDB. You can find this new package on Github at http://github.com/TonyGen/bson-haskell and on Hackage at http://hackage.haskell.org/package/bson. I also made the BSON easier to write and view. For example, you can write: ["a" =: 1, "b" =: "hello", "c" =: [1,2,3]], and it shows as: [a: 1, b: "hello", c: [1,2,3]]. + +Second, for modularity, I separated MongoDB into 5 modules: MongoDB-Internal-Connection, MongoDB-Internal-Protocol, MongoDB-Connection, MongoDB-Query, and MongoDB-Admin. + +MongoDB-Internal-Connection defines a connection with multi-threaded support via two monads, one with shared access to a connection (Task), and one with exclusive access to a connection (Op). This module also exposes low-level writing and reading bytes inside the Op monad for MongoDB-Internal-Protocol to use. This module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not. + +MongoDB-Internal-Protocol defines the MongoDB Wire Protocol (http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol). It defines the messages the the client and server send back and forth to each other. Again, this module is not intended for the application-programmer use, and maybe should be a hidden module inside cabal, but for now it is not. + +MongoDB-Connection re-exports Connection, and Task and Op monads from MongoDB-Internal-Connection but without the low-level read and write bytes functions. It also adds support for replica-sets, which will replace replica-pairs in the next release of MongoDB coming out soon. I had to make two connection modules (MongoDB-Internal-Connection and MongoDB-Connection) because connecting to a replica set requires quering its config info, which requires us to import MongoDB-Query, which recursively imports MongoDB-Internal-Protocol then MongoDB-Internal-Connection. I could have used mutual dependent modules (via .hs-boot) but felt that violated the layered approach I was going for. + +MongoDB-Query defines all the normal query and update operations like find, findOne, count, insert, modify, delete, group, mapReduce, allDatabases, allCollections, runCommand, etc. + +MongoDB-Admin defines all the administration operations like validateCollection, ensureIndex, dropIndex, addUser, copyDatabase, dbStats, setProfilingLevel, etc. + +Finally, the top-level MongoDB module simply re-exports MongoDB-Connection, MongoDB-Query, and MongoDB-Admin, along with Data.Bson from the bson package. + +I updated your TODO list, removing items I completed, added items that were missing, and added back items I removed from the code like lazy list from a cursor (I am skeptical of lazy I/O, but we could add it back). + +I also update your two tutorials to reflect this new API. + +I hope you like these changes! Let me know your feedback, and I hope we can both maintain it in the future. + +Cheers, +Tony Hannan +10gen Inc. +Creators of MongoDB diff --git a/map-reduce-example.md b/map-reduce-example.md index 7f55b29..0e74c55 100644 --- a/map-reduce-example.md +++ b/map-reduce-example.md @@ -17,21 +17,17 @@ map/reduce queries on: GHCi, version 6.12.1: http://www.haskell.org/ghc/ :? for help ... Prelude> :set prompt "> " + > :set -XOverloadedStrings > import Database.MongoDB - > import Database.MongoDB.BSON - > import Data.ByteString.Lazy.UTF8 - > c <- connect "localhost" [] - > let col = (fromString "test.mr1") + > Right conn <- connect (server "localhost") + > let run task = runTask task conn + > let runDb db dbTask = run $ useDb db dbTask > :{ - insertMany c col [ - (toBsonDoc [("x", BsonInt32 1), - ("tags", toBson ["dog", "cat"])]), - (toBsonDoc [("x", BsonInt32 2), - ("tags", toBson ["cat"])]), - (toBsonDoc [("x", BsonInt32 3), - ("tags", toBson ["mouse", "cat", "doc"])]), - (toBsonDoc [("x", BsonInt32 4), - ("tags", BsonArray [])]) + runDb "test" $ insertMany "mr1" [ + ["x" =: 1, "tags" =: ["dog", "cat"]], + ["x" =: 2, "tags" =: ["cat"]], + ["x" =: 3, "tags" =: ["mouse", "cat", "dog"]], + ["x" =: 4, "tags" =: ([] :: [String])] ] :} @@ -47,7 +43,7 @@ Our map function just emits a single (key, 1) pair for each tag in the array: > :{ - let mapFn = " + let mapFn = Javascript [] " function() {\n this.tags.forEach(function(z) {\n emit(z, 1);\n @@ -59,7 +55,7 @@ The reduce function sums over all of the emitted values for a given key: > :{ - let reduceFn = " + let reduceFn = Javascript [] " function (key, values) {\n var total = 0;\n for (var i = 0; i < values.length; i++) {\n @@ -74,40 +70,16 @@ be called iteratively on the results of other reduce steps. Finally, we call map_reduce() and iterate over the result collection: - > mapReduce c col (fromString mapFn) (fromString reduceFn) [] >>= allDocs - [[(Chunk "_id" Empty,BsonString (Chunk "cat" Empty)), - (Chunk "value" Empty,BsonDouble 6.0)], - [(Chunk "_id" Empty,BsonString (Chunk "doc" Empty)), - (Chunk "value" Empty,BsonDouble 1.0)], - [(Chunk "_id" Empty,BsonString (Chunk "dog" Empty)), - (Chunk "value" Empty,BsonDouble 3.0)], - [(Chunk "_id" Empty,BsonString (Chunk "mouse" Empty)), - (Chunk "value" Empty,BsonDouble 2.0)]] + > runDb "test" $ runMR (mapReduce "mr1" mapFn reduceFn) >>= rest + Right [[ _id: "cat", value: 3.0],[ _id: "dog", value: 2.0],[ _id: "mouse", value: 1.0]] Advanced Map/Reduce ------------------- -MongoDB returns additional information in the map/reduce results. To -obtain them, use *runMapReduce*: +MongoDB returns additional statistics in the map/reduce results. To +obtain them, use *runMR'* instead: - > res <- runMapReduce c col (fromString mapFn) (fromString reduceFn) [] - > res - [(Chunk "result" Empty, BsonString (Chunk "tmp.mr.mapreduce_1268105512_18" Empty)), - (Chunk "timeMillis" Empty, BsonInt32 90), - (Chunk "counts" Empty, - BsonDoc [(Chunk "input" Empty,BsonInt64 8), - (Chunk "emit" Empty,BsonInt64 12), - (Chunk "output" Empty,BsonInt64 4)]), - (Chunk "ok" Empty,BsonDouble 1.0)] + > runDb "test" $ runMR' (mapReduce "mr1" mapFn reduceFn) + Right [ result: "tmp.mr.mapreduce_1276482643_7", timeMillis: 379, counts: [ input: 4, emit: 6, output: 3], ok: 1.0] -You can then obtain the results using *mapReduceResults*: - - > mapReduceResults c (fromString "test") res >>= allDocs - [[(Chunk "_id" Empty,BsonString (Chunk "cat" Empty)), - (Chunk "value" Empty,BsonDouble 6.0)], - [(Chunk "_id" Empty,BsonString (Chunk "doc" Empty)), - (Chunk "value" Empty,BsonDouble 1.0)], - [(Chunk "_id" Empty,BsonString (Chunk "dog" Empty)), - (Chunk "value" Empty,BsonDouble 3.0)], - [(Chunk "_id" Empty,BsonString (Chunk "mouse" Empty)), - (Chunk "value" Empty,BsonDouble 2.0)]] +You can then obtain the results from here by quering the result collection yourself. "runMR* (above) does this for you but discards the statistics. diff --git a/mongoDB.cabal b/mongoDB.cabal index 728dab9..453f305 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -1,33 +1,34 @@ Name: mongoDB -Version: 0.4.2 +Version: 0.5.0 License: MIT -Maintainer: Scott Parish -Author: Scott Parish -Copyright: Copyright (c) 2010-2010 Scott Parish +Maintainer: Scott Parish , Tony Hannan +Author: Scott Parish , Tony Hannan +Copyright: Copyright (c) 2010-2010 Scott Parish & 10gen Inc. homepage: http://github.com/srp/mongoDB Category: Database Synopsis: A driver for MongoDB Description: This module lets you connect to MongoDB, do inserts, queries, updates, etc. Also has many convience functions inspired by HDBC such as more easily converting between - the BsonValue types and native Haskell types. + the Bson.Value types and native Haskell types. Stability: alpha -Build-Depends: base < 5, - binary, - bytestring, - containers, - data-binary-ieee754, - network, - random, - time, - unix, - utf8-string, - nano-md5 +Build-Depends: base < 5, + containers, + mtl, + binary, + bytestring, + network, + nano-md5, + parsec, + bson Build-Type: Simple -Exposed-modules: Database.MongoDB, - Database.MongoDB.BSON -Other-modules: Database.MongoDB.Util +Exposed-modules: + Database.MongoDB.Util, + Database.MongoDB.Internal.Connection, + Database.MongoDB.Internal.Protocol, + Database.MongoDB.Connection, + Database.MongoDB.Query, + Database.MongoDB.Admin, + Database.MongoDB ghc-options: -Wall -O2 -extensions: FlexibleContexts, FlexibleInstances, MultiParamTypeClasses, - TypeSynonymInstances cabal-version: >= 1.4 diff --git a/old/Database/MongoDB.hs b/old/Database/MongoDB.hs new file mode 100644 index 0000000..eb02093 --- /dev/null +++ b/old/Database/MongoDB.hs @@ -0,0 +1,1039 @@ +{- + +Copyright (C) 2010 Scott R Parish + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +-} + +-- | A driver for MongoDB +-- +-- This module lets you connect to MongoDB, do inserts, queries, +-- updates, etc. Also has many convience functions inspired by HDBC +-- such as more easily converting between the BsonValue types and +-- native Haskell types. +-- +-- * Tutorial for this driver: +-- +-- +-- * Map/Reduce example for this driver: +-- +-- +-- * MongoDB: +-- +-- + +module Database.MongoDB + ( + -- * Connection + Connection, ConnectOpt(..), + connect, connectOnPort, conClose, disconnect, dropDatabase, + connectCluster, connectClusterOnPort, + serverInfo, serverShutdown, + databasesInfo, databaseNames, + -- * Database + Database, MongoDBCollectionInvalid, Password, Username, + ColCreateOpt(..), + collectionNames, createCollection, dropCollection, + renameCollection, runCommand, validateCollection, + auth, addUser, login, logout, + -- * Collection + Collection, FieldSelector, FullCollection, + NumToSkip, NumToReturn, Selector, + QueryOpt(..), + UpdateFlag(..), + count, countMatching, delete, insert, insertMany, query, remove, update, + save, + -- * Convenience collection operations + find, findOne, quickFind, quickFind', + -- * Query Helpers + whereClause, + -- * Cursor + Cursor, + allDocs, allDocs', finish, nextDoc, + -- * Index + Key, Unique, + Direction(..), + createIndex, dropIndex, dropIndexes, indexInformation, + -- * Map-Reduce + MapReduceOpt(..), + mapReduce, mapReduceWScopes, + runMapReduce, runMapReduceWScopes, + mapReduceResults, + ) +where +import Control.Exception +import Control.Monad +import Data.Binary() +import Data.Binary.Get +import Data.Binary.Put +import Data.Bits +import Data.ByteString.Char8 (pack) +import Data.ByteString.Internal (c2w) +import qualified Data.ByteString.Lazy as L +import qualified Data.ByteString.Lazy.UTF8 as L8 +import Data.Digest.OpenSSL.MD5 +import Data.Int +import Data.IORef +import qualified Data.List as List +import Data.Maybe +import Data.Typeable +import Database.MongoDB.BSON as BSON +import Database.MongoDB.Util +import qualified Network +import Network.Socket hiding (connect, send, sendTo, recv, recvFrom) +import Prelude hiding (getContents) +import System.IO +import System.IO.Unsafe +import System.Random + +-- | A list of handles to database connections +data Connection = Connection { + cHandle :: IORef Handle, + cRand :: IORef [Int], + cOidGen :: ObjectIdGen + } + +data ConnectOpt + = SlaveOK -- ^ It's fine to connect to the slave + deriving (Show, Eq) + +-- | Establish a connection to a MongoDB server +connect :: HostName -> [ConnectOpt] -> IO Connection +connect = flip connectOnPort (Network.PortNumber 27017) + +-- | Establish connections to a list of MongoDB servers +connectCluster :: [HostName] -> [ConnectOpt] -> IO Connection +connectCluster xs = + connectClusterOnPort (fmap (flip (,) $ Network.PortNumber 27017) xs) + +-- | Establish connections to a list of MongoDB servers specifying each port. +connectClusterOnPort :: [(HostName, Network.PortID)] -> [ConnectOpt] + -> IO Connection +connectClusterOnPort [] _ = throwOpFailure "No hostnames in list" +connectClusterOnPort servers opts = newConnection servers opts + +-- | Establish a connection to a MongoDB server on a non-standard port +connectOnPort :: HostName -> Network.PortID -> [ConnectOpt] -> IO Connection +connectOnPort host port = newConnection [(host, port)] + +newConnection :: [(HostName, Network.PortID)] -> [ConnectOpt] -> IO Connection +newConnection servers opts = do + r <- newStdGen + let ns = randomRs (fromIntegral (minBound :: Int32), + fromIntegral (maxBound :: Int32)) r + nsRef <- newIORef ns + hRef <- openHandle (head servers) >>= newIORef + oidGen <- mkObjectIdGen + let c = Connection hRef nsRef oidGen + res <- isMaster c + if fromBson (fromLookup $ List.lookup (s2L "ismaster") res) == (1::Int) || + isJust (List.elemIndex SlaveOK opts) + then return c + else case List.lookup (s2L "remote") res of + Nothing -> throwConFailure "Couldn't find master to connect to" + Just server -> do + hRef' <- openHandle (splitHostPort $ fromBson server) >>= newIORef + return $ c {cHandle = hRef'} + +openHandle :: (HostName, Network.PortID) -> IO Handle +openHandle (host, port) = do + h <- Network.connectTo host port + hSetBuffering h NoBuffering + return h + +getHandle :: Connection -> IO Handle +getHandle c = readIORef $ cHandle c + +cPut :: Connection -> L.ByteString -> IO () +cPut c msg = getHandle c >>= flip L.hPut msg + +-- | Close database connection +conClose :: Connection -> IO () +conClose c = readIORef (cHandle c) >>= hClose + +-- | Information about the databases on the server. +databasesInfo :: Connection -> IO BsonDoc +databasesInfo c = + runCommand c (s2L "admin") $ toBsonDoc [("listDatabases", BsonInt32 1)] + +-- | Return a list of database names on the server. +databaseNames :: Connection -> IO [Database] +databaseNames c = do + info <- databasesInfo c + let (BsonArray dbs) = fromLookup $ List.lookup (s2L "databases") info + names = mapMaybe (List.lookup (s2L "name") . fromBson) dbs + return $ List.map fromBson (names::[BsonValue]) + +-- | Alias for 'conClose' +disconnect :: Connection -> IO () +disconnect = conClose + +-- | Drop a database. +dropDatabase :: Connection -> Database -> IO () +dropDatabase c db = do + _ <- runCommand c db $ toBsonDoc [("dropDatabase", BsonInt32 1)] + return () + +isMaster :: Connection -> IO BsonDoc +isMaster c = runCommand c (s2L "admin") $ toBsonDoc [("ismaster", BsonInt32 1)] + +-- | Get information about the MongoDB server we're connected to. +serverInfo :: Connection -> IO BsonDoc +serverInfo c = + runCommand c (s2L "admin") $ toBsonDoc [("buildinfo", BsonInt32 1)] + +-- | Shut down the MongoDB server. +-- +-- Force a clean exit, flushing and closing all data files. +-- Note that it will wait until all ongoing operations are complete. +serverShutdown :: Connection -> IO BsonDoc +serverShutdown c = + runCommand c (s2L "admin") $ toBsonDoc [("shutdown", BsonInt32 1)] + +-- | Return a list of collections in /Database/. +collectionNames :: Connection -> Database -> IO [FullCollection] +collectionNames c db = do + docs <- quickFind' c (L.append db $ s2L ".system.namespaces") empty + let names = flip List.map docs $ + fromBson . fromLookup . List.lookup (s2L "name") + return $ List.filter (L.notElem $ c2w '$') names + +data ColCreateOpt = CCOSize Int64 -- ^ Desired initial size for the + -- collection (in bytes). must be + -- less than or equal to + -- 10000000000. For capped + -- collections this size is the max + -- size of the collection. + | CCOCapped Bool -- ^ If 'True', this is a capped collection. + | CCOMax Int64 -- ^ Maximum number of objects if capped. + deriving (Show, Eq) + +colCreateOptToBson :: ColCreateOpt -> (String, BsonValue) +colCreateOptToBson (CCOSize sz) = ("size", toBson sz) +colCreateOptToBson (CCOCapped b) = ("capped", toBson b) +colCreateOptToBson (CCOMax m) = ("max", toBson m) + +-- | Create a new collection in this database. +-- +-- Normally collection creation is automatic. This function should +-- only be needed if you want to specify 'ColCreateOpt's on creation. +-- 'MongoDBCollectionInvalid' is thrown if the collection already +-- exists. +createCollection :: Connection -> FullCollection -> [ColCreateOpt] -> IO () +createCollection c col opts = do + (db, col') <- validateCollectionName col + dbcols <- collectionNames c db + when (col `List.elem` dbcols) $ + throwColInvalid $ "Collection already exists: " ++ show col + let cmd = ("create", toBson col') : List.map colCreateOptToBson opts + _ <- runCommand c db $ toBsonDoc cmd + return () + +-- | Drop a collection. +dropCollection :: Connection -> FullCollection -> IO () +dropCollection c col = do + let (db, col') = splitFullCol col + _ <- runCommand c db $ toBsonDoc [("drop", toBson col')] + return () + +-- | Rename a collection--first /FullCollection/ argument is the +-- existing name, the second is the new name. At the moment this command +-- can also be used to move a collection between databases. +renameCollection :: Connection -> FullCollection -> FullCollection -> IO () +renameCollection c col newName = do + _ <- validateCollectionName col + _ <- runCommand c (s2L "admin") $ toBsonDoc [("renameCollection", toBson col), + ("to", toBson newName)] + return () + +-- | Return a string of validation info about the collection. +-- +-- Example output (note this probably can/will change with different +-- versions of the server): +-- +-- > validate +-- > details: 0x7fe5cc2c1da4 ofs:e7da4 +-- > firstExtent:0:24100 ns:test.foo.bar +-- > lastExtent:0:24100 ns:test.foo.bar +-- > # extents:1 +-- > datasize?:180 nrecords?:5 lastExtentSize:1024 +-- > padding:1 +-- > first extent: +-- > loc:0:24100 xnext:null xprev:null +-- > nsdiag:test.foo.bar +-- > size:1024 firstRecord:0:241e4 lastRecord:0:24280 +-- > 5 objects found, nobj:5 +-- > 260 bytes data w/headers +-- > 180 bytes data wout/headers +-- > deletedList: 0100100000000000000 +-- > deleted: n: 4 size: 588 +-- > nIndexes:1 +-- > test.foo.bar.$_id_ keys:5 +validateCollection :: Connection -> FullCollection -> IO String +validateCollection c col = do + let (db, col') = splitFullCol col + res <- runCommand c db $ toBsonDoc [("validate", toBson col')] + return $ fromBson $ fromLookup $ List.lookup (s2L "result") res + +splitFullCol :: FullCollection -> (Database, Collection) +splitFullCol col = (L.takeWhile (c2w '.' /=) col, + L.tail $ L.dropWhile (c2w '.' /=) col) + +splitHostPort :: String -> (HostName, Network.PortID) +splitHostPort hp = (host, port) + where host = List.takeWhile (':' /=) hp + port = case List.dropWhile (':' /=) hp of + "" -> Network.PortNumber 27017 + pstr -> Network.Service $ List.tail pstr + +-- | Run a database command. Usually this is unneeded as driver wraps +-- all of the commands for you (eg 'createCollection', +-- 'dropCollection', etc). +runCommand :: Connection -> Database -> BsonDoc -> IO BsonDoc +runCommand c db cmd = do + mres <- findOne c (L.append db $ s2L ".$cmd") cmd + let res = fromLookup mres + when (BsonDouble 1.0 /= fromLookup (List.lookup (s2L "ok") res)) $ + throwOpFailure $ "command \"" ++ show cmd ++ "\" failed: " ++ + fromBson (fromLookup $ List.lookup (s2L "errmsg") res) + return res + +-- | An Iterator over the results of a query. Use 'nextDoc' to get each +-- successive result document, or 'allDocs' or 'allDocs'' to get lazy or +-- strict lists of results. +data Cursor = Cursor { + curCon :: Connection, + curID :: IORef Int64, + curNumToRet :: Int32, + curCol :: FullCollection, + curDocBytes :: IORef L.ByteString, + curClosed :: IORef Bool + } + +data Opcode + = OPReply -- 1 Reply to a client request. responseTo is set + | OPMsg -- 1000 generic msg command followed by a string + | OPUpdate -- 2001 update document + | OPInsert -- 2002 insert new document + | OPGetByOid -- 2003 is this used? + | OPQuery -- 2004 query a collection + | OPGetMore -- 2005 Get more data from a query. See Cursors + | OPDelete -- 2006 Delete documents + | OPKillCursors -- 2007 Tell database client is done with a cursor + deriving (Show, Eq) + +data MongoDBInternalError = MongoDBInternalError String + deriving (Eq, Show, Read) + +mongoDBInternalError :: TyCon +mongoDBInternalError = mkTyCon "Database.MongoDB.MongoDBInternalError" + +instance Typeable MongoDBInternalError where + typeOf _ = mkTyConApp mongoDBInternalError [] + +instance Exception MongoDBInternalError + +data MongoDBCollectionInvalid = MongoDBCollectionInvalid String + deriving (Eq, Show, Read) + +mongoDBCollectionInvalid :: TyCon +mongoDBCollectionInvalid = mkTyCon "Database.MongoDB.MongoDBcollectionInvalid" + +instance Typeable MongoDBCollectionInvalid where + typeOf _ = mkTyConApp mongoDBCollectionInvalid [] + +instance Exception MongoDBCollectionInvalid + +throwColInvalid :: String -> a +throwColInvalid = throw . MongoDBCollectionInvalid + +data MongoDBOperationFailure = MongoDBOperationFailure String + deriving (Eq, Show, Read) + +mongoDBOperationFailure :: TyCon +mongoDBOperationFailure = mkTyCon "Database.MongoDB.MongoDBoperationFailure" + +instance Typeable MongoDBOperationFailure where + typeOf _ = mkTyConApp mongoDBOperationFailure [] + +instance Exception MongoDBOperationFailure + +throwOpFailure :: String -> a +throwOpFailure = throw . MongoDBOperationFailure + +data MongoDBConnectionFailure = MongoDBConnectionFailure String + deriving (Eq, Show, Read) + +mongoDBConnectionFailure :: TyCon +mongoDBConnectionFailure = mkTyCon "Database.MongoDB.MongoDBconnectionFailure" + +instance Typeable MongoDBConnectionFailure where + typeOf _ = mkTyConApp mongoDBConnectionFailure [] + +instance Exception MongoDBConnectionFailure + +throwConFailure :: String -> a +throwConFailure = throw . MongoDBConnectionFailure + +fromOpcode :: Opcode -> Int32 +fromOpcode OPReply = 1 +fromOpcode OPMsg = 1000 +fromOpcode OPUpdate = 2001 +fromOpcode OPInsert = 2002 +fromOpcode OPGetByOid = 2003 +fromOpcode OPQuery = 2004 +fromOpcode OPGetMore = 2005 +fromOpcode OPDelete = 2006 +fromOpcode OPKillCursors = 2007 + +toOpcode :: Int32 -> Opcode +toOpcode 1 = OPReply +toOpcode 1000 = OPMsg +toOpcode 2001 = OPUpdate +toOpcode 2002 = OPInsert +toOpcode 2003 = OPGetByOid +toOpcode 2004 = OPQuery +toOpcode 2005 = OPGetMore +toOpcode 2006 = OPDelete +toOpcode 2007 = OPKillCursors +toOpcode n = throw $ MongoDBInternalError $ "Got unexpected Opcode: " ++ show n + +-- | The name of a database. +type Database = L8.ByteString + +-- | The full collection name. The full collection name is the +-- concatenation of the database name with the collection name, using +-- a @.@ for the concatenation. For example, for the database @foo@ +-- and the collection @bar@, the full collection name is @foo.bar@. +type FullCollection = L8.ByteString + +-- | The same as 'FullCollection' but without the 'Database' prefix. +type Collection = L8.ByteString + +-- | A 'BsonDoc' representing restrictions for a query much like the +-- /where/ part of an SQL query. +type Selector = BsonDoc + +-- | A list of field names that limits the fields in the returned +-- documents. The list can contains zero or more elements, each of +-- which is the name of a field that should be returned. An empty list +-- means that no limiting is done and all fields are returned. +type FieldSelector = [L8.ByteString] + +type RequestID = Int32 + +-- | Sets the number of documents to omit - starting from the first +-- document in the resulting dataset - when returning the result of +-- the query. +type NumToSkip = Int32 + +-- | This controls how many documents are returned at a time. The +-- cursor works by requesting /NumToReturn/ documents, which are then +-- immediately all transfered over the network; these are held locally +-- until the those /NumToReturn/ are all consumed and then the network +-- will be hit again for the next /NumToReturn/ documents. +-- +-- If the value @0@ is given, the database will choose the number of +-- documents to return. +-- +-- Otherwise choosing a good value is very dependant on the document size +-- and the way the cursor is being used. +type NumToReturn = Int32 + +type Username = String +type Password = String + +type JSCode = L8.ByteString + +-- | Options that control the behavior of a 'query' operation. +data QueryOpt = QOTailableCursor + | QOSlaveOK + | QOOpLogReplay + | QONoCursorTimeout + deriving (Show) + +fromQueryOpts :: [QueryOpt] -> Int32 +fromQueryOpts opts = List.foldl (.|.) 0 $ fmap toVal opts + where toVal QOTailableCursor = 2 + toVal QOSlaveOK = 4 + toVal QOOpLogReplay = 8 + toVal QONoCursorTimeout = 16 + +-- | Options that effect the behavior of a 'update' operation. +data UpdateFlag = UFUpsert + | UFMultiupdate + deriving (Show, Enum) + +fromUpdateFlags :: [UpdateFlag] -> Int32 +fromUpdateFlags flags = List.foldl (.|.) 0 $ + flip fmap flags $ (1 `shiftL`) . fromEnum + +-- | Return the number of documents in /FullCollection/. +count :: Connection -> FullCollection -> IO Integer +count c col = countMatching c col empty + +-- | Return the number of documents in /FullCollection/ matching /Selector/ +countMatching :: Connection -> FullCollection -> Selector -> IO Integer +countMatching c col sel = do + let (db, col') = splitFullCol col + res <- runCommand c db $ toBsonDoc [("count", toBson col'), + ("query", toBson sel)] + let cnt = (fromBson $ fromLookup $ List.lookup (s2L "n") res :: Double) + return $ truncate cnt + +-- | Delete documents matching /Selector/ from the given /FullCollection/. +delete :: Connection -> FullCollection -> Selector -> IO RequestID +delete c col sel = do + let body = runPut $ do + putI32 0 + putCol col + putI32 0 + putBsonDoc sel + (reqID, msg) <- packMsg c OPDelete body + cPut c msg + return reqID + +-- | An alias for 'delete'. +remove :: Connection -> FullCollection -> Selector -> IO RequestID +remove = delete + +moveOidToFrontOrGen :: Connection -> BsonDoc -> IO BsonDoc +moveOidToFrontOrGen c doc = + case List.lookup (s2L "_id") doc of + Nothing -> do + oid <- genObjectId $ cOidGen c + return $ (s2L "_id", oid) : doc + Just oid -> do + let keyEq = (\(k1, _) (k2, _) -> k1 == k2) + delByKey = \k -> List.deleteBy keyEq (k, undefined) + return $ (s2L "_id", oid) : delByKey (s2L "_id") doc + +-- | Insert a single document into /FullCollection/ returning the /_id/ field. +insert :: Connection -> FullCollection -> BsonDoc -> IO BsonValue +insert c col doc = do + doc' <- moveOidToFrontOrGen c doc + let body = runPut $ do + putI32 0 + putCol col + putBsonDoc doc' + (_reqID, msg) <- packMsg c OPInsert body + cPut c msg + return $ snd $ head doc' + +-- | Insert a list of documents into /FullCollection/ returing the +-- /_id/ field for each one in the same order as they were given. +insertMany :: Connection -> FullCollection -> [BsonDoc] -> IO [BsonValue] +insertMany c col docs = do + docs' <- mapM (moveOidToFrontOrGen c) docs + let body = runPut $ do + putI32 0 + putCol col + forM_ docs' putBsonDoc + (_, msg) <- packMsg c OPInsert body + cPut c msg + return $ List.map (snd . head) docs' + +-- | Open a cursor to find documents. If you need full functionality, +-- see 'query' +find :: Connection -> FullCollection -> Selector -> IO Cursor +find c col sel = query c col [] 0 0 sel [] + +-- | Query, but only return the first result, if any. +findOne :: Connection -> FullCollection -> Selector -> IO (Maybe BsonDoc) +findOne c col sel = query c col [] 0 (-1) sel [] >>= nextDoc + +-- | Perform a query and return the result as a lazy list. Be sure to +-- understand the comments about using the lazy list given for +-- 'allDocs'. +quickFind :: Connection -> FullCollection -> Selector -> IO [BsonDoc] +quickFind c col sel = find c col sel >>= allDocs + +-- | Perform a query and return the result as a strict list. +quickFind' :: Connection -> FullCollection -> Selector -> IO [BsonDoc] +quickFind' c col sel = find c col sel >>= allDocs' + +-- | Open a cursor to find documents in /FullCollection/ that match +-- /Selector/. See the documentation for each argument's type for +-- information about how it effects the query. +query :: Connection -> FullCollection -> [QueryOpt] -> + NumToSkip -> NumToReturn -> Selector -> FieldSelector -> IO Cursor +query c col opts nskip ret sel fsel = do + h <- getHandle c + + let body = runPut $ do + putI32 $ fromQueryOpts opts + putCol col + putI32 nskip + putI32 ret + putBsonDoc sel + case fsel of + [] -> putNothing + _ -> putBsonDoc $ toBsonDoc $ List.zip fsel $ + repeat $ BsonInt32 1 + (reqID, msg) <- packMsg c OPQuery body + L.hPut h msg + + hdr <- getHeader h + assert (OPReply == hOp hdr) $ return () + assert (hRespTo hdr == reqID) $ return () + reply <- getReply h + assert (rRespFlags reply == 0) $ return () + docBytes <- L.hGet h (fromIntegral $ hMsgLen hdr - 16 - 20) >>= newIORef + closed <- newIORef False + cid <- newIORef $ rCursorID reply + return Cursor { + curCon = c, + curID = cid, + curNumToRet = ret, + curCol = col, + curDocBytes = docBytes, + curClosed = closed + } + +-- | Update documents with /BsonDoc/ in /FullCollection/ that match /Selector/. +update :: Connection -> FullCollection -> + [UpdateFlag] -> Selector -> BsonDoc -> IO RequestID +update c col flags sel obj = do + let body = runPut $ do + putI32 0 + putCol col + putI32 $ fromUpdateFlags flags + putBsonDoc sel + putBsonDoc obj + (reqID, msg) <- packMsg c OPUpdate body + cPut c msg + return reqID + +-- | log into the mongodb /Database/ attached to the /Connection/ +login :: Connection -> Database -> Username -> Password -> IO BsonDoc +login c db user pass = do + doc <- runCommand c db (toBsonDoc [("getnonce", toBson (1 :: Int))]) + let nonce = fromBson $ fromLookup $ List.lookup (s2L "nonce") doc :: String + digest = md5sum $ pack $ nonce ++ user ++ + md5sum (pack (user ++ ":mongo:" ++ pass)) + request = toBsonDoc [("authenticate", toBson (1 :: Int)), + ("user", toBson user), + ("nonce", toBson nonce), + ("key", toBson digest)] + in runCommand c db request + +auth :: Connection -> Database -> Username -> Password -> IO BsonDoc +auth = login + +logout :: Connection -> Database -> IO () +logout c db = + runCommand c db (toBsonDoc [(s2L "logout", BsonInt32 1)]) >> return () + +-- | create a new user in the current /Database/ +addUser :: Connection -> Database -> Username -> Password -> IO BsonDoc +addUser c db user pass = do + let userDoc = toBsonDoc [(s2L "user", toBson user)] + fdb = L.append db (s2L ".system.users") + doc <- findOne c fdb userDoc + let pwd = md5sum $ pack (user ++ ":mongo:" ++ pass) + doc' = (s2L "pwd", toBson pwd) : + List.deleteBy (\(k1,_) (k2,_) -> (k1 == k2)) + (s2L user, undefined) + (fromMaybe userDoc doc) + _ <- save c fdb doc' + return doc' + +data MapReduceOpt + = MROptQuery BsonDoc -- ^ query filter object + + -- | MRSort ???? TODO + + | MROptLimit Int64 -- ^ number of objects to return from + -- collection + + | MROptOut L8.ByteString -- ^ output-collection name + + | MROptKeepTemp -- ^ If set the generated collection is + -- not treated as temporary, as it will + -- be by defualt. When /MROptOut/ is + -- specified, the collection is + -- automatically made permanent. + + | MROptFinalize JSCode -- ^ function to apply to all the + -- results when finished + + | MROptScope BsonDoc -- ^ can pass in variables that can be + -- access from map/reduce/finalize + + | MROptVerbose -- ^ provide statistics on job execution + -- time + +mrOptToTuple :: MapReduceOpt -> (String, BsonValue) +mrOptToTuple (MROptQuery q) = ("query", BsonDoc q) +mrOptToTuple (MROptLimit l) = ("limit", BsonInt64 l) +mrOptToTuple (MROptOut c) = ("out", BsonString c) +mrOptToTuple MROptKeepTemp = ("keeptemp", BsonBool True) +mrOptToTuple (MROptFinalize f) = ("finalize", BsonJSCode f) +mrOptToTuple (MROptScope s) = ("scope", BsonDoc s) +mrOptToTuple MROptVerbose = ("verbose", BsonBool True) + +-- | Issue a map/reduce command and return the results metadata. If +-- all you care about is the actual map/reduce results you might want +-- to use the 'mapReduce' command instead. +-- +-- The results meta-document will look something like this: +-- +-- > {"result": "tmp.mr.mapreduce_1268095152_14", +-- > "timeMillis": 67, +-- > "counts": {"input": 4, +-- > "emit": 6, +-- > "output": 3}, +-- > "ok": 1.0} +-- +-- The /results/ field is the collection name within the same Database +-- that contain the results of the map/reduce. +runMapReduce :: Connection -> FullCollection + -> JSCode -- ^ mapping javascript function + -> JSCode -- ^ reducing javascript function + -> [MapReduceOpt] + -> IO BsonDoc +runMapReduce c fc m r opts = do + let (db, col) = splitFullCol fc + doc = [("mapreduce", toBson col), + ("map", BsonJSCode m), + ("reduce", BsonJSCode r)] ++ List.map mrOptToTuple opts + runCommand c db $ toBsonDoc doc + +-- | Issue a map/reduce command with associated scopes and return the +-- results metadata. If all you care about is the actual map/reduce +-- results you might want to use the 'mapReduce' command instead. +-- +-- See 'runMapReduce' for more information about the form of the +-- result metadata. +runMapReduceWScopes :: Connection -> FullCollection + -> JSCode -- ^ mapping javascript function + -> BsonDoc -- ^ Scope for mapping function + -> JSCode -- ^ reducing javascript function + -> BsonDoc -- ^ Scope for reducing function + -> [MapReduceOpt] + -> IO BsonDoc +runMapReduceWScopes c fc m ms r rs opts = do + let (db, col) = splitFullCol fc + doc = [("mapreduce", toBson col), + ("map", BsonJSCodeWScope m ms), + ("reduce", BsonJSCodeWScope r rs)] ++ List.map mrOptToTuple opts + runCommand c db $ toBsonDoc doc + +-- | Given a result metadata from a 'mapReduce' command (or +-- 'mapReduceWScope'), issue the 'find' command that will produce the +-- actual map/reduce results. +mapReduceResults :: Connection -> Database -> BsonDoc -> IO Cursor +mapReduceResults c db r = do + let col = case List.lookup (s2L "result") r of + Just bCol -> fromBson bCol + Nothing -> throwOpFailure "No 'result' in mapReduce response" + fc = L.append (L.append db $ s2L ".") col + find c fc [] + +-- | Run map/reduce and produce a cursor on the results. +mapReduce :: Connection -> FullCollection + -> JSCode -- ^ mapping javascript function + -> JSCode -- ^ reducing javascript function + -> [MapReduceOpt] + -> IO Cursor +mapReduce c fc m r opts = + runMapReduce c fc m r opts >>= mapReduceResults c (fst $ splitFullCol fc) + +-- | Run map/reduce with associated scopes and produce a cursor on the +-- results. +mapReduceWScopes :: Connection -> FullCollection + -> JSCode -- ^ mapping javascript function + -> BsonDoc -- ^ Scope for mapping function + -> JSCode -- ^ reducing javascript function + -> BsonDoc -- ^ Scope for mapping function + -> [MapReduceOpt] + -> IO Cursor +mapReduceWScopes c fc m ms r rs opts = + runMapReduceWScopes c fc m ms r rs opts >>= + mapReduceResults c (fst $ splitFullCol fc) + +-- | Conveniently stores the /BsonDoc/ to the /FullCollection/ +-- if there is an _id present in the /BsonDoc/ then it already has +-- a place in the DB, so we update it using the _id, otherwise +-- we insert it +save :: Connection -> FullCollection -> BsonDoc -> IO BsonValue +save c fc doc = + case List.lookup (s2L "_id") doc of + Nothing -> insert c fc doc + Just oid -> update c fc [UFUpsert] (toBsonDoc [("_id", oid)]) doc >> + return oid + +-- | Use this in the place of the query portion of a select type query +-- This uses javascript and a scope supplied by a /BsonDoc/ to evaluate +-- documents in the database for retrieval. +-- +-- Example: +-- +-- > findOne conn mycoll $ whereClause "this.name == (name1 + name2)" +-- > Just (toBsonDoc [("name1", toBson "mar"), ("name2", toBson "tha")]) +whereClause :: String -> Maybe BsonDoc -> BsonDoc +whereClause qry Nothing = toBsonDoc [("$where", BsonJSCode (s2L qry))] +whereClause qry (Just scope) = + toBsonDoc [("$where", BsonJSCodeWScope (s2L qry) scope)] + +data Hdr = Hdr { + hMsgLen :: Int32, + -- hReqID :: Int32, + hRespTo :: Int32, + hOp :: Opcode + } deriving (Show) + +data Reply = Reply { + rRespFlags :: Int32, + rCursorID :: Int64 + -- rStartFrom :: Int32, + -- rNumReturned :: Int32 + } deriving (Show) + +getHeader :: Handle -> IO Hdr +getHeader h = do + hdrBytes <- L.hGet h 16 + return $ flip runGet hdrBytes $ do + msgLen <- getI32 + skip 4 -- reqID <- getI32 + respTo <- getI32 + op <- getI32 + return $ Hdr msgLen respTo $ toOpcode op + +getReply :: Handle -> IO Reply +getReply h = do + replyBytes <- L.hGet h 20 + return $ flip runGet replyBytes $ do + respFlags <- getI32 + cursorID <- getI64 + skip 4 -- startFrom <- getI32 + skip 4 -- numReturned <- getI32 + return $ Reply respFlags cursorID + + +-- | Return one document or Nothing if there are no more. +-- Automatically closes the cursor when last document is read +nextDoc :: Cursor -> IO (Maybe BsonDoc) +nextDoc cur = do + closed <- readIORef $ curClosed cur + if closed + then return Nothing + else do + docBytes <- readIORef $ curDocBytes cur + cid <- readIORef $ curID cur + case L.length docBytes of + 0 -> if cid == 0 + then writeIORef (curClosed cur) True >> return Nothing + else getMore cur + _ -> do + let (doc, docBytes') = getFirstDoc docBytes + writeIORef (curDocBytes cur) docBytes' + return $ Just doc + +-- | Return a lazy list of all (of the rest) of the documents in the +-- cursor. This works much like hGetContents--it will lazily read the +-- cursor data out of the database as the list is used. The cursor is +-- automatically closed when the list has been fully read. +-- +-- If you manually finish the cursor before consuming off this list +-- you won't get all the original documents in the cursor. +-- +-- If you don't consume to the end of the list, you must manually +-- close the cursor or you will leak the cursor, which may also leak +-- on the database side. +allDocs :: Cursor -> IO [BsonDoc] +allDocs cur = unsafeInterleaveIO $ do + doc <- nextDoc cur + case doc of + Nothing -> return [] + Just d -> liftM (d :) (allDocs cur) + +-- | Returns a strict list of all (of the rest) of the documents in +-- the cursor. This means that all of the documents will immediately +-- be read out of the database and loaded into memory. +allDocs' :: Cursor -> IO [BsonDoc] +allDocs' cur = do + doc <- nextDoc cur + case doc of + Nothing -> return [] + Just d -> liftM (d :) (allDocs' cur) + +getFirstDoc :: L.ByteString -> (BsonDoc, L.ByteString) +getFirstDoc docBytes = flip runGet docBytes $ do + doc <- getBsonDoc + docBytes' <- getRemainingLazyByteString + return (doc, docBytes') + +getMore :: Cursor -> IO (Maybe BsonDoc) +getMore cur = do + h <- getHandle $ curCon cur + + cid <- readIORef $ curID cur + let body = runPut $ do + putI32 0 + putCol $ curCol cur + putI32 $ curNumToRet cur + putI64 cid + (reqID, msg) <- packMsg (curCon cur) OPGetMore body + L.hPut h msg + + hdr <- getHeader h + assert (OPReply == hOp hdr) $ return () + assert (hRespTo hdr == reqID) $ return () + reply <- getReply h + assert (rRespFlags reply == 0) $ return () + case rCursorID reply of + 0 -> writeIORef (curID cur) 0 + ncid -> assert (ncid == cid) $ return () + docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20) + case L.length docBytes of + 0 -> writeIORef (curClosed cur) True >> return Nothing + _ -> do + let (doc, docBytes') = getFirstDoc docBytes + writeIORef (curDocBytes cur) docBytes' + return $ Just doc + +-- | Manually close a cursor -- usually not needed if you use +-- 'allDocs', 'allDocs'', or 'nextDoc'. +finish :: Cursor -> IO () +finish cur = do + h <- getHandle $ curCon cur + cid <- readIORef $ curID cur + unless (cid == 0) $ do + let body = runPut $ do + putI32 0 + putI32 1 + putI64 cid + (_reqID, msg) <- packMsg (curCon cur) OPKillCursors body + L.hPut h msg + writeIORef (curClosed cur) True + return () + +-- | The field key to index on. +type Key = L8.ByteString + +-- | Direction to index. +data Direction = Ascending + | Descending + deriving (Show, Eq) + +fromDirection :: Direction -> Int +fromDirection Ascending = 1 +fromDirection Descending = - 1 + +-- | Should this index guarantee uniqueness? +type Unique = Bool + +-- | Create a new index on /FullCollection/ on the list of /Key/ / +-- /Direction/ pairs. +createIndex :: Connection -> FullCollection -> + [(Key, Direction)] -> Unique -> IO L8.ByteString +createIndex c col keys uniq = do + let (db, _col') = splitFullCol col + name = indexName keys + keysDoc = flip fmap keys $ + \(k, d) -> (k, toBson $ fromDirection d :: BsonValue) + _ <- insert c (L.append db $ s2L ".system.indexes") $ + toBsonDoc [("name", toBson name), + ("ns", toBson col), + ("key", toBson keysDoc), + ("unique", toBson uniq)] + return name + +-- | Drop the specified index on the given /FullCollection/. +dropIndex :: Connection -> FullCollection -> [(Key, Direction)] -> IO () +dropIndex c col keys = do + let (db, col') = splitFullCol col + name = indexName keys + _ <- runCommand c db $ toBsonDoc [("deleteIndexes", toBson col'), + ("index", toBson name)] + return () + +-- | Drop all indexes on /FullCollection/. +dropIndexes :: Connection -> FullCollection -> IO () +dropIndexes c col = do + let (db, col') = splitFullCol col + _ <- runCommand c db $ toBsonDoc [("deleteIndexes", toBson col'), + ("index", toBson "*")] + return () + +-- | Return a BsonDoc describing the existing indexes on /FullCollection/. +-- +-- With the current server versions (1.2) this will return documents +-- such as: +-- +-- > {"key": {"lastname": -1, "firstname": 1}, +-- > "name": "lastname_-1_firstname_1", +-- > "ns": "mydb.people", +-- > "unique": true} +-- +-- Which is a single key that indexes on @lastname@ (descending) and +-- then @firstname@ (ascending) on the collection @people@ of the +-- database @mydb@ with a uniqueness requirement. +indexInformation :: Connection -> FullCollection -> IO [BsonDoc] +indexInformation c col = do + let (db, _col') = splitFullCol col + quickFind' c (L.append db $ s2L ".system.indexes") $ + toBsonDoc [("ns", toBson col)] + +indexName :: [(Key, Direction)] -> L8.ByteString +indexName = L.intercalate (s2L "_") . List.map partName + where partName (k, Ascending) = L.append k $ s2L "_1" + partName (k, Descending) = L.append k $ s2L "_-1" + +putCol :: Collection -> Put +putCol col = putLazyByteString col >> putNull + +packMsg :: Connection -> Opcode -> L.ByteString -> IO (RequestID, L.ByteString) +packMsg c op body = do + reqID <- randNum c + let msg = runPut $ do + putI32 $ fromIntegral $ L.length body + 16 + putI32 reqID + putI32 0 + putI32 $ fromOpcode op + putLazyByteString body + return (reqID, msg) + +randNum :: Connection -> IO Int32 +randNum Connection { cRand = nsRef } = atomicModifyIORef nsRef $ \ns -> + (List.tail ns, + fromIntegral $ List.head ns) + +s2L :: String -> L8.ByteString +s2L = L8.fromString + +validateCollectionName :: FullCollection -> IO (Database, Collection) +validateCollectionName col = do + let (db, col') = splitFullCol col + when (s2L ".." `List.elem` L.group col) $ + throwColInvalid $ "Collection can't contain \"..\": " ++ show col + when (c2w '$' `L.elem` col && + not (s2L "oplog.$mail" `L.isPrefixOf` col' || + s2L "$cmd" `L.isPrefixOf` col')) $ + throwColInvalid $ "Collection can't contain '$': " ++ show col + when (L.head col == c2w '.' || L.last col == c2w '.') $ + throwColInvalid $ "Collection can't start or end with '.': " ++ show col + return (db, col') + +fromLookup :: Maybe a -> a +fromLookup (Just m) = m +fromLookup Nothing = throwColInvalid "cannot find key" diff --git a/Database/MongoDB/BSON.hs b/old/Database/MongoDB/BSON.hs similarity index 100% rename from Database/MongoDB/BSON.hs rename to old/Database/MongoDB/BSON.hs diff --git a/old/Database/MongoDB/Util.hs b/old/Database/MongoDB/Util.hs new file mode 100644 index 0000000..4de9f5c --- /dev/null +++ b/old/Database/MongoDB/Util.hs @@ -0,0 +1,83 @@ +{- + +Copyright (C) 2010 Scott R Parish + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +-} + +module Database.MongoDB.Util + ( + putI8, putI16, putI32, putI64, putNothing, putNull, putS, + getI8, getI32, getI64, getC, getS, getNull, putStrSz, + ) +where +import Control.Exception (assert) +import Control.Monad +import Data.Binary +import Data.Binary.Get +import Data.Binary.Put +import Data.ByteString.Char8 +import qualified Data.ByteString.Lazy as L +import qualified Data.ByteString.Lazy.UTF8 as L8 +import Data.Char (chr) +import Data.Int + +getC :: Get Char +getC = liftM chr getI8 + +getI8 :: (Integral a) => Get a +getI8 = liftM fromIntegral getWord8 + +getI32 :: Get Int32 +getI32 = liftM fromIntegral getWord32le + +getI64 :: Get Int64 +getI64 = liftM fromIntegral getWord64le + +getS :: Get (Integer, L8.ByteString) +getS = getLazyByteStringNul >>= \s -> return (fromIntegral $ L.length s + 1, s) + +getNull :: Get () +getNull = do {c <- getC; assert (c == '\0') $ return ()} + +putI8 :: Int8 -> Put +putI8 = putWord8 . fromIntegral + +putI16 :: Int16 -> Put +putI16 = putWord16le . fromIntegral + +putI32 :: Int32 -> Put +putI32 = putWord32le . fromIntegral + +putI64 :: Int64 -> Put +putI64 = putWord64le . fromIntegral + +putNothing :: Put +putNothing = putByteString $ pack "" + +putNull :: Put +putNull = putI8 0 + +putS :: L8.ByteString -> Put +putS s = putLazyByteString s >> putNull + +putStrSz :: L8.ByteString -> Put +putStrSz s = putI32 (fromIntegral $ 1 + L.length s) >> putS s diff --git a/tutorial.md b/tutorial.md index cc01f94..d7ced80 100644 --- a/tutorial.md +++ b/tutorial.md @@ -41,94 +41,89 @@ Start up a haskell repl: $ ghci -Now We'll need to bring in the MongoDB/BSON bindings: +Now we'll need to bring in the MongoDB/BSON bindings and set +OverloadedStrings so literal strings are converted to UTF-8 automatically. > import Database.MongoDB - > import Database.MongoDB.BSON + > :set -XOverloadedStrings Making A Connection ------------------- Open up a connection to your DB instance, using the standard port: - > con <- connect "127.0.0.1" [] + > Right con <- connect $ server "127.0.0.1" or for a non-standard port - > import Network - > con <- connectOnPort "127.0.0.1" (Network.PortNumber 666) [] + > Right con <- connect $ server "127.0.0.1" (PortNumber 666) -By default mongoDB will try to find the master and connect to it and -will throw an exception if a master can not be found to connect -to. You can force mongoDB to connect to the slave by adding SlaveOK as -a connection option, eg: +*connect* returns Left IOError if connection fails. We are assuming above +it won't fail. If it does you will get a pattern match error. - > con <- connect "127.0.0.1" [SlaveOK] +Task and Db monad +------------------- -Databases, Collections and FullCollections ------------------------------------------- +The current connection is held in a Reader monad called "Task*, and the +current database is held in a Reader monad on top of that. To run a task, +supply it and a connection to *runTask*. Within a task, to access a database, +wrap you operations in a *useDb*. -As many database servers, MongoDB has databases--separate namespaces -under which collections reside. Most of the APIs for this driver -request the *FullCollection* which is simply the *Database* and the -*Collection* concatenated with a period. +But since we are working in ghci, which requires us to start from the +IO monad every time, we'll define a convenient 'run' function that takes a +db-action and executes it against our "test" database on the server we +just connected to: -For instance 'myweb_prod.users' is the the *FullCollection* name for -the *Collection 'users' in the database 'myweb_prod'. + > let run act = runTask (useDb "test" act) con + +*run* (*runTask*) will return either Left Failure or Right result. Failure +means the connection failed (eg. network problem) or the server failed +(eg. disk full). + +Databases and Collections +----------------------------- + +A MongoDB can store multiple databases--separate namespaces +under which collections reside. + +You can obtain the list of databases available on a connection: + + > runTask allDatabases con + +You can also use the *run* function we just created: + + > run allDatabases + +The "test" database is ignored in this case because *allDatabases* +is not a query on a specific database but on the server as a whole. Databases and collections do not need to be created, just start using them and MongoDB will automatically create them for you. -In the below examples we'll be using the following *FullCollection*: +In the below examples we'll be using the database "test" (captured in *run* +above) and the colllection "posts": - > import Data.ByteString.Lazy.UTF8 - > let postsCol = (fromString "test.posts") +You can obtain a list of collections available in the "test" database: -You can obtain a list of databases available on a connection: - - > dbs <- databaseNames con - -You can obtain a list of collections available on a database: - - > cols <- collectionNames con (fromString "test") - > map toString cols - ["test.system.indexes"] + > run allCollections Documents --------- Data in MongoDB is represented (and stored) using JSON-style -documents. In mongoDB we use the *BsonDoc* type to represent these -documents. At the moment a *BsonDoc* is simply a tuple list of the -type '[(ByteString, BsonValue)]'. Here's a BsonDoc which could represent -a blog post: +documents. In mongoDB we use the BSON *Document* type to represent +these documents. A document is simply a list of *Field*s, where each field is +a named value. A value is a basic type like Bool, Int, Float, String, Time; +a special BSON value like Binary, Javascript, ObjectId; a (embedded) +Document; or a list of values. Here's an example document which could +represent a blog post: - > import Data.Time.Clock.POSIX - > now <- getPOSIXTime + > import Data.Time + > now <- getCurrentTime > :{ - let post = [(fromString "author", BsonString $ fromString "Mike"), - (fromString "text", - BsonString $ fromString "My first blog post!"), - (fromString "tags", - BsonArray [BsonString $ fromString "mongodb", - BsonString $ fromString "python", - BsonString $ fromString "pymongo"]), - (fromString "date", BsonDate now)] - :} - -With all the type wrappers and string conversion, it's hard to see -what's actually going on. Fortunately the BSON library provides -conversion functions *toBson* and *fromBson* for converting native -between the wrapped BSON types and many native Haskell types. The -functions *toBsonDoc* and *fromBsonDoc* help convert from tuple lists -with plain *String* keys, or *Data.Map*. - -Here's the same BSON data structure using these conversion functions: - - > :{ - let post = toBsonDoc [("author", toBson "Mike"), - ("text", toBson "My first blog post!"), - ("tags", toBson ["mongoDB", "Haskell"]), - ("date", BsonDate now)] + let post = ["author" =: "Mike", + "text" =: "My first blog post!", + "tags" =: ["mongoDB", "Haskell"], + "date" =: now] :} Inserting a Document @@ -136,11 +131,11 @@ Inserting a Document To insert a document into a collection we can use the *insert* function: - > insert con postsCol post - BsonObjectId 23400392795601893065744187392 + > run $ insert "posts" post + Right (Oid 4c16d355 c80c560858000000) -When a document is inserted a special key, *_id*, is automatically -added if the document doesn't already contain an *_id* key. The value +When a document is inserted a special field, *_id*, is automatically +added if the document doesn't already contain that field. The value of *_id* must be unique across the collection. *insert* returns the value of *_id* for the inserted document. For more information, see the [documentation on _id](http://www.mongodb.org/display/DOCS/Object+IDs). @@ -149,9 +144,7 @@ After inserting the first document, the posts collection has actually been created on the server. We can verify this by listing all of the collections in our database: - > cols <- collectionNames con (fromString "test") - > map toString cols - [u'postsCol', u'system.indexes'] + > run allCollections * Note The system.indexes collection is a special internal collection that was created automatically. @@ -166,11 +159,10 @@ only one matching document, or are only interested in the first match. Here we use *findOne* to get the first document from the posts collection: - > findOne con postsCol [] - Just [(Chunk "_id" Empty,BsonObjectId (Chunk "K\151\153S9\CAN\138e\203X\182'" Empty)),(Chunk "author" Empty,BsonString (Chunk "Mike" Empty)),(Chunk "text" Empty,BsonString (Chunk "My first blog post!" Empty)),(Chunk "tags" Empty,BsonArray [BsonString (Chunk "mongoDB" Empty),BsonString (Chunk "Haskell" Empty)]),(Chunk "date" Empty,BsonDate 1268226361.753s)] + > run $ findOne (query [] "posts") + Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC]) -The result is a dictionary matching the one that we inserted -previously. +The result is a document matching the one that we inserted previously. * Note: The returned document contains an *_id*, which was automatically added on insert. @@ -179,41 +171,42 @@ added on insert. resulting document must match. To limit our results to a document with author "Mike" we do: - > findOne con postsCol $ toBsonDoc [("author", toBson "Mike")] - Just [(Chunk "_id" Empty,BsonObjectId (Chunk "K\151\153S9\CAN\138e\203X\182'" Empty)),(Chunk "author" Empty,BsonString (Chunk "Mike" Empty)),(Chunk "text" Empty,BsonString (Chunk "My first blog post!" Empty)),(Chunk "tags" Empty,BsonArray [BsonString (Chunk "mongoDB" Empty),BsonString (Chunk "Haskell" Empty)]),(Chunk "date" Empty,BsonDate 1268226361.753s)] + > run $ findOne (query ["author" =: "Mike"] "posts") + Right (Just [ _id: Oid 4c16d355 c80c560858000000, author: "Mike", text: "My first blog post!", tags: ["mongoDB","Haskell"], date: 2010-06-15 01:09:28.364 UTC]) If we try with a different author, like "Eliot", we'll get no result: - > findOne con postsCol $ toBsonDoc [("author", toBson "Eliot")] - Nothing + > run $ findOne (query ["author" =: "Eliot"] "posts") + Right Nothing Bulk Inserts ------------ In order to make querying a little more interesting, let's insert a few more documents. In addition to inserting a single document, we can -also perform bulk insert operations, by using the *insertMany* api -which accepts a list of documents to be inserted. This will insert -each document in the iterable, sending only a single command to the -server: +also perform bulk insert operations, by using the *insertMany* function +which accepts a list of documents to be inserted. It send only a single +command to the server: - > now <- getPOSIXTime + > now <- getCurrentTime > :{ - let new_postsCol = [toBsonDoc [("author", toBson "Mike"), - ("text", toBson "Another post!"), - ("tags", toBson ["bulk", "insert"]), - ("date", toBson now)], - toBsonDoc [("author", toBson "Eliot"), - ("title", toBson "MongoDB is fun"), - ("text", toBson "and pretty easy too!"), - ("date", toBson now)]] + let post1 = ["author" =: "Mike", + "text" =: "Another post!", + "tags" =: ["bulk", "insert"], + "date" =: now] :} - > insertMany con postsCol new_posts - [BsonObjectId 23400393883959793414607732737,BsonObjectId 23400398126710930368559579137] + > :{ + let post2 = ["author" =: "Eliot", + "title" =: "MongoDB is fun", + "text" =: "and pretty easy too!", + "date" =: now] + :} + > run $ insertMany "posts" [post1, post2] + Right [Oid 4c16d67e c80c560858000001,Oid 4c16d67e c80c560858000002] -* Note that *new_posts !! 1* has a different shape than the other -posts - there is no "tags" field and we've added a new field, -"title". This is what we mean when we say that MongoDB is schema-free. +* Note that post2 has a different shape than the other posts - there +is no "tags" field and we've added a new field, "title". This is what we +mean when we say that MongoDB is schema-free. Querying for More Than One Document ------------------------------------ @@ -221,43 +214,37 @@ Querying for More Than One Document To get more than a single document as the result of a query we use the *find* method. *find* returns a cursor instance, which allows us to iterate over all matching documents. There are several ways in which -we can iterate: we can call *nextDoc* to get documents one at a time -or we can get a lazy list of all the results by applying the cursor -to *allDocs*: +we can iterate: we can call *next* to get documents one at a time +or we can get all the results by applying the cursor to *rest*: - > cursor <- find con postsCol $ toBsonDoc [("author", toBson "Mike")] - > allDocs cursor + > Right cursor <- run $ find (query ["author" =: "Mike"] "posts") + > run $ rest cursor Of course you can use bind (*>>=*) to combine these into one line: - > docs <- find con postsCol (toBsonDoc [("author", toBson "Mike")]) >>= allDocs + > run $ find (query ["author" =: "Mike"] "posts") >>= rest -* Note: *nextDoc* automatically closes the cursor when the last -document has been read out of it. Similarly, *allDocs* automatically -closes the cursor when you've consumed to the end of the resulting -list. +* Note: *next* automatically closes the cursor when the last +document has been read out of it. Similarly, *rest* automatically +closes the cursor after returning all the results. Counting -------- We can count how many documents are in an entire collection: - > num <- count con postsCol + > run $ count (query [] "posts") -Or we can query for how many documents match a query: +Or count how many documents match a query: - > num <- countMatching con postsCol (toBsonDoc [("author", toBson "Mike")]) + > run $ count (query ["author" =: "Mike"] "posts") Range Queries ------------- -No non native sorting yet. +To do Indexing -------- -WIP - coming soon. - -Something like... - - > index <- createIndex con testcol [("author", Ascending)] True +To do