Fix splitAtLimit
This commit is contained in:
parent
e586fd51cc
commit
af49f43027
1 changed files with 33 additions and 21 deletions
|
@ -50,6 +50,7 @@ import Prelude hiding (lookup)
|
||||||
import Control.Exception (Exception, throwIO, throw)
|
import Control.Exception (Exception, throwIO, throw)
|
||||||
import Control.Monad (unless, replicateM, liftM, forM, forM_, liftM2)
|
import Control.Monad (unless, replicateM, liftM, forM, forM_, liftM2)
|
||||||
import Data.Int (Int32, Int64)
|
import Data.Int (Int32, Int64)
|
||||||
|
import Data.Either (lefts, rights)
|
||||||
import Data.List (foldl1')
|
import Data.List (foldl1')
|
||||||
import Data.Maybe (listToMaybe, catMaybes, isNothing, maybeToList)
|
import Data.Maybe (listToMaybe, catMaybes, isNothing, maybeToList)
|
||||||
import Data.Word (Word32)
|
import Data.Word (Word32)
|
||||||
|
@ -447,6 +448,12 @@ insertCommandDocument opts col docs writeConcern =
|
||||||
, "writeConcern" =: writeConcern
|
, "writeConcern" =: writeConcern
|
||||||
]
|
]
|
||||||
|
|
||||||
|
takeRightsUpToLeft :: [Either a b] -> [b]
|
||||||
|
takeRightsUpToLeft l = go l []
|
||||||
|
where
|
||||||
|
go ((Right x):xs) !res = go xs (x:res)
|
||||||
|
go ((Left x):xs) !res = res
|
||||||
|
|
||||||
insert' :: (MonadIO m)
|
insert' :: (MonadIO m)
|
||||||
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
||||||
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
|
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
|
||||||
|
@ -458,16 +465,21 @@ insert' opts col docs = do
|
||||||
NoConfirm -> ["w" =: (0 :: Int)]
|
NoConfirm -> ["w" =: (0 :: Int)]
|
||||||
Confirm params -> params
|
Confirm params -> params
|
||||||
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
|
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
|
||||||
chunks <- forM (splitAtLimit
|
let ordered = (not (KeepGoing `elem` opts))
|
||||||
(not (KeepGoing `elem` opts))
|
let preChunks = splitAtLimit
|
||||||
(maxBsonObjectSize sd - docSize)
|
(maxBsonObjectSize sd - docSize)
|
||||||
-- size of auxiliary part of insert
|
-- size of auxiliary part of insert
|
||||||
-- document should be subtracted from
|
-- document should be subtracted from
|
||||||
-- the overall size
|
-- the overall size
|
||||||
(maxWriteBatchSize sd)
|
(maxWriteBatchSize sd)
|
||||||
docs)
|
docs
|
||||||
(insertBlock opts col)
|
let chunks =
|
||||||
return $ concat chunks
|
if ordered
|
||||||
|
then takeRightsUpToLeft preChunks
|
||||||
|
else rights preChunks
|
||||||
|
|
||||||
|
chunkResults <- forM chunks (insertBlock opts col)
|
||||||
|
return $ concat chunkResults
|
||||||
|
|
||||||
insertBlock :: (MonadIO m)
|
insertBlock :: (MonadIO m)
|
||||||
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
||||||
|
@ -508,20 +520,14 @@ insertBlock opts col docs = do
|
||||||
(maybe 0 id $ lookup "ok" doc)
|
(maybe 0 id $ lookup "ok" doc)
|
||||||
(show err ++ show writeConcernErr)
|
(show err ++ show writeConcernErr)
|
||||||
|
|
||||||
splitAtLimit :: Bool -> Int -> Int -> [Document] -> [[Document]]
|
splitAtLimit :: Int -> Int -> [Document] -> [Either Failure [Document]]
|
||||||
splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list
|
splitAtLimit maxSize maxCount list = chop (go 0 0 []) list
|
||||||
where
|
where
|
||||||
go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document])
|
go :: Int -> Int -> [Document] -> [Document] -> ((Either Failure [Document]), [Document])
|
||||||
go _ _ res [] = (reverse res, [])
|
go _ _ res [] = (Right $ reverse res, [])
|
||||||
go curSize curCount [] (x:xs) |
|
go curSize curCount [] (x:xs) |
|
||||||
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
|
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
|
||||||
if (not ordered)
|
(Left $ WriteFailure 0 0 "One document is too big for the message", xs)
|
||||||
then
|
|
||||||
go curSize curCount [] xs -- Skip this document and insert the other documents.
|
|
||||||
else
|
|
||||||
throw $ WriteFailure 0 0 "One document is too big for the message" -- TODO add proper index in the first argument
|
|
||||||
-- TODO it shouldn't throw exceptions. otherwise no documents will be added to the list.
|
|
||||||
-- It should return UpdateResult with this document as failed.
|
|
||||||
go curSize curCount res (x:xs) =
|
go curSize curCount res (x:xs) =
|
||||||
if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize)
|
if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize)
|
||||||
-- we have ^ 2 brackets and curCount commas in
|
-- we have ^ 2 brackets and curCount commas in
|
||||||
|
@ -529,7 +535,7 @@ splitAtLimit ordered maxSize maxCount list = chop (go 0 0 []) list
|
||||||
-- account
|
-- account
|
||||||
|| ((curCount + 1) > maxCount))
|
|| ((curCount + 1) > maxCount))
|
||||||
then
|
then
|
||||||
(reverse res, x:xs)
|
(Right $ reverse res, x:xs)
|
||||||
else
|
else
|
||||||
go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs
|
go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs
|
||||||
|
|
||||||
|
@ -644,14 +650,17 @@ update' ordered col updateDocs = do
|
||||||
NoConfirm -> ["w" =: (0 :: Int)]
|
NoConfirm -> ["w" =: (0 :: Int)]
|
||||||
Confirm params -> params
|
Confirm params -> params
|
||||||
let docSize = sizeOfDocument $ updateCommandDocument col ordered [] writeConcern
|
let docSize = sizeOfDocument $ updateCommandDocument col ordered [] writeConcern
|
||||||
let chunks = splitAtLimit
|
let preChunks = splitAtLimit
|
||||||
ordered
|
|
||||||
(maxBsonObjectSize sd - docSize)
|
(maxBsonObjectSize sd - docSize)
|
||||||
-- size of auxiliary part of update
|
-- size of auxiliary part of update
|
||||||
-- document should be subtracted from
|
-- document should be subtracted from
|
||||||
-- the overall size
|
-- the overall size
|
||||||
(maxWriteBatchSize sd)
|
(maxWriteBatchSize sd)
|
||||||
updates
|
updates
|
||||||
|
let chunks =
|
||||||
|
if ordered
|
||||||
|
then takeRightsUpToLeft preChunks
|
||||||
|
else rights preChunks
|
||||||
let lens = map length chunks
|
let lens = map length chunks
|
||||||
let lSums = 0 : (zipWith (+) lSums lens)
|
let lSums = 0 : (zipWith (+) lSums lens)
|
||||||
blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do
|
blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do
|
||||||
|
@ -850,14 +859,17 @@ delete' ordered col deleteDocs = do
|
||||||
NoConfirm -> ["w" =: (0 :: Int)]
|
NoConfirm -> ["w" =: (0 :: Int)]
|
||||||
Confirm params -> params
|
Confirm params -> params
|
||||||
let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern
|
let docSize = sizeOfDocument $ deleteCommandDocument col ordered [] writeConcern
|
||||||
let chunks = splitAtLimit
|
let preChunks = splitAtLimit
|
||||||
ordered
|
|
||||||
(maxBsonObjectSize sd - docSize)
|
(maxBsonObjectSize sd - docSize)
|
||||||
-- size of auxiliary part of delete
|
-- size of auxiliary part of delete
|
||||||
-- document should be subtracted from
|
-- document should be subtracted from
|
||||||
-- the overall size
|
-- the overall size
|
||||||
(maxWriteBatchSize sd)
|
(maxWriteBatchSize sd)
|
||||||
deletes
|
deletes
|
||||||
|
let chunks =
|
||||||
|
if ordered
|
||||||
|
then takeRightsUpToLeft preChunks
|
||||||
|
else rights preChunks
|
||||||
forM_ chunks (deleteBlock ordered col)
|
forM_ chunks (deleteBlock ordered col)
|
||||||
return DeleteResult
|
return DeleteResult
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue