Merge pull request #59 from VictorDenisov/update_command
Update command
This commit is contained in:
commit
f28a76e953
5 changed files with 240 additions and 18 deletions
|
@ -7,10 +7,13 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac
|
|||
### Added
|
||||
- TLS implementation. So far it is an experimental feature.
|
||||
- Insert using command syntax with mongo server >= 2.6
|
||||
- UpdateMany and UpdateAll commands. They use bulk operations from mongo
|
||||
version 2.6 and above. With versions below 2.6 it sends many updates.
|
||||
|
||||
### Changed
|
||||
- All messages will be strictly evaluated before sending them to mongodb server.
|
||||
No more closed handles because of bad arguments.
|
||||
- Update command is reimplemented in terms of UpdateMany.
|
||||
|
||||
### Removed
|
||||
- System.IO.Pipeline module
|
||||
|
|
|
@ -196,7 +196,8 @@ allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document]
|
|||
allUsers = map (exclude ["_id"]) <$> (rest =<< find
|
||||
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
|
||||
|
||||
addUser :: (MonadIO m) => Bool -> Username -> Password -> Action m ()
|
||||
addUser :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Bool -> Username -> Password -> Action m ()
|
||||
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
|
||||
addUser readOnly user pass = do
|
||||
mu <- findOne (select ["user" =: user] "system.users")
|
||||
|
|
|
@ -42,12 +42,13 @@ module Database.MongoDB.Query (
|
|||
MRResult, mapReduce, runMR, runMR',
|
||||
-- * Command
|
||||
Command, runCommand, runCommand1,
|
||||
eval, retrieveServerData
|
||||
eval, retrieveServerData, updateMany, updateAll, UpdateResult,
|
||||
UpdateOption(..)
|
||||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
import Control.Exception (Exception, throwIO, throw)
|
||||
import Control.Monad (unless, replicateM, liftM, forM)
|
||||
import Control.Monad (unless, replicateM, liftM, forM, forM_, void)
|
||||
import Data.Int (Int32, Int64)
|
||||
import Data.Maybe (listToMaybe, catMaybes, isNothing)
|
||||
import Data.Word (Word32)
|
||||
|
@ -64,6 +65,8 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
|
|||
readMVar, modifyMVar)
|
||||
#endif
|
||||
import Control.Applicative ((<$>))
|
||||
import Control.Exception (SomeException)
|
||||
import Control.Exception.Lifted (catch)
|
||||
import Control.Monad (when)
|
||||
import Control.Monad.Base (MonadBase)
|
||||
import Control.Monad.Error (Error(..))
|
||||
|
@ -145,6 +148,8 @@ data AccessMode =
|
|||
type GetLastError = Document
|
||||
-- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options.
|
||||
|
||||
data UpdateResult = UpdateResult
|
||||
|
||||
master :: AccessMode
|
||||
-- ^ Same as 'ConfirmWrites' []
|
||||
master = ConfirmWrites []
|
||||
|
@ -428,7 +433,7 @@ insert' opts col docs = do
|
|||
let sd = P.serverData p
|
||||
let docSize = sizeOfDocument $ insertCommandDocument opts col []
|
||||
chunks <- forM (splitAtLimit
|
||||
opts
|
||||
(not (KeepGoing `elem` opts))
|
||||
(maxBsonObjectSize sd - docSize)
|
||||
-- ^ size of auxiliary part of insert
|
||||
-- document should be subtracted from
|
||||
|
@ -469,14 +474,14 @@ insertBlock opts col docs = do
|
|||
(maybe 0 id $ lookup "ok" doc)
|
||||
(show err ++ show writeConcernErr)
|
||||
|
||||
splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]]
|
||||
splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list
|
||||
splitAtLimit :: Bool -> Int -> Int -> [Document] -> [[Document]]
|
||||
splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list
|
||||
where
|
||||
go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document])
|
||||
go _ _ res [] = (reverse res, [])
|
||||
go curSize curCount [] (x:xs) |
|
||||
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
|
||||
if (KeepGoing `elem` opts)
|
||||
if (not ordered)
|
||||
then
|
||||
go curSize curCount [] xs -- Skip this document and insert the other documents.
|
||||
else
|
||||
|
@ -507,37 +512,145 @@ assignId doc = if any (("_id" ==) . label) doc
|
|||
|
||||
-- ** Update
|
||||
|
||||
save :: (MonadIO m) => Collection -> Document -> Action m ()
|
||||
save :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Collection -> Document -> Action m ()
|
||||
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or upsert it if its not new (has \"_id\" field)
|
||||
save col doc = case look "_id" doc of
|
||||
Nothing -> insert_ col doc
|
||||
Just i -> upsert (Select ["_id" := i] col) doc
|
||||
|
||||
replace :: (MonadIO m) => Selection -> Document -> Action m ()
|
||||
replace :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Selection -> Document -> Action m ()
|
||||
-- ^ Replace first document in selection with given document
|
||||
replace = update []
|
||||
|
||||
repsert :: (MonadIO m) => Selection -> Document -> Action m ()
|
||||
repsert :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Selection -> Document -> Action m ()
|
||||
-- ^ Replace first document in selection with given document, or insert document if selection is empty
|
||||
repsert = update [Upsert]
|
||||
{-# DEPRECATED repsert "use upsert instead" #-}
|
||||
|
||||
upsert :: (MonadIO m) => Selection -> Document -> Action m ()
|
||||
upsert :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Selection -> Document -> Action m ()
|
||||
-- ^ Update first document in selection with given document, or insert document if selection is empty
|
||||
upsert = update [Upsert]
|
||||
|
||||
type Modifier = Document
|
||||
-- ^ Update operations on fields in a document. See <http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations>
|
||||
|
||||
modify :: (MonadIO m) => Selection -> Modifier -> Action m ()
|
||||
modify :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Selection -> Modifier -> Action m ()
|
||||
-- ^ Update all documents in selection using given modifier
|
||||
modify = update [MultiUpdate]
|
||||
|
||||
update :: (MonadIO m) => [UpdateOption] -> Selection -> Document -> Action m ()
|
||||
update :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> [UpdateOption] -> Selection -> Document -> Action m ()
|
||||
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
|
||||
update opts (Select sel col) up = do
|
||||
update opts (Select sel col) up = void $ update' True col [(sel, up, opts)]
|
||||
|
||||
updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
|
||||
updateCommandDocument col ordered updates writeConcern =
|
||||
[ "update" =: col
|
||||
, "ordered" =: ordered
|
||||
, "updates" =: updates
|
||||
, "writeConcern" =: writeConcern
|
||||
]
|
||||
|
||||
{-| Bulk update operation. If one insert fails it will not insert the remaining
|
||||
- documents. Current returned value is only a place holder. With mongodb server
|
||||
- before 2.6 it will send update requests one by one. After 2.6 it will use
|
||||
- bulk update feature in mongodb.
|
||||
-}
|
||||
updateMany :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Collection
|
||||
-> [(Selector, Document, [UpdateOption])]
|
||||
-> Action m UpdateResult
|
||||
updateMany = update' True
|
||||
|
||||
{-| Bulk update operation. If one insert fails it will proceed with the
|
||||
- remaining documents. Current returned value is only a place holder. With
|
||||
- mongodb server before 2.6 it will send update requests one by one. After 2.6
|
||||
- it will use bulk update feature in mongodb.
|
||||
-}
|
||||
updateAll :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Collection
|
||||
-> [(Selector, Document, [UpdateOption])]
|
||||
-> Action m UpdateResult
|
||||
updateAll = update' False
|
||||
|
||||
update' :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Bool
|
||||
-> Collection
|
||||
-> [(Selector, Document, [UpdateOption])]
|
||||
-> Action m UpdateResult
|
||||
update' ordered col updateDocs = do
|
||||
p <- asks mongoPipe
|
||||
let sd = P.serverData p
|
||||
let updates = map (\(s, d, os) -> [ "q" =: s
|
||||
, "u" =: d
|
||||
, "upsert" =: (Upsert `elem` os)
|
||||
, "multi" =: (MultiUpdate `elem` os)])
|
||||
updateDocs
|
||||
|
||||
mode <- asks mongoWriteMode
|
||||
let writeConcern = case mode of
|
||||
NoConfirm -> ["w" =: (0 :: Int)]
|
||||
Confirm params -> params
|
||||
let docSize = sizeOfDocument $ updateCommandDocument col ordered [] writeConcern
|
||||
let chunks = splitAtLimit
|
||||
ordered
|
||||
(maxBsonObjectSize sd - docSize)
|
||||
-- ^ size of auxiliary part of insert
|
||||
-- document should be subtracted from
|
||||
-- the overall size
|
||||
(maxWriteBatchSize sd)
|
||||
updates
|
||||
forM_ chunks (updateBlock ordered col)
|
||||
return UpdateResult
|
||||
|
||||
updateBlock :: (MonadIO m, MonadBaseControl IO m)
|
||||
=> Bool -> Collection -> [Document] -> Action m ()
|
||||
updateBlock ordered col docs = do
|
||||
p <- asks mongoPipe
|
||||
let sd = P.serverData p
|
||||
if (maxWireVersion sd < 2)
|
||||
then do
|
||||
db <- thisDatabase
|
||||
write (Update (db <.> col) opts sel up)
|
||||
errors <-
|
||||
forM docs $ \updateDoc -> do
|
||||
let doc = (at "u" updateDoc) :: Document
|
||||
let sel = (at "q" updateDoc) :: Document
|
||||
let upsrt = if at "upsert" updateDoc then [Upsert] else []
|
||||
let multi = if at "multi" updateDoc then [MultiUpdate] else []
|
||||
write (Update (db <.> col) (upsrt ++ multi) sel doc)
|
||||
return Nothing
|
||||
`catch` \(e :: SomeException) -> do
|
||||
when ordered $ liftIO $ throwIO e
|
||||
return $ Just e
|
||||
let onlyErrors = catMaybes errors
|
||||
if not $ null onlyErrors
|
||||
then liftIO $ throwIO $ WriteFailure 0 (show onlyErrors)
|
||||
else return ()
|
||||
else do
|
||||
mode <- asks mongoWriteMode
|
||||
let writeConcern = case mode of
|
||||
NoConfirm -> ["w" =: (0 :: Int)]
|
||||
Confirm params -> params
|
||||
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
|
||||
case (look "writeErrors" doc, look "writeConcernError" doc) of
|
||||
(Nothing, Nothing) -> return ()
|
||||
(Just err, Nothing) -> do
|
||||
liftIO $ throwIO $ WriteFailure
|
||||
(maybe 0 id $ lookup "ok" doc)
|
||||
(show err)
|
||||
(Nothing, Just err) -> do
|
||||
liftIO $ throwIO $ WriteFailure
|
||||
(maybe 0 id $ lookup "ok" doc)
|
||||
(show err)
|
||||
(Just err, Just writeConcernErr) -> do
|
||||
liftIO $ throwIO $ WriteFailure
|
||||
(maybe 0 id $ lookup "ok" doc)
|
||||
(show err ++ show writeConcernErr)
|
||||
|
||||
-- ** Delete
|
||||
|
||||
|
|
|
@ -50,11 +50,11 @@ Library
|
|||
Exposed-modules: Database.MongoDB
|
||||
Database.MongoDB.Admin
|
||||
Database.MongoDB.Connection
|
||||
Database.MongoDB.Internal.Protocol
|
||||
Database.MongoDB.Internal.Util
|
||||
Database.MongoDB.Query
|
||||
Database.MongoDB.Transport
|
||||
Database.MongoDB.Transport.Tls
|
||||
Other-modules: Database.MongoDB.Internal.Protocol
|
||||
Database.MongoDB.Internal.Util
|
||||
|
||||
Source-repository head
|
||||
Type: git
|
||||
|
|
|
@ -5,6 +5,7 @@ module QuerySpec (spec) where
|
|||
import Data.String (IsString(..))
|
||||
import TestImport
|
||||
import Control.Exception
|
||||
import Control.Monad (forM_)
|
||||
import qualified Data.List as L
|
||||
|
||||
import qualified Data.Text as T
|
||||
|
@ -186,6 +187,110 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
liftIO $ (length returnedDocs) `shouldBe` 6001
|
||||
|
||||
describe "updateMany" $ do
|
||||
it "updates value" $ do
|
||||
_id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
result <- db $ rest =<< find (select [] "team")
|
||||
result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]]
|
||||
_ <- db $ updateMany "team" [([ "_id" =: _id]
|
||||
, ["$set" =: ["league" =: "European"]]
|
||||
, [])]
|
||||
updatedResult <- db $ rest =<< find (select [] "team")
|
||||
updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]]
|
||||
it "upserts value" $ do
|
||||
c <- db $ count (select [] "team")
|
||||
c `shouldBe` 0
|
||||
_ <- db $ updateMany "team" [( []
|
||||
, ["name" =: "Giants", "league" =: "MLB"]
|
||||
, [Upsert]
|
||||
)]
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]]
|
||||
it "updates all documents with Multi enabled" $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
|
||||
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
|
||||
, ["$set" =: ["league" =: "MLB"]]
|
||||
, [MultiUpdate]
|
||||
)]
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
|
||||
, ["league" =: "MLB", "name" =: "Yankees"]
|
||||
]
|
||||
it "updates one document when there is no Multi option" $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
|
||||
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
|
||||
, ["$set" =: ["league" =: "MLB"]]
|
||||
, []
|
||||
)]
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
|
||||
, ["league" =: "MiLB", "name" =: "Yankees"]
|
||||
]
|
||||
it "can process different updates" $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"]
|
||||
_ <- db $ updateMany "team" [ ( ["name" =: "Yankees"]
|
||||
, ["$set" =: ["league" =: "MiLB"]]
|
||||
, []
|
||||
)
|
||||
, ( ["name" =: "Giants"]
|
||||
, ["$set" =: ["league" =: "MLB"]]
|
||||
, []
|
||||
)
|
||||
]
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB" , "name" =: "Giants"]
|
||||
, ["league" =: "MiLB", "name" =: "Yankees"]
|
||||
]
|
||||
it "can process different updates" $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
|
||||
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
|
||||
(db $ updateMany "team" [ ( ["name" =: "Yankees"]
|
||||
, ["$inc" =: ["score" =: (1 :: Int)]]
|
||||
, []
|
||||
)
|
||||
, ( ["name" =: "Giants"]
|
||||
, ["$inc" =: ["score" =: (2 :: Int)]]
|
||||
, []
|
||||
)
|
||||
]) `shouldThrow` anyException
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
|
||||
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)]
|
||||
]
|
||||
it "can handle big updates" $ do
|
||||
let docs = (flip map) [0..200000] $ \i ->
|
||||
["name" =: (T.pack $ "name " ++ (show i))]
|
||||
ids <- db $ insertAll "bigCollection" docs
|
||||
let updateDocs = (flip map) ids (\i -> ( [ "_id" =: i]
|
||||
, ["$set" =: ["name" =: ("name " ++ (show i))]]
|
||||
, []
|
||||
))
|
||||
_ <- db $ updateMany "team" updateDocs
|
||||
updatedResult <- db $ rest =<< find (select [] "team")
|
||||
forM_ updatedResult $ \r -> let (i :: ObjectId) = "_id" `at` r
|
||||
in (("name" `at` r) :: String) `shouldBe` ("name" ++ (show i))
|
||||
|
||||
describe "updateAll" $ do
|
||||
it "can process different updates" $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
|
||||
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
|
||||
(db $ updateAll "team" [ ( ["name" =: "Yankees"]
|
||||
, ["$inc" =: ["score" =: (1 :: Int)]]
|
||||
, []
|
||||
)
|
||||
, ( ["name" =: "Giants"]
|
||||
, ["$inc" =: ["score" =: (2 :: Int)]]
|
||||
, []
|
||||
)
|
||||
]) `shouldThrow` anyException
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
|
||||
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)]
|
||||
]
|
||||
|
||||
describe "allCollections" $ do
|
||||
it "returns all collections in a database" $ do
|
||||
_ <- db $ insert "team1" ["name" =: "Yankees", "league" =: "American"]
|
||||
|
|
Loading…
Reference in a new issue