2010-10-27 20:13:23 +00:00
{- | Low - level messaging between this client and the MongoDB server, see Mongo Wire Protocol (<http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol>).
2010-06-15 03:14:40 +00:00
2010-07-27 21:18:53 +00:00
This module i s n o t i n t e n d e d f o r d i r e c t u s e . Use the high - level interface at " Database.MongoDB.Query " and " Database.MongoDB.Connection " instead . - }
2010-06-15 03:14:40 +00:00
2010-12-20 02:08:53 +00:00
{- # LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings, FlexibleContexts, TupleSections, TypeSynonymInstances, MultiParamTypeClasses, FlexibleInstances, UndecidableInstances # -}
2010-06-15 03:14:40 +00:00
module Database.MongoDB.Internal.Protocol (
2011-07-05 14:37:01 +00:00
MasterOrSlaveOk ( .. ) ,
FullCollection ,
2010-10-27 20:13:23 +00:00
-- * Pipe
2010-12-20 02:08:53 +00:00
Pipe , send , call ,
2010-06-21 15:06:20 +00:00
-- * Message
2011-07-05 14:37:01 +00:00
writeMessage , readMessage ,
2010-06-21 15:06:20 +00:00
-- ** Notice
Notice ( .. ) , UpdateOption ( .. ) , DeleteOption ( .. ) , CursorId ,
-- ** Request
Request ( .. ) , QueryOption ( .. ) ,
2010-06-15 03:14:40 +00:00
-- ** Reply
2010-07-03 17:15:30 +00:00
Reply ( .. ) , ResponseFlag ( .. ) ,
2010-06-15 03:14:40 +00:00
-- * Authentication
Username , Password , Nonce , pwHash , pwKey
) where
2010-06-21 15:06:20 +00:00
import Prelude as X
import Control.Applicative ( ( <$> ) )
2010-12-20 02:08:53 +00:00
import Control.Arrow ( ( *** ) )
import Data.ByteString.Lazy as B ( length , hPut )
2011-07-05 14:37:01 +00:00
import System.IO.Pipeline ( IOE , Pipeline )
import qualified System.IO.Pipeline as P ( send , call )
import System.IO ( Handle )
2010-06-21 15:06:20 +00:00
import Data.Bson ( Document , UString )
2010-06-15 03:14:40 +00:00
import Data.Bson.Binary
import Data.Binary.Put
import Data.Binary.Get
import Data.Int
import Data.Bits
import Data.IORef
import System.IO.Unsafe ( unsafePerformIO )
2011-03-11 16:11:14 +00:00
import qualified Crypto.Hash.MD5 as MD5 ( hash )
2010-06-21 15:06:20 +00:00
import Data.UString as U ( pack , append , toByteString )
2010-10-27 20:13:23 +00:00
import System.IO.Error as E ( try )
import Control.Monad.Error
2010-12-27 05:23:02 +00:00
import System.IO ( hFlush )
2011-07-05 14:37:01 +00:00
import Database.MongoDB.Internal.Util ( whenJust , hGetN , bitOr , byteStringHex )
2010-12-20 02:08:53 +00:00
2011-07-05 14:37:01 +00:00
-- * MasterOrSlaveOk
data MasterOrSlaveOk =
Master -- ^ connect to master only
| SlaveOk -- ^ connect to a slave, or master if no slave available
deriving ( Show , Eq )
2010-12-20 02:08:53 +00:00
2010-10-27 20:13:23 +00:00
-- * Pipe
2010-06-15 03:14:40 +00:00
2011-07-05 14:37:01 +00:00
type Pipe = Pipeline Response Message
2010-11-01 00:38:38 +00:00
-- ^ Thread-safe TCP connection with pipelined requests
2010-06-15 03:14:40 +00:00
2010-12-20 02:08:53 +00:00
send :: Pipe -> [ Notice ] -> IOE ()
2010-10-27 20:13:23 +00:00
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
2010-12-20 02:08:53 +00:00
send pipe notices = P . send pipe ( notices , Nothing )
2010-06-15 03:14:40 +00:00
2010-12-20 02:08:53 +00:00
call :: Pipe -> [ Notice ] -> Request -> IOE ( IOE Reply )
2010-10-27 20:13:23 +00:00
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
2010-12-20 02:08:53 +00:00
call pipe notices request = do
2010-06-21 15:06:20 +00:00
requestId <- genRequestId
2010-12-20 02:08:53 +00:00
promise <- P . call pipe ( notices , Just ( request , requestId ) )
return $ check requestId <$> promise
where
check requestId ( responseTo , reply ) = if requestId == responseTo then reply else
error $ " expected response id ( " ++ show responseTo ++ " ) to match request id ( " ++ show requestId ++ " ) "
2010-06-15 03:14:40 +00:00
2010-12-27 05:23:02 +00:00
-- * Message
type Message = ( [ Notice ] , Maybe ( Request , RequestId ) )
2011-07-05 14:37:01 +00:00
-- ^ A write notice(s) with getLastError request, or just query request.
2010-12-27 05:23:02 +00:00
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
2011-07-05 14:37:01 +00:00
writeMessage :: Handle -> Message -> IOE ()
-- ^ Write message to socket
writeMessage handle ( notices , mRequest ) = ErrorT . E . try $ do
forM_ notices $ \ n -> writeReq . ( Left n , ) =<< genRequestId
whenJust mRequest $ writeReq . ( Right *** id )
hFlush handle
where
writeReq ( e , requestId ) = do
hPut handle lenBytes
hPut handle bytes
2010-12-27 05:23:02 +00:00
where
2011-07-05 14:37:01 +00:00
bytes = runPut $ ( either putNotice putRequest e ) requestId
lenBytes = encodeSize . toEnum . fromEnum $ B . length bytes
encodeSize = runPut . putInt32 . ( + 4 )
2010-12-27 05:23:02 +00:00
type Response = ( ResponseTo , Reply )
-- ^ Message received from a Mongo server in response to a Request
2011-07-05 14:37:01 +00:00
readMessage :: Handle -> IOE Response
-- ^ read response from socket
readMessage handle = ErrorT $ E . try readResp where
readResp = do
len <- fromEnum . decodeSize <$> hGetN handle 4
runGet getReply <$> hGetN handle len
decodeSize = subtract 4 . runGet getInt32
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
type FullCollection = UString
-- ^ Database name and collection name with period (.) in between. Eg. \"myDb.myCollection\"
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
-- ** Header
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
type Opcode = Int32
2010-06-15 03:14:40 +00:00
type RequestId = Int32
-- ^ A fresh request id is generated for every message
2010-06-21 15:06:20 +00:00
type ResponseTo = RequestId
2010-12-20 02:08:53 +00:00
genRequestId :: ( MonadIO m ) => m RequestId
2010-06-15 03:14:40 +00:00
-- ^ Generate fresh request id
2010-12-20 02:08:53 +00:00
genRequestId = liftIO $ atomicModifyIORef counter $ \ n -> ( n + 1 , n ) where
2010-06-15 03:14:40 +00:00
counter :: IORef RequestId
counter = unsafePerformIO ( newIORef 0 )
{- # NOINLINE counter # -}
2010-06-21 15:06:20 +00:00
-- *** Binary format
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
putHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putHeader opcode requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
getHeader :: Get ( Opcode , ResponseTo )
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader = do
_requestId <- getInt32
responseTo <- getInt32
opcode <- getInt32
return ( opcode , responseTo )
-- ** Notice
-- | A notice is a message that is sent with no reply
data Notice =
Insert {
iFullCollection :: FullCollection ,
iDocuments :: [ Document ] }
| Update {
uFullCollection :: FullCollection ,
uOptions :: [ UpdateOption ] ,
uSelector :: Document ,
uUpdater :: Document }
| Delete {
dFullCollection :: FullCollection ,
dOptions :: [ DeleteOption ] ,
dSelector :: Document }
| KillCursors {
kCursorIds :: [ CursorId ] }
deriving ( Show , Eq )
2010-06-15 03:14:40 +00:00
data UpdateOption =
Upsert -- ^ If set, the database will insert the supplied object into the collection if no matching document is found
| MultiUpdate -- ^ If set, the database will update all matching objects in the collection. Otherwise only updates first matching doc
deriving ( Show , Eq )
data DeleteOption = SingleRemove -- ^ If set, the database will remove only the first matching document in the collection. Otherwise all matching documents will be removed
deriving ( Show , Eq )
type CursorId = Int64
2010-06-21 15:06:20 +00:00
-- *** Binary format
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
nOpcode :: Notice -> Opcode
nOpcode Update { } = 2001
nOpcode Insert { } = 2002
nOpcode Delete { } = 2006
nOpcode KillCursors { } = 2007
putNotice :: Notice -> RequestId -> Put
putNotice notice requestId = do
putHeader ( nOpcode notice ) requestId
2010-06-15 03:14:40 +00:00
putInt32 0
2010-06-21 15:06:20 +00:00
case notice of
Insert { .. } -> do
putCString iFullCollection
mapM_ putDocument iDocuments
Update { .. } -> do
putCString uFullCollection
putInt32 ( uBits uOptions )
putDocument uSelector
putDocument uUpdater
Delete { .. } -> do
putCString dFullCollection
putInt32 ( dBits dOptions )
putDocument dSelector
KillCursors { .. } -> do
putInt32 $ toEnum ( X . length kCursorIds )
mapM_ putInt64 kCursorIds
2010-06-15 03:14:40 +00:00
uBit :: UpdateOption -> Int32
uBit Upsert = bit 0
uBit MultiUpdate = bit 1
uBits :: [ UpdateOption ] -> Int32
uBits = bitOr . map uBit
dBit :: DeleteOption -> Int32
dBit SingleRemove = bit 0
dBits :: [ DeleteOption ] -> Int32
dBits = bitOr . map dBit
2010-06-21 15:06:20 +00:00
-- ** Request
-- | A request is a message that is sent with a 'Reply' returned
data Request =
Query {
qOptions :: [ QueryOption ] ,
qFullCollection :: FullCollection ,
qSkip :: Int32 , -- ^ Number of initial matching documents to skip
qBatchSize :: Int32 , -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document , -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ \[\] = return whole document
} | GetMore {
gFullCollection :: FullCollection ,
gBatchSize :: Int32 ,
gCursorId :: CursorId }
deriving ( Show , Eq )
data QueryOption =
2010-07-27 21:18:53 +00:00
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
| NoCursorTimeout -- The server normally times out idle cursors after an inactivity period (10 minutes) to prevent excess memory use. Set this option to prevent that.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
2010-06-21 15:06:20 +00:00
deriving ( Show , Eq )
-- *** Binary format
qOpcode :: Request -> Opcode
qOpcode Query { } = 2004
qOpcode GetMore { } = 2005
putRequest :: Request -> RequestId -> Put
putRequest request requestId = do
putHeader ( qOpcode request ) requestId
case request of
Query { .. } -> do
putInt32 ( qBits qOptions )
putCString qFullCollection
putInt32 qSkip
putInt32 qBatchSize
putDocument qSelector
unless ( null qProjector ) ( putDocument qProjector )
GetMore { .. } -> do
putInt32 0
putCString gFullCollection
putInt32 gBatchSize
putInt64 gCursorId
2010-06-15 03:14:40 +00:00
qBit :: QueryOption -> Int32
qBit TailableCursor = bit 1
qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4
2010-07-27 21:18:53 +00:00
qBit AwaitData = bit 5
--qBit Exhaust = bit 6
2010-06-15 03:14:40 +00:00
qBits :: [ QueryOption ] -> Int32
qBits = bitOr . map qBit
2010-06-21 15:06:20 +00:00
-- ** Reply
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
-- | A reply is a message received in response to a 'Request'
data Reply = Reply {
2010-07-03 17:15:30 +00:00
rResponseFlags :: [ ResponseFlag ] ,
2010-06-21 15:06:20 +00:00
rCursorId :: CursorId , -- ^ 0 = cursor finished
rStartingFrom :: Int32 ,
rDocuments :: [ Document ]
} deriving ( Show , Eq )
2010-07-03 17:15:30 +00:00
data ResponseFlag =
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
2010-07-27 21:18:53 +00:00
| QueryError -- ^ Query error. Returned with one document containing an "$err" field holding the error message.
2010-07-03 17:15:30 +00:00
| AwaitCapable -- ^ For backward compatability: Set when the server supports the AwaitData query option. if it doesn't, a replica slave client should sleep a little between getMore's
deriving ( Show , Eq , Enum )
2010-06-21 15:06:20 +00:00
-- * Binary format
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
replyOpcode :: Opcode
replyOpcode = 1
getReply :: Get ( ResponseTo , Reply )
getReply = do
( opcode , responseTo ) <- getHeader
unless ( opcode == replyOpcode ) $ fail $ " expected reply opcode (1) but got " ++ show opcode
2010-07-03 17:15:30 +00:00
rResponseFlags <- rFlags <$> getInt32
2010-06-15 03:14:40 +00:00
rCursorId <- getInt64
rStartingFrom <- getInt32
2010-06-21 15:06:20 +00:00
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return ( responseTo , Reply { .. } )
2010-06-15 03:14:40 +00:00
2010-07-03 17:15:30 +00:00
rFlags :: Int32 -> [ ResponseFlag ]
rFlags bits = filter ( testBit bits . rBit ) [ CursorNotFound .. ]
rBit :: ResponseFlag -> Int
rBit CursorNotFound = 0
rBit QueryError = 1
rBit AwaitCapable = 3
2010-06-21 15:06:20 +00:00
-- * Authentication
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
type Username = UString
type Password = UString
type Nonce = UString
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
pwHash :: Username -> Password -> UString
2011-03-11 16:11:14 +00:00
pwHash u p = pack . byteStringHex . MD5 . hash . toByteString $ u ` U . append ` " :mongo: " ` U . append ` p
2010-06-15 03:14:40 +00:00
2010-06-21 15:06:20 +00:00
pwKey :: Nonce -> Username -> Password -> UString
2011-03-11 16:11:14 +00:00
pwKey n u p = pack . byteStringHex . MD5 . hash . toByteString . U . append n . U . append u $ pwHash u p
2011-07-05 14:37:01 +00:00
{- Authors: Tony Hannan <tony@10gen.com>
Copyright 2011 10 gen Inc .
Licensed under the Apache License , Version 2.0 ( the " License " ) ; you may not use this file except in compliance with the License . You may obtain a copy of the License at : http :// www . apache . org / licenses / LICENSE - 2.0 . Unless required by applicable law or agreed to in writing , software distributed under the License is distributed on an " AS IS " BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied . See the License for the specific language governing permissions and limitations under the License . - }