From 97400c074d6a334788303a85616faa3ecaa26c00 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Wed, 8 Jun 2016 00:09:33 -0700 Subject: [PATCH 1/4] Implement bulk update operation --- Database/MongoDB/Admin.hs | 3 +- Database/MongoDB/Query.hs | 142 ++++++++++++++++++++++++++++++++++---- 2 files changed, 129 insertions(+), 16 deletions(-) diff --git a/Database/MongoDB/Admin.hs b/Database/MongoDB/Admin.hs index 09fbcb0..26df47c 100644 --- a/Database/MongoDB/Admin.hs +++ b/Database/MongoDB/Admin.hs @@ -196,7 +196,8 @@ allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document] allUsers = map (exclude ["_id"]) <$> (rest =<< find (select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]}) -addUser :: (MonadIO m) => Bool -> Username -> Password -> Action m () +addUser :: (MonadBaseControl IO m, MonadIO m) + => Bool -> Username -> Password -> Action m () -- ^ Add user with password with read-only access if bool is True or read-write access if bool is False addUser readOnly user pass = do mu <- findOne (select ["user" =: user] "system.users") diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 5eb895f..e9dd1e5 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -42,12 +42,12 @@ module Database.MongoDB.Query ( MRResult, mapReduce, runMR, runMR', -- * Command Command, runCommand, runCommand1, - eval, retrieveServerData + eval, retrieveServerData, updateMany, updateAll, UpdateResult ) where import Prelude hiding (lookup) import Control.Exception (Exception, throwIO, throw) -import Control.Monad (unless, replicateM, liftM, forM) +import Control.Monad (unless, replicateM, liftM, forM, forM_, void) import Data.Int (Int32, Int64) import Data.Maybe (listToMaybe, catMaybes, isNothing) import Data.Word (Word32) @@ -64,6 +64,8 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, readMVar, modifyMVar) #endif import Control.Applicative ((<$>)) +import Control.Exception (SomeException) +import Control.Exception.Lifted (catch) import Control.Monad (when) import Control.Monad.Base (MonadBase) import Control.Monad.Error (Error(..)) @@ -145,6 +147,8 @@ data AccessMode = type GetLastError = Document -- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See for more options. +data UpdateResult = UpdateResult + master :: AccessMode -- ^ Same as 'ConfirmWrites' [] master = ConfirmWrites [] @@ -428,7 +432,7 @@ insert' opts col docs = do let sd = P.serverData p let docSize = sizeOfDocument $ insertCommandDocument opts col [] chunks <- forM (splitAtLimit - opts + (not (KeepGoing `elem` opts)) (maxBsonObjectSize sd - docSize) -- ^ size of auxiliary part of insert -- document should be subtracted from @@ -469,14 +473,14 @@ insertBlock opts col docs = do (maybe 0 id $ lookup "ok" doc) (show err ++ show writeConcernErr) -splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]] -splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list +splitAtLimit :: Bool -> Int -> Int -> [Document] -> [[Document]] +splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list where go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document]) go _ _ res [] = (reverse res, []) go curSize curCount [] (x:xs) | ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) = - if (KeepGoing `elem` opts) + if (not ordered) then go curSize curCount [] xs -- Skip this document and insert the other documents. else @@ -507,37 +511,145 @@ assignId doc = if any (("_id" ==) . label) doc -- ** Update -save :: (MonadIO m) => Collection -> Document -> Action m () +save :: (MonadBaseControl IO m, MonadIO m) + => Collection -> Document -> Action m () -- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or upsert it if its not new (has \"_id\" field) save col doc = case look "_id" doc of Nothing -> insert_ col doc Just i -> upsert (Select ["_id" := i] col) doc -replace :: (MonadIO m) => Selection -> Document -> Action m () +replace :: (MonadBaseControl IO m, MonadIO m) + => Selection -> Document -> Action m () -- ^ Replace first document in selection with given document replace = update [] -repsert :: (MonadIO m) => Selection -> Document -> Action m () +repsert :: (MonadBaseControl IO m, MonadIO m) + => Selection -> Document -> Action m () -- ^ Replace first document in selection with given document, or insert document if selection is empty repsert = update [Upsert] {-# DEPRECATED repsert "use upsert instead" #-} -upsert :: (MonadIO m) => Selection -> Document -> Action m () +upsert :: (MonadBaseControl IO m, MonadIO m) + => Selection -> Document -> Action m () -- ^ Update first document in selection with given document, or insert document if selection is empty upsert = update [Upsert] type Modifier = Document -- ^ Update operations on fields in a document. See -modify :: (MonadIO m) => Selection -> Modifier -> Action m () +modify :: (MonadBaseControl IO m, MonadIO m) + => Selection -> Modifier -> Action m () -- ^ Update all documents in selection using given modifier modify = update [MultiUpdate] -update :: (MonadIO m) => [UpdateOption] -> Selection -> Document -> Action m () +update :: (MonadBaseControl IO m, MonadIO m) + => [UpdateOption] -> Selection -> Document -> Action m () -- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty. -update opts (Select sel col) up = do - db <- thisDatabase - write (Update (db <.> col) opts sel up) +update opts (Select sel col) up = void $ update' True col [(sel, up, opts)] + +updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document +updateCommandDocument col ordered updates writeConcern = + [ "update" =: col + , "ordered" =: ordered + , "updates" =: updates + , "writeConcern" =: writeConcern + ] + +{-| Bulk update operation. If one insert fails it will not insert the remaining + - documents. Current returned value is only a place holder. With mongodb server + - before 2.6 it will send update requests one by one. After 2.6 it will use + - bulk update feature in mongodb. + -} +updateMany :: (MonadBaseControl IO m, MonadIO m) + => Collection + -> [(Selector, Document, [UpdateOption])] + -> Action m UpdateResult +updateMany = update' True + +{-| Bulk update operation. If one insert fails it will proceed with the + - remaining documents. Current returned value is only a place holder. With + - mongodb server before 2.6 it will send update requests one by one. After 2.6 + - it will use bulk update feature in mongodb. + -} +updateAll :: (MonadBaseControl IO m, MonadIO m) + => Collection + -> [(Selector, Document, [UpdateOption])] + -> Action m UpdateResult +updateAll = update' False + +update' :: (MonadBaseControl IO m, MonadIO m) + => Bool + -> Collection + -> [(Selector, Document, [UpdateOption])] + -> Action m UpdateResult +update' ordered col updateDocs = do + p <- asks mongoPipe + let sd = P.serverData p + let updates = map (\(s, d, os) -> [ "q" =: s + , "u" =: d + , "upsert" =: (Upsert `elem` os) + , "multi" =: (MultiUpdate `elem` os)]) + updateDocs + + mode <- asks mongoWriteMode + let writeConcern = case mode of + NoConfirm -> ["w" =: (0 :: Int)] + Confirm params -> params + let docSize = sizeOfDocument $ updateCommandDocument col ordered [] writeConcern + let chunks = splitAtLimit + ordered + (maxBsonObjectSize sd - docSize) + -- ^ size of auxiliary part of insert + -- document should be subtracted from + -- the overall size + (maxWriteBatchSize sd) + updates + forM_ chunks (updateBlock ordered col) + return UpdateResult + +updateBlock :: (MonadIO m, MonadBaseControl IO m) + => Bool -> Collection -> [Document] -> Action m () +updateBlock ordered col docs = do + p <- asks mongoPipe + let sd = P.serverData p + if (maxWireVersion sd < 2) + then do + db <- thisDatabase + errors <- + forM docs $ \updateDoc -> do + let doc = (at "u" updateDoc) :: Document + let sel = (at "q" updateDoc) :: Document + let upsrt = if at "upsert" updateDoc then [Upsert] else [] + let multi = if at "multi" updateDoc then [MultiUpdate] else [] + write (Update (db <.> col) (upsrt ++ multi) sel doc) + return Nothing + `catch` \(e :: SomeException) -> do + when ordered $ liftIO $ throwIO e + return $ Just e + let onlyErrors = catMaybes errors + if not $ null onlyErrors + then liftIO $ throwIO $ WriteFailure 0 (show onlyErrors) + else return () + else do + mode <- asks mongoWriteMode + let writeConcern = case mode of + NoConfirm -> ["w" =: (0 :: Int)] + Confirm params -> params + doc <- runCommand $ updateCommandDocument col ordered docs writeConcern + case (look "writeErrors" doc, look "writeConcernError" doc) of + (Nothing, Nothing) -> return () + (Just err, Nothing) -> do + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) + (Nothing, Just err) -> do + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) + (Just err, Just writeConcernErr) -> do + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err ++ show writeConcernErr) -- ** Delete From d4b1c3a8ff41f13c9a6ba6b86493b076e0bb221f Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Thu, 9 Jun 2016 00:28:54 -0700 Subject: [PATCH 2/4] Add tests for update operations --- test/QuerySpec.hs | 106 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/test/QuerySpec.hs b/test/QuerySpec.hs index 57ee7e7..8fa3fb8 100644 --- a/test/QuerySpec.hs +++ b/test/QuerySpec.hs @@ -5,6 +5,8 @@ module QuerySpec (spec) where import Data.String (IsString(..)) import TestImport import Control.Exception +import Control.Monad (forM_) +import Database.MongoDB.Internal.Protocol (UpdateOption(..)) import qualified Data.List as L import qualified Data.Text as T @@ -186,6 +188,110 @@ spec = around withCleanDatabase $ do liftIO $ (length returnedDocs) `shouldBe` 6001 + describe "updateMany" $ do + it "updates value" $ do + _id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] + result <- db $ rest =<< find (select [] "team") + result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]] + _ <- db $ updateMany "team" [([ "_id" =: _id] + , ["$set" =: ["league" =: "European"]] + , [])] + updatedResult <- db $ rest =<< find (select [] "team") + updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]] + it "upserts value" $ do + c <- db $ count (select [] "team") + c `shouldBe` 0 + _ <- db $ updateMany "team" [( [] + , ["name" =: "Giants", "league" =: "MLB"] + , [Upsert] + )] + updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) + map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]] + it "updates all documents with Multi enabled" $ do + _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] + _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"] + _ <- db $ updateMany "team" [( ["name" =: "Yankees"] + , ["$set" =: ["league" =: "MLB"]] + , [MultiUpdate] + )] + updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) + (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"] + , ["league" =: "MLB", "name" =: "Yankees"] + ] + it "updates one document when there is no Multi option" $ do + _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] + _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"] + _ <- db $ updateMany "team" [( ["name" =: "Yankees"] + , ["$set" =: ["league" =: "MLB"]] + , [] + )] + updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) + (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"] + , ["league" =: "MiLB", "name" =: "Yankees"] + ] + it "can process different updates" $ do + _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] + _ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"] + _ <- db $ updateMany "team" [ ( ["name" =: "Yankees"] + , ["$set" =: ["league" =: "MiLB"]] + , [] + ) + , ( ["name" =: "Giants"] + , ["$set" =: ["league" =: "MLB"]] + , [] + ) + ] + updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) + (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB" , "name" =: "Giants"] + , ["league" =: "MiLB", "name" =: "Yankees"] + ] + it "can process different updates" $ do + _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)] + _ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)] + (db $ updateMany "team" [ ( ["name" =: "Yankees"] + , ["$inc" =: ["score" =: (1 :: Int)]] + , [] + ) + , ( ["name" =: "Giants"] + , ["$inc" =: ["score" =: (2 :: Int)]] + , [] + ) + ]) `shouldThrow` anyException + updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) + (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)] + , ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)] + ] + it "can handle big updates" $ do + let docs = (flip map) [0..200000] $ \i -> + ["name" =: (T.pack $ "name " ++ (show i))] + ids <- db $ insertAll "bigCollection" docs + let updateDocs = (flip map) ids (\i -> ( [ "_id" =: i] + , ["$set" =: ["name" =: ("name " ++ (show i))]] + , [] + )) + _ <- db $ updateMany "team" updateDocs + updatedResult <- db $ rest =<< find (select [] "team") + forM_ updatedResult $ \r -> let (i :: ObjectId) = "_id" `at` r + in (("name" `at` r) :: String) `shouldBe` ("name" ++ (show i)) + + describe "updateAll" $ do + it "can process different updates" $ do + _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)] + _ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)] + (db $ updateAll "team" [ ( ["name" =: "Yankees"] + , ["$inc" =: ["score" =: (1 :: Int)]] + , [] + ) + , ( ["name" =: "Giants"] + , ["$inc" =: ["score" =: (2 :: Int)]] + , [] + ) + ]) `shouldThrow` anyException + updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) + (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)] + , ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)] + ] + describe "allCollections" $ do it "returns all collections in a database" $ do _ <- db $ insert "team1" ["name" =: "Yankees", "league" =: "American"] From 2a450ff538b4766cb46cd8fc919f3f64bce7f66b Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 12 Jun 2016 15:08:59 -0700 Subject: [PATCH 3/4] Add changelog entry --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b4a8e3..dbfba8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,13 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac ### Added - TLS implementation. So far it is an experimental feature. - Insert using command syntax with mongo server >= 2.6 +- UpdateMany and UpdateAll commands. They use bulk operations from mongo + version 2.6 and above. With versions below 2.6 it sends many updates. ### Changed - All messages will be strictly evaluated before sending them to mongodb server. No more closed handles because of bad arguments. +- Update command is reimplemented in terms of UpdateMany. ### Removed - System.IO.Pipeline module From 8091afe4a07c90a94cfe8445b8d66f9163967586 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Thu, 16 Jun 2016 11:27:03 -0700 Subject: [PATCH 4/4] Hide Internal module --- Database/MongoDB/Query.hs | 3 ++- mongoDB.cabal | 4 ++-- test/QuerySpec.hs | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index e9dd1e5..fb24f32 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -42,7 +42,8 @@ module Database.MongoDB.Query ( MRResult, mapReduce, runMR, runMR', -- * Command Command, runCommand, runCommand1, - eval, retrieveServerData, updateMany, updateAll, UpdateResult + eval, retrieveServerData, updateMany, updateAll, UpdateResult, + UpdateOption(..) ) where import Prelude hiding (lookup) diff --git a/mongoDB.cabal b/mongoDB.cabal index eb15c53..3b41bb3 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -50,11 +50,11 @@ Library Exposed-modules: Database.MongoDB Database.MongoDB.Admin Database.MongoDB.Connection - Database.MongoDB.Internal.Protocol - Database.MongoDB.Internal.Util Database.MongoDB.Query Database.MongoDB.Transport Database.MongoDB.Transport.Tls + Other-modules: Database.MongoDB.Internal.Protocol + Database.MongoDB.Internal.Util Source-repository head Type: git diff --git a/test/QuerySpec.hs b/test/QuerySpec.hs index 8fa3fb8..9591d45 100644 --- a/test/QuerySpec.hs +++ b/test/QuerySpec.hs @@ -6,7 +6,6 @@ import Data.String (IsString(..)) import TestImport import Control.Exception import Control.Monad (forM_) -import Database.MongoDB.Internal.Protocol (UpdateOption(..)) import qualified Data.List as L import qualified Data.Text as T