diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 2b1e0b2..12c156a 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -63,6 +63,8 @@ import Control.Concurrent.MVar.Lifted (MVar, newMVar, mkWeakMVar, import Control.Concurrent.MVar.Lifted (MVar, newMVar, addMVarFinalizer, readMVar, modifyMVar) #endif +import Control.Applicative ((<$>)) +import Control.Monad (when) import Control.Monad.Base (MonadBase) import Control.Monad.Error (Error(..)) import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local) @@ -505,7 +507,7 @@ distinct :: (MonadIO m) => Label -> Selection -> Action m [Value] -- ^ Fetch distinct values of field in selected documents distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel] -queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Limit) +queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit) -- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute. queryRequest isExplain Query{..} = do ctx <- ask @@ -515,7 +517,7 @@ queryRequest isExplain Query{..} = do qOptions = readModeOption rm ++ options qFullCollection = db <.> coll selection qSkip = fromIntegral skip - (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize limit + (qBatchSize, remainingLimit) = batchSizeRemainingLimit batchSize (if limit == 0 then Nothing else Just limit) qProjector = project mOrder = if null sort then Nothing else Just ("$orderby" =: sort) mSnapshot = if snapshot then Just ("$snapshot" =: True) else Nothing @@ -524,29 +526,31 @@ queryRequest isExplain Query{..} = do special = catMaybes [mOrder, mSnapshot, mHint, mExplain] qSelector = if null special then s else ("$query" =: s) : special where s = selector selection -batchSizeRemainingLimit :: BatchSize -> Limit -> (Int32, Limit) +batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit) -- ^ Given batchSize and limit return P.qBatchSize and remaining limit -batchSizeRemainingLimit batchSize limit = if limit == 0 - then (fromIntegral batchSize', 0) -- no limit - else if 0 < batchSize' && batchSize' < limit - then (fromIntegral batchSize', limit - batchSize') - else (- fromIntegral limit, 1) - where batchSize' = if batchSize == 1 then 2 else batchSize - -- batchSize 1 is broken because server converts 1 to -1 meaning limit 1 +batchSizeRemainingLimit batchSize mLimit = + let remaining = + case mLimit of + Nothing -> batchSize + Just limit -> + if 0 < batchSize && batchSize < limit + then batchSize + else limit + in (fromIntegral remaining, mLimit) type DelayedBatch = IO Batch -- ^ A promised batch which may fail -data Batch = Batch Limit CursorId [Document] --- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is remaining limit for next fetch. +data Batch = Batch (Maybe Limit) CursorId [Document] +-- ^ CursorId = 0 means cursor is finished. Documents is remaining documents to serve in current batch. Limit is number of documents to return. Nothing means no limit. -request :: (MonadIO m) => [Notice] -> (Request, Limit) -> Action m DelayedBatch +request :: (MonadIO m) => [Notice] -> (Request, Maybe Limit) -> Action m DelayedBatch -- ^ Send notices and request and return promised batch request ns (req, remainingLimit) = do promise <- call ns req return $ fromReply remainingLimit =<< promise -fromReply :: Limit -> Reply -> DelayedBatch +fromReply :: Maybe Limit -> Reply -> DelayedBatch -- ^ Convert Reply to Batch or Failure fromReply limit Reply{..} = do mapM_ checkResponseFlag rResponseFlags @@ -582,20 +586,31 @@ nextBatch :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m [Document] -- ^ Return next batch of documents in query result, which will be empty if finished. nextBatch (Cursor fcol batchSize var) = modifyMVar var $ \dBatch -> do -- Pre-fetch next batch promise from server and return current batch. - Batch limit cid docs <- fulfill' fcol batchSize dBatch - dBatch' <- if cid /= 0 then nextBatch' fcol batchSize limit cid else return $ return (Batch 0 0 []) - return (dBatch', docs) + Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + let newLimit = do + limit <- mLimit + return $ limit - (min limit $ fromIntegral $ length docs) + let emptyBatch = return $ Batch (Just 0) 0 [] + let getNextBatch = nextBatch' fcol batchSize newLimit cid + let resultDocs = (maybe id (take . fromIntegral) mLimit) docs + case (cid, newLimit) of + (0, _) -> return (emptyBatch, resultDocs) + (_, Just 0) -> do + send [KillCursors [cid]] + return (emptyBatch, resultDocs) + (_, _) -> (, resultDocs) <$> getNextBatch fulfill' :: (MonadIO m) => FullCollection -> BatchSize -> DelayedBatch -> Action m Batch -- Discard pre-fetched batch if empty with nonzero cid. fulfill' fcol batchSize dBatch = do b@(Batch limit cid docs) <- fulfill dBatch - if cid /= 0 && null docs + if cid /= 0 && null docs && (limit > (Just 0)) then nextBatch' fcol batchSize limit cid >>= fulfill else return b -nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Limit -> CursorId -> Action m DelayedBatch -nextBatch' fcol batchSize limit cid = request [] (GetMore fcol batchSize' cid, remLimit) +nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> (Maybe Limit) -> CursorId -> Action m DelayedBatch +nextBatch' fcol batchSize limit cid = do + request [] (GetMore fcol batchSize' cid, remLimit) where (batchSize', remLimit) = batchSizeRemainingLimit batchSize limit next :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m (Maybe Document) @@ -604,16 +619,23 @@ next (Cursor fcol batchSize var) = modifyMVar var nextState where -- Pre-fetch next batch promise from server when last one in current batch is returned. -- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document) nextState dBatch = do - Batch limit cid docs <- fulfill' fcol batchSize dBatch - case docs of - doc : docs' -> do - dBatch' <- if null docs' && cid /= 0 - then nextBatch' fcol batchSize limit cid - else return $ return (Batch limit cid docs') - return (dBatch', Just doc) - [] -> if cid == 0 - then return (return $ Batch 0 0 [], Nothing) -- finished - else fmap (,Nothing) $ nextBatch' fcol batchSize limit cid + Batch mLimit cid docs <- fulfill' fcol batchSize dBatch + if mLimit == (Just 0) + then return (return $ Batch (Just 0) 0 [], Nothing) + else + case docs of + doc : docs' -> do + let newLimit = do + limit <- mLimit + return $ limit - 1 + dBatch' <- if null docs' && cid /= 0 && (newLimit > (Just 0)) + then nextBatch' fcol batchSize newLimit cid + else return $ return (Batch newLimit cid docs') + when (newLimit == (Just 0)) $ unless (cid == 0) $ send [KillCursors [cid]] + return (dBatch', Just doc) + [] -> if cid == 0 + then return (return $ Batch (Just 0) 0 [], Nothing) -- finished + else fmap (,Nothing) $ nextBatch' fcol batchSize mLimit cid nextN :: (MonadIO m, MonadBaseControl IO m) => Int -> Cursor -> Action m [Document] -- ^ Return next N documents or less if end is reached @@ -627,7 +649,7 @@ closeCursor :: (MonadIO m, MonadBaseControl IO m) => Cursor -> Action m () closeCursor (Cursor _ _ var) = modifyMVar var $ \dBatch -> do Batch _ cid _ <- fulfill dBatch unless (cid == 0) $ send [KillCursors [cid]] - return $ (return $ Batch 0 0 [], ()) + return $ (return $ Batch (Just 0) 0 [], ()) isCursorClosed :: (MonadIO m, MonadBase IO m) => Cursor -> Action m Bool isCursorClosed (Cursor _ _ var) = do @@ -752,7 +774,7 @@ runMR mr = do Just (String coll) -> find $ query [] coll Just (Doc doc) -> useDb (at "db" doc) $ find $ query [] (at "collection" doc) Just x -> error $ "unexpected map-reduce result field: " ++ show x - Nothing -> newCursor "" "" 0 $ return $ Batch 0 0 (at "results" res) + Nothing -> newCursor "" "" 0 $ return $ Batch (Just 0) 0 (at "results" res) runMR' :: (MonadIO m) => MapReduce -> Action m MRResult -- ^ Run MapReduce and return a MR result document containing stats and the results if Inlined. Error if the map/reduce failed (because of bad Javascript). diff --git a/mongoDB.cabal b/mongoDB.cabal index 481df2d..441cd58 100644 --- a/mongoDB.cabal +++ b/mongoDB.cabal @@ -67,6 +67,7 @@ test-suite test -- now. It's too difficult to support old versions of GHC and -- the new version of time. , old-locale + , text , time default-language: Haskell2010 diff --git a/test/QuerySpec.hs b/test/QuerySpec.hs index 8c5f93c..510d408 100644 --- a/test/QuerySpec.hs +++ b/test/QuerySpec.hs @@ -5,6 +5,8 @@ module QuerySpec (spec) where import TestImport import Control.Exception +import qualified Data.Text as T + testDBName :: Database testDBName = "mongodb-haskell-test" @@ -122,6 +124,17 @@ spec = around withCleanDatabase $ do it "raises exception" $ insertDuplicateWith insertAll_ `shouldThrow` anyException + describe "insertAll_" $ do + it "inserts documents and receives 100 000 of them" $ do + let docs = (flip map) [0..200000] $ \i -> + ["name" =: (T.pack $ "name " ++ (show i))] + db $ insertAll_ "bigCollection" docs + db $ do + cur <- find $ (select [] "bigCollection") {limit = 100000, batchSize = 100000} + returnedDocs <- rest cur + + liftIO $ (length returnedDocs) `shouldBe` 100000 + describe "aggregate" $ do it "aggregates to normalize and sort documents" $ do db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]