Fix for bad behavior when using TailableCursor.

`Database.MongoDB.Query.next` and `nextBatch` prefetch a promise of the next
batch of documents from the server whenever the current batch has been
exhausted. The following call to `next` or `nextBatch` fulfills that promise
(thereby turning it into a concrete batch of documents) and then returns one or
more documents in the batch.

The old behavior was to raise an exception if an empty batch with a nonzero
cursor ID was encountered. This is normal when using tailable cursors, so a
change was required.

Now, `Nothing` is returned with the still-live cursor ID, instead of raising
the exception. Also, prefetched empty batches with nonzero cursor IDs are
refetched once per call to avoid stale data.

This new prefetching behavior does not affect the performance of `next`, except
when calling it repeatedly on a tailable cursor with no new data. In those
(generally avoidable) cases, each call to `next` results in two server calls
instead of one.
This commit is contained in:
Michael S. Craig 2011-12-12 10:31:14 -05:00
parent 6faad5d866
commit fa95b65fad

View file

@ -496,10 +496,16 @@ nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
-- ^ Return next batch of documents in query result, which will be empty if finished. -- ^ Return next batch of documents in query result, which will be empty if finished.
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
-- Pre-fetch next batch promise from server and return current batch. -- Pre-fetch next batch promise from server and return current batch.
Batch limit cid docs <- fulfill dBatch -- Discard pre-fetched batch if empty with nonzero cid.
Batch limit cid docs <- fulfill' dBatch
dBatch' <- if cid /= 0 then nextBatch' limit cid else return $ return (Batch 0 0 []) dBatch' <- if cid /= 0 then nextBatch' limit cid else return $ return (Batch 0 0 [])
return (dBatch', docs) return (dBatch', docs)
where where
fulfill' dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs
then nextBatch' limit cid >>= fulfill
else return b
nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit) nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
@ -507,9 +513,10 @@ next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document
-- ^ Return next document in query result, or Nothing if finished. -- ^ Return next document in query result, or Nothing if finished.
next (Cursor fcol batchSize var) = modifyMVar var nextState where next (Cursor fcol batchSize var) = modifyMVar var nextState where
-- Pre-fetch next batch promise from server when last one in current batch is returned. -- Pre-fetch next batch promise from server when last one in current batch is returned.
-- Discard pre-fetched batch if empty with nonzero cid.
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document) -- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
nextState dBatch = do nextState dBatch = do
Batch limit cid docs <- fulfill dBatch Batch limit cid docs <- fulfill' dBatch
case docs of case docs of
doc : docs' -> do doc : docs' -> do
dBatch' <- if null docs' && cid /= 0 dBatch' <- if null docs' && cid /= 0
@ -518,7 +525,12 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where
return (dBatch', Just doc) return (dBatch', Just doc)
[] -> if cid == 0 [] -> if cid == 0
then return (return $ Batch 0 0 [], Nothing) -- finished then return (return $ Batch 0 0 [], Nothing) -- finished
else error $ "server returned empty batch but says more results on server" else fmap (,Nothing) $ nextBatch' limit cid
fulfill' dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs
then nextBatch' limit cid >>= fulfill
else return b
nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit) nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit