Handle the case when mongodb returns less documents than cursor requested
If we request certain amount of values from a cursor the mongo db server can return less than requested. So, if we provide -100000 then mongodb may return 97899 and close the cursor. Instead of relying on negative values this implementation will use only positive numbers and will close the cursor itself as soon as the driver receives enough results. It fixes the issue #24 from github.
This commit is contained in:
parent
cb912cb952
commit
5e72c8ad61
1 changed files with 45 additions and 32 deletions
|
@ -63,6 +63,7 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar,
|
||||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
|
import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer,
|
||||||
readMVar, modifyMVar)
|
readMVar, modifyMVar)
|
||||||
#endif
|
#endif
|
||||||
|
import Control.Monad (when)
|
||||||
import Control.Monad.Base (MonadBase)
|
import Control.Monad.Base (MonadBase)
|
||||||
import Control.Monad.Error (Error(..))
|
import Control.Monad.Error (Error(..))
|
||||||
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
||||||
|
@ -505,7 +506,7 @@ distinct :: (MonadIO m) => Label -> Selection -> Action m [Value]
|
||||||
-- ^ Fetch distinct values of field in selected documents
|
-- ^ Fetch distinct values of field in selected documents
|
||||||
distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
|
distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
|
||||||
|
|
||||||
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit)
|
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit)
|
||||||
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
|
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
|
||||||
queryRequest isExplain Query{..} = do
|
queryRequest isExplain Query{..} = do
|
||||||
ctx <- ask
|
ctx <- ask
|
||||||
|
@ -515,7 +516,7 @@ queryRequest isExplain Query{..} = do
|
||||||
qOptions = readModeOption rm ++ options
|
qOptions = readModeOption rm ++ options
|
||||||
qFullCollection = db <.> coll selection
|
qFullCollection = db <.> coll selection
|
||||||
qSkip = fromIntegral skip
|
qSkip = fromIntegral skip
|
||||||
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit
|
(qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit)
|
||||||
qProjector = project
|
qProjector = project
|
||||||
mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
|
mOrder = if null sort then Nothing else Just ("$orderby" =: sort)
|
||||||
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
|
mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing
|
||||||
|
@ -524,29 +525,29 @@ queryRequest isExplain Query{..} = do
|
||||||
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
|
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
|
||||||
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
|
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
|
||||||
|
|
||||||
batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit)
|
batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit)
|
||||||
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
|
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
|
||||||
batchSizeRemainingLimit batchSize limit = if limit == 0
|
batchSizeRemainingLimit batchSize mLimit =
|
||||||
then (fromIntegral batchSize', 0) -- no limit
|
case mLimit of
|
||||||
else if 0 < batchSize' && batchSize' < limit
|
Nothing -> (fromIntegral batchSize, Nothing)
|
||||||
then (fromIntegral batchSize', limit - batchSize')
|
Just limit ->
|
||||||
else (- fromIntegral limit, 1)
|
if 0 < batchSize && batchSize < limit
|
||||||
where batchSize' = if batchSize == 1 then 2 else batchSize
|
then (fromIntegral batchSize, Just limit)
|
||||||
-- batchSize 1 is broken because server converts 1 to -1 meaning limit 1
|
else (fromIntegral limit, Just limit)
|
||||||
|
|
||||||
type DelayedBatch = IO Batch
|
type DelayedBatch = IO Batch
|
||||||
-- ^ A promised batch which may fail
|
-- ^ A promised batch which may fail
|
||||||
|
|
||||||
data Batch = Batch Limit CursorId [Document]
|
data Batch = Batch (Maybe Limit) CursorId [Document]
|
||||||
-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch.
|
-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is number of documents to return. Nothing means no limit.
|
||||||
|
|
||||||
request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch
|
request :: (MonadIO m) => [Notice] -> (Request, Maybe Limit) -> Action m DelayedBatch
|
||||||
-- ^ Send notices and request and return promised batch
|
-- ^ Send notices and request and return promised batch
|
||||||
request ns (req, remainingLimit) = do
|
request ns (req, remainingLimit) = do
|
||||||
promise <- call ns req
|
promise <- call ns req
|
||||||
return $ fromReply remainingLimit =<< promise
|
return $ fromReply remainingLimit =<< promise
|
||||||
|
|
||||||
fromReply :: Limit -> Reply -> DelayedBatch
|
fromReply :: Maybe Limit -> Reply -> DelayedBatch
|
||||||
-- ^ Convert Reply to Batch or Failure
|
-- ^ Convert Reply to Batch or Failure
|
||||||
fromReply limit Reply{..} = do
|
fromReply limit Reply{..} = do
|
||||||
mapM_ checkResponseFlag rResponseFlags
|
mapM_ checkResponseFlag rResponseFlags
|
||||||
|
@ -582,20 +583,25 @@ nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document]
|
||||||
-- ^ Return next batch of documents in query result, which will be empty if finished.
|
-- ^ Return next batch of documents in query result, which will be empty if finished.
|
||||||
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
|
nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do
|
||||||
-- Pre-fetch next batch promise from server and return current batch.
|
-- Pre-fetch next batch promise from server and return current batch.
|
||||||
Batch limit cid docs <- fulfill' fcol batchSize dBatch
|
Batch mLimit cid docs <- fulfill' fcol batchSize dBatch
|
||||||
dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 [])
|
let newLimit = do
|
||||||
return (dBatch', docs)
|
limit <- mLimit
|
||||||
|
return $ limit - (min limit $ fromIntegral $ length docs)
|
||||||
|
dBatch' <- if cid /= 0 && newLimit /= (Just 0) then nextBatch' fcol batchSize newLimit cid else return $ return (Batch (Just 0) 0 [])
|
||||||
|
when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]]
|
||||||
|
return (dBatch', maybe docs (\l -> take (fromIntegral l) docs) mLimit)
|
||||||
|
|
||||||
fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch
|
fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch
|
||||||
-- Discard pre-fetched batch if empty with nonzero cid.
|
-- Discard pre-fetched batch if empty with nonzero cid.
|
||||||
fulfill' fcol batchSize dBatch = do
|
fulfill' fcol batchSize dBatch = do
|
||||||
b@(Batch limit cid docs) <- fulfill dBatch
|
b@(Batch limit cid docs) <- fulfill dBatch
|
||||||
if cid /= 0 && null docs
|
if cid /= 0 && null docs && (limit > (Just 0))
|
||||||
then nextBatch' fcol batchSize limit cid >>= fulfill
|
then nextBatch' fcol batchSize limit cid >>= fulfill
|
||||||
else return b
|
else return b
|
||||||
|
|
||||||
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Limit -> CursorId -> Action m DelayedBatch
|
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> (Maybe Limit) -> CursorId -> Action m DelayedBatch
|
||||||
nextBatch' fcol batchSize limit cid = request [] (GetMore fcol batchSize' cid, remLimit)
|
nextBatch' fcol batchSize limit cid = do
|
||||||
|
request [] (GetMore fcol batchSize' cid, remLimit)
|
||||||
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
|
where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit
|
||||||
|
|
||||||
next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document)
|
next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document)
|
||||||
|
@ -604,16 +610,23 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where
|
||||||
-- Pre-fetch next batch promise from server when last one in current batch is returned.
|
-- Pre-fetch next batch promise from server when last one in current batch is returned.
|
||||||
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
|
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
|
||||||
nextState dBatch = do
|
nextState dBatch = do
|
||||||
Batch limit cid docs <- fulfill' fcol batchSize dBatch
|
Batch mLimit cid docs <- fulfill' fcol batchSize dBatch
|
||||||
|
if mLimit == (Just 0)
|
||||||
|
then return (return $ Batch (Just 0) 0 [], Nothing)
|
||||||
|
else
|
||||||
case docs of
|
case docs of
|
||||||
doc : docs' -> do
|
doc : docs' -> do
|
||||||
dBatch' <- if null docs' && cid /= 0
|
let newLimit = do
|
||||||
then nextBatch' fcol batchSize limit cid
|
limit <- mLimit
|
||||||
else return $ return (Batch limit cid docs')
|
return $ limit - 1
|
||||||
|
dBatch' <- if null docs' && cid /= 0 && (newLimit > (Just 0))
|
||||||
|
then nextBatch' fcol batchSize newLimit cid
|
||||||
|
else return $ return (Batch newLimit cid docs')
|
||||||
|
when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]]
|
||||||
return (dBatch', Just doc)
|
return (dBatch', Just doc)
|
||||||
[] -> if cid == 0
|
[] -> if cid == 0
|
||||||
then return (return $ Batch 0 0 [], Nothing) -- finished
|
then return (return $ Batch (Just 0) 0 [], Nothing) -- finished
|
||||||
else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid
|
else fmap (,Nothing) $ nextBatch' fcol batchSize mLimit cid
|
||||||
|
|
||||||
nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document]
|
nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document]
|
||||||
-- ^ Return next N documents or less if end is reached
|
-- ^ Return next N documents or less if end is reached
|
||||||
|
@ -627,7 +640,7 @@ closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m ()
|
||||||
closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do
|
closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do
|
||||||
Batch _ cid _ <- fulfill dBatch
|
Batch _ cid _ <- fulfill dBatch
|
||||||
unless (cid == 0) $ send [KillCursors [cid]]
|
unless (cid == 0) $ send [KillCursors [cid]]
|
||||||
return $ (return $ Batch 0 0 [], ())
|
return $ (return $ Batch (Just 0) 0 [], ())
|
||||||
|
|
||||||
isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool
|
isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool
|
||||||
isCursorClosed (Cursor _ _ var) = do
|
isCursorClosed (Cursor _ _ var) = do
|
||||||
|
@ -752,7 +765,7 @@ runMR mr = do
|
||||||
Just (String coll) -> find $ query [] coll
|
Just (String coll) -> find $ query [] coll
|
||||||
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
|
Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc)
|
||||||
Just x -> error $ "unexpected map-reduce result field: " ++ show x
|
Just x -> error $ "unexpected map-reduce result field: " ++ show x
|
||||||
Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res)
|
Nothing -> newCursor "" "" 0 $ return $ Batch (Just 0) 0 (at "results" res)
|
||||||
|
|
||||||
runMR' :: (MonadIO m) => MapReduce -> Action m MRResult
|
runMR' :: (MonadIO m) => MapReduce -> Action m MRResult
|
||||||
-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).
|
-- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).
|
||||||
|
|
Loading…
Reference in a new issue