Merge pull request #25 from VictorDenisov/master

Handle the case when mongodb returns less documents than cursor reque…
This commit is contained in:
Greg Weber 2015-07-31 17:59:04 -07:00
commit f385dade31
3 changed files with 68 additions and 32 deletions

View file

@ -63,6 +63,8 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
readMVar, modifyMVar) readMVar, modifyMVar)
#endif #endif
import Control.Applicative ((<$>))
import Control.Monad (when)
import Control.Monad.Base (MonadBase) import Control.Monad.Base (MonadBase)
import Control.Monad.Error (Error(..)) import Control.Monad.Error (Error(..))
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
@ -505,7 +507,7 @@ distinct :: (MonadIO m) => Label -> Selection -> Action m [Value]
-- ^ Fetch distinct values of field in selected documents -- ^ Fetch distinct values of field in selected documents
distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel] distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit) queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit)
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
queryRequest isExplain Query{..} = do queryRequest isExplain Query{..} = do
ctx <- ask ctx <- ask
@ -515,7 +517,7 @@ queryRequest isExplain Query{..} = do
qOptions = readModeOption rm ++ options qOptions = readModeOption rm ++ options
qFullCollection = db <.> coll selection qFullCollection = db <.> coll selection
qSkip = fromIntegral skip qSkip = fromIntegral skip
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit)
qProjector = project qProjector = project
mOrder = if null sort then Nothing else Just ("$orderby" =: sort) mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
@ -524,29 +526,31 @@ queryRequest isExplain Query{..} = do
special = catMaybes [mOrder, mSnapshot, mHint, mExplain] special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit) batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit)
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit -- ^ Given batchSize and limit return P.qBatchSize and remaining limit
batchSizeRemainingLimit batchSize limit = if limit == 0 batchSizeRemainingLimit batchSize mLimit =
then (fromIntegral batchSize', 0) -- no limit let remaining =
else if 0 < batchSize' && batchSize' < limit case mLimit of
then (fromIntegral batchSize', limit - batchSize') Nothing -> batchSize
else (- fromIntegral limit, 1) Just limit ->
where batchSize' = if batchSize == 1 then 2 else batchSize if 0 < batchSize && batchSize < limit
-- batchSize 1 is broken because server converts 1 to -1 meaning limit 1 then batchSize
else limit
in (fromIntegral remaining, mLimit)
type DelayedBatch = IO Batch type DelayedBatch = IO Batch
-- ^ A promised batch which may fail -- ^ A promised batch which may fail
data Batch = Batch Limit CursorId [Document] data Batch = Batch (Maybe Limit) CursorId [Document]
-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch. -- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is number of documents to return. Nothing means no limit.
request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch request :: (MonadIO m) => [Notice] -> (Request, Maybe Limit) -> Action m DelayedBatch
-- ^ Send notices and request and return promised batch -- ^ Send notices and request and return promised batch
request ns (req, remainingLimit) = do request ns (req, remainingLimit) = do
promise <- call ns req promise <- call ns req
return $ fromReply remainingLimit =<< promise return $ fromReply remainingLimit =<< promise
fromReply :: Limit -> Reply -> DelayedBatch fromReply :: Maybe Limit -> Reply -> DelayedBatch
-- ^ Convert Reply to Batch or Failure -- ^ Convert Reply to Batch or Failure
fromReply limit Reply{..} = do fromReply limit Reply{..} = do
mapM_ checkResponseFlag rResponseFlags mapM_ checkResponseFlag rResponseFlags
@ -582,20 +586,31 @@ nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
-- ^ Return next batch of documents in query result, which will be empty if finished. -- ^ Return next batch of documents in query result, which will be empty if finished.
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
-- Pre-fetch next batch promise from server and return current batch. -- Pre-fetch next batch promise from server and return current batch.
Batch limit cid docs <- fulfill' fcol batchSize dBatch Batch mLimit cid docs <- fulfill' fcol batchSize dBatch
dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 []) let newLimit = do
return (dBatch', docs) limit <- mLimit
return $ limit - (min limit $ fromIntegral $ length docs)
let emptyBatch = return $ Batch (Just 0) 0 []
let getNextBatch = nextBatch' fcol batchSize newLimit cid
let resultDocs = (maybe id (take . fromIntegral) mLimit) docs
case (cid, newLimit) of
(0, _) -> return (emptyBatch, resultDocs)
(_, Just 0) -> do
send [KillCursors [cid]]
return (emptyBatch, resultDocs)
(_, _) -> (, resultDocs) <$> getNextBatch
fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch
-- Discard pre-fetched batch if empty with nonzero cid. -- Discard pre-fetched batch if empty with nonzero cid.
fulfill' fcol batchSize dBatch = do fulfill' fcol batchSize dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs if cid /= 0 && null docs && (limit > (Just 0))
then nextBatch' fcol batchSize limit cid >>= fulfill then nextBatch' fcol batchSize limit cid >>= fulfill
else return b else return b
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Limit -> CursorId -> Action m DelayedBatch nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> (Maybe Limit) -> CursorId -> Action m DelayedBatch
nextBatch' fcol batchSize limit cid = request [] (GetMore fcol batchSize' cid, remLimit) nextBatch' fcol batchSize limit cid = do
request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document) next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document)
@ -604,16 +619,23 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where
-- Pre-fetch next batch promise from server when last one in current batch is returned. -- Pre-fetch next batch promise from server when last one in current batch is returned.
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document) -- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
nextState dBatch = do nextState dBatch = do
Batch limit cid docs <- fulfill' fcol batchSize dBatch Batch mLimit cid docs <- fulfill' fcol batchSize dBatch
if mLimit == (Just 0)
then return (return $ Batch (Just 0) 0 [], Nothing)
else
case docs of case docs of
doc : docs' -> do doc : docs' -> do
dBatch' <- if null docs' && cid /= 0 let newLimit = do
then nextBatch' fcol batchSize limit cid limit <- mLimit
else return $ return (Batch limit cid docs') return $ limit - 1
dBatch' <- if null docs' && cid /= 0 && (newLimit > (Just 0))
then nextBatch' fcol batchSize newLimit cid
else return $ return (Batch newLimit cid docs')
when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]]
return (dBatch', Just doc) return (dBatch', Just doc)
[] -> if cid == 0 [] -> if cid == 0
then return (return $ Batch 0 0 [], Nothing) -- finished then return (return $ Batch (Just 0) 0 [], Nothing) -- finished
else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid else fmap (,Nothing) $ nextBatch' fcol batchSize mLimit cid
nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document] nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document]
-- ^ Return next N documents or less if end is reached -- ^ Return next N documents or less if end is reached
@ -627,7 +649,7 @@ closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m ()
closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do
Batch _ cid _ <- fulfill dBatch Batch _ cid _ <- fulfill dBatch
unless (cid == 0) $ send [KillCursors [cid]] unless (cid == 0) $ send [KillCursors [cid]]
return $ (return $ Batch 0 0 [], ()) return $ (return $ Batch (Just 0) 0 [], ())
isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool
isCursorClosed (Cursor _ _ var) = do isCursorClosed (Cursor _ _ var) = do
@ -752,7 +774,7 @@ runMR mr = do
Just (String coll) -> find $ query [] coll Just (String coll) -> find $ query [] coll
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc) Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
Just x -> error $ "unexpected map-reduce result field: " ++ show x Just x -> error $ "unexpected map-reduce result field: " ++ show x
Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res) Nothing -> newCursor "" "" 0 $ return $ Batch (Just 0) 0 (at "results" res)
runMR' :: (MonadIO m) => MapReduce -> Action m MRResult runMR' :: (MonadIO m) => MapReduce -> Action m MRResult
-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript). -- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).

View file

@ -67,6 +67,7 @@ test-suite test
-- now. It's too difficult to support old versions of GHC and -- now. It's too difficult to support old versions of GHC and
-- the new version of time. -- the new version of time.
, old-locale , old-locale
, text
, time , time
default-language: Haskell2010 default-language: Haskell2010

View file

@ -5,6 +5,8 @@ module QuerySpec (spec) where
import TestImport import TestImport
import Control.Exception import Control.Exception
import qualified Data.Text as T
testDBName :: Database testDBName :: Database
testDBName = "mongodb-haskell-test" testDBName = "mongodb-haskell-test"
@ -122,6 +124,17 @@ spec = around withCleanDatabase $ do
it "raises exception" $ it "raises exception" $
insertDuplicateWith insertAll_ `shouldThrow` anyException insertDuplicateWith insertAll_ `shouldThrow` anyException
describe "insertAll_" $ do
it "inserts documents and receives 100 000 of them" $ do
let docs = (flip map) [0..200000] $ \i ->
["name" =: (T.pack $ "name " ++ (show i))]
db $ insertAll_ "bigCollection" docs
db $ do
cur <- find $ (select [] "bigCollection") {limit = 100000, batchSize = 100000}
returnedDocs <- rest cur
liftIO $ (length returnedDocs) `shouldBe` 100000
describe "aggregate" $ do describe "aggregate" $ do
it "aggregates to normalize and sort documents" $ do it "aggregates to normalize and sort documents" $ do
db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]] db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]