From 5e72c8ad610a7aa050dbb1f23d830d07aeaac427 Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Wed, 17 Jun 2015 00:24:12 -0700 Subject: [PATCH 1/3] Handle the case when mongodb returns less documents than cursor requested If we request certain amount of values from a cursor the mongo db server can return less than requested. So, if we provide -100000 then mongodb may return 97899 and close the cursor. Instead of relying on negative values this implementation will use only positive numbers and will close the cursor itself as soon as the driver receives enough results. It fixes the issue #24 from github. --- Database/MongoDB/Query.hs | 77 +++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 2b1e0b2..5fcc617 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -63,6 +63,7 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar, import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, readMVar, modifyMVar) #endif +import Control.Monad (when) import Control.Monad.Base (MonadBase) import Control.Monad.Error (Error(..)) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) @@ -505,7 +506,7 @@ distinct :: (MonadIO m) => Label -> Selection -> Action m [Value] -- ^ Fetch distinct values of field in selected documents distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel] -queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit) +queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. queryRequest isExplain Query{..} = do ctx <- ask @@ -515,7 +516,7 @@ queryRequest isExplain Query{..} = do qOptions = readModeOption rm ++ options qFullCollection = db <.> coll selection qSkip = fromIntegral skip - (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit + (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit) qProjector = project mOrder = if null sort then Nothing else Just ("$orderby" =: sort) mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing @@ -524,29 +525,29 @@ queryRequest isExplain Query{..} = do special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection -batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit) +batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit -batchSizeRemainingLimit batchSize limit = if limit == 0 - then (fromIntegral batchSize', 0) -- no limit - else if 0 < batchSize' && batchSize' < limit - then (fromIntegral batchSize', limit - batchSize') - else (- fromIntegral limit, 1) - where batchSize' = if batchSize == 1 then 2 else batchSize - -- batchSize 1 is broken because server converts 1 to -1 meaning limit 1 +batchSizeRemainingLimit batchSize mLimit = + case mLimit of + Nothing -> (fromIntegral batchSize, Nothing) + Just limit -> + if 0 < batchSize && batchSize < limit + then (fromIntegral batchSize, Just limit) + else (fromIntegral limit, Just limit) type DelayedBatch = IO Batch -- ^ A promised batch which may fail -data Batch = Batch Limit CursorId [Document] --- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch. +data Batch = Batch (Maybe Limit) CursorId [Document] +-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is number of documents to return. Nothing means no limit. -request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch +request :: (MonadIO m) => [Notice] -> (Request, Maybe Limit) -> Action m DelayedBatch -- ^ Send notices and request and return promised batch request ns (req, remainingLimit) = do promise <- call ns req return $ fromReply remainingLimit =<< promise -fromReply :: Limit -> Reply -> DelayedBatch +fromReply :: Maybe Limit -> Reply -> DelayedBatch -- ^ Convert Reply to Batch or Failure fromReply limit Reply{..} = do mapM_ checkResponseFlag rResponseFlags @@ -582,20 +583,25 @@ nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document] -- ^ Return next batch of documents in query result, which will be empty if finished. nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do -- Pre-fetch next batch promise from server and return current batch. - Batch limit cid docs <- fulfill' fcol batchSize dBatch - dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 []) - return (dBatch', docs) + Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + let newLimit = do + limit <- mLimit + return $ limit - (min limit $ fromIntegral $ length docs) + dBatch' <- if cid /= 0 && newLimit /= (Just 0) then nextBatch' fcol batchSize newLimit cid else return $ return (Batch (Just 0) 0 []) + when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]] + return (dBatch', maybe docs (\l -> take (fromIntegral l) docs) mLimit) fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch -- Discard pre-fetched batch if empty with nonzero cid. fulfill' fcol batchSize dBatch = do b@(Batch limit cid docs) <- fulfill dBatch - if cid /= 0 && null docs + if cid /= 0 && null docs && (limit > (Just 0)) then nextBatch' fcol batchSize limit cid >>= fulfill else return b -nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Limit -> CursorId -> Action m DelayedBatch -nextBatch' fcol batchSize limit cid = request [] (GetMore fcol batchSize' cid, remLimit) +nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> (Maybe Limit) -> CursorId -> Action m DelayedBatch +nextBatch' fcol batchSize limit cid = do + request [] (GetMore fcol batchSize' cid, remLimit) where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document) @@ -604,16 +610,23 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where -- Pre-fetch next batch promise from server when last one in current batch is returned. -- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document) nextState dBatch = do - Batch limit cid docs <- fulfill' fcol batchSize dBatch - case docs of - doc : docs' -> do - dBatch' <- if null docs' && cid /= 0 - then nextBatch' fcol batchSize limit cid - else return $ return (Batch limit cid docs') - return (dBatch', Just doc) - [] -> if cid == 0 - then return (return $ Batch 0 0 [], Nothing) -- finished - else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid + Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + if mLimit == (Just 0) + then return (return $ Batch (Just 0) 0 [], Nothing) + else + case docs of + doc : docs' -> do + let newLimit = do + limit <- mLimit + return $ limit - 1 + dBatch' <- if null docs' && cid /= 0 && (newLimit > (Just 0)) + then nextBatch' fcol batchSize newLimit cid + else return $ return (Batch newLimit cid docs') + when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]] + return (dBatch', Just doc) + [] -> if cid == 0 + then return (return $ Batch (Just 0) 0 [], Nothing) -- finished + else fmap (,Nothing) $ nextBatch' fcol batchSize mLimit cid nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document] -- ^ Return next N documents or less if end is reached @@ -627,7 +640,7 @@ closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m () closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do Batch _ cid _ <- fulfill dBatch unless (cid == 0) $ send [KillCursors [cid]] - return $ (return $ Batch 0 0 [], ()) + return $ (return $ Batch (Just 0) 0 [], ()) isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool isCursorClosed (Cursor _ _ var) = do @@ -752,7 +765,7 @@ runMR mr = do Just (String coll) -> find $ query [] coll Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc) Just x -> error $ "unexpected map-reduce result field: " ++ show x - Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res) + Nothing -> newCursor "" "" 0 $ return $ Batch (Just 0) 0 (at "results" res) runMR' :: (MonadIO m) => MapReduce -> Action m MRResult -- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript). From 0038e4163ce0a28ecef3ca09f449b2b55056dffa Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Sat, 20 Jun 2015 21:10:35 -0700 Subject: [PATCH 2/3] Add big test --- mongoDB.cabal | 1 + test/QuerySpec.hs | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/mongoDB.cabal b/mongoDB.cabal index 481df2d..441cd58 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -67,6 +67,7 @@ test-suite test -- now. It's too difficult to support old versions of GHC and -- the new version of time. , old-locale + , text , time default-language: Haskell2010 diff --git a/test/QuerySpec.hs b/test/QuerySpec.hs index 8c5f93c..510d408 100644 --- a/test/QuerySpec.hs +++ b/test/QuerySpec.hs @@ -5,6 +5,8 @@ module QuerySpec (spec) where import TestImport import Control.Exception +import qualified Data.Text as T + testDBName :: Database testDBName = "mongodb-haskell-test" @@ -122,6 +124,17 @@ spec = around withCleanDatabase $ do it "raises exception" $ insertDuplicateWith insertAll_ `shouldThrow` anyException + describe "insertAll_" $ do + it "inserts documents and receives 100 000 of them" $ do + let docs = (flip map) [0..200000] $ \i -> + ["name" =: (T.pack $ "name " ++ (show i))] + db $ insertAll_ "bigCollection" docs + db $ do + cur <- find $ (select [] "bigCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 100000 + describe "aggregate" $ do it "aggregates to normalize and sort documents" $ do db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]] From dca5ae051a7531ada3108730bbaad7af8d16456c Mon Sep 17 00:00:00 2001 From: Victor Denisov Date: Fri, 31 Jul 2015 03:25:01 -0700 Subject: [PATCH 3/3] Apply reviewer's comments --- Database/MongoDB/Query.hs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 5fcc617..12c156a 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -63,6 +63,7 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar, import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, readMVar, modifyMVar) #endif +import Control.Applicative ((<$>)) import Control.Monad (when) import Control.Monad.Base (MonadBase) import Control.Monad.Error (Error(..)) @@ -528,12 +529,14 @@ queryRequest isExplain Query{..} = do batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit batchSizeRemainingLimit batchSize mLimit = - case mLimit of - Nothing -> (fromIntegral batchSize, Nothing) - Just limit -> - if 0 < batchSize && batchSize < limit - then (fromIntegral batchSize, Just limit) - else (fromIntegral limit, Just limit) + let remaining = + case mLimit of + Nothing -> batchSize + Just limit -> + if 0 < batchSize && batchSize < limit + then batchSize + else limit + in (fromIntegral remaining, mLimit) type DelayedBatch = IO Batch -- ^ A promised batch which may fail @@ -587,9 +590,15 @@ nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do let newLimit = do limit <- mLimit return $ limit - (min limit $ fromIntegral $ length docs) - dBatch' <- if cid /= 0 && newLimit /= (Just 0) then nextBatch' fcol batchSize newLimit cid else return $ return (Batch (Just 0) 0 []) - when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]] - return (dBatch', maybe docs (\l -> take (fromIntegral l) docs) mLimit) + let emptyBatch = return $ Batch (Just 0) 0 [] + let getNextBatch = nextBatch' fcol batchSize newLimit cid + let resultDocs = (maybe id (take . fromIntegral) mLimit) docs + case (cid, newLimit) of + (0, _) -> return (emptyBatch, resultDocs) + (_, Just 0) -> do + send [KillCursors [cid]] + return (emptyBatch, resultDocs) + (_, _) -> (, resultDocs) <$> getNextBatch fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch -- Discard pre-fetched batch if empty with nonzero cid.