Implement bulk update operation

This commit is contained in:
Victor Denisov 2016-06-08 00:09:33 -07:00
parent c31744d65f
commit 97400c074d
2 changed files with 129 additions and 16 deletions

View file

@ -196,7 +196,8 @@ allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document]
allUsers = map (exclude ["_id"]) <$> (rest =<< find allUsers = map (exclude ["_id"]) <$> (rest =<< find
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]}) (select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
addUser :: (MonadIO m) => Bool -> Username -> Password -> Action m () addUser :: (MonadBaseControl IO m, MonadIO m)
=> Bool -> Username -> Password -> Action m ()
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False -- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do addUser readOnly user pass = do
mu <- findOne (select ["user" =: user] "system.users") mu <- findOne (select ["user" =: user] "system.users")

View file

@ -42,12 +42,12 @@ module Database.MongoDB.Query (
MRResult, mapReduce, runMR, runMR', MRResult, mapReduce, runMR, runMR',
-- * Command -- * Command
Command, runCommand, runCommand1, Command, runCommand, runCommand1,
eval, retrieveServerData eval, retrieveServerData, updateMany, updateAll, UpdateResult
) where ) where
import Prelude hiding (lookup) import Prelude hiding (lookup)
import Control.Exception (Exception, throwIO, throw) import Control.Exception (Exception, throwIO, throw)
import Control.Monad (unless, replicateM, liftM, forM) import Control.Monad (unless, replicateM, liftM, forM, forM_, void)
import Data.Int (Int32, Int64) import Data.Int (Int32, Int64)
import Data.Maybe (listToMaybe, catMaybes, isNothing) import Data.Maybe (listToMaybe, catMaybes, isNothing)
import Data.Word (Word32) import Data.Word (Word32)
@ -64,6 +64,8 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
readMVar, modifyMVar) readMVar, modifyMVar)
#endif #endif
import Control.Applicative ((<$>)) import Control.Applicative ((<$>))
import Control.Exception (SomeException)
import Control.Exception.Lifted (catch)
import Control.Monad (when) import Control.Monad (when)
import Control.Monad.Base (MonadBase) import Control.Monad.Base (MonadBase)
import Control.Monad.Error (Error(..)) import Control.Monad.Error (Error(..))
@ -145,6 +147,8 @@ data AccessMode =
type GetLastError = Document type GetLastError = Document
-- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options. -- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options.
data UpdateResult = UpdateResult
master :: AccessMode master :: AccessMode
-- ^ Same as 'ConfirmWrites' [] -- ^ Same as 'ConfirmWrites' []
master = ConfirmWrites [] master = ConfirmWrites []
@ -428,7 +432,7 @@ insert' opts col docs = do
let sd = P.serverData p let sd = P.serverData p
let docSize = sizeOfDocument $ insertCommandDocument opts col [] let docSize = sizeOfDocument $ insertCommandDocument opts col []
chunks <- forM (splitAtLimit chunks <- forM (splitAtLimit
opts (not (KeepGoing `elem` opts))
(maxBsonObjectSize sd - docSize) (maxBsonObjectSize sd - docSize)
-- ^ size of auxiliary part of insert -- ^ size of auxiliary part of insert
-- document should be subtracted from -- document should be subtracted from
@ -469,14 +473,14 @@ insertBlock opts col docs = do
(maybe 0 id $ lookup "ok" doc) (maybe 0 id $ lookup "ok" doc)
(show err ++ show writeConcernErr) (show err ++ show writeConcernErr)
splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]] splitAtLimit :: Bool -> Int -> Int -> [Document] -> [[Document]]
splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list
where where
go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document]) go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document])
go _ _ res [] = (reverse res, []) go _ _ res [] = (reverse res, [])
go curSize curCount [] (x:xs) | go curSize curCount [] (x:xs) |
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) = ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
if (KeepGoing `elem` opts) if (not ordered)
then then
go curSize curCount [] xs -- Skip this document and insert the other documents. go curSize curCount [] xs -- Skip this document and insert the other documents.
else else
@ -507,37 +511,145 @@ assignId doc = if any (("_id" ==) . label) doc
-- ** Update -- ** Update
save :: (MonadIO m) => Collection -> Document -> Action m () save :: (MonadBaseControl IO m, MonadIO m)
=> Collection -> Document -> Action m ()
-- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or upsert it if its not new (has \"_id\" field) -- ^ Save document to collection, meaning insert it if its new (has no \"_id\" field) or upsert it if its not new (has \"_id\" field)
save col doc = case look "_id" doc of save col doc = case look "_id" doc of
Nothing -> insert_ col doc Nothing -> insert_ col doc
Just i -> upsert (Select ["_id" := i] col) doc Just i -> upsert (Select ["_id" := i] col) doc
replace :: (MonadIO m) => Selection -> Document -> Action m () replace :: (MonadBaseControl IO m, MonadIO m)
=> Selection -> Document -> Action m ()
-- ^ Replace first document in selection with given document -- ^ Replace first document in selection with given document
replace = update [] replace = update []
repsert :: (MonadIO m) => Selection -> Document -> Action m () repsert :: (MonadBaseControl IO m, MonadIO m)
=> Selection -> Document -> Action m ()
-- ^ Replace first document in selection with given document, or insert document if selection is empty -- ^ Replace first document in selection with given document, or insert document if selection is empty
repsert = update [Upsert] repsert = update [Upsert]
{-# DEPRECATED repsert "use upsert instead" #-} {-# DEPRECATED repsert "use upsert instead" #-}
upsert :: (MonadIO m) => Selection -> Document -> Action m () upsert :: (MonadBaseControl IO m, MonadIO m)
=> Selection -> Document -> Action m ()
-- ^ Update first document in selection with given document, or insert document if selection is empty -- ^ Update first document in selection with given document, or insert document if selection is empty
upsert = update [Upsert] upsert = update [Upsert]
type Modifier = Document type Modifier = Document
-- ^ Update operations on fields in a document. See <http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations> -- ^ Update operations on fields in a document. See <http://www.mongodb.org/display/DOCS/Updating#Updating-ModifierOperations>
modify :: (MonadIO m) => Selection -> Modifier -> Action m () modify :: (MonadBaseControl IO m, MonadIO m)
=> Selection -> Modifier -> Action m ()
-- ^ Update all documents in selection using given modifier -- ^ Update all documents in selection using given modifier
modify = update [MultiUpdate] modify = update [MultiUpdate]
update :: (MonadIO m) => [UpdateOption] -> Selection -> Document -> Action m () update :: (MonadBaseControl IO m, MonadIO m)
=> [UpdateOption] -> Selection -> Document -> Action m ()
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty. -- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
update opts (Select sel col) up = do update opts (Select sel col) up = void $ update' True col [(sel, up, opts)]
updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
updateCommandDocument col ordered updates writeConcern =
[ "update" =: col
, "ordered" =: ordered
, "updates" =: updates
, "writeConcern" =: writeConcern
]
{-| Bulk update operation. If one insert fails it will not insert the remaining
- documents. Current returned value is only a place holder. With mongodb server
- before 2.6 it will send update requests one by one. After 2.6 it will use
- bulk update feature in mongodb.
-}
updateMany :: (MonadBaseControl IO m, MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m UpdateResult
updateMany = update' True
{-| Bulk update operation. If one insert fails it will proceed with the
- remaining documents. Current returned value is only a place holder. With
- mongodb server before 2.6 it will send update requests one by one. After 2.6
- it will use bulk update feature in mongodb.
-}
updateAll :: (MonadBaseControl IO m, MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m UpdateResult
updateAll = update' False
update' :: (MonadBaseControl IO m, MonadIO m)
=> Bool
-> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m UpdateResult
update' ordered col updateDocs = do
p <- asks mongoPipe
let sd = P.serverData p
let updates = map (\(s, d, os) -> [ "q" =: s
, "u" =: d
, "upsert" =: (Upsert `elem` os)
, "multi" =: (MultiUpdate `elem` os)])
updateDocs
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
let docSize = sizeOfDocument $ updateCommandDocument col ordered [] writeConcern
let chunks = splitAtLimit
ordered
(maxBsonObjectSize sd - docSize)
-- ^ size of auxiliary part of insert
-- document should be subtracted from
-- the overall size
(maxWriteBatchSize sd)
updates
forM_ chunks (updateBlock ordered col)
return UpdateResult
updateBlock :: (MonadIO m, MonadBaseControl IO m)
=> Bool -> Collection -> [Document] -> Action m ()
updateBlock ordered col docs = do
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd < 2)
then do
db <- thisDatabase db <- thisDatabase
write (Update (db <.> col) opts sel up) errors <-
forM docs $ \updateDoc -> do
let doc = (at "u" updateDoc) :: Document
let sel = (at "q" updateDoc) :: Document
let upsrt = if at "upsert" updateDoc then [Upsert] else []
let multi = if at "multi" updateDoc then [MultiUpdate] else []
write (Update (db <.> col) (upsrt ++ multi) sel doc)
return Nothing
`catch` \(e :: SomeException) -> do
when ordered $ liftIO $ throwIO e
return $ Just e
let onlyErrors = catMaybes errors
if not $ null onlyErrors
then liftIO $ throwIO $ WriteFailure 0 (show onlyErrors)
else return ()
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return ()
(Just err, Nothing) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err)
(Nothing, Just err) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err)
(Just err, Just writeConcernErr) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err ++ show writeConcernErr)
-- ** Delete -- ** Delete