Merge pull request #53 from VictorDenisov/insert_command
Insert command accordingly to the latest mongodb spec.
This commit is contained in:
commit
87fbed4971
5 changed files with 139 additions and 12 deletions
|
@ -6,6 +6,7 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac
|
|||
|
||||
### Added
|
||||
- TLS implementation. So far it is an experimental feature.
|
||||
- Insert using command syntax with mongo server >= 2.6
|
||||
|
||||
### Removed
|
||||
- System.IO.Pipeline module
|
||||
|
|
|
@ -91,9 +91,12 @@ data Pipeline = Pipeline
|
|||
}
|
||||
|
||||
data ServerData = ServerData
|
||||
{ isMaster :: Bool
|
||||
, minWireVersion :: Int
|
||||
, maxWireVersion :: Int
|
||||
{ isMaster :: Bool
|
||||
, minWireVersion :: Int
|
||||
, maxWireVersion :: Int
|
||||
, maxMessageSizeBytes :: Int
|
||||
, maxBsonObjectSize :: Int
|
||||
, maxWriteBatchSize :: Int
|
||||
}
|
||||
|
||||
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
|
||||
|
|
|
@ -46,8 +46,8 @@ module Database.MongoDB.Query (
|
|||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
import Control.Exception (Exception, throwIO)
|
||||
import Control.Monad (unless, replicateM, liftM)
|
||||
import Control.Exception (Exception, throwIO, throw)
|
||||
import Control.Monad (unless, replicateM, liftM, forM)
|
||||
import Data.Int (Int32, Int64)
|
||||
import Data.Maybe (listToMaybe, catMaybes, isNothing)
|
||||
import Data.Word (Word32)
|
||||
|
@ -70,9 +70,11 @@ import Control.Monad.Error (Error(..))
|
|||
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
||||
import Control.Monad.Trans (MonadIO, liftIO)
|
||||
import Control.Monad.Trans.Control (MonadBaseControl(..))
|
||||
import Data.Binary.Put (runPut)
|
||||
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
||||
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
||||
(=?), (!?), Val(..))
|
||||
import Data.Bson.Binary (putDocument)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
|
||||
|
@ -90,6 +92,7 @@ import qualified Database.MongoDB.Internal.Protocol as P
|
|||
|
||||
import qualified Crypto.Nonce as Nonce
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.ByteString.Lazy as LBS
|
||||
import qualified Data.ByteString.Base16 as B16
|
||||
import qualified Data.ByteString.Base64 as B64
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
|
@ -304,6 +307,9 @@ retrieveServerData = do
|
|||
{ isMaster = (fromMaybe False $ lookup "ismaster" d)
|
||||
, minWireVersion = (fromMaybe 0 $ lookup "minWireVersion" d)
|
||||
, maxWireVersion = (fromMaybe 0 $ lookup "maxWireVersion" d)
|
||||
, maxMessageSizeBytes = (fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d)
|
||||
, maxBsonObjectSize = (fromMaybe (16 * 1024 * 1024) $ lookup "maxBsonObjectSize" d)
|
||||
, maxWriteBatchSize = (fromMaybe 1000 $ lookup "maxWriteBatchSize" d)
|
||||
}
|
||||
return newSd
|
||||
|
||||
|
@ -385,7 +391,7 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of
|
|||
|
||||
insert :: (MonadIO m) => Collection -> Document -> Action m Value
|
||||
-- ^ Insert document into collection and return its \"_id\" value, which is created automatically if not supplied
|
||||
insert col doc = head `liftM` insertMany col [doc]
|
||||
insert col doc = head `liftM` insertBlock [] col [doc]
|
||||
|
||||
insert_ :: (MonadIO m) => Collection -> Document -> Action m ()
|
||||
-- ^ Same as 'insert' except don't return _id
|
||||
|
@ -407,13 +413,92 @@ insertAll_ :: (MonadIO m) => Collection -> [Document] -> Action m ()
|
|||
-- ^ Same as 'insertAll' except don't return _ids
|
||||
insertAll_ col docs = insertAll col docs >> return ()
|
||||
|
||||
insert' :: (MonadIO m) => [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
||||
insertCommandDocument :: [InsertOption] -> Collection -> [Document] -> Document
|
||||
insertCommandDocument opts col docs =
|
||||
[ "insert" =: col
|
||||
, "ordered" =: (KeepGoing `notElem` opts)
|
||||
, "documents" =: docs
|
||||
]
|
||||
|
||||
insert' :: (MonadIO m)
|
||||
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
||||
-- ^ Insert documents into collection and return their \"_id\" values, which are created automatically if not supplied
|
||||
insert' opts col docs = do
|
||||
p <- asks mongoPipe
|
||||
let sd = P.serverData p
|
||||
let docSize = sizeOfDocument $ insertCommandDocument opts col []
|
||||
chunks <- forM (splitAtLimit
|
||||
opts
|
||||
(maxBsonObjectSize sd - docSize)
|
||||
-- ^ size of auxiliary part of insert
|
||||
-- document should be subtracted from
|
||||
-- the overall size
|
||||
(maxWriteBatchSize sd)
|
||||
docs)
|
||||
(insertBlock opts col)
|
||||
return $ concat chunks
|
||||
|
||||
insertBlock :: (MonadIO m)
|
||||
=> [InsertOption] -> Collection -> [Document] -> Action m [Value]
|
||||
-- ^ This will fail if the list of documents is bigger than restrictions
|
||||
insertBlock _ _ [] = return []
|
||||
insertBlock opts col docs = do
|
||||
db <- thisDatabase
|
||||
docs' <- liftIO $ mapM assignId docs
|
||||
write (Insert (db <.> col) opts docs')
|
||||
return $ map (valueAt "_id") docs'
|
||||
|
||||
p <- asks mongoPipe
|
||||
let sd = P.serverData p
|
||||
if (maxWireVersion sd < 2)
|
||||
then do
|
||||
write (Insert (db <.> col) opts docs')
|
||||
return $ map (valueAt "_id") docs'
|
||||
else do
|
||||
doc <- runCommand $ insertCommandDocument opts col docs'
|
||||
liftIO $ putStrLn $ show doc
|
||||
case (look "writeErrors" doc, look "writeConcernError" doc) of
|
||||
(Nothing, Nothing) -> return $ map (valueAt "_id") docs'
|
||||
(Just err, Nothing) -> do
|
||||
liftIO $ throwIO $ WriteFailure
|
||||
(maybe 0 id $ lookup "ok" doc)
|
||||
(show err)
|
||||
(Nothing, Just err) -> do
|
||||
liftIO $ throwIO $ WriteFailure
|
||||
(maybe 0 id $ lookup "ok" doc)
|
||||
(show err)
|
||||
(Just err, Just writeConcernErr) -> do
|
||||
liftIO $ throwIO $ WriteFailure
|
||||
(maybe 0 id $ lookup "ok" doc)
|
||||
(show err ++ show writeConcernErr)
|
||||
|
||||
splitAtLimit :: [InsertOption] -> Int -> Int -> [Document] -> [[Document]]
|
||||
splitAtLimit opts maxSize maxCount list = chop (go 0 0 []) list
|
||||
where
|
||||
go :: Int -> Int -> [Document] -> [Document] -> ([Document], [Document])
|
||||
go _ _ res [] = (reverse res, [])
|
||||
go curSize curCount [] (x:xs) |
|
||||
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
|
||||
if (KeepGoing `elem` opts)
|
||||
then
|
||||
go curSize curCount [] xs -- Skip this document and insert the other documents.
|
||||
else
|
||||
throw $ WriteFailure 0 "One document is too big for the message"
|
||||
go curSize curCount res (x:xs) =
|
||||
if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize)
|
||||
-- we have ^ 2 brackets and curCount commas in
|
||||
-- the document that we need to take into
|
||||
-- account
|
||||
|| ((curCount + 1) > maxCount))
|
||||
then
|
||||
(reverse res, x:xs)
|
||||
else
|
||||
go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs
|
||||
|
||||
chop :: ([a] -> (b, [a])) -> [a] -> [b]
|
||||
chop _ [] = []
|
||||
chop f as = let (b, as') = f as in b : chop f as'
|
||||
|
||||
sizeOfDocument :: Document -> Int
|
||||
sizeOfDocument d = fromIntegral $ LBS.length $ runPut $ putDocument d
|
||||
|
||||
assignId :: Document -> IO Document
|
||||
-- ^ Assign a unique value to _id field if missing
|
||||
|
|
|
@ -63,7 +63,7 @@ Source-repository head
|
|||
test-suite test
|
||||
hs-source-dirs: test
|
||||
main-is: Main.hs
|
||||
ghc-options: -Wall -with-rtsopts "-K32m"
|
||||
ghc-options: -Wall -with-rtsopts "-K64m"
|
||||
type: exitcode-stdio-1.0
|
||||
build-depends: mongoDB
|
||||
, base
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
|
||||
|
||||
module QuerySpec (spec) where
|
||||
import Data.String (IsString(..))
|
||||
import TestImport
|
||||
import Control.Exception
|
||||
import qualified Data.List as L
|
||||
|
@ -33,6 +34,15 @@ insertDuplicateWith testInsert = do
|
|||
]
|
||||
return ()
|
||||
|
||||
bigDocument :: Document
|
||||
bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
|
||||
|
||||
fineGrainedBigDocument :: Document
|
||||
fineGrainedBigDocument = (flip map) [1..1000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
|
||||
|
||||
hugeDocument :: Document
|
||||
hugeDocument = (flip map) [1..1000000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
|
||||
|
||||
spec :: Spec
|
||||
spec = around withCleanDatabase $ do
|
||||
describe "useDb" $ do
|
||||
|
@ -77,9 +87,12 @@ spec = around withCleanDatabase $ do
|
|||
describe "insertMany_" $ do
|
||||
it "inserts documents to the collection and returns nothing" $ do
|
||||
ids <- db $ insertMany_ "team" [ ["name" =: "Yankees", "league" =: "American"]
|
||||
, ["name" =: "Dodgers", "league" =: "American"]
|
||||
]
|
||||
, ["name" =: "Dodgers", "league" =: "American"]
|
||||
]
|
||||
ids `shouldBe` ()
|
||||
it "fails if the document is too big" $ do
|
||||
(db $ insertMany_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException
|
||||
|
||||
|
||||
context "Insert a document with duplicating key" $ do
|
||||
before (insertDuplicateWith insertMany_ `catch` \(_ :: Failure) -> return ()) $ do
|
||||
|
@ -136,6 +149,31 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
liftIO $ (length returnedDocs) `shouldBe` 100000
|
||||
|
||||
describe "insertAll_" $ do
|
||||
it "inserts big documents" $ do
|
||||
let docs = replicate 100 bigDocument
|
||||
db $ insertAll_ "bigDocCollection" docs
|
||||
db $ do
|
||||
cur <- find $ (select [] "bigDocCollection") {limit = 100000, batchSize = 100000}
|
||||
returnedDocs <- rest cur
|
||||
|
||||
liftIO $ (length returnedDocs) `shouldBe` 100
|
||||
it "inserts fine grained big documents" $ do
|
||||
let docs = replicate 1000 fineGrainedBigDocument
|
||||
db $ insertAll_ "bigDocFineGrainedCollection" docs
|
||||
db $ do
|
||||
cur <- find $ (select [] "bigDocFineGrainedCollection") {limit = 100000, batchSize = 100000}
|
||||
returnedDocs <- rest cur
|
||||
|
||||
liftIO $ (length returnedDocs) `shouldBe` 1000
|
||||
it "skips one too big document" $ do
|
||||
db $ insertAll_ "hugeDocCollection" [hugeDocument]
|
||||
db $ do
|
||||
cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000}
|
||||
returnedDocs <- rest cur
|
||||
|
||||
liftIO $ (length returnedDocs) `shouldBe` 0
|
||||
|
||||
describe "rest" $ do
|
||||
it "returns all documents from the collection" $ do
|
||||
let docs = (flip map) [0..6000] $ \i ->
|
||||
|
|
Loading…
Reference in a new issue