diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a179be..4032efa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac ### Added - TLS implementation. So far it is an experimental feature. +- Insert using command syntax with mongo server >= 2.6 ### Removed - System.IO.Pipeline module diff --git a/Database/MongoDB/Internal/Protocol.hs b/Database/MongoDB/Internal/Protocol.hs index 2bf0916..bf647ed 100644 --- a/Database/MongoDB/Internal/Protocol.hs +++ b/Database/MongoDB/Internal/Protocol.hs @@ -91,9 +91,12 @@ data Pipeline = Pipeline } data ServerData = ServerData - { isMaster :: Bool - , minWireVersion :: Int - , maxWireVersion :: Int + { isMaster :: Bool + , minWireVersion :: Int + , maxWireVersion :: Int + , maxMessageSizeBytes :: Int + , maxBsonObjectSize :: Int + , maxWriteBatchSize :: Int } -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index c336aa1..a347dda 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -46,8 +46,8 @@ module Database.MongoDB.Query ( ) where import Prelude hiding (lookup) -import Control.Exception (Exception, throwIO) -import Control.Monad (unless, replicateM, liftM) +import Control.Exception (Exception, throwIO, throw) +import Control.Monad (unless, replicateM, liftM, forM) import Data.Int (Int32, Int64) import Data.Maybe (listToMaybe, catMaybes, isNothing) import Data.Word (Word32) @@ -70,9 +70,11 @@ import Control.Monad.Error (Error(..)) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl(..)) +import Data.Binary.Put (runPut) import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool), Javascript, at, valueAt, lookup, look, genObjectId, (=:), (=?), (!?), Val(..)) +import Data.Bson.Binary (putDocument) import Data.Text (Text) import qualified Data.Text as T @@ -90,6 +92,7 @@ import qualified Database.MongoDB.Internal.Protocol as P import qualified Crypto.Nonce as Nonce import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as LBS import qualified Data.ByteString.Base16 as B16 import qualified Data.ByteString.Base64 as B64 import qualified Data.ByteString.Char8 as B @@ -304,6 +307,9 @@ retrieveServerData = do { isMaster = (fromMaybe False $ lookup "ismaster" d) , minWireVersion = (fromMaybe 0 $ lookup "minWireVersion" d) , maxWireVersion = (fromMaybe 0 $ lookup "maxWireVersion" d) + , maxMessageSizeBytes = (fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d) + , maxBsonObjectSize = (fromMaybe (16 * 1024 * 1024) $ lookup "maxBsonObjectSize" d) + , maxWriteBatchSize = (fromMaybe 1000 $ lookup "maxWriteBatchSize" d) } return newSd @@ -385,7 +391,7 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of insert :: (MonadIO m) => Collection -> Document -> Action m Value -- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied -insert col doc = head `liftM` insertMany col [doc] +insert col doc = head `liftM` insertBlock [] col [doc] insert_ :: (MonadIO m) => Collection -> Document -> Action m () -- ^ Same as 'insert' except don't return _id @@ -407,13 +413,92 @@ insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m () -- ^ Same as 'insertAll' except don't return _ids insertAll_ col docs = insertAll col docs >> return () -insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value] +insertCommandDocument :: [InsertOption] -> Collection -> [Document] -> Document +insertCommandDocument opts col docs = + [ "insert" =: col + , "ordered" =: (KeepGoing `notElem` opts) + , "documents" =: docs + ] + +insert' :: (MonadIO m) + => [InsertOption] -> Collection -> [Document] -> Action m [Value] -- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied insert' opts col docs = do + p <- asks mongoPipe + let sd = P.serverData p + let docSize = sizeOfDocument $ insertCommandDocument opts col [] + chunks <- forM (splitAtLimit + opts + (maxBsonObjectSize sd - docSize) + -- ^ size of auxiliary part of insert + -- document should be subtracted from + -- the overall size + (maxWriteBatchSize sd) + docs) + (insertBlock opts col) + return $ concat chunks + +insertBlock :: (MonadIO m) + => [InsertOption] -> Collection -> [Document] -> Action m [Value] +-- ^ This will fail if the list of documents is bigger than restrictions +insertBlock _ _ [] = return [] +insertBlock opts col docs = do db <- thisDatabase docs' <- liftIO $ mapM assignId docs - write (Insert (db <.> col) opts docs') - return $ map (valueAt "_id") docs' + + p <- asks mongoPipe + let sd = P.serverData p + if (maxWireVersion sd < 2) + then do + write (Insert (db <.> col) opts docs') + return $ map (valueAt "_id") docs' + else do + doc <- runCommand $ insertCommandDocument opts col docs' + liftIO $ putStrLn $ show doc + case (look "writeErrors" doc, look "writeConcernError" doc) of + (Nothing, Nothing) -> return $ map (valueAt "_id") docs' + (Just err, Nothing) -> do + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) + (Nothing, Just err) -> do + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err) + (Just err, Just writeConcernErr) -> do + liftIO $ throwIO $ WriteFailure + (maybe 0 id $ lookup "ok" doc) + (show err ++ show writeConcernErr) + +splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]] +splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list + where + go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document]) + go _ _ res [] = (reverse res, []) + go curSize curCount [] (x:xs) | + ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) = + if (KeepGoing `elem` opts) + then + go curSize curCount [] xs -- Skip this document and insert the other documents. + else + throw $ WriteFailure 0 "One document is too big for the message" + go curSize curCount res (x:xs) = + if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) + -- we have ^ 2 brackets and curCount commas in + -- the document that we need to take into + -- account + || ((curCount + 1) > maxCount)) + then + (reverse res, x:xs) + else + go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs + + chop :: ([a] -> (b, [a])) -> [a] -> [b] + chop _ [] = [] + chop f as = let (b, as') = f as in b : chop f as' + +sizeOfDocument :: Document -> Int +sizeOfDocument d = fromIntegral $ LBS.length $ runPut $ putDocument d assignId :: Document -> IO Document -- ^ Assign a unique value to _id field if missing diff --git a/mongoDB.cabal b/mongoDB.cabal index 11a4ec1..eb15c53 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -63,7 +63,7 @@ Source-repository head test-suite test hs-source-dirs: test main-is: Main.hs - ghc-options: -Wall -with-rtsopts "-K32m" + ghc-options: -Wall -with-rtsopts "-K64m" type: exitcode-stdio-1.0 build-depends: mongoDB , base diff --git a/test/QuerySpec.hs b/test/QuerySpec.hs index ebbea72..57ee7e7 100644 --- a/test/QuerySpec.hs +++ b/test/QuerySpec.hs @@ -2,6 +2,7 @@ {-# OPTIONS_GHC -fno-warn-type-defaults #-} module QuerySpec (spec) where +import Data.String (IsString(..)) import TestImport import Control.Exception import qualified Data.List as L @@ -33,6 +34,15 @@ insertDuplicateWith testInsert = do ] return () +bigDocument :: Document +bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + +fineGrainedBigDocument :: Document +fineGrainedBigDocument = (flip map) [1..1000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + +hugeDocument :: Document +hugeDocument = (flip map) [1..1000000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") + spec :: Spec spec = around withCleanDatabase $ do describe "useDb" $ do @@ -77,9 +87,12 @@ spec = around withCleanDatabase $ do describe "insertMany_" $ do it "inserts documents to the collection and returns nothing" $ do ids <- db $ insertMany_ "team" [ ["name" =: "Yankees", "league" =: "American"] - , ["name" =: "Dodgers", "league" =: "American"] - ] + , ["name" =: "Dodgers", "league" =: "American"] + ] ids `shouldBe` () + it "fails if the document is too big" $ do + (db $ insertMany_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException + context "Insert a document with duplicating key" $ do before (insertDuplicateWith insertMany_ `catch` \(_ :: Failure) -> return ()) $ do @@ -136,6 +149,31 @@ spec = around withCleanDatabase $ do liftIO $ (length returnedDocs) `shouldBe` 100000 + describe "insertAll_" $ do + it "inserts big documents" $ do + let docs = replicate 100 bigDocument + db $ insertAll_ "bigDocCollection" docs + db $ do + cur <- find $ (select [] "bigDocCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 100 + it "inserts fine grained big documents" $ do + let docs = replicate 1000 fineGrainedBigDocument + db $ insertAll_ "bigDocFineGrainedCollection" docs + db $ do + cur <- find $ (select [] "bigDocFineGrainedCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 1000 + it "skips one too big document" $ do + db $ insertAll_ "hugeDocCollection" [hugeDocument] + db $ do + cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 0 + describe "rest" $ do it "returns all documents from the collection" $ do let docs = (flip map) [0..6000] $ \i ->