Merge pull request #12 from mkscrg/master

Fix for tailable cursor
This commit is contained in:
Tony Hannan 2012-01-23 16:11:43 -08:00
commit 5c2a296fa1

View file

@ -496,10 +496,16 @@ nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
-- ^ Return next batch of documents in query result, which will be empty if finished. -- ^ Return next batch of documents in query result, which will be empty if finished.
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
-- Pre-fetch next batch promise from server and return current batch. -- Pre-fetch next batch promise from server and return current batch.
Batch limit cid docs <- fulfill dBatch -- Discard pre-fetched batch if empty with nonzero cid.
Batch limit cid docs <- fulfill' dBatch
dBatch' <- if cid /= 0 then nextBatch' limit cid else return $ return (Batch 0 0 []) dBatch' <- if cid /= 0 then nextBatch' limit cid else return $ return (Batch 0 0 [])
return (dBatch', docs) return (dBatch', docs)
where where
fulfill' dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs
then nextBatch' limit cid >>= fulfill
else return b
nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit) nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
@ -507,9 +513,10 @@ next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document
-- ^ Return next document in query result, or Nothing if finished. -- ^ Return next document in query result, or Nothing if finished.
next (Cursor fcol batchSize var) = modifyMVar var nextState where next (Cursor fcol batchSize var) = modifyMVar var nextState where
-- Pre-fetch next batch promise from server when last one in current batch is returned. -- Pre-fetch next batch promise from server when last one in current batch is returned.
-- Discard pre-fetched batch if empty with nonzero cid.
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document) -- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
nextState dBatch = do nextState dBatch = do
Batch limit cid docs <- fulfill dBatch Batch limit cid docs <- fulfill' dBatch
case docs of case docs of
doc : docs' -> do doc : docs' -> do
dBatch' <- if null docs' && cid /= 0 dBatch' <- if null docs' && cid /= 0
@ -518,7 +525,12 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where
return (dBatch', Just doc) return (dBatch', Just doc)
[] -> if cid == 0 [] -> if cid == 0
then return (return $ Batch 0 0 [], Nothing) -- finished then return (return $ Batch 0 0 [], Nothing) -- finished
else error $ "server returned empty batch but says more results on server" else fmap (,Nothing) $ nextBatch' limit cid
fulfill' dBatch = do
b@(Batch limit cid docs) <- fulfill dBatch
if cid /= 0 && null docs
then nextBatch' limit cid >>= fulfill
else return b
nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit) nextBatch' limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit