2010-06-15 03:14:40 +00:00
{- | Low - level messaging between this client and the MongoDB server. See Mongo Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
This module i s n o t i n t e n d e d f o r d i r e c t u s e . Use the high - level interface at " Database.MongoDB.Query " instead . - }
{- # LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings # -}
module Database.MongoDB.Internal.Protocol (
-- * FullCollection
FullCollection ,
-- * Write
Insert ( .. ) , insert ,
Update ( .. ) , UpdateOption ( .. ) , update ,
Delete ( .. ) , DeleteOption ( .. ) , delete ,
-- * Read
Query ( .. ) , QueryOption ( .. ) , query ,
GetMore ( .. ) , getMore ,
-- ** Reply
Reply ( .. ) ,
-- ** Cursor
CursorId , killCursors ,
-- * Authentication
Username , Password , Nonce , pwHash , pwKey
) where
import Prelude as P
import Database.MongoDB.Internal.Connection ( Op , sendBytes , flushBytes , receiveBytes )
import Data.Bson
import Data.Bson.Binary
import Data.UString as U ( pack , append , toByteString )
import Data.ByteString.Lazy as B ( length , append )
import Data.Binary.Put
import Data.Binary.Get
import Data.Int
import Data.Bits
import Control.Monad.Reader
import Control.Applicative ( ( <$> ) )
import Data.IORef
import System.IO.Unsafe ( unsafePerformIO )
import Data.Digest.OpenSSL.MD5 ( md5sum )
import Database.MongoDB.Util ( bitOr , ( <.> ) )
-- * Authentication
type Username = UString
type Password = UString
type Nonce = UString
pwHash :: Username -> Password -> UString
pwHash u p = pack . md5sum . toByteString $ u ` U . append ` " :mongo: " ` U . append ` p
pwKey :: Nonce -> Username -> Password -> UString
pwKey n u p = pack . md5sum . toByteString . U . append n . U . append u $ pwHash u p
-- * FullCollection
type FullCollection = UString
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"
-- * Request / response
insert :: Insert -> Op ()
-- ^ Insert documents into collection
insert = send_ . putInsert
update :: Update -> Op ()
-- ^ Update documents in collection matching selector using updater
update = send_ . putUpdate
delete :: Delete -> Op ()
-- ^ Delete documents in collection matching selector
delete = send_ . putDelete
killCursors :: [ CursorId ] -> Op ()
-- ^ Close cursors on server because we will not be getting anymore documents from them
killCursors = send_ . putKillCursors . KillCursors
query :: Query -> Op Reply
-- ^ Return first batch of documents in collection matching selector and a cursor-id for getting remaining documents (see 'getMore')
query q = do
requestId <- send ( putQuery q )
( reply , responseTo ) <- receive getReply
unless ( responseTo == requestId ) $ fail " expected response id to match query request id "
return reply
getMore :: GetMore -> Op Reply
-- ^ Get next batch of documents from cursor
getMore g = do
requestId <- send ( putGetMore g )
( reply , responseTo ) <- receive getReply
unless ( responseTo == requestId ) $ fail " expected response id to match get-more request id "
return reply
-- ** Send / receive
type RequestId = Int32
-- ^ A fresh request id is generated for every message
genRequestId :: IO RequestId
-- ^ Generate fresh request id
genRequestId = atomicModifyIORef counter $ \ n -> ( n + 1 , n ) where
counter :: IORef RequestId
counter = unsafePerformIO ( newIORef 0 )
{- # NOINLINE counter # -}
type ResponseTo = RequestId
send_ :: ( RequestId -> Put ) -> Op ()
send_ x = send x >> return ()
send :: ( RequestId -> Put ) -> Op RequestId
send rput = do
requestId <- liftIO genRequestId
let bytes = runPut ( rput requestId )
let lengthBytes = runPut . putInt32 $ ( toEnum . fromEnum ) ( B . length bytes + 4 )
sendBytes ( B . append lengthBytes bytes )
flushBytes
return requestId
receive :: Get a -> Op a
receive getMess = do
messageLength <- fromIntegral . runGet getInt32 <$> receiveBytes 4
runGet getMess <$> receiveBytes ( messageLength - 4 )
-- * Messages
data Insert = Insert {
iFullCollection :: FullCollection ,
iDocuments :: [ Document ]
} deriving ( Show , Eq )
data Update = Update {
uFullCollection :: FullCollection ,
uOptions :: [ UpdateOption ] ,
uSelector :: Document ,
uUpdater :: Document
} deriving ( Show , Eq )
data UpdateOption =
Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
| MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
deriving ( Show , Eq )
data Delete = Delete {
dFullCollection :: FullCollection ,
dOptions :: [ DeleteOption ] ,
dSelector :: Document
} deriving ( Show , Eq )
data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
deriving ( Show , Eq )
data Query = Query {
qOptions :: [ QueryOption ] ,
qFullCollection :: FullCollection ,
qSkip :: Int32 , -- ^ Number of initial matching documents to skip
qBatchSize :: Int32 , -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document , -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ \[\] = return whole document
} deriving ( Show , Eq )
data QueryOption =
TailableCursor |
SlaveOK |
2010-06-15 20:15:37 +00:00
NoCursorTimeout -- Never timeout the cursor. When not set, the cursor will die if idle for more than 10 minutes.
2010-06-15 03:14:40 +00:00
deriving ( Show , Eq )
data GetMore = GetMore {
gFullCollection :: FullCollection ,
gBatchSize :: Int32 ,
gCursorId :: CursorId
} deriving ( Show , Eq )
newtype KillCursors = KillCursors {
kCursorIds :: [ CursorId ]
} deriving ( Show , Eq )
data Reply = Reply {
rResponseFlag :: Int32 , -- ^ 0 = success, non-zero = failure
rCursorId :: CursorId , -- ^ 0 = cursor finished
rStartingFrom :: Int32 ,
rDocuments :: [ Document ]
} deriving ( Show , Eq )
type CursorId = Int64
-- ** Messages binary format
type Opcode = Int32
-- ^ Code for each message type
replyOpcode , updateOpcode , insertOpcode , queryOpcode , getMoreOpcode , deleteOpcode , killCursorsOpcode :: Opcode
replyOpcode = 1
updateOpcode = 2001
insertOpcode = 2002
queryOpcode = 2004
getMoreOpcode = 2005
deleteOpcode = 2006
killCursorsOpcode = 2007
putUpdate :: Update -> RequestId -> Put
putUpdate Update { .. } = putMessage updateOpcode $ do
putInt32 0
putCString uFullCollection
putInt32 ( uBits uOptions )
putDocument uSelector
putDocument uUpdater
uBit :: UpdateOption -> Int32
uBit Upsert = bit 0
uBit MultiUpdate = bit 1
uBits :: [ UpdateOption ] -> Int32
uBits = bitOr . map uBit
putInsert :: Insert -> RequestId -> Put
putInsert Insert { .. } = putMessage insertOpcode $ do
putInt32 0
putCString iFullCollection
mapM_ putDocument iDocuments
putDelete :: Delete -> RequestId -> Put
putDelete Delete { .. } = putMessage deleteOpcode $ do
putInt32 0
putCString dFullCollection
putInt32 ( dBits dOptions )
putDocument dSelector
dBit :: DeleteOption -> Int32
dBit SingleRemove = bit 0
dBits :: [ DeleteOption ] -> Int32
dBits = bitOr . map dBit
putQuery :: Query -> RequestId -> Put
putQuery Query { .. } = putMessage queryOpcode $ do
putInt32 ( qBits qOptions )
putCString qFullCollection
putInt32 qSkip
putInt32 qBatchSize
putDocument qSelector
unless ( null qProjector ) ( putDocument qProjector )
qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1
qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4
qBits :: [ QueryOption ] -> Int32
qBits = bitOr . map qBit
putGetMore :: GetMore -> RequestId -> Put
putGetMore GetMore { .. } = putMessage getMoreOpcode $ do
putInt32 0
putCString gFullCollection
putInt32 gBatchSize
putInt64 gCursorId
putKillCursors :: KillCursors -> RequestId -> Put
putKillCursors KillCursors { .. } = putMessage killCursorsOpcode $ do
putInt32 0
putInt32 $ toEnum ( P . length kCursorIds )
mapM_ putInt64 kCursorIds
getReply :: Get ( Reply , ResponseTo )
getReply = getMessage replyOpcode $ do
rResponseFlag <- getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- getInt32
rDocuments <- replicateM ( fromIntegral numDocs ) getDocument
return $ Reply { .. }
-- *** Message header
putMessage :: Opcode -> Put -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putMessage opcode messageBodyPut requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
messageBodyPut
getMessage :: Opcode -> Get a -> Get ( a , ResponseTo )
-- ^ Note, does not read message length (first int32), assumes it was already read
getMessage expectedOpcode getMessageBody = do
_requestId <- getInt32
responseTo <- getInt32
opcode <- getInt32
unless ( opcode == expectedOpcode ) $
fail $ " expected opcode " ++ show expectedOpcode ++ " but got " ++ show opcode
body <- getMessageBody
return ( body , responseTo )
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2010 10 gen Inc .
Licensed under the Apache License , Version 2.0 ( the " License " ) ; you may not use this file except in compliance with the License . You may obtain a copy of the License at : http :// www . apache . org / licenses / LICENSE - 2.0 . Unless required by applicable law or agreed to in writing , software distributed under the License is distributed on an " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied . See the License for the specific language governing permissions and limitations under the License . - }