From d123460b4077c4d918ece8d7aa83f559c9fd8b79 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 22 May 2016 17:38:07 -0700 Subject: [PATCH 1/4] Implement insert using command mechanism --- Database/MongoDB/Query.hs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index c336aa1..2b00e21 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -412,8 +412,28 @@ insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m insert' opts col docs = do db <- thisDatabase docs' <- liftIO $ mapM assignId docs - write (Insert (db <.> col) opts docs') - return $ map (valueAt "_id") docs' + + p <- asks mongoPipe + let sd = P.serverData p + if (maxWireVersion sd < 2) + then do + write (Insert (db <.> col) opts docs') + return $ map (valueAt "_id") docs' + else do + doc <- runCommand $ + [ "insert" =: col + , "ordered" =: (KeepGoing `notElem` opts) + , "documents" =: docs' + ] + liftIO $ putStrLn $ show doc + case (look "writeErrors" doc, look "writeConcernError" doc) of + (Nothing, Nothing) -> return $ map (valueAt "_id") docs' + (Just err, Nothing) -> do + liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err) + (Nothing, Just err) -> do + liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err) + (Just err, Just writeConcernErr) -> do + liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err ++ show writeConcernErr) assignId :: Document -> IO Document -- ^ Assign a unique value to _id field if missing From a632e8ff55ec7977b1f2871d60f474059434d83a Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Thu, 26 May 2016 23:03:55 -0700 Subject: [PATCH 2/4] Collect size restrictions from the server --- Database/MongoDB/Internal/Protocol.hs | 9 ++++++--- Database/MongoDB/Query.hs | 3 +++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 2bf0916..bf647ed 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -91,9 +91,12 @@ data Pipeline = Pipeline } data ServerData = ServerData - { isMaster :: Bool - , minWireVersion :: Int - , maxWireVersion :: Int + { isMaster :: Bool + , minWireVersion :: Int + , maxWireVersion :: Int + , maxMessageSizeBytes :: Int + , maxBsonObjectSize :: Int + , maxWriteBatchSize :: Int } -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 2b00e21..7058c4b 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -304,6 +304,9 @@ retrieveServerData = do { isMaster = (fromMaybe False $ lookup "ismaster" d) , minWireVersion = (fromMaybe 0 $ lookup "minWireVersion" d) , maxWireVersion = (fromMaybe 0 $ lookup "maxWireVersion" d) + , maxMessageSizeBytes = (fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d) + , maxBsonObjectSize = (fromMaybe (16 * 1024 * 1024) $ lookup "maxBsonObjectSize" d) + , maxWriteBatchSize = (fromMaybe 1000 $ lookup "maxWriteBatchSize" d) } return newSd From 3a4ebcb23beaf21fbbb31b49825001429878ed16 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sun, 29 May 2016 18:21:31 -0700 Subject: [PATCH 3/4] Insert the list of documents into chunks --- Database/MongoDB/Query.hs | 86 +++++++++++++++++++++++++++++++++------ mongoDB.cabal | 2 +- test/QuerySpec.hs | 42 ++++++++++++++++++- 3 files changed, 115 insertions(+), 15 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 7058c4b..a347dda 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -46,8 +46,8 @@ module Database.MongoDB.Query ( ) where import Prelude hiding (lookup) -import Control.Exception (Exception, throwIO) -import Control.Monad (unless, replicateM, liftM) +import Control.Exception (Exception, throwIO, throw) +import Control.Monad (unless, replicateM, liftM, forM) import Data.Int (Int32, Int64) import Data.Maybe (listToMaybe, catMaybes, isNothing) import Data.Word (Word32) @@ -70,9 +70,11 @@ import Control.Monad.Error (Error(..)) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl(..)) +import Data.Binary.Put (runPut) import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool), Javascript, at, valueAt, lookup, look, genObjectId, (=:), (=?), (!?), Val(..)) +import Data.Bson.Binary (putDocument) import Data.Text (Text) import qualified Data.Text as T @@ -90,6 +92,7 @@ import qualified Database.MongoDB.Internal.Protocol as P import qualified Crypto.Nonce as Nonce import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as LBS import qualified Data.ByteString.Base16 as B16 import qualified Data.ByteString.Base64 as B64 import qualified Data.ByteString.Char8 as B @@ -388,7 +391,7 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of insert :: (MonadIO m) => Collection -> Document -> Action m Value -- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied -insert col doc = head `liftM` insertMany col [doc] +insert col doc = head `liftM` insertBlock [] col [doc] insert_ :: (MonadIO m) => Collection -> Document -> Action m () -- ^ Same as 'insert' except don't return _id @@ -410,9 +413,36 @@ insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m () -- ^ Same as 'insertAll' except don't return _ids insertAll_ col docs = insertAll col docs >> return () -insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value] +insertCommandDocument :: [InsertOption] -> Collection -> [Document] -> Document +insertCommandDocument opts col docs = + [ "insert" =: col + , "ordered" =: (KeepGoing `notElem` opts) + , "documents" =: docs + ] + +insert' :: (MonadIO m) + => [InsertOption] -> Collection -> [Document] -> Action m [Value] -- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied insert' opts col docs = do + p <- asks mongoPipe + let sd = P.serverData p + let docSize = sizeOfDocument $ insertCommandDocument opts col [] + chunks <- forM (splitAtLimit + opts + (maxBsonObjectSize sd - docSize) + -- ^ size of auxiliary part of insert + -- document should be subtracted from + -- the overall size + (maxWriteBatchSize sd) + docs) + (insertBlock opts col) + return $ concat chunks + +insertBlock :: (MonadIO m) + => [InsertOption] -> Collection -> [Document] -> Action m [Value] +-- ^ This will fail if the list of documents is bigger than restrictions +insertBlock _ _ [] = return [] +insertBlock opts col docs = do db <- thisDatabase docs' <- liftIO $ mapM assignId docs @@ -423,20 +453,52 @@ insert' opts col docs = do write (Insert (db <.> col) opts docs') return $ map (valueAt "_id") docs' else do - doc <- runCommand $ - [ "insert" =: col - , "ordered" =: (KeepGoing `notElem` opts) - , "documents" =: docs' - ] + doc <- runCommand $ insertCommandDocument opts col docs' liftIO $ putStrLn $ show doc case (look "writeErrors" doc, look "writeConcernError" doc) of (Nothing, Nothing) -> return $ map (valueAt "_id") docs' (Just err, Nothing) -> do - liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err) + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) (Nothing, Just err) -> do - liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err) + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) (Just err, Just writeConcernErr) -> do - liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "ok" doc) (show err ++ show writeConcernErr) + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err ++ show writeConcernErr) + +splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]] +splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list + where + go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document]) + go _ _ res [] = (reverse res, []) + go curSize curCount [] (x:xs) | + ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) = + if (KeepGoing `elem` opts) + then + go curSize curCount [] xs -- Skip this document and insert the other documents. + else + throw $ WriteFailure 0 "One document is too big for the message" + go curSize curCount res (x:xs) = + if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) + -- we have ^ 2 brackets and curCount commas in + -- the document that we need to take into + -- account + || ((curCount + 1) > maxCount)) + then + (reverse res, x:xs) + else + go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs + + chop :: ([a] -> (b, [a])) -> [a] -> [b] + chop _ [] = [] + chop f as = let (b, as') = f as in b : chop f as' + +sizeOfDocument :: Document -> Int +sizeOfDocument d = fromIntegral $ LBS.length $ runPut $ putDocument d assignId :: Document -> IO Document -- ^ Assign a unique value to _id field if missing diff --git a/mongoDB.cabal b/mongoDB.cabal index 11a4ec1..eb15c53 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -63,7 +63,7 @@ Source-repository head test-suite test hs-source-dirs: test main-is: Main.hs - ghc-options: -Wall -with-rtsopts "-K32m" + ghc-options: -Wall -with-rtsopts "-K64m" type: exitcode-stdio-1.0 build-depends: mongoDB , base diff --git a/test/QuerySpec.hs b/test/QuerySpec.hs index ebbea72..57ee7e7 100644 --- a/test/QuerySpec.hs +++ b/test/QuerySpec.hs @@ -2,6 +2,7 @@ {-# OPTIONS_GHC -fno-warn-type-defaults #-} module QuerySpec (spec) where +import Data.String (IsString(..)) import TestImport import Control.Exception import qualified Data.List as L @@ -33,6 +34,15 @@ insertDuplicateWith testInsert = do ] return () +bigDocument :: Document +bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + +fineGrainedBigDocument :: Document +fineGrainedBigDocument = (flip map) [1..1000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + +hugeDocument :: Document +hugeDocument = (flip map) [1..1000000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + spec :: Spec spec = around withCleanDatabase $ do describe "useDb" $ do @@ -77,9 +87,12 @@ spec = around withCleanDatabase $ do describe "insertMany_" $ do it "inserts documents to the collection and returns nothing" $ do ids <- db $ insertMany_ "team" [ ["name" =: "Yankees", "league" =: "American"] - , ["name" =: "Dodgers", "league" =: "American"] - ] + , ["name" =: "Dodgers", "league" =: "American"] + ] ids `shouldBe` () + it "fails if the document is too big" $ do + (db $ insertMany_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException + context "Insert a document with duplicating key" $ do before (insertDuplicateWith insertMany_ `catch` \(_ :: Failure) -> return ()) $ do @@ -136,6 +149,31 @@ spec = around withCleanDatabase $ do liftIO $ (length returnedDocs) `shouldBe` 100000 + describe "insertAll_" $ do + it "inserts big documents" $ do + let docs = replicate 100 bigDocument + db $ insertAll_ "bigDocCollection" docs + db $ do + cur <- find $ (select [] "bigDocCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 100 + it "inserts fine grained big documents" $ do + let docs = replicate 1000 fineGrainedBigDocument + db $ insertAll_ "bigDocFineGrainedCollection" docs + db $ do + cur <- find $ (select [] "bigDocFineGrainedCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 1000 + it "skips one too big document" $ do + db $ insertAll_ "hugeDocCollection" [hugeDocument] + db $ do + cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 0 + describe "rest" $ do it "returns all documents from the collection" $ do let docs = (flip map) [0..6000] $ \i -> From 884d0e223d4a45cb9818b5771873d16998f53a3c Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Mon, 30 May 2016 12:30:20 -0700 Subject: [PATCH 4/4] Add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a179be..4032efa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac ### Added - TLS implementation. So far it is an experimental feature. +- Insert using command syntax with mongo server >= 2.6 ### Removed - System.IO.Pipeline module