From f5a946c0e00ea8d5d2e38d2499ad5f3552bea451 Mon Sep 17 00:00:00 2001 From: "Scott R. Parish" Date: Sat, 16 Jan 2010 15:54:39 -0600 Subject: [PATCH] query now returns a cursor, nextDoc iterates over the cursor --- Database/MongoDB.hs | 119 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 97 insertions(+), 22 deletions(-) diff --git a/Database/MongoDB.hs b/Database/MongoDB.hs index 251be1e..5fc577a 100644 --- a/Database/MongoDB.hs +++ b/Database/MongoDB.hs @@ -2,6 +2,7 @@ module Database.MongoDB ( connect, connectOnPort, delete, insert, insertMany, query, remove, update, + nextDoc, Collection, FieldSelector, NumToSkip, NumToReturn, RequestID, Selector, Opcode(..), QueryOpt(..), @@ -43,6 +44,15 @@ connectOnPort host port = do nsRef <- newIORef ns return $ Connection { cHandle = h, cRand = nsRef } +data Cursor = Cursor { + curCon :: Connection, + curID :: IORef Int64, + curNumToRet :: Int32, + curCol :: Collection, + curDocBytes :: IORef L.ByteString, + curClosed :: IORef Bool + } + data Opcode = OP_REPLY -- 1 Reply to a client request. responseTo is set | OP_MSG -- 1000 generic msg command followed by a string @@ -135,8 +145,10 @@ insertMany c col docs = do return reqID query :: Connection -> Collection -> [QueryOpt] -> NumToSkip -> NumToReturn -> - Selector -> Maybe FieldSelector -> IO [BSONObject] + Selector -> Maybe FieldSelector -> IO Cursor query c col opts skip ret sel fsel = do + let h = cHandle c + let body = runPut $ do putI32 $ fromQueryOpts opts putCol col @@ -147,8 +159,24 @@ query c col opts skip ret sel fsel = do Nothing -> putNothing Just fsel -> put fsel (reqID, msg) <- packMsg c OP_QUERY body - L.hPut (cHandle c) msg - getReply c reqID + L.hPut h msg + + hdr <- getHeader h + assert (OP_REPLY == hOp hdr) $ return () + assert (hRespTo hdr == reqID) $ return () + reply <- getReply h + assert (rRespFlags reply == 0) $ return () + docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20) >>= newIORef + closed <- newIORef False + cid <- newIORef $ rCursorID reply + return $ Cursor { + curCon = c, + curID = cid, + curNumToRet = ret, + curCol = col, + curDocBytes = docBytes, + curClosed = closed + } update :: Connection -> Collection -> [UpdateFlag] -> Selector -> BSONObject -> IO RequestID @@ -178,29 +206,76 @@ data Reply = Reply { rNumReturned :: Int32 } deriving (Show) -getReply :: Connection -> RequestID -> IO [BSONObject] -getReply c reqID = do - let h = cHandle c +getHeader h = do hdrBytes <- L.hGet h 16 - let hdr = flip runGet hdrBytes $ do - msgLen <- getI32 - reqID <- getI32 - respTo <- getI32 - op <- getI32 - return $ Hdr msgLen reqID respTo $ toOpcode op + return $ flip runGet hdrBytes $ do + msgLen <- getI32 + reqID <- getI32 + respTo <- getI32 + op <- getI32 + return $ Hdr msgLen reqID respTo $ toOpcode op + +getReply h = do + replyBytes <- L.hGet h 20 + return $ flip runGet replyBytes $ do + respFlags <- getI32 + cursorID <- getI64 + startFrom <- getI32 + numReturned <- getI32 + return $ (Reply respFlags cursorID startFrom numReturned) + + +nextDoc :: Cursor -> IO (Maybe BSONObject) +nextDoc cur = do + closed <- readIORef $ curClosed cur + case closed of + True -> return Nothing + False -> do + docBytes <- readIORef $ curDocBytes cur + cid <- readIORef $ curID cur + case L.length docBytes of + 0 -> if cid == 0 + then writeIORef (curClosed cur) True >> return Nothing + else getMore cur + _ -> do + let (doc, docBytes') = getFirstDoc docBytes + writeIORef (curDocBytes cur) docBytes' + return $ Just doc + +getFirstDoc docBytes = flip runGet docBytes $ do + doc <- get + docBytes' <- getRemainingLazyByteString + return (doc, docBytes') + +getMore :: Cursor -> IO (Maybe BSONObject) +getMore cur = do + let h = cHandle $ curCon cur + + cid <- readIORef $ curID cur + let body = runPut $ do + putI32 0 + putCol $ curCol cur + putI32 $ curNumToRet cur + putI64 cid + (reqID, msg) <- packMsg (curCon cur) OP_GET_MORE body + L.hPut h msg + + hdr <- getHeader h assert (OP_REPLY == hOp hdr) $ return () assert (hRespTo hdr == reqID) $ return () - replyBytes <- L.hGet h 20 - let reply = flip runGet replyBytes $ do - respFlags <- getI32 - cursorID <- getI64 - startFrom <- getI32 - numReturned <- getI32 - return $ (Reply respFlags cursorID startFrom numReturned) + reply <- getReply h assert (rRespFlags reply == 0) $ return () - docBytes <- L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20 - return $ flip runGet docBytes $ do - forM [1 .. rNumReturned reply] $ \_ -> get + cid <- readIORef (curID cur) + case rCursorID reply of + 0 -> writeIORef (curID cur) 0 + ncid -> assert (ncid == cid) $ return () + docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20) + case L.length docBytes of + 0 -> writeIORef (curClosed cur) True >> return Nothing + _ -> do + let (doc, docBytes') = getFirstDoc docBytes + writeIORef (curDocBytes cur) docBytes' + return $ Just doc putCol col = putByteString (pack col) >> putNull