Insert the list of documents into chunks
This commit is contained in:
parent
a632e8ff55
commit
3a4ebcb23b
3 changed files with 115 additions and 15 deletions
|
@ -46,8 +46,8 @@ module Database.MongoDB.Query (
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Prelude hiding (lookup)
|
import Prelude hiding (lookup)
|
||||||
import Control.Exception (Exception, throwIO)
|
import Control.Exception (Exception, throwIO, throw)
|
||||||
import Control.Monad (unless, replicateM, liftM)
|
import Control.Monad (unless, replicateM, liftM, forM)
|
||||||
import Data.Int (Int32, Int64)
|
import Data.Int (Int32, Int64)
|
||||||
import Data.Maybe (listToMaybe, catMaybes, isNothing)
|
import Data.Maybe (listToMaybe, catMaybes, isNothing)
|
||||||
import Data.Word (Word32)
|
import Data.Word (Word32)
|
||||||
|
@ -70,9 +70,11 @@ import Control.Monad.Error (Error(..))
|
||||||
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
||||||
import Control.Monad.Trans (MonadIO, liftIO)
|
import Control.Monad.Trans (MonadIO, liftIO)
|
||||||
import Control.Monad.Trans.Control (MonadBaseControl(..))
|
import Control.Monad.Trans.Control (MonadBaseControl(..))
|
||||||
|
import Data.Binary.Put (runPut)
|
||||||
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
||||||
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
||||||
(=?), (!?), Val(..))
|
(=?), (!?), Val(..))
|
||||||
|
import Data.Bson.Binary (putDocument)
|
||||||
import Data.Text (Text)
|
import Data.Text (Text)
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
|
|
||||||
|
@ -90,6 +92,7 @@ import qualified Database.MongoDB.Internal.Protocol as P
|
||||||
|
|
||||||
import qualified Crypto.Nonce as Nonce
|
import qualified Crypto.Nonce as Nonce
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
|
import qualified Data.ByteString.Lazy as LBS
|
||||||
import qualified Data.ByteString.Base16 as B16
|
import qualified Data.ByteString.Base16 as B16
|
||||||
import qualified Data.ByteString.Base64 as B64
|
import qualified Data.ByteString.Base64 as B64
|
||||||
import qualified Data.ByteString.Char8 as B
|
import qualified Data.ByteString.Char8 as B
|
||||||
|
@ -388,7 +391,7 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of
|
||||||
|
|
||||||
insert :: (MonadIO m) => Collection -> Document -> Action m Value
|
insert :: (MonadIO m) => Collection -> Document -> Action m Value
|
||||||
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
|
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
|
||||||
insert col doc = head `liftM` insertMany col [doc]
|
insert col doc = head `liftM` insertBlock [] col [doc]
|
||||||
|
|
||||||
insert_ :: (MonadIO m) => Collection -> Document -> Action m ()
|
insert_ :: (MonadIO m) => Collection -> Document -> Action m ()
|
||||||
-- ^ Same as 'insert' except don't return _id
|
-- ^ Same as 'insert' except don't return _id
|
||||||
|
@ -410,9 +413,36 @@ insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
|
||||||
-- ^ Same as 'insertAll' except don't return _ids
|
-- ^ Same as 'insertAll' except don't return _ids
|
||||||
insertAll_ col docs = insertAll col docs >> return ()
|
insertAll_ col docs = insertAll col docs >> return ()
|
||||||
|
|
||||||
insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
insertCommandDocument :: [InsertOption] -> Collection -> [Document] -> Document
|
||||||
|
insertCommandDocument opts col docs =
|
||||||
|
[ "insert" =: col
|
||||||
|
, "ordered" =: (KeepGoing `notElem` opts)
|
||||||
|
, "documents" =: docs
|
||||||
|
]
|
||||||
|
|
||||||
|
insert' :: (MonadIO m)
|
||||||
|
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
||||||
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
|
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
|
||||||
insert' opts col docs = do
|
insert' opts col docs = do
|
||||||
|
p <- asks mongoPipe
|
||||||
|
let sd = P.serverData p
|
||||||
|
let docSize = sizeOfDocument $ insertCommandDocument opts col []
|
||||||
|
chunks <- forM (splitAtLimit
|
||||||
|
opts
|
||||||
|
(maxBsonObjectSize sd - docSize)
|
||||||
|
-- ^ size of auxiliary part of insert
|
||||||
|
-- document should be subtracted from
|
||||||
|
-- the overall size
|
||||||
|
(maxWriteBatchSize sd)
|
||||||
|
docs)
|
||||||
|
(insertBlock opts col)
|
||||||
|
return $ concat chunks
|
||||||
|
|
||||||
|
insertBlock :: (MonadIO m)
|
||||||
|
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
||||||
|
-- ^ This will fail if the list of documents is bigger than restrictions
|
||||||
|
insertBlock _ _ [] = return []
|
||||||
|
insertBlock opts col docs = do
|
||||||
db <- thisDatabase
|
db <- thisDatabase
|
||||||
docs' <- liftIO $ mapM assignId docs
|
docs' <- liftIO $ mapM assignId docs
|
||||||
|
|
||||||
|
@ -423,20 +453,52 @@ insert' opts col docs = do
|
||||||
write (Insert (db <.> col) opts docs')
|
write (Insert (db <.> col) opts docs')
|
||||||
return $ map (valueAt "_id") docs'
|
return $ map (valueAt "_id") docs'
|
||||||
else do
|
else do
|
||||||
doc <- runCommand $
|
doc <- runCommand $ insertCommandDocument opts col docs'
|
||||||
[ "insert" =: col
|
|
||||||
, "ordered" =: (KeepGoing `notElem` opts)
|
|
||||||
, "documents" =: docs'
|
|
||||||
]
|
|
||||||
liftIO $ putStrLn $ show doc
|
liftIO $ putStrLn $ show doc
|
||||||
case (look "writeErrors" doc, look "writeConcernError" doc) of
|
case (look "writeErrors" doc, look "writeConcernError" doc) of
|
||||||
(Nothing, Nothing) -> return $ map (valueAt "_id") docs'
|
(Nothing, Nothing) -> return $ map (valueAt "_id") docs'
|
||||||
(Just err, Nothing) -> do
|
(Just err, Nothing) -> do
|
||||||
liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err)
|
liftIO $ throwIO $ WriteFailure
|
||||||
|
(maybe 0 id $ lookup "ok" doc)
|
||||||
|
(show err)
|
||||||
(Nothing, Just err) -> do
|
(Nothing, Just err) -> do
|
||||||
liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err)
|
liftIO $ throwIO $ WriteFailure
|
||||||
|
(maybe 0 id $ lookup "ok" doc)
|
||||||
|
(show err)
|
||||||
(Just err, Just writeConcernErr) -> do
|
(Just err, Just writeConcernErr) -> do
|
||||||
liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err ++ show writeConcernErr)
|
liftIO $ throwIO $ WriteFailure
|
||||||
|
(maybe 0 id $ lookup "ok" doc)
|
||||||
|
(show err ++ show writeConcernErr)
|
||||||
|
|
||||||
|
splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]]
|
||||||
|
splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list
|
||||||
|
where
|
||||||
|
go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document])
|
||||||
|
go _ _ res [] = (reverse res, [])
|
||||||
|
go curSize curCount [] (x:xs) |
|
||||||
|
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
|
||||||
|
if (KeepGoing `elem` opts)
|
||||||
|
then
|
||||||
|
go curSize curCount [] xs -- Skip this document and insert the other documents.
|
||||||
|
else
|
||||||
|
throw $ WriteFailure 0 "One document is too big for the message"
|
||||||
|
go curSize curCount res (x:xs) =
|
||||||
|
if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize)
|
||||||
|
-- we have ^ 2 brackets and curCount commas in
|
||||||
|
-- the document that we need to take into
|
||||||
|
-- account
|
||||||
|
|| ((curCount + 1) > maxCount))
|
||||||
|
then
|
||||||
|
(reverse res, x:xs)
|
||||||
|
else
|
||||||
|
go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs
|
||||||
|
|
||||||
|
chop :: ([a] -> (b, [a])) -> [a] -> [b]
|
||||||
|
chop _ [] = []
|
||||||
|
chop f as = let (b, as') = f as in b : chop f as'
|
||||||
|
|
||||||
|
sizeOfDocument :: Document -> Int
|
||||||
|
sizeOfDocument d = fromIntegral $ LBS.length $ runPut $ putDocument d
|
||||||
|
|
||||||
assignId :: Document -> IO Document
|
assignId :: Document -> IO Document
|
||||||
-- ^ Assign a unique value to _id field if missing
|
-- ^ Assign a unique value to _id field if missing
|
||||||
|
|
|
@ -63,7 +63,7 @@ Source-repository head
|
||||||
test-suite test
|
test-suite test
|
||||||
hs-source-dirs: test
|
hs-source-dirs: test
|
||||||
main-is: Main.hs
|
main-is: Main.hs
|
||||||
ghc-options: -Wall -with-rtsopts "-K32m"
|
ghc-options: -Wall -with-rtsopts "-K64m"
|
||||||
type: exitcode-stdio-1.0
|
type: exitcode-stdio-1.0
|
||||||
build-depends: mongoDB
|
build-depends: mongoDB
|
||||||
, base
|
, base
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
|
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
|
||||||
|
|
||||||
module QuerySpec (spec) where
|
module QuerySpec (spec) where
|
||||||
|
import Data.String (IsString(..))
|
||||||
import TestImport
|
import TestImport
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import qualified Data.List as L
|
import qualified Data.List as L
|
||||||
|
@ -33,6 +34,15 @@ insertDuplicateWith testInsert = do
|
||||||
]
|
]
|
||||||
return ()
|
return ()
|
||||||
|
|
||||||
|
bigDocument :: Document
|
||||||
|
bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
|
||||||
|
|
||||||
|
fineGrainedBigDocument :: Document
|
||||||
|
fineGrainedBigDocument = (flip map) [1..1000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
|
||||||
|
|
||||||
|
hugeDocument :: Document
|
||||||
|
hugeDocument = (flip map) [1..1000000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
|
||||||
|
|
||||||
spec :: Spec
|
spec :: Spec
|
||||||
spec = around withCleanDatabase $ do
|
spec = around withCleanDatabase $ do
|
||||||
describe "useDb" $ do
|
describe "useDb" $ do
|
||||||
|
@ -80,6 +90,9 @@ spec = around withCleanDatabase $ do
|
||||||
, ["name" =: "Dodgers", "league" =: "American"]
|
, ["name" =: "Dodgers", "league" =: "American"]
|
||||||
]
|
]
|
||||||
ids `shouldBe` ()
|
ids `shouldBe` ()
|
||||||
|
it "fails if the document is too big" $ do
|
||||||
|
(db $ insertMany_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException
|
||||||
|
|
||||||
|
|
||||||
context "Insert a document with duplicating key" $ do
|
context "Insert a document with duplicating key" $ do
|
||||||
before (insertDuplicateWith insertMany_ `catch` \(_ :: Failure) -> return ()) $ do
|
before (insertDuplicateWith insertMany_ `catch` \(_ :: Failure) -> return ()) $ do
|
||||||
|
@ -136,6 +149,31 @@ spec = around withCleanDatabase $ do
|
||||||
|
|
||||||
liftIO $ (length returnedDocs) `shouldBe` 100000
|
liftIO $ (length returnedDocs) `shouldBe` 100000
|
||||||
|
|
||||||
|
describe "insertAll_" $ do
|
||||||
|
it "inserts big documents" $ do
|
||||||
|
let docs = replicate 100 bigDocument
|
||||||
|
db $ insertAll_ "bigDocCollection" docs
|
||||||
|
db $ do
|
||||||
|
cur <- find $ (select [] "bigDocCollection") {limit = 100000, batchSize = 100000}
|
||||||
|
returnedDocs <- rest cur
|
||||||
|
|
||||||
|
liftIO $ (length returnedDocs) `shouldBe` 100
|
||||||
|
it "inserts fine grained big documents" $ do
|
||||||
|
let docs = replicate 1000 fineGrainedBigDocument
|
||||||
|
db $ insertAll_ "bigDocFineGrainedCollection" docs
|
||||||
|
db $ do
|
||||||
|
cur <- find $ (select [] "bigDocFineGrainedCollection") {limit = 100000, batchSize = 100000}
|
||||||
|
returnedDocs <- rest cur
|
||||||
|
|
||||||
|
liftIO $ (length returnedDocs) `shouldBe` 1000
|
||||||
|
it "skips one too big document" $ do
|
||||||
|
db $ insertAll_ "hugeDocCollection" [hugeDocument]
|
||||||
|
db $ do
|
||||||
|
cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000}
|
||||||
|
returnedDocs <- rest cur
|
||||||
|
|
||||||
|
liftIO $ (length returnedDocs) `shouldBe` 0
|
||||||
|
|
||||||
describe "rest" $ do
|
describe "rest" $ do
|
||||||
it "returns all documents from the collection" $ do
|
it "returns all documents from the collection" $ do
|
||||||
let docs = (flip map) [0..6000] $ \i ->
|
let docs = (flip map) [0..6000] $ \i ->
|
||||||
|
|
Loading…
Reference in a new issue