From af49f43027c693809de0bf4da1aff12ec73698c8 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Thu, 17 Nov 2016 00:15:01 -0800 Subject: [PATCH] Fix splitAtLimit --- Database/MongoDB/Query.hs | 54 ++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index fdc8067..3cb451a 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -50,6 +50,7 @@ import Prelude hiding (lookup) import Control.Exception (Exception, throwIO, throw) import Control.Monad (unless, replicateM, liftM, forM, forM_, liftM2) import Data.Int (Int32, Int64) +import Data.Either (lefts, rights) import Data.List (foldl1') import Data.Maybe (listToMaybe, catMaybes, isNothing, maybeToList) import Data.Word (Word32) @@ -447,6 +448,12 @@ insertCommandDocument opts col docs writeConcern = , "writeConcern" =: writeConcern ] +takeRightsUpToLeft :: [Either a b] -> [b] +takeRightsUpToLeft l = go l [] + where + go ((Right x):xs) !res = go xs (x:res) + go ((Left x):xs) !res = res + insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value] -- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied @@ -458,16 +465,21 @@ insert' opts col docs = do NoConfirm -> ["w" =: (0 :: Int)] Confirm params -> params let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern - chunks <- forM (splitAtLimit - (not (KeepGoing `elem` opts)) + let ordered = (not (KeepGoing `elem` opts)) + let preChunks = splitAtLimit (maxBsonObjectSize sd - docSize) -- size of auxiliary part of insert -- document should be subtracted from -- the overall size (maxWriteBatchSize sd) - docs) - (insertBlock opts col) - return $ concat chunks + docs + let chunks = + if ordered + then takeRightsUpToLeft preChunks + else rights preChunks + + chunkResults <- forM chunks (insertBlock opts col) + return $ concat chunkResults insertBlock :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value] @@ -508,20 +520,14 @@ insertBlock opts col docs = do (maybe 0 id $ lookup "ok" doc) (show err ++ show writeConcernErr) -splitAtLimit :: Bool -> Int -> Int -> [Document] -> [[Document]] -splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list +splitAtLimit :: Int -> Int -> [Document] -> [Either Failure [Document]] +splitAtLimit maxSize maxCount list = chop (go 0 0 []) list where - go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document]) - go _ _ res [] = (reverse res, []) + go :: Int -> Int -> [Document] -> [Document] -> ((Either Failure [Document]), [Document]) + go _ _ res [] = (Right $ reverse res, []) go curSize curCount [] (x:xs) | ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) = - if (not ordered) - then - go curSize curCount [] xs -- Skip this document and insert the other documents. - else - throw $ WriteFailure 0 0 "One document is too big for the message" -- TODO add proper index in the first argument - -- TODO it shouldn't throw exceptions. otherwise no documents will be added to the list. - -- It should return UpdateResult with this document as failed. + (Left $ WriteFailure 0 0 "One document is too big for the message", xs) go curSize curCount res (x:xs) = if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) -- we have ^ 2 brackets and curCount commas in @@ -529,7 +535,7 @@ splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list -- account || ((curCount + 1) > maxCount)) then - (reverse res, x:xs) + (Right $ reverse res, x:xs) else go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs @@ -644,14 +650,17 @@ update' ordered col updateDocs = do NoConfirm -> ["w" =: (0 :: Int)] Confirm params -> params let docSize = sizeOfDocument $ updateCommandDocument col ordered [] writeConcern - let chunks = splitAtLimit - ordered + let preChunks = splitAtLimit (maxBsonObjectSize sd - docSize) -- size of auxiliary part of update -- document should be subtracted from -- the overall size (maxWriteBatchSize sd) updates + let chunks = + if ordered + then takeRightsUpToLeft preChunks + else rights preChunks let lens = map length chunks let lSums = 0 : (zipWith (+) lSums lens) blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do @@ -850,14 +859,17 @@ delete' ordered col deleteDocs = do NoConfirm -> ["w" =: (0 :: Int)] Confirm params -> params let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern - let chunks = splitAtLimit - ordered + let preChunks = splitAtLimit (maxBsonObjectSize sd - docSize) -- size of auxiliary part of delete -- document should be subtracted from -- the overall size (maxWriteBatchSize sd) deletes + let chunks = + if ordered + then takeRightsUpToLeft preChunks + else rights preChunks forM_ chunks (deleteBlock ordered col) return DeleteResult