diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 2b1e0b2..5fcc617 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -63,6 +63,7 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar, import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, readMVar, modifyMVar) #endif +import Control.Monad (when) import Control.Monad.Base (MonadBase) import Control.Monad.Error (Error(..)) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) @@ -505,7 +506,7 @@ distinct :: (MonadIO m) => Label -> Selection -> Action m [Value] -- ^ Fetch distinct values of field in selected documents distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel] -queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit) +queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. queryRequest isExplain Query{..} = do ctx <- ask @@ -515,7 +516,7 @@ queryRequest isExplain Query{..} = do qOptions = readModeOption rm ++ options qFullCollection = db <.> coll selection qSkip = fromIntegral skip - (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit + (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit) qProjector = project mOrder = if null sort then Nothing else Just ("$orderby" =: sort) mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing @@ -524,29 +525,29 @@ queryRequest isExplain Query{..} = do special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection -batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit) +batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit -batchSizeRemainingLimit batchSize limit = if limit == 0 - then (fromIntegral batchSize', 0) -- no limit - else if 0 < batchSize' && batchSize' < limit - then (fromIntegral batchSize', limit - batchSize') - else (- fromIntegral limit, 1) - where batchSize' = if batchSize == 1 then 2 else batchSize - -- batchSize 1 is broken because server converts 1 to -1 meaning limit 1 +batchSizeRemainingLimit batchSize mLimit = + case mLimit of + Nothing -> (fromIntegral batchSize, Nothing) + Just limit -> + if 0 < batchSize && batchSize < limit + then (fromIntegral batchSize, Just limit) + else (fromIntegral limit, Just limit) type DelayedBatch = IO Batch -- ^ A promised batch which may fail -data Batch = Batch Limit CursorId [Document] --- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch. +data Batch = Batch (Maybe Limit) CursorId [Document] +-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is number of documents to return. Nothing means no limit. -request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch +request :: (MonadIO m) => [Notice] -> (Request, Maybe Limit) -> Action m DelayedBatch -- ^ Send notices and request and return promised batch request ns (req, remainingLimit) = do promise <- call ns req return $ fromReply remainingLimit =<< promise -fromReply :: Limit -> Reply -> DelayedBatch +fromReply :: Maybe Limit -> Reply -> DelayedBatch -- ^ Convert Reply to Batch or Failure fromReply limit Reply{..} = do mapM_ checkResponseFlag rResponseFlags @@ -582,20 +583,25 @@ nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document] -- ^ Return next batch of documents in query result, which will be empty if finished. nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do -- Pre-fetch next batch promise from server and return current batch. - Batch limit cid docs <- fulfill' fcol batchSize dBatch - dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 []) - return (dBatch', docs) + Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + let newLimit = do + limit <- mLimit + return $ limit - (min limit $ fromIntegral $ length docs) + dBatch' <- if cid /= 0 && newLimit /= (Just 0) then nextBatch' fcol batchSize newLimit cid else return $ return (Batch (Just 0) 0 []) + when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]] + return (dBatch', maybe docs (\l -> take (fromIntegral l) docs) mLimit) fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch -- Discard pre-fetched batch if empty with nonzero cid. fulfill' fcol batchSize dBatch = do b@(Batch limit cid docs) <- fulfill dBatch - if cid /= 0 && null docs + if cid /= 0 && null docs && (limit > (Just 0)) then nextBatch' fcol batchSize limit cid >>= fulfill else return b -nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Limit -> CursorId -> Action m DelayedBatch -nextBatch' fcol batchSize limit cid = request [] (GetMore fcol batchSize' cid, remLimit) +nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> (Maybe Limit) -> CursorId -> Action m DelayedBatch +nextBatch' fcol batchSize limit cid = do + request [] (GetMore fcol batchSize' cid, remLimit) where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document) @@ -604,16 +610,23 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where -- Pre-fetch next batch promise from server when last one in current batch is returned. -- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document) nextState dBatch = do - Batch limit cid docs <- fulfill' fcol batchSize dBatch - case docs of - doc : docs' -> do - dBatch' <- if null docs' && cid /= 0 - then nextBatch' fcol batchSize limit cid - else return $ return (Batch limit cid docs') - return (dBatch', Just doc) - [] -> if cid == 0 - then return (return $ Batch 0 0 [], Nothing) -- finished - else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid + Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + if mLimit == (Just 0) + then return (return $ Batch (Just 0) 0 [], Nothing) + else + case docs of + doc : docs' -> do + let newLimit = do + limit <- mLimit + return $ limit - 1 + dBatch' <- if null docs' && cid /= 0 && (newLimit > (Just 0)) + then nextBatch' fcol batchSize newLimit cid + else return $ return (Batch newLimit cid docs') + when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]] + return (dBatch', Just doc) + [] -> if cid == 0 + then return (return $ Batch (Just 0) 0 [], Nothing) -- finished + else fmap (,Nothing) $ nextBatch' fcol batchSize mLimit cid nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document] -- ^ Return next N documents or less if end is reached @@ -627,7 +640,7 @@ closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m () closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do Batch _ cid _ <- fulfill dBatch unless (cid == 0) $ send [KillCursors [cid]] - return $ (return $ Batch 0 0 [], ()) + return $ (return $ Batch (Just 0) 0 [], ()) isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool isCursorClosed (Cursor _ _ var) = do @@ -752,7 +765,7 @@ runMR mr = do Just (String coll) -> find $ query [] coll Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc) Just x -> error $ "unexpected map-reduce result field: " ++ show x - Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res) + Nothing -> newCursor "" "" 0 $ return $ Batch (Just 0) 0 (at "results" res) runMR' :: (MonadIO m) => MapReduce -> Action m MRResult -- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript).