From 3a4ebcb23beaf21fbbb31b49825001429878ed16 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 29 May 2016 18:21:31 -0700 Subject: [PATCH] Insert the list of documents into chunks --- Database/MongoDB/Query.hs | 86 +++++++++++++++++++++++++++++++++------ mongoDB.cabal | 2 +- test/QuerySpec.hs | 42 ++++++++++++++++++- 3 files changed, 115 insertions(+), 15 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 7058c4b..a347dda 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -46,8 +46,8 @@ module Database.MongoDB.Query ( ) where import Prelude hiding (lookup) -import Control.Exception (Exception, throwIO) -import Control.Monad (unless, replicateM, liftM) +import Control.Exception (Exception, throwIO, throw) +import Control.Monad (unless, replicateM, liftM, forM) import Data.Int (Int32, Int64) import Data.Maybe (listToMaybe, catMaybes, isNothing) import Data.Word (Word32) @@ -70,9 +70,11 @@ import Control.Monad.Error (Error(..)) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl(..)) +import Data.Binary.Put (runPut) import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool), Javascript, at, valueAt, lookup, look, genObjectId, (=:), (=?), (!?), Val(..)) +import Data.Bson.Binary (putDocument) import Data.Text (Text) import qualified Data.Text as T @@ -90,6 +92,7 @@ import qualified Database.MongoDB.Internal.Protocol as P import qualified Crypto.Nonce as Nonce import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as LBS import qualified Data.ByteString.Base16 as B16 import qualified Data.ByteString.Base64 as B64 import qualified Data.ByteString.Char8 as B @@ -388,7 +391,7 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of insert :: (MonadIO m) => Collection -> Document -> Action m Value -- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied -insert col doc = head `liftM` insertMany col [doc] +insert col doc = head `liftM` insertBlock [] col [doc] insert_ :: (MonadIO m) => Collection -> Document -> Action m () -- ^ Same as 'insert' except don't return _id @@ -410,9 +413,36 @@ insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m () -- ^ Same as 'insertAll' except don't return _ids insertAll_ col docs = insertAll col docs >> return () -insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value] +insertCommandDocument :: [InsertOption] -> Collection -> [Document] -> Document +insertCommandDocument opts col docs = + [ "insert" =: col + , "ordered" =: (KeepGoing `notElem` opts) + , "documents" =: docs + ] + +insert' :: (MonadIO m) + => [InsertOption] -> Collection -> [Document] -> Action m [Value] -- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied insert' opts col docs = do + p <- asks mongoPipe + let sd = P.serverData p + let docSize = sizeOfDocument $ insertCommandDocument opts col [] + chunks <- forM (splitAtLimit + opts + (maxBsonObjectSize sd - docSize) + -- ^ size of auxiliary part of insert + -- document should be subtracted from + -- the overall size + (maxWriteBatchSize sd) + docs) + (insertBlock opts col) + return $ concat chunks + +insertBlock :: (MonadIO m) + => [InsertOption] -> Collection -> [Document] -> Action m [Value] +-- ^ This will fail if the list of documents is bigger than restrictions +insertBlock _ _ [] = return [] +insertBlock opts col docs = do db <- thisDatabase docs' <- liftIO $ mapM assignId docs @@ -423,20 +453,52 @@ insert' opts col docs = do write (Insert (db <.> col) opts docs') return $ map (valueAt "_id") docs' else do - doc <- runCommand $ - [ "insert" =: col - , "ordered" =: (KeepGoing `notElem` opts) - , "documents" =: docs' - ] + doc <- runCommand $ insertCommandDocument opts col docs' liftIO $ putStrLn $ show doc case (look "writeErrors" doc, look "writeConcernError" doc) of (Nothing, Nothing) -> return $ map (valueAt "_id") docs' (Just err, Nothing) -> do - liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err) + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) (Nothing, Just err) -> do - liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err) + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) (Just err, Just writeConcernErr) -> do - liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err ++ show writeConcernErr) + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err ++ show writeConcernErr) + +splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]] +splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list + where + go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document]) + go _ _ res [] = (reverse res, []) + go curSize curCount [] (x:xs) | + ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) = + if (KeepGoing `elem` opts) + then + go curSize curCount [] xs -- Skip this document and insert the other documents. + else + throw $ WriteFailure 0 "One document is too big for the message" + go curSize curCount res (x:xs) = + if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) + -- we have ^ 2 brackets and curCount commas in + -- the document that we need to take into + -- account + || ((curCount + 1) > maxCount)) + then + (reverse res, x:xs) + else + go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs + + chop :: ([a] -> (b, [a])) -> [a] -> [b] + chop _ [] = [] + chop f as = let (b, as') = f as in b : chop f as' + +sizeOfDocument :: Document -> Int +sizeOfDocument d = fromIntegral $ LBS.length $ runPut $ putDocument d assignId :: Document -> IO Document -- ^ Assign a unique value to _id field if missing diff --git a/mongoDB.cabal b/mongoDB.cabal index 11a4ec1..eb15c53 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -63,7 +63,7 @@ Source-repository head test-suite test hs-source-dirs: test main-is: Main.hs - ghc-options: -Wall -with-rtsopts "-K32m" + ghc-options: -Wall -with-rtsopts "-K64m" type: exitcode-stdio-1.0 build-depends: mongoDB , base diff --git a/test/QuerySpec.hs b/test/QuerySpec.hs index ebbea72..57ee7e7 100644 --- a/test/QuerySpec.hs +++ b/test/QuerySpec.hs @@ -2,6 +2,7 @@ {-# OPTIONS_GHC -fno-warn-type-defaults #-} module QuerySpec (spec) where +import Data.String (IsString(..)) import TestImport import Control.Exception import qualified Data.List as L @@ -33,6 +34,15 @@ insertDuplicateWith testInsert = do ] return () +bigDocument :: Document +bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + +fineGrainedBigDocument :: Document +fineGrainedBigDocument = (flip map) [1..1000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + +hugeDocument :: Document +hugeDocument = (flip map) [1..1000000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + spec :: Spec spec = around withCleanDatabase $ do describe "useDb" $ do @@ -77,9 +87,12 @@ spec = around withCleanDatabase $ do describe "insertMany_" $ do it "inserts documents to the collection and returns nothing" $ do ids <- db $ insertMany_ "team" [ ["name" =: "Yankees", "league" =: "American"] - , ["name" =: "Dodgers", "league" =: "American"] - ] + , ["name" =: "Dodgers", "league" =: "American"] + ] ids `shouldBe` () + it "fails if the document is too big" $ do + (db $ insertMany_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException + context "Insert a document with duplicating key" $ do before (insertDuplicateWith insertMany_ `catch` \(_ :: Failure) -> return ()) $ do @@ -136,6 +149,31 @@ spec = around withCleanDatabase $ do liftIO $ (length returnedDocs) `shouldBe` 100000 + describe "insertAll_" $ do + it "inserts big documents" $ do + let docs = replicate 100 bigDocument + db $ insertAll_ "bigDocCollection" docs + db $ do + cur <- find $ (select [] "bigDocCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 100 + it "inserts fine grained big documents" $ do + let docs = replicate 1000 fineGrainedBigDocument + db $ insertAll_ "bigDocFineGrainedCollection" docs + db $ do + cur <- find $ (select [] "bigDocFineGrainedCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 1000 + it "skips one too big document" $ do + db $ insertAll_ "hugeDocCollection" [hugeDocument] + db $ do + cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 0 + describe "rest" $ do it "returns all documents from the collection" $ do let docs = (flip map) [0..6000] $ \i ->