query now returns a cursor, nextDoc iterates over the cursor
This commit is contained in:
parent
8805ea4a9e
commit
f5a946c0e0
1 changed files with 97 additions and 22 deletions
|
@ -2,6 +2,7 @@ module Database.MongoDB
|
|||
(
|
||||
connect, connectOnPort,
|
||||
delete, insert, insertMany, query, remove, update,
|
||||
nextDoc,
|
||||
Collection, FieldSelector, NumToSkip, NumToReturn, RequestID, Selector,
|
||||
Opcode(..),
|
||||
QueryOpt(..),
|
||||
|
@ -43,6 +44,15 @@ connectOnPort host port = do
|
|||
nsRef <- newIORef ns
|
||||
return $ Connection { cHandle = h, cRand = nsRef }
|
||||
|
||||
data Cursor = Cursor {
|
||||
curCon :: Connection,
|
||||
curID :: IORef Int64,
|
||||
curNumToRet :: Int32,
|
||||
curCol :: Collection,
|
||||
curDocBytes :: IORef L.ByteString,
|
||||
curClosed :: IORef Bool
|
||||
}
|
||||
|
||||
data Opcode
|
||||
= OP_REPLY -- 1 Reply to a client request. responseTo is set
|
||||
| OP_MSG -- 1000 generic msg command followed by a string
|
||||
|
@ -135,8 +145,10 @@ insertMany c col docs = do
|
|||
return reqID
|
||||
|
||||
query :: Connection -> Collection -> [QueryOpt] -> NumToSkip -> NumToReturn ->
|
||||
Selector -> Maybe FieldSelector -> IO [BSONObject]
|
||||
Selector -> Maybe FieldSelector -> IO Cursor
|
||||
query c col opts skip ret sel fsel = do
|
||||
let h = cHandle c
|
||||
|
||||
let body = runPut $ do
|
||||
putI32 $ fromQueryOpts opts
|
||||
putCol col
|
||||
|
@ -147,8 +159,24 @@ query c col opts skip ret sel fsel = do
|
|||
Nothing -> putNothing
|
||||
Just fsel -> put fsel
|
||||
(reqID, msg) <- packMsg c OP_QUERY body
|
||||
L.hPut (cHandle c) msg
|
||||
getReply c reqID
|
||||
L.hPut h msg
|
||||
|
||||
hdr <- getHeader h
|
||||
assert (OP_REPLY == hOp hdr) $ return ()
|
||||
assert (hRespTo hdr == reqID) $ return ()
|
||||
reply <- getReply h
|
||||
assert (rRespFlags reply == 0) $ return ()
|
||||
docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20) >>= newIORef
|
||||
closed <- newIORef False
|
||||
cid <- newIORef $ rCursorID reply
|
||||
return $ Cursor {
|
||||
curCon = c,
|
||||
curID = cid,
|
||||
curNumToRet = ret,
|
||||
curCol = col,
|
||||
curDocBytes = docBytes,
|
||||
curClosed = closed
|
||||
}
|
||||
|
||||
update :: Connection -> Collection ->
|
||||
[UpdateFlag] -> Selector -> BSONObject -> IO RequestID
|
||||
|
@ -178,29 +206,76 @@ data Reply = Reply {
|
|||
rNumReturned :: Int32
|
||||
} deriving (Show)
|
||||
|
||||
getReply :: Connection -> RequestID -> IO [BSONObject]
|
||||
getReply c reqID = do
|
||||
let h = cHandle c
|
||||
getHeader h = do
|
||||
hdrBytes <- L.hGet h 16
|
||||
let hdr = flip runGet hdrBytes $ do
|
||||
return $ flip runGet hdrBytes $ do
|
||||
msgLen <- getI32
|
||||
reqID <- getI32
|
||||
respTo <- getI32
|
||||
op <- getI32
|
||||
return $ Hdr msgLen reqID respTo $ toOpcode op
|
||||
assert (OP_REPLY == hOp hdr) $ return ()
|
||||
assert (hRespTo hdr == reqID) $ return ()
|
||||
|
||||
getReply h = do
|
||||
replyBytes <- L.hGet h 20
|
||||
let reply = flip runGet replyBytes $ do
|
||||
return $ flip runGet replyBytes $ do
|
||||
respFlags <- getI32
|
||||
cursorID <- getI64
|
||||
startFrom <- getI32
|
||||
numReturned <- getI32
|
||||
return $ (Reply respFlags cursorID startFrom numReturned)
|
||||
|
||||
|
||||
nextDoc :: Cursor -> IO (Maybe BSONObject)
|
||||
nextDoc cur = do
|
||||
closed <- readIORef $ curClosed cur
|
||||
case closed of
|
||||
True -> return Nothing
|
||||
False -> do
|
||||
docBytes <- readIORef $ curDocBytes cur
|
||||
cid <- readIORef $ curID cur
|
||||
case L.length docBytes of
|
||||
0 -> if cid == 0
|
||||
then writeIORef (curClosed cur) True >> return Nothing
|
||||
else getMore cur
|
||||
_ -> do
|
||||
let (doc, docBytes') = getFirstDoc docBytes
|
||||
writeIORef (curDocBytes cur) docBytes'
|
||||
return $ Just doc
|
||||
|
||||
getFirstDoc docBytes = flip runGet docBytes $ do
|
||||
doc <- get
|
||||
docBytes' <- getRemainingLazyByteString
|
||||
return (doc, docBytes')
|
||||
|
||||
getMore :: Cursor -> IO (Maybe BSONObject)
|
||||
getMore cur = do
|
||||
let h = cHandle $ curCon cur
|
||||
|
||||
cid <- readIORef $ curID cur
|
||||
let body = runPut $ do
|
||||
putI32 0
|
||||
putCol $ curCol cur
|
||||
putI32 $ curNumToRet cur
|
||||
putI64 cid
|
||||
(reqID, msg) <- packMsg (curCon cur) OP_GET_MORE body
|
||||
L.hPut h msg
|
||||
|
||||
hdr <- getHeader h
|
||||
assert (OP_REPLY == hOp hdr) $ return ()
|
||||
assert (hRespTo hdr == reqID) $ return ()
|
||||
reply <- getReply h
|
||||
assert (rRespFlags reply == 0) $ return ()
|
||||
docBytes <- L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20
|
||||
return $ flip runGet docBytes $ do
|
||||
forM [1 .. rNumReturned reply] $ \_ -> get
|
||||
cid <- readIORef (curID cur)
|
||||
case rCursorID reply of
|
||||
0 -> writeIORef (curID cur) 0
|
||||
ncid -> assert (ncid == cid) $ return ()
|
||||
docBytes <- (L.hGet h $ fromIntegral $ hMsgLen hdr - 16 - 20)
|
||||
case L.length docBytes of
|
||||
0 -> writeIORef (curClosed cur) True >> return Nothing
|
||||
_ -> do
|
||||
let (doc, docBytes') = getFirstDoc docBytes
|
||||
writeIORef (curDocBytes cur) docBytes'
|
||||
return $ Just doc
|
||||
|
||||
putCol col = putByteString (pack col) >> putNull
|
||||
|
||||
|
|
Loading…
Reference in a new issue