Rework WriteResult for updateMany, deleteMany functions

Merge branch 'update-result'

PR #77

Conflicts:
	CHANGELOG.md
	Database/MongoDB/Query.hs
This commit is contained in:
Victor Denisov 2017-05-29 19:32:20 -07:00
commit dda10d461b
4 changed files with 538 additions and 270 deletions

View file

@ -7,6 +7,9 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac
### Changed
- Description of access function
- Lift MonadBaseControl restriction
- Update and delete results are squashed into one WriteResult type
- Functions insertMany, updateMany, deleteMany are rewritten to properly report
various errors
## [2.2.0] - 2017-04-08

View file

@ -1,6 +1,6 @@
-- | Query and update documents
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables, BangPatterns #-}
module Database.MongoDB.Query (
-- * Monad
@ -22,9 +22,9 @@ module Database.MongoDB.Query (
insert, insert_, insertMany, insertMany_, insertAll, insertAll_,
-- ** Update
save, replace, repsert, upsert, Modifier, modify, updateMany, updateAll,
UpdateResult, UpdateOption(..),
WriteResult(..), UpdateOption(..), Upserted(..),
-- ** Delete
delete, deleteOne, deleteMany, deleteAll, DeleteResult, DeleteOption(..),
delete, deleteOne, deleteMany, deleteAll, DeleteOption(..),
-- * Read
-- ** Query
Query(..), QueryOption(NoCursorTimeout, TailableCursor, AwaitData, Partial),
@ -43,13 +43,15 @@ module Database.MongoDB.Query (
MRResult, mapReduce, runMR, runMR',
-- * Command
Command, runCommand, runCommand1,
eval, retrieveServerData
eval, retrieveServerData, ServerData(..)
) where
import Prelude hiding (lookup)
import Control.Exception (Exception, throwIO, throw)
import Control.Monad (unless, replicateM, liftM, forM, forM_)
import Control.Exception (Exception, throwIO)
import Control.Monad (unless, replicateM, liftM, liftM2)
import Data.Int (Int32, Int64)
import Data.Either (lefts, rights)
import Data.List (foldl1')
import Data.Maybe (listToMaybe, catMaybes, isNothing)
import Data.Word (Word32)
#if !MIN_VERSION_base(4,8,0)
@ -67,15 +69,15 @@ import Control.Concurrent.MVar.Lifted (MVar, addMVarFinalizer,
readMVar)
#endif
import Control.Applicative ((<$>))
import Control.Exception (SomeException, catch)
import Control.Monad (when)
import Control.Exception (catch)
import Control.Monad (when, void)
import Control.Monad.Error (Error(..))
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
import Control.Monad.Trans (MonadIO, liftIO)
import Data.Binary.Put (runPut)
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
(=?), (!?), Val(..))
(=?), (!?), Val(..), ObjectId, Value(..))
import Data.Bson.Binary (putDocument)
import Data.Text (Text)
import qualified Data.Text as T
@ -98,6 +100,7 @@ import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Base16 as B16
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Char8 as B
import qualified Data.Either as E
import qualified Crypto.Hash.MD5 as MD5
import qualified Crypto.Hash.SHA1 as SHA1
import qualified Crypto.MAC.HMAC as HMAC
@ -122,9 +125,12 @@ data Failure =
ConnectionFailure IOError -- ^ TCP connection ('Pipeline') failed. May work if you try again on the same Mongo 'Connection' which will create a new Pipe.
| CursorNotFoundFailure CursorId -- ^ Cursor expired because it wasn't accessed for over 10 minutes, or this cursor came from a different server that the one you are currently connected to (perhaps a fail over happen between servers in a replica set)
| QueryFailure ErrorCode String -- ^ Query failed for some reason as described in the string
| WriteFailure ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string
| WriteFailure Int ErrorCode String -- ^ Error observed by getLastError after a write, error description is in string, index of failed document is the first argument
| WriteConcernFailure Int String -- ^ Write concern error. It's reported only by insert, update, delete commands. Not by wire protocol.
| DocNotFound Selection -- ^ 'fetch' found no document matching selection
| AggregateFailure String -- ^ 'aggregate' returned an error
| CompoundFailure [Failure] -- ^ When we need to aggregate several failures and report them.
| ProtocolFailure Int String -- ^ The structure of the returned documents doesn't match what we expected
deriving (Show, Eq, Typeable)
instance Exception Failure
@ -144,9 +150,32 @@ data AccessMode =
type GetLastError = Document
-- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options.
data UpdateResult = UpdateResult
class Result a where
isFailed :: a -> Bool
data DeleteResult = DeleteResult
data WriteResult = WriteResult
{ failed :: Bool
, nMatched :: Int
, nModified :: Maybe Int
, nRemoved :: Int
-- ^ Mongodb server before 2.6 doesn't allow to calculate this value.
-- This field is meaningless if we can't calculate the number of modified documents.
, upserted :: [Upserted]
, writeErrors :: [Failure]
, writeConcernErrors :: [Failure]
} deriving Show
instance Result WriteResult where
isFailed = failed
instance Result (Either a b) where
isFailed (Left _) = True
isFailed _ = False
data Upserted = Upserted
{ upsertedIndex :: Int
, upsertedId :: ObjectId
} deriving Show
master :: AccessMode
-- ^ Same as 'ConfirmWrites' []
@ -368,12 +397,13 @@ data WriteMode =
| Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
deriving (Show, Eq)
write :: Notice -> Action IO ()
write :: Notice -> Action IO (Maybe Document)
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
write notice = asks mongoWriteMode >>= \mode -> case mode of
NoConfirm -> do
pipe <- asks mongoPipe
liftIOE ConnectionFailure $ P.send pipe [notice]
return Nothing
Confirm params -> do
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
pipe <- asks mongoPipe
@ -381,22 +411,29 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of
r <- queryRequest False q {limit = 1}
rr <- liftIO $ request pipe [notice] r
fulfill rr
case lookup "err" doc of
Nothing -> return ()
Just err -> liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "code" doc) err
return $ Just doc
-- ** Insert
insert :: (MonadIO m) => Collection -> Document -> Action m Value
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
insert col doc = head `liftM` insertBlock [] col [doc]
insert col doc = do
doc' <- liftIO $ assignId doc
res <- insertBlock [] col (0, [doc'])
case res of
Left failure -> liftIO $ throwIO failure
Right r -> return $ head r
insert_ :: (MonadIO m) => Collection -> Document -> Action m ()
-- ^ Same as 'insert' except don't return _id
insert_ col doc = insert col doc >> return ()
insertMany :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied. If a document fails to be inserted (eg. due to duplicate key) then remaining docs are aborted, and LastError is set.
-- ^ Insert documents into collection and return their \"_id\" values,
-- which are created automatically if not supplied.
-- If a document fails to be inserted (eg. due to duplicate key)
-- then remaining docs are aborted, and LastError is set.
-- An exception will be throw if any error occurs.
insertMany = insert' []
insertMany_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
@ -404,7 +441,10 @@ insertMany_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
insertMany_ col docs = insertMany col docs >> return ()
insertAll :: (MonadIO m) => Collection -> [Document] -> Action m [Value]
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied. If a document fails to be inserted (eg. due to duplicate key) then remaining docs are still inserted. LastError is set if any doc fails, not just last one.
-- ^ Insert documents into collection and return their \"_id\" values,
-- which are created automatically if not supplied. If a document fails
-- to be inserted (eg. due to duplicate key) then remaining docs
-- are still inserted.
insertAll = insert' [KeepGoing]
insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
@ -419,75 +459,107 @@ insertCommandDocument opts col docs writeConcern =
, "writeConcern" =: writeConcern
]
takeRightsUpToLeft :: [Either a b] -> [b]
takeRightsUpToLeft l = E.rights $ takeWhile E.isRight l
insert' :: (MonadIO m)
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
insert' opts col docs = do
p <- asks mongoPipe
let sd = P.serverData p
docs' <- liftIO $ mapM assignId docs
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
chunks <- forM (splitAtLimit
(not (KeepGoing `elem` opts))
let ordered = (not (KeepGoing `elem` opts))
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
-- size of auxiliary part of insert
-- document should be subtracted from
-- the overall size
(maxWriteBatchSize sd)
docs)
(insertBlock opts col)
return $ concat chunks
docs'
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
let lens = map length chunks
let lSums = 0 : (zipWith (+) lSums lens)
chunkResults <- interruptibleFor ordered (zip lSums chunks) $ insertBlock opts col
let lchunks = lefts preChunks
when (not $ null lchunks) $ do
liftIO $ throwIO $ head lchunks
let lresults = lefts chunkResults
when (not $ null lresults) $ liftIO $ throwIO $ head lresults
return $ concat $ rights chunkResults
insertBlock :: (MonadIO m)
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
=> [InsertOption] -> Collection -> (Int, [Document]) -> Action m (Either Failure [Value])
-- ^ This will fail if the list of documents is bigger than restrictions
insertBlock _ _ [] = return []
insertBlock opts col docs = do
insertBlock _ _ (_, []) = return $ Right []
insertBlock opts col (prevCount, docs) = do
db <- thisDatabase
docs' <- liftIO $ mapM assignId docs
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd < 2)
then do
liftDB $ write (Insert (db <.> col) opts docs')
return $ map (valueAt "_id") docs'
res <- liftDB $ write (Insert (db <.> col) opts docs)
let errorMessage = do
jRes <- res
em <- lookup "err" jRes
return $ WriteFailure prevCount (maybe 0 id $ lookup "code" jRes) em
-- In older versions of ^^ the protocol we can't really say which document failed.
-- So we just report the accumulated number of documents in the previous blocks.
case errorMessage of
Just failure -> return $ Left failure
Nothing -> return $ Right $ map (valueAt "_id") docs
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
doc <- runCommand $ insertCommandDocument opts col docs' writeConcern
doc <- runCommand $ insertCommandDocument opts col docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return $ map (valueAt "_id") docs'
(Just err, Nothing) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err)
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
(Just (Array errs), Nothing) -> do
let writeErrors = map (anyToWriteError prevCount) $ errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure errorsWithFailureIndex
(Nothing, Just err) -> do
liftIO $ throwIO $ WriteFailure
return $ Left $ WriteFailure
prevCount
(maybe 0 id $ lookup "ok" doc)
(show err)
(Just err, Just writeConcernErr) -> do
liftIO $ throwIO $ WriteFailure
(Just (Array errs), Just writeConcernErr) -> do
let writeErrors = map (anyToWriteError prevCount) $ errs
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
return $ Left $ CompoundFailure $ (WriteFailure
prevCount
(maybe 0 id $ lookup "ok" doc)
(show err ++ show writeConcernErr)
(show writeConcernErr)) : errorsWithFailureIndex
(Just unknownValue, Nothing) -> do
return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
(Just unknownValue, Just writeConcernErr) -> do
return $ Left $ CompoundFailure $ [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
, WriteFailure prevCount (maybe 0 id $ lookup "ok" doc) $ show writeConcernErr]
splitAtLimit :: Bool -> Int -> Int -> [Document] -> [[Document]]
splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list
splitAtLimit :: Int -> Int -> [Document] -> [Either Failure [Document]]
splitAtLimit maxSize maxCount list = chop (go 0 0 []) list
where
go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document])
go _ _ res [] = (reverse res, [])
go :: Int -> Int -> [Document] -> [Document] -> ((Either Failure [Document]), [Document])
go _ _ res [] = (Right $ reverse res, [])
go curSize curCount [] (x:xs) |
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
if (not ordered)
then
go curSize curCount [] xs -- Skip this document and insert the other documents.
else
throw $ WriteFailure 0 "One document is too big for the message"
(Left $ WriteFailure 0 0 "One document is too big for the message", xs)
go curSize curCount res (x:xs) =
if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize)
-- we have ^ 2 brackets and curCount commas in
@ -495,7 +567,7 @@ splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list
-- account
|| ((curCount + 1) > maxCount))
then
(reverse res, x:xs)
(Right $ reverse res, x:xs)
else
go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs
@ -549,8 +621,9 @@ update :: (MonadIO m)
=> [UpdateOption] -> Selection -> Document -> Action m ()
-- ^ Update first document in selection using updater document, unless 'MultiUpdate' option is supplied then update all documents in selection. If 'Upsert' option is supplied then treat updater as document and insert it if selection is empty.
update opts (Select sel col) up = do
_ <- update' True col [(sel, up, opts)]
return ()
db <- thisDatabase
ctx <- ask
liftIO $ runReaderT (void $ write (Update (db <.> col) opts sel up)) ctx
updateCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
updateCommandDocument col ordered updates writeConcern =
@ -562,31 +635,36 @@ updateCommandDocument col ordered updates writeConcern =
{-| Bulk update operation. If one update fails it will not update the remaining
- documents. Current returned value is only a place holder. With mongodb server
- before 2.6 it will send update requests one by one. After 2.6 it will use
- bulk update feature in mongodb.
- before 2.6 it will send update requests one by one. In order to receive
- error messages in versions under 2.6 you need to user confirmed writes.
- Otherwise even if the errors had place the list of errors will be empty and
- the result will be success. After 2.6 it will use bulk update feature in
- mongodb.
-}
updateMany :: (MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m UpdateResult
-> Action m WriteResult
updateMany = update' True
{-| Bulk update operation. If one update fails it will proceed with the
- remaining documents. Current returned value is only a place holder. With
- mongodb server before 2.6 it will send update requests one by one. After 2.6
- it will use bulk update feature in mongodb.
- remaining documents. With mongodb server before 2.6 it will send update
- requests one by one. In order to receive error messages in versions under
- 2.6 you need to use confirmed writes. Otherwise even if the errors had
- place the list of errors will be empty and the result will be success.
- After 2.6 it will use bulk update feature in mongodb.
-}
updateAll :: (MonadIO m)
=> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m UpdateResult
-> Action m WriteResult
updateAll = update' False
update' :: (MonadIO m)
=> Bool
-> Collection
-> [(Selector, Document, [UpdateOption])]
-> Action m UpdateResult
-> Action m WriteResult
update' ordered col updateDocs = do
p <- asks mongoPipe
let sd = P.serverData p
@ -597,65 +675,159 @@ update' ordered col updateDocs = do
updateDocs
mode <- asks mongoWriteMode
ctx <- ask
liftIO $ do
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
let docSize = sizeOfDocument $ updateCommandDocument col ordered [] writeConcern
let chunks = splitAtLimit
let docSize = sizeOfDocument $ updateCommandDocument
col
ordered
[]
writeConcern
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
-- size of auxiliary part of update
-- document should be subtracted from
-- the overall size
(maxWriteBatchSize sd)
updates
forM_ chunks (updateBlock ordered col)
return UpdateResult
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
let lens = map length chunks
let lSums = 0 : (zipWith (+) lSums lens)
blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do
ur <- runReaderT (updateBlock ordered col b) ctx
return ur
`catch` \(e :: Failure) -> do
return $ WriteResult True 0 Nothing 0 [] [e] []
let failedTotal = or $ map failed blocks
let updatedTotal = sum $ map nMatched blocks
let modifiedTotal =
if all isNothing $ map nModified blocks
then Nothing
else Just $ sum $ catMaybes $ map nModified blocks
let totalWriteErrors = concat $ map writeErrors blocks
let totalWriteConcernErrors = concat $ map writeConcernErrors blocks
let upsertedTotal = concat $ map upserted blocks
return $ WriteResult
failedTotal
updatedTotal
modifiedTotal
0 -- nRemoved
upsertedTotal
totalWriteErrors
totalWriteConcernErrors
`catch` \(e :: Failure) -> return $ WriteResult True 0 Nothing 0 [] [e] []
updateBlock :: (MonadIO m)
=> Bool -> Collection -> [Document] -> Action m ()
updateBlock ordered col docs = do
=> Bool -> Collection -> (Int, [Document]) -> Action m WriteResult
updateBlock ordered col (prevCount, docs) = do
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd < 2)
then do
db <- thisDatabase
ctx <- ask
errors <-
liftIO $ forM docs $ \updateDoc -> do
let doc = (at "u" updateDoc) :: Document
let sel = (at "q" updateDoc) :: Document
let upsrt = if at "upsert" updateDoc then [Upsert] else []
let multi = if at "multi" updateDoc then [MultiUpdate] else []
runReaderT (write (Update (db <.> col) (upsrt ++ multi) sel doc)) ctx
return Nothing
`catch` \(e :: SomeException) -> do
when ordered $ liftIO $ throwIO e
return $ Just e
let onlyErrors = catMaybes errors
if not $ null onlyErrors
then liftIO $ throwIO $ WriteFailure 0 (show onlyErrors)
else return ()
then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6"
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
doc <- runCommand $ updateCommandDocument col ordered docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return ()
(Just err, Nothing) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err)
(Nothing, Just err) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err)
(Just err, Just writeConcernErr) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err ++ show writeConcernErr)
let n = fromMaybe 0 $ doc !? "n"
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Array err) -> WriteResult True 0 (Just 0) 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ (show unknownErr)]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 (Just 0) 0 [] [] []
Just (Doc err) -> WriteResult
True
0
(Just 0)
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
(Just 0)
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ (show unknownErr)]
let upsertedList = map docToUpserted $ fromMaybe [] (doc !? "upserted")
liftIO $ putStrLn $ show doc
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
interruptibleFor :: (Monad m, Result b) => Bool -> [a] -> (a -> m b) -> m [b]
interruptibleFor ordered = go []
where
go !res [] _ = return $ reverse res
go !res (x:xs) f = do
y <- f x
if isFailed y && ordered
then return $ reverse (y:res)
else go (y:res) xs f
mergeWriteResults :: WriteResult -> WriteResult -> WriteResult
mergeWriteResults
(WriteResult failed1 nMatched1 nModified1 nDeleted1 upserted1 writeErrors1 writeConcernErrors1)
(WriteResult failed2 nMatched2 nModified2 nDeleted2 upserted2 writeErrors2 writeConcernErrors2) =
(WriteResult
(failed1 || failed2)
(nMatched1 + nMatched2)
((liftM2 (+)) nModified1 nModified2)
(nDeleted1 + nDeleted2)
-- This function is used in foldl1' function. The first argument is the accumulator.
-- The list in the accumulator is usually longer than the subsequent value which goes in the second argument.
-- So, changing the order of list concatenation allows us to keep linear complexity of the
-- whole list accumulation process.
(upserted2 ++ upserted1)
(writeErrors2 ++ writeErrors1)
(writeConcernErrors2 ++ writeConcernErrors1)
)
docToUpserted :: Document -> Upserted
docToUpserted doc = Upserted ind uid
where
ind = at "index" doc
uid = at "_id" doc
docToWriteError :: Document -> Failure
docToWriteError doc = WriteFailure ind code msg
where
ind = at "index" doc
code = at "code" doc
msg = at "errmsg" doc
-- ** Delete
@ -672,8 +844,9 @@ deleteOne = deleteHelper [SingleRemove]
deleteHelper :: (MonadIO m)
=> [DeleteOption] -> Selection -> Action m ()
deleteHelper opts (Select sel col) = do
_ <- delete' True col [(sel, opts)]
return ()
db <- thisDatabase
ctx <- ask
liftIO $ runReaderT (void $ write (Delete (db <.> col) opts sel)) ctx
{-| Bulk delete operation. If one delete fails it will not delete the remaining
- documents. Current returned value is only a place holder. With mongodb server
@ -683,7 +856,7 @@ deleteHelper opts (Select sel col) = do
deleteMany :: (MonadIO m)
=> Collection
-> [(Selector, [DeleteOption])]
-> Action m DeleteResult
-> Action m WriteResult
deleteMany = delete' True
{-| Bulk delete operation. If one delete fails it will proceed with the
@ -694,7 +867,7 @@ deleteMany = delete' True
deleteAll :: (MonadIO m)
=> Collection
-> [(Selector, [DeleteOption])]
-> Action m DeleteResult
-> Action m WriteResult
deleteAll = delete' False
deleteCommandDocument :: Collection -> Bool -> [Document] -> Document -> Document
@ -709,7 +882,7 @@ delete' :: (MonadIO m)
=> Bool
-> Collection
-> [(Selector, [DeleteOption])]
-> Action m DeleteResult
-> Action m WriteResult
delete' ordered col deleteDocs = do
p <- asks mongoPipe
let sd = P.serverData p
@ -725,59 +898,94 @@ delete' ordered col deleteDocs = do
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern
let chunks = splitAtLimit
ordered
let preChunks = splitAtLimit
(maxBsonObjectSize sd - docSize)
-- size of auxiliary part of delete
-- document should be subtracted from
-- the overall size
(maxWriteBatchSize sd)
deletes
forM_ chunks (deleteBlock ordered col)
return DeleteResult
let chunks =
if ordered
then takeRightsUpToLeft preChunks
else rights preChunks
ctx <- ask
let lens = map length chunks
let lSums = 0 : (zipWith (+) lSums lens)
blockResult <- liftIO $ interruptibleFor ordered (zip lSums chunks) $ \b -> do
dr <- runReaderT (deleteBlock ordered col b) ctx
return dr
`catch` \(e :: Failure) -> do
return $ WriteResult True 0 Nothing 0 [] [e] []
return $ foldl1' mergeWriteResults blockResult
addFailureIndex :: Int -> Failure -> Failure
addFailureIndex i (WriteFailure ind code s) = WriteFailure (ind + i) code s
addFailureIndex _ f = f
deleteBlock :: (MonadIO m)
=> Bool -> Collection -> [Document] -> Action m ()
deleteBlock ordered col docs = do
=> Bool -> Collection -> (Int, [Document]) -> Action m WriteResult
deleteBlock ordered col (prevCount, docs) = do
p <- asks mongoPipe
let sd = P.serverData p
if (maxWireVersion sd < 2)
then do
db <- thisDatabase
ctx <- ask
errors <-
liftIO $ forM docs $ \deleteDoc -> do
let sel = (at "q" deleteDoc) :: Document
let opts = if at "limit" deleteDoc == (1 :: Int) then [SingleRemove] else []
runReaderT (write (Delete (db <.> col) opts sel)) ctx
return Nothing
`catch` \(e :: SomeException) -> do
when ordered $ liftIO $ throwIO e
return $ Just e
let onlyErrors = catMaybes errors
if not $ null onlyErrors
then liftIO $ throwIO $ WriteFailure 0 (show onlyErrors)
else return ()
then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6"
else do
mode <- asks mongoWriteMode
let writeConcern = case mode of
NoConfirm -> ["w" =: (0 :: Int)]
Confirm params -> params
doc <- runCommand $ deleteCommandDocument col ordered docs writeConcern
case (look "writeErrors" doc, look "writeConcernError" doc) of
(Nothing, Nothing) -> return ()
(Just err, Nothing) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err)
(Nothing, Just err) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err)
(Just err, Just writeConcernErr) -> do
liftIO $ throwIO $ WriteFailure
(maybe 0 id $ lookup "ok" doc)
(show err ++ show writeConcernErr)
let n = fromMaybe 0 $ doc !? "n"
liftIO $ putStrLn $ "result of delete block: " ++ (show n)
let successResults = WriteResult False 0 Nothing n [] [] []
let writeErrorsResults =
case look "writeErrors" doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just (Array err) -> WriteResult True 0 Nothing 0 [] (map (anyToWriteError prevCount) err) []
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[ ProtocolFailure
prevCount
$ "Expected array of error docs, but received: "
++ (show unknownErr)]
[]
let writeConcernResults =
case look "writeConcernError" doc of
Nothing -> WriteResult False 0 Nothing 0 [] [] []
Just (Doc err) -> WriteResult
True
0
Nothing
0
[]
[]
[ WriteConcernFailure
(fromMaybe (-1) $ err !? "code")
(fromMaybe "" $ err !? "errmsg")
]
Just unknownErr -> WriteResult
True
0
Nothing
0
[]
[]
[ ProtocolFailure
prevCount
$ "Expected doc in writeConcernError, but received: "
++ (show unknownErr)]
return $ foldl1' mergeWriteResults [successResults, writeErrorsResults, writeConcernResults]
anyToWriteError :: Int -> Value -> Failure
anyToWriteError _ (Doc d) = docToWriteError d
anyToWriteError ind _ = ProtocolFailure ind "Unknown bson value"
-- * Read

0
Setup.lhs Executable file → Normal file
View file

View file

@ -5,7 +5,7 @@ module QuerySpec (spec) where
import Data.String (IsString(..))
import TestImport
import Control.Exception
import Control.Monad (forM_)
import Control.Monad (forM_, when)
import System.Environment (getEnv)
import System.IO.Error (catchIOError)
import qualified Data.List as L
@ -23,6 +23,11 @@ db action = do
close pipe
return result
getWireVersion :: IO Int
getWireVersion = db $ do
sd <- retrieveServerData
return $ maxWireVersion sd
withCleanDatabase :: ActionWith () -> IO ()
withCleanDatabase action = dropDB >> action () >> dropDB >> return ()
where
@ -171,7 +176,7 @@ spec = around withCleanDatabase $ do
liftIO $ (length returnedDocs) `shouldBe` 1000
it "skips one too big document" $ do
db $ insertAll_ "hugeDocCollection" [hugeDocument]
(db $ insertAll_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException
db $ do
cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000}
returnedDocs <- rest cur
@ -192,6 +197,8 @@ spec = around withCleanDatabase $ do
describe "updateMany" $ do
it "updates value" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
result <- db $ rest =<< find (select [] "team")
result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]]
@ -201,6 +208,8 @@ spec = around withCleanDatabase $ do
updatedResult <- db $ rest =<< find (select [] "team")
updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]]
it "upserts value" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
c <- db $ count (select [] "team")
c `shouldBe` 0
_ <- db $ updateMany "team" [( []
@ -210,6 +219,8 @@ spec = around withCleanDatabase $ do
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]]
it "updates all documents with Multi enabled" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
@ -221,6 +232,8 @@ spec = around withCleanDatabase $ do
, ["league" =: "MLB", "name" =: "Yankees"]
]
it "updates one document when there is no Multi option" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
@ -232,6 +245,8 @@ spec = around withCleanDatabase $ do
, ["league" =: "MiLB", "name" =: "Yankees"]
]
it "can process different updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"]
_ <- db $ updateMany "team" [ ( ["name" =: "Yankees"]
@ -248,9 +263,11 @@ spec = around withCleanDatabase $ do
, ["league" =: "MiLB", "name" =: "Yankees"]
]
it "can process different updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
(db $ updateMany "team" [ ( ["name" =: "Yankees"]
updateResult <- (db $ updateMany "team" [ ( ["name" =: "Yankees"]
, ["$inc" =: ["score" =: (1 :: Int)]]
, []
)
@ -258,12 +275,15 @@ spec = around withCleanDatabase $ do
, ["$inc" =: ["score" =: (2 :: Int)]]
, []
)
]) `shouldThrow` anyException
])
failed updateResult `shouldBe` True
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)]
]
it "can handle big updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
let docs = (flip map) [0..20000] $ \i ->
["name" =: (T.pack $ "name " ++ (show i))]
ids <- db $ insertAll "bigCollection" docs
@ -278,9 +298,11 @@ spec = around withCleanDatabase $ do
describe "updateAll" $ do
it "can process different updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
(db $ updateAll "team" [ ( ["name" =: "Yankees"]
updateResult <- (db $ updateAll "team" [ ( ["name" =: "Yankees"]
, ["$inc" =: ["score" =: (1 :: Int)]]
, []
)
@ -288,11 +310,33 @@ spec = around withCleanDatabase $ do
, ["$inc" =: ["score" =: (2 :: Int)]]
, []
)
]) `shouldThrow` anyException
])
failed updateResult `shouldBe` True
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)]
]
it "returns correct number of matched and modified" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [MultiUpdate])]
nMatched res `shouldBe` 2
nModified res `shouldBe` (Just 2)
it "returns correct number of upserted" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myfield" =: "newValue"]], [Upsert])]
(length $ upserted res) `shouldBe` 1
it "updates only one doc without multi update" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [])]
nMatched res `shouldBe` 1
nModified res `shouldBe` (Just 1)
describe "delete" $ do
it "actually deletes something" $ do
@ -334,6 +378,8 @@ spec = around withCleanDatabase $ do
describe "deleteMany" $ do
it "actually deletes something" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: ("Giants" :: String)]
_ <- db $ insert "team" ["name" =: ("Yankees" :: String)]
_ <- db $ deleteMany "team" [ (["name" =: ("Giants" :: String)], [])
@ -344,6 +390,8 @@ spec = around withCleanDatabase $ do
describe "deleteAll" $ do
it "actually deletes something" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" [ "name" =: ("Giants" :: String)
, "score" =: (Nothing :: Maybe Int)
]
@ -356,12 +404,21 @@ spec = around withCleanDatabase $ do
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
it "can handle big deletes" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
let docs = (flip map) [0..20000] $ \i ->
["name" =: (T.pack $ "name " ++ (show i))]
_ <- db $ insertAll "bigCollection" docs
_ <- db $ deleteAll "bigCollection" $ map (\d -> (d, [])) docs
updatedResult <- db $ rest =<< find ((select [] "bigCollection") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
it "returns correct result" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
res <- db $ deleteAll "testCollection" [ (["myField" =: "myValue"], []) ]
nRemoved res `shouldBe` 2
describe "allCollections" $ do
it "returns all collections in a database" $ do