Compare commits

..

1 commit

Author SHA1 Message Date
Greg Weber
71111d45f1 run against the persistent-mongoDB test suite 2016-06-22 00:29:04 -07:00
29 changed files with 746 additions and 2583 deletions

3
.gitignore vendored
View file

@ -1,6 +1,3 @@
dist/
cabal.sandbox.config
.cabal-sandbox/
.stack-work/
dist-newstyle/*
!dist-newstyle/config

View file

@ -1,25 +1,26 @@
# See https://github.com/hvr/multi-ghc-travis for more information.
sudo: required
services:
- docker
env:
# We use CABALVER=1.22 everywhere because it uses the flag --enable-coverage
# instead of --enable-library-coverage used by older versions.
#- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.6.12
#- GHCVER=7.10.3 CABALVER=1.22 MONGO=2.6.12
#- GHCVER=8.0.2 CABALVER=1.24 MONGO=2.6.12
- GHCVER=8.4.2 CABALVER=2.2 MONGO=3.6 STACKAGE=nightly
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-11.6
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-9.21
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.0 STACKAGE=lts-11.6
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.0 STACKAGE=lts-9.21
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-11.6
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-9.21
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.2 STACKAGE=lts-11.6
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.2 STACKAGE=lts-9.21
- GHCVER=7.6.3 CABALVER=1.22 MONGO=2.4.14
- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.4.14
- GHCVER=7.10.1 CABALVER=1.22 MONGO=2.4.14
- GHCVER=7.6.3 CABALVER=1.22 MONGO=2.6.12
- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.6.12
- GHCVER=7.10.1 CABALVER=1.22 MONGO=2.6.12
- GHCVER=7.6.3 CABALVER=1.22 MONGO=3.0.12
- GHCVER=7.8.4 CABALVER=1.22 MONGO=3.0.12
- GHCVER=7.10.1 CABALVER=1.22 MONGO=3.0.12
- GHCVER=7.6.3 CABALVER=1.22 MONGO=3.2.6
- GHCVER=7.8.4 CABALVER=1.22 MONGO=3.2.6
- GHCVER=7.10.1 CABALVER=1.22 MONGO=3.2.6
- GHCVER=head CABALVER=head MONGO=3.2.6
matrix:
allow_failures:
# The text here should match the last line above exactly.
- env: GHCVER=head CABALVER=head MONGO=3.2.6
before_install:
@ -28,38 +29,29 @@ before_install:
- travis_retry sudo apt-get install cabal-install-$CABALVER ghc-$GHCVER
- export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH
- cabal --version
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.4 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.4.list
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
#- echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
#- sudo apt-get update
#- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo apt-get install mongodb-10gen=$MONGO; else sudo apt-get install --allow-downgrades -y mongodb-org=$MONGO mongodb-org-server=$MONGO mongodb-org-shell=$MONGO mongodb-org-tools=$MONGO; fi
#- ls /etc/init.d
#- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo service mongodb start; fi
#- sudo service --status-all
#- sudo service mongod start
#- sleep 15 #mongo may not be responded directly. See http://docs.travis-ci.com/user/database-setup/#MongoDB
#- ps axf | grep mongo
#- sudo netstat -apn
#- mongo --version
- sudo docker pull mongo:$MONGO
- sudo docker run -d -p 27017:27017 mongo:$MONGO
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
- echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
- sudo apt-get update
- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo apt-get install mongodb-10gen=$MONGO; else sudo apt-get install -y mongodb-org=$MONGO mongodb-org-server=$MONGO mongodb-org-shell=$MONGO mongodb-org-tools=$MONGO; fi
- ls /etc/init.d
- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo service mongodb start; fi
- sleep 15 #mongo may not be responded directly. See http://docs.travis-ci.com/user/database-setup/#MongoDB
- ps axf | grep mongo
- netstat -apn
- mongo --version
install:
- travis_retry cabal update
# Install the combined dependencies for this package and all other packages
# needed to reduce conflicts.
- cabal sandbox init
- wget https://www.stackage.org/$STACKAGE/cabal.config
- sed -e '/mongoDB/d' cabal.config > cabal.config.new
- mv cabal.config.new cabal.config
- cabal install --only-dependencies --enable-tests --enable-benchmarks --force-reinstalls
- cabal install --only-dependencies --enable-tests
script:
- cabal configure --enable-tests -v2 --enable-benchmarks
- cabal configure --enable-tests -v2
- cabal build
# cabal test fails due a to hpc error. Using run-cabal-test instead.
# - cabal test --show-details=always
@ -77,25 +69,9 @@ script:
echo "expected '$SRC_TGZ' not found";
exit 1;
fi
jobs:
include:
- stage: deploy
env: GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6
before_install:
- travis_retry sudo add-apt-repository -y ppa:hvr/ghc
- travis_retry sudo apt-get update
- travis_retry sudo apt-get install cabal-install-$CABALVER
- export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH
- cabal --version
install: skip
script: skip
deploy:
provider: hackage
username: VictorDenisov
password:
secure: DPYlqRN09gFp06paLj9bRBzpxTkqkZzfsTrU3j0WiqRzqUMeWEeiZNAkIE/maC9xrEuuYBTk0KlSdz+esF4kjfyRQFTxb9CvXrZ474qHozVLC01vh/av5bGZBDQOwgzJrJNVpfl+g+EADOicz9/nhPXiAd7nCQIv/2s/xM1Yj1U=
on:
repo: mongodb-haskell/mongodb
tags: true
- git clone https://github.com/yesodweb/persistent
- cd persistent
- "cabal install ./persistent ./persistent-mongoDB --only-dependencies"
- cd persistent-test
- cabal install -fmongodb --enable-tests
- "cabal configure -fmongodb --enable-tests && cabal test"

View file

@ -2,117 +2,6 @@
All notable changes to this project will be documented in this file.
This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Package_versioning_policy).
* Get rid of `MonadFail` constraints in `Database.MongoDB.Query`
## [2.7.1.2] - 2022-10-26
### Added
- Support of OP_MSG protocol
- Clarifications in the documentation
- Allow optional TLS parameters
## [2.7.1.1] - 2021-06-14
### Fixed
- Sample code
### Added
- Clarification to the tutorial
## [2.7.1.0] - 2020-08-17
### Added
- Function findCommand
## [2.7.0.1] - 2020-04-07
### Fixed
- Error reporting for deletion of big messages
- Formatting of docs
## [2.7.0.0] - 2020-02-08
### Fixed
- Upgraded bson to compile with GHC 8.8
## [2.6.0.1] - 2020-02-01
### Fixed
- Parsing hostname with underscores in readHostPortM.
## [2.6.0.0] - 2020-01-03
### Added
- MonadFail. It's a standard for newer versions of Haskell,
- Open replica sets over tls.
### Fixed
- Support for unix domain socket connection,
- Stubborn listener threads.
## [2.5.0.0] - 2019-06-14
### Fixed
Compatibility with network 3.0 package
## [2.4.0.1] - 2019-03-03
### Fixed
Doc for modify method
## [2.4.0.0] - 2018-05-03
### Fixed
- GHC 8.4 compatibility. isEmptyChan is not available in base 4.11 anymore.
## [2.3.0.5] - 2018-03-15
### Fixed
- Resource leak in SCRAM authentication
## [2.3.0.4] - 2018-02-11
### Fixed
- Benchmark's build
## [2.3.0.3] - 2018-02-10
### Fixed
- aggregate requires cursor in mongo 3.6
## [2.3.0.2] - 2018-01-28
### Fixed
- Uploading files with GridFS
## [2.3.0.1] - 2017-12-28
### Removed
- Log output that littered stdout in modify many commands.
## [2.3.0] - 2017-05-31
### Changed
- Description of access function
- Lift MonadBaseControl restriction
- Update and delete results are squashed into one WriteResult type
- Functions insertMany, updateMany, deleteMany are rewritten to properly report
various errors
## [2.2.0] - 2017-04-08
### Added
- GridFS implementation
### Fixed
- Write functions hang when the connection is lost.
## [2.1.1] - 2016-08-13
### Changed
- Interfaces of update and delete functions. They don't require MonadBaseControl
anymore.
## [2.1.0] - 2016-06-21
### Added

View file

@ -1,54 +1,42 @@
-- |
-- Client interface to MongoDB database management system.
--
-- Simple example below.
--
-- @
-- {\-\# LANGUAGE OverloadedStrings \#\-}
-- {\-\# LANGUAGE ExtendedDefaultRules \#\-}
--
-- import Database.MongoDB
-- import Control.Monad.Trans (liftIO)
--
-- main :: IO ()
-- main = do
-- pipe <- connect (host \"127.0.0.1\")
-- e <- access pipe master \"baseball\" run
-- close pipe
-- print e
--
-- run :: Action IO ()
-- run = do
-- clearTeams
-- insertTeams
-- allTeams >>= printDocs \"All Teams\"
-- nationalLeagueTeams >>= printDocs \"National League Teams\"
-- newYorkTeams >>= printDocs \"New York Teams\"
--
-- clearTeams :: Action IO ()
-- clearTeams = delete (select [] \"team\")
--
-- insertTeams :: Action IO [Value]
-- insertTeams = insertMany \"team\" [
-- [\"name\" =: \"Yankees\", \"home\" =: [\"city\" =: \"New York\", \"state\" =: \"NY\"], \"league\" =: \"American\"],
-- [\"name\" =: \"Mets\", \"home\" =: [\"city\" =: \"New York\", \"state\" =: \"NY\"], \"league\" =: \"National\"],
-- [\"name\" =: \"Phillies\", \"home\" =: [\"city\" =: \"Philadelphia\", \"state\" =: \"PA\"], \"league\" =: \"National\"],
-- [\"name\" =: \"Red Sox\", \"home\" =: [\"city\" =: \"Boston\", \"state\" =: \"MA\"], \"league\" =: \"American\"] ]
--
-- allTeams :: Action IO [Document]
-- allTeams = rest =<< find (select [] \"team\") {sort = [\"home.city\" =: 1]}
--
-- nationalLeagueTeams :: Action IO [Document]
-- nationalLeagueTeams = rest =<< find (select [\"league\" =: \"National\"] \"team\")
--
-- newYorkTeams :: Action IO [Document]
-- newYorkTeams = rest =<< find (select [\"home.state\" =: \"NY\"] \"team\") {project = [\"name\" =: 1, \"league\" =: 1]}
--
-- printDocs :: String -> [Document] -> Action IO ()
-- printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude [\"_id\"]) docs
--
-- @
--
{- |
Client interface to MongoDB database management system.
Simple example below. Use with language extensions /OverloadedStrings/ & /ExtendedDefaultRules/.
> import Database.MongoDB
> import Control.Monad.Trans (liftIO)
>
> main = do
> pipe <- connect (host "127.0.0.1")
> e <- access pipe master "baseball" run
> close pipe
> print e
>
> run = do
> clearTeams
> insertTeams
> allTeams >>= printDocs "All Teams"
> nationalLeagueTeams >>= printDocs "National League Teams"
> newYorkTeams >>= printDocs "New York Teams"
>
> clearTeams = delete (select [] "team")
>
> insertTeams = insertMany "team" [
> ["name" =: "Yankees", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "American"],
> ["name" =: "Mets", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "National"],
> ["name" =: "Phillies", "home" =: ["city" =: "Philadelphia", "state" =: "PA"], "league" =: "National"],
> ["name" =: "Red Sox", "home" =: ["city" =: "Boston", "state" =: "MA"], "league" =: "American"] ]
>
> allTeams = rest =<< find (select [] "team") {sort = ["home.city" =: 1]}
>
> nationalLeagueTeams = rest =<< find (select ["league" =: "National"] "team")
>
> newYorkTeams = rest =<< find (select ["home.state" =: "NY"] "team") {project = ["name" =: 1, "league" =: 1]}
>
> printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
>
-}
module Database.MongoDB (
module Data.Bson,

View file

@ -42,6 +42,7 @@ import qualified Data.HashTable.IO as H
import qualified Data.Set as Set
import Control.Monad.Trans (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Bson (Document, Field(..), at, (=:), (=?), exclude, merge)
import Data.Text (Text)
@ -76,8 +77,8 @@ renameCollection from to = do
db <- thisDatabase
useDb admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
dropCollection :: (MonadIO m, MonadFail m) => Collection -> Action m Bool
-- ^ Delete the given collection! Return @True@ if collection existed (and was deleted); return @False@ if collection did not exist (and no action).
dropCollection :: (MonadIO m) => Collection -> Action m Bool
-- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action).
dropCollection coll = do
resetIndexCache
r <- runCommand ["drop" =: coll]
@ -86,7 +87,7 @@ dropCollection coll = do
fail $ "dropCollection failed: " ++ show r
validateCollection :: (MonadIO m) => Collection -> Action m Document
-- ^ Validate the given collection, scanning the data and indexes for correctness. This operation takes a while.
-- ^ This operation takes a while
validateCollection coll = runCommand ["validate" =: coll]
-- ** Index
@ -111,7 +112,7 @@ idxDocument Index{..} db = [
"dropDups" =: iDropDups ] ++ (maybeToList $ fmap ((=:) "expireAfterSeconds") iExpireAfterSeconds)
index :: Collection -> Order -> Index
-- ^ Spec of index of ordered keys on collection. 'iName' is generated from keys. 'iUnique' and 'iDropDups' are @False@.
-- ^ Spec of index of ordered keys on collection. Name is generated from keys. Unique and dropDups are False.
index coll keys = Index coll keys (genName keys) False False Nothing
genName :: Order -> IndexName
@ -132,12 +133,12 @@ createIndex :: (MonadIO m) => Index -> Action m ()
createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase
dropIndex :: (MonadIO m) => Collection -> IndexName -> Action m Document
-- ^ Remove the index from the given collection.
-- ^ Remove the index
dropIndex coll idxName = do
resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: idxName]
getIndexes :: MonadIO m => Collection -> Action m [Document]
getIndexes :: (MonadIO m, MonadBaseControl IO m, Functor m) => Collection -> Action m [Document]
-- ^ Get all indexes on this collection
getIndexes coll = do
db <- thisDatabase
@ -190,31 +191,31 @@ resetIndexCache = do
-- ** User
allUsers :: MonadIO m => Action m [Document]
allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document]
-- ^ Fetch all users of this database
allUsers = map (exclude ["_id"]) `liftM` (rest =<< find
allUsers = map (exclude ["_id"]) <$> (rest =<< find
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
addUser :: (MonadIO m)
addUser :: (MonadBaseControl IO m, MonadIO m)
=> Bool -> Username -> Password -> Action m ()
-- ^ Add user with password with read-only access if the boolean argument is @True@, or read-write access if it's @False@
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do
mu <- findOne (select ["user" =: user] "system.users")
let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" usr
removeUser :: (MonadIO m)
removeUser :: (MonadIO m, MonadBaseControl IO m)
=> Username -> Action m ()
removeUser user = delete (select ["user" =: user] "system.users")
-- ** Database
admin :: Database
-- ^ The \"admin\" database, which stores user authorization and authentication data plus other system collections.
-- ^ \"admin\" database
admin = "admin"
cloneDatabase :: (MonadIO m) => Database -> Host -> Action m Document
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use 'copyDatabase' in this case).
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost]
copyDatabase :: (MonadIO m) => Database -> Host -> Maybe (Username, Password) -> Database -> Action m Document
@ -238,11 +239,9 @@ repairDatabase db = useDb db $ runCommand ["repairDatabase" =: (1 :: Int)]
-- ** Server
serverBuildInfo :: (MonadIO m) => Action m Document
-- ^ Return a document containing the parameters used to compile the server instance.
serverBuildInfo = useDb admin $ runCommand ["buildinfo" =: (1 :: Int)]
serverVersion :: (MonadIO m) => Action m Text
-- ^ Return the version of the server instance.
serverVersion = at "version" `liftM` serverBuildInfo
-- * Diagnostics
@ -250,22 +249,18 @@ serverVersion = at "version" `liftM` serverBuildInfo
-- ** Collection
collectionStats :: (MonadIO m) => Collection -> Action m Document
-- ^ Return some storage statistics for the given collection.
collectionStats coll = runCommand ["collstats" =: coll]
dataSize :: (MonadIO m) => Collection -> Action m Int
-- ^ Return the total uncompressed size (in bytes) in memory of all records in the given collection. Does not include indexes.
dataSize c = at "size" `liftM` collectionStats c
storageSize :: (MonadIO m) => Collection -> Action m Int
-- ^ Return the total bytes allocated to the given collection. Does not include indexes.
storageSize c = at "storageSize" `liftM` collectionStats c
totalIndexSize :: (MonadIO m) => Collection -> Action m Int
-- ^ The total size in bytes of all indexes in this collection.
totalIndexSize c = at "totalIndexSize" `liftM` collectionStats c
totalSize :: MonadIO m => Collection -> Action m Int
totalSize :: (MonadIO m, MonadBaseControl IO m) => Collection -> Action m Int
totalSize coll = do
x <- storageSize coll
xs <- mapM isize =<< getIndexes coll
@ -275,45 +270,34 @@ totalSize coll = do
-- ** Profiling
-- | The available profiler levels.
data ProfilingLevel
= Off -- ^ No data collection.
| Slow -- ^ Data collected only for slow operations. The slow operation time threshold is 100ms by default, but can be changed using 'setProfilingLevel'.
| All -- ^ Data collected for all operations.
deriving (Show, Enum, Eq)
data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq)
getProfilingLevel :: (MonadIO m) => Action m ProfilingLevel
-- ^ Get the profiler level.
getProfilingLevel = (toEnum . at "was") `liftM` runCommand ["profile" =: (-1 :: Int)]
type MilliSec = Int
setProfilingLevel :: (MonadIO m) => ProfilingLevel -> Maybe MilliSec -> Action m ()
-- ^ Set the profiler level, and optionally the slow operation time threshold (in milliseconds).
setProfilingLevel p mSlowMs =
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
-- ** Database
dbStats :: (MonadIO m) => Action m Document
-- ^ Return some storage statistics for the given database.
dbStats = runCommand ["dbstats" =: (1 :: Int)]
currentOp :: (MonadIO m) => Action m (Maybe Document)
-- ^ See currently running operation on the database, if any
currentOp = findOne (select [] "$cmd.sys.inprog")
-- | An operation indentifier.
type OpNum = Int
killOp :: (MonadIO m) => OpNum -> Action m (Maybe Document)
-- ^ Terminate the operation specified by the given 'OpNum'.
killOp op = findOne (select ["op" =: op] "$cmd.sys.killop")
-- ** Server
serverStatus :: (MonadIO m) => Action m Document
-- ^ Return a document with an overview of the state of the database.
serverStatus = useDb admin $ runCommand ["serverStatus" =: (1 :: Int)]

View file

@ -17,29 +17,29 @@ module Database.MongoDB.Connection (
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect',
-- * Replica Set
ReplicaSetName, openReplicaSet, openReplicaSet', openReplicaSetTLS, openReplicaSetTLS',
openReplicaSetSRV, openReplicaSetSRV', openReplicaSetSRV'', openReplicaSetSRV''',
ReplicaSetName, openReplicaSet, openReplicaSet',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where
import Prelude hiding (lookup)
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersect, partition, (\\), delete)
import Data.Maybe (fromJust)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad (forM_, guard)
import Control.Monad (forM_)
import Network (HostName, PortID(..), connectTo)
import System.IO.Unsafe (unsafePerformIO)
import System.Timeout (timeout)
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof,
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof,
spaces, try, (<|>))
import qualified Data.List as List
import Control.Monad.Except (throwError)
import Control.Monad.Identity (runIdentity)
import Control.Monad.Error (throwError)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
readMVar)
import Data.Bson (Document, at, (=:))
@ -48,13 +48,11 @@ import Data.Text (Text)
import qualified Data.Bson as B
import qualified Data.Text as T
import Database.MongoDB.Internal.Network (Host(..), HostName, PortID(..), connectTo, lookupSeedList, lookupReplicaSetName)
import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
updateAssocs, shuffle, mergesortM)
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
slaveOk, runCommand, retrieveServerData)
import qualified Database.MongoDB.Transport.Tls as TLS (connect)
adminCommand :: Command -> Pipe -> IO Document
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
@ -64,6 +62,10 @@ adminCommand cmd pipe =
failureToIOError (ConnectionFailure e) = e
failureToIOError e = userError $ show e
-- * Host
data Host = Host HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID
-- ^ Default MongoDB port = 27017
defaultPort = PortNumber 27017
@ -74,50 +76,46 @@ host hostname = Host hostname defaultPort
showHostPort :: Host -> String
-- ^ Display host as \"host:port\"
-- TODO: Distinguish Service port
showHostPort (Host hostname (PortNumber port)) = hostname ++ ":" ++ show port
-- TODO: Distinguish Service and UnixSocket port
showHostPort (Host hostname port) = hostname ++ ":" ++ portname where
portname = case port of
Service s -> s
PortNumber p -> show p
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
showHostPort (Host _ (UnixSocket path)) = "unix:" ++ path
UnixSocket s -> s
#endif
readHostPortM :: (MonadFail m) => String -> m Host
readHostPortM :: (Monad m) => String -> m Host
-- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
-- TODO: handle Service port
-- TODO: handle Service and UnixSocket port
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.' <|> char '_')
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do
spaces
h <- hostname
try (spaces >> eof >> return (host h)) <|> do
_ <- char ':'
try ( do port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port))
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
<|> do guard (h == "unix")
p <- many1 anyChar
eof
return $ Host "" (UnixSocket p)
#endif
port :: Int <- read <$> many1 digit
spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)
readHostPort :: String -> Host
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
readHostPort = fromJust . readHostPortM
readHostPort = runIdentity . readHostPortM
type Secs = Double
globalConnectTimeout :: IORef Secs
-- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect'' (and 'openReplicaSet'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection.
-- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect\'' (and 'openReplicaSet\'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection.
globalConnectTimeout = unsafePerformIO (newIORef 6)
{-# NOINLINE globalConnectTimeout #-}
connect :: Host -> IO Pipe
-- ^ Connect to Host returning pipelined TCP connection. Throw 'IOError' if connection refused or no response within 'globalConnectTimeout'.
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within 'globalConnectTimeout'.
connect h = readIORef globalConnectTimeout >>= flip connect' h
connect' :: Secs -> Host -> IO Pipe
-- ^ Connect to Host returning pipelined TCP connection. Throw 'IOError' if connection refused or no response within given number of seconds.
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds.
connect' timeoutSecs (Host hostname port) = do
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
handle <- maybe (ioError $ userError "connect timed out") return mh
@ -130,85 +128,32 @@ connect' timeoutSecs (Host hostname port) = do
type ReplicaSetName = Text
data TransportSecurity = Secure | Unsecure
-- | Maintains a connection (created on demand) to each server in the named replica set
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs TransportSecurity
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs
replSetName :: ReplicaSet -> Text
-- ^ Get the name of connected replica set.
replSetName (ReplicaSet rsName _ _ _) = rsName
-- ^ name of connected replica set
replSetName (ReplicaSet rsName _ _) = rsName
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet'' instead.
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet\'' instead.
openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
openReplicaSet' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Unsecure)
openReplicaSetTLS :: (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open secure connections (on demand) to servers in the replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetTLS'' instead.
openReplicaSetTLS rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSetTLS' rsSeed
openReplicaSetTLS' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open secure connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
openReplicaSetTLS' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Secure)
_openReplicaSet :: Secs -> (ReplicaSetName, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet timeoutSecs (rsName, seedList, transportSecurity) = do
openReplicaSet' timeoutSecs (rsName, seedList) = do
vMembers <- newMVar (map (, Nothing) seedList)
let rs = ReplicaSet rsName vMembers timeoutSecs transportSecurity
let rs = ReplicaSet rsName vMembers timeoutSecs
_ <- updateMembers rs
return rs
openReplicaSetSRV :: HostName -> IO ReplicaSet
-- ^ Open /non-secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetSRV''' instead.
openReplicaSetSRV hostname = do
timeoutSecs <- readIORef globalConnectTimeout
_openReplicaSetSRV timeoutSecs Unsecure hostname
openReplicaSetSRV' :: HostName -> IO ReplicaSet
-- ^ Open /secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetSRV'''' instead.
--
-- The preferred connection method for cloud MongoDB providers. A typical connecting sequence is shown in the example below.
--
-- ==== __Example__
-- > do
-- > pipe <- openReplicatSetSRV' "cluster#.xxxxx.yyyyy.zzz"
-- > is_auth <- access pipe master "admin" $ auth user_name password
-- > unless is_auth (throwIO $ userError "Authentication failed!")
openReplicaSetSRV' hostname = do
timeoutSecs <- readIORef globalConnectTimeout
_openReplicaSetSRV timeoutSecs Secure hostname
openReplicaSetSRV'' :: Secs -> HostName -> IO ReplicaSet
-- ^ Open /non-secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. Supplied seconds timeout is used for connect attempts to members.
openReplicaSetSRV'' timeoutSecs = _openReplicaSetSRV timeoutSecs Unsecure
openReplicaSetSRV''' :: Secs -> HostName -> IO ReplicaSet
-- ^ Open /secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. Supplied seconds timeout is used for connect attempts to members.
openReplicaSetSRV''' timeoutSecs = _openReplicaSetSRV timeoutSecs Secure
_openReplicaSetSRV :: Secs -> TransportSecurity -> HostName -> IO ReplicaSet
_openReplicaSetSRV timeoutSecs transportSecurity hostname = do
replicaSetName <- lookupReplicaSetName hostname
hosts <- lookupSeedList hostname
case (replicaSetName, hosts) of
(Nothing, _) -> throwError $ userError "Failed to lookup replica set name"
(_, []) -> throwError $ userError "Failed to lookup replica set seedlist"
(Just rsName, _) ->
case transportSecurity of
Secure -> openReplicaSetTLS' timeoutSecs (rsName, hosts)
Unsecure -> openReplicaSet' timeoutSecs (rsName, hosts)
closeReplicaSet :: ReplicaSet -> IO ()
-- ^ Close all connections to replica set
closeReplicaSet (ReplicaSet _ vMembers _ _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
primary :: ReplicaSet -> IO Pipe
-- ^ Return connection to current primary of replica set. Fail if no primary available.
primary rs@(ReplicaSet rsName _ _ _) = do
primary rs@(ReplicaSet rsName _ _) = do
mHost <- statedPrimary <$> updateMembers rs
case mHost of
Just host' -> connection rs Nothing host'
@ -227,7 +172,7 @@ routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO
routedHost f rs = do
info <- updateMembers rs
hosts <- shuffle (possibleHosts info)
let addIsPrimary h = (h, Just h == statedPrimary info)
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
untilSuccess (connection rs Nothing) hosts'
@ -244,7 +189,7 @@ possibleHosts (_, info) = map readHostPort $ at "hosts" info
updateMembers :: ReplicaSet -> IO ReplicaInfo
-- ^ Fetch replica info from any server and update members accordingly
updateMembers rs@(ReplicaSet _ vMembers _ _) = do
updateMembers rs@(ReplicaSet _ vMembers _) = do
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
modifyMVar vMembers $ \members -> do
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
@ -258,7 +203,7 @@ updateMembers rs@(ReplicaSet _ vMembers _ _) = do
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
fetchReplicaInfo rs@(ReplicaSet rsName _ _ _) (host', mPipe) = do
fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
pipe <- connection rs mPipe host'
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
case B.lookup "setName" info of
@ -268,15 +213,11 @@ fetchReplicaInfo rs@(ReplicaSet rsName _ _ _) (host', mPipe) = do
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
connection (ReplicaSet _ vMembers timeoutSecs transportSecurity) mPipe host' =
connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe
where
conn = modifyMVar vMembers $ \members -> do
let (Host h p) = host'
let conn' = case transportSecurity of
Secure -> TLS.connect h p
Unsecure -> connect' timeoutSecs host'
let new = conn' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
case List.lookup host' members of
Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe)
_ -> new

View file

@ -1,191 +0,0 @@
-- Author:
-- Brent Tubbs <brent.tubbs@gmail.com>
-- | MongoDB GridFS implementation
{-# LANGUAGE OverloadedStrings, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, TypeFamilies, CPP, RankNTypes #-}
module Database.MongoDB.GridFS
( Bucket
, files, chunks
, File
, document, bucket
-- ** Setup
, openDefaultBucket
, openBucket
-- ** Query
, findFile
, findOneFile
, fetchFile
-- ** Delete
, deleteFile
-- ** Conduits
, sourceFile
, sinkFile
)
where
import Control.Monad(when)
import Control.Monad.IO.Class
import Control.Monad.Trans(lift)
import Data.Conduit
import Data.Digest.Pure.MD5
import Data.Int
import Data.Tagged(Tagged, untag)
import Data.Text(Text, append)
import Data.Time.Clock(getCurrentTime)
import Database.MongoDB
import Prelude
import qualified Data.Bson as B
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
defaultChunkSize :: Int64
-- ^ The default chunk size is 256 kB
defaultChunkSize = 256 * 1024
-- magic constant for md5Finalize
md5BlockSizeInBytes :: Int
md5BlockSizeInBytes = 64
data Bucket = Bucket {files :: Text, chunks :: Text}
-- ^ Files are stored in "buckets". You open a bucket with openDefaultBucket or openBucket
openDefaultBucket :: (Monad m, MonadIO m) => Action m Bucket
-- ^ Open the default 'Bucket' (named "fs")
openDefaultBucket = openBucket "fs"
openBucket :: (Monad m, MonadIO m) => Text -> Action m Bucket
-- ^ Open a 'Bucket'
openBucket name = do
let filesCollection = name `append` ".files"
let chunksCollection = name `append` ".chunks"
ensureIndex $ index filesCollection ["filename" =: (1::Int), "uploadDate" =: (1::Int)]
ensureIndex $ (index chunksCollection ["files_id" =: (1::Int), "n" =: (1::Int)]) { iUnique = True, iDropDups = True }
return $ Bucket filesCollection chunksCollection
data File = File {bucket :: Bucket, document :: Document}
getChunk :: (MonadFail m, MonadIO m) => File -> Int -> Action m (Maybe S.ByteString)
-- ^ Get a chunk of a file
getChunk (File _bucket doc) i = do
files_id <- B.look "_id" doc
result <- findOne $ select ["files_id" := files_id, "n" =: i] $ chunks _bucket
let content = at "data" <$> result
case content of
Just (Binary b) -> return (Just b)
_ -> return Nothing
findFile :: MonadIO m => Bucket -> Selector -> Action m [File]
-- ^ Find files in the bucket
findFile _bucket sel = do
cursor <- find $ select sel $ files _bucket
results <- rest cursor
return $ File _bucket <$> results
findOneFile :: MonadIO m => Bucket -> Selector -> Action m (Maybe File)
-- ^ Find one file in the bucket
findOneFile _bucket sel = do
mdoc <- findOne $ select sel $ files _bucket
return $ File _bucket <$> mdoc
fetchFile :: MonadIO m => Bucket -> Selector -> Action m File
-- ^ Fetch one file in the bucket
fetchFile _bucket sel = do
doc <- fetch $ select sel $ files _bucket
return $ File _bucket doc
deleteFile :: (MonadIO m, MonadFail m) => File -> Action m ()
-- ^ Delete files in the bucket
deleteFile (File _bucket doc) = do
files_id <- B.look "_id" doc
delete $ select ["_id" := files_id] $ files _bucket
delete $ select ["files_id" := files_id] $ chunks _bucket
putChunk :: (Monad m, MonadIO m) => Bucket -> ObjectId -> Int -> L.ByteString -> Action m ()
-- ^ Put a chunk in the bucket
putChunk _bucket files_id i chunk = do
insert_ (chunks _bucket) ["files_id" =: files_id, "n" =: i, "data" =: Binary (L.toStrict chunk)]
sourceFile :: (MonadFail m, MonadIO m) => File -> ConduitT File S.ByteString (Action m) ()
-- ^ A producer for the contents of a file
sourceFile file = yieldChunk 0 where
yieldChunk i = do
mbytes <- lift $ getChunk file i
case mbytes of
Just bytes -> yield bytes >> yieldChunk (i+1)
Nothing -> return ()
-- Used to keep data during writing
data FileWriter = FileWriter
{ _fwChunkSize :: Int64
, _fwBucket :: Bucket
, _fwFilesId :: ObjectId
, _fwChunkIndex :: Int
, _fwSize :: Int64
, _fwAcc :: L.ByteString
, _fwMd5Context :: MD5Context
, _fwMd5acc :: L.ByteString
}
-- Finalize file, calculating md5 digest, saving the last chunk, and creating the file in the bucket
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
finalizeFile filename (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) = do
let md5digest = finalizeMD5 md5context (L.toStrict md5acc)
when (L.length acc > 0) $ putChunk _bucket files_id i acc
currentTimestamp <- liftIO getCurrentTime
let doc = [ "_id" =: files_id
, "length" =: size
, "uploadDate" =: currentTimestamp
, "md5" =: show md5digest
, "chunkSize" =: chunkSize
, "filename" =: filename
]
insert_ (files _bucket) doc
return $ File _bucket doc
-- finalize the remainder and return the MD5Digest.
finalizeMD5 :: MD5Context -> S.ByteString -> MD5Digest
finalizeMD5 ctx remainder =
md5Finalize ctx2 (S.drop lu remainder) -- can only handle max md5BlockSizeInBytes length
where
l = S.length remainder
r = l `mod` md5BlockSizeInBytes
lu = l - r
ctx2 = md5Update ctx (S.take lu remainder)
-- Write as many chunks as can be written from the file writer
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
writeChunks (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) chunk = do
-- Update md5 context
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
let md5acc_temp = (md5acc `L.append` chunk)
let (md5context', md5acc') =
if (L.length md5acc_temp < md5BlockLength)
then (md5context, md5acc_temp)
else let numBlocks = L.length md5acc_temp `div` md5BlockLength
(current, remainder) = L.splitAt (md5BlockLength * numBlocks) md5acc_temp
in (md5Update md5context (L.toStrict current), remainder)
-- Update chunks
let size' = (size + L.length chunk)
let acc_temp = (acc `L.append` chunk)
if (L.length acc_temp < chunkSize)
then return (FileWriter chunkSize _bucket files_id i size' acc_temp md5context' md5acc')
else do
let (newChunk, acc') = L.splitAt chunkSize acc_temp
putChunk _bucket files_id i newChunk
writeChunks (FileWriter chunkSize _bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> ConduitT S.ByteString () (Action m) File
-- ^ A consumer that creates a file in the bucket and puts all consumed data in it
sinkFile _bucket filename = do
files_id <- liftIO $ genObjectId
awaitChunk $ FileWriter defaultChunkSize _bucket files_id 0 0 L.empty md5InitialContext L.empty
where
awaitChunk fw = do
mchunk <- await
case mchunk of
Nothing -> lift (finalizeFile filename fw)
Just chunk -> lift (writeChunks fw (L.fromStrict chunk)) >>= awaitChunk

View file

@ -1,106 +0,0 @@
-- | Compatibility layer for network package, including newtype 'PortID'
{-# LANGUAGE CPP, OverloadedStrings #-}
module Database.MongoDB.Internal.Network (Host(..), PortID(..), N.HostName, connectTo,
lookupReplicaSetName, lookupSeedList) where
#if !MIN_VERSION_network(2, 9, 0)
import qualified Network as N
import System.IO (Handle)
#else
import Control.Exception (bracketOnError)
import Network.BSD as BSD
import qualified Network.Socket as N
import System.IO (Handle, IOMode(ReadWriteMode))
#endif
import Data.ByteString.Char8 (pack, unpack)
import Data.List (dropWhileEnd)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Network.DNS.Lookup (lookupSRV, lookupTXT)
import Network.DNS.Resolver (defaultResolvConf, makeResolvSeed, withResolver)
import Network.HTTP.Types.URI (parseQueryText)
-- | Wraps network's 'PortNumber'
-- Used to ease compatibility between older and newer network versions.
data PortID = PortNumber N.PortNumber
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
| UnixSocket String
#endif
deriving (Eq, Ord, Show)
#if !MIN_VERSION_network(2, 9, 0)
-- Unwrap our newtype and use network's PortID and connectTo
connectTo :: N.HostName -- Hostname
-> PortID -- Port Identifier
-> IO Handle -- Connected Socket
connectTo hostname (PortNumber port) = N.connectTo hostname (N.PortNumber port)
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
connectTo _ (UnixSocket path) = N.connectTo "" (N.UnixSocket path)
#endif
#else
-- Copied implementation from network 2.8's 'connectTo', but using our 'PortID' newtype.
-- https://github.com/haskell/network/blob/e73f0b96c9da924fe83f3c73488f7e69f712755f/Network.hs#L120-L129
connectTo :: N.HostName -- Hostname
-> PortID -- Port Identifier
-> IO Handle -- Connected Socket
connectTo hostname (PortNumber port) = do
proto <- BSD.getProtocolNumber "tcp"
bracketOnError
(N.socket N.AF_INET N.Stream proto)
N.close -- only done if there's an error
(\sock -> do
he <- BSD.getHostByName hostname
N.connect sock (N.SockAddrInet port (hostAddress he))
N.socketToHandle sock ReadWriteMode
)
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
connectTo _ (UnixSocket path) = do
bracketOnError
(N.socket N.AF_UNIX N.Stream 0)
N.close
(\sock -> do
N.connect sock (N.SockAddrUnix path)
N.socketToHandle sock ReadWriteMode
)
#endif
#endif
-- * Host
data Host = Host N.HostName PortID deriving (Show, Eq, Ord)
lookupReplicaSetName :: N.HostName -> IO (Maybe Text)
-- ^ Retrieves the replica set name from the TXT DNS record for the given hostname
lookupReplicaSetName hostname = do
rs <- makeResolvSeed defaultResolvConf
res <- withResolver rs $ \resolver -> lookupTXT resolver (pack hostname)
case res of
Left _ -> pure Nothing
Right [] -> pure Nothing
Right (x:_) ->
pure $ fromMaybe (Nothing :: Maybe Text) (lookup "replicaSet" $ parseQueryText x)
lookupSeedList :: N.HostName -> IO [Host]
-- ^ Retrieves the replica set seed list from the SRV DNS record for the given hostname
lookupSeedList hostname = do
rs <- makeResolvSeed defaultResolvConf
res <- withResolver rs $ \resolver -> lookupSRV resolver $ pack $ "_mongodb._tcp." ++ hostname
case res of
Left _ -> pure []
Right srv -> pure $ map (\(_, _, por, tar) ->
let tar' = dropWhileEnd (=='.') (unpack tar)
in Host tar' (PortNumber . fromIntegral $ por)) srv

View file

@ -4,8 +4,8 @@
-- This module is not intended for direct use. Use the high-level interface at
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.
{-# LANGUAGE RecordWildCards, OverloadedStrings #-}
{-# LANGUAGE CPP, FlexibleContexts #-}
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
{-# LANGUAGE CPP, FlexibleContexts, TupleSections, TypeSynonymInstances #-}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE BangPatterns #-}
@ -20,43 +20,41 @@
module Database.MongoDB.Internal.Protocol (
FullCollection,
-- * Pipe
Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg,
Pipe, newPipe, newPipeWith, send, call,
-- ** Notice
Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
-- ** Request
Request(..), QueryOption(..), Cmd (..), KillC(..),
Request(..), QueryOption(..),
-- ** Reply
Reply(..), ResponseFlag(..), FlagBit(..),
Reply(..), ResponseFlag(..),
-- * Authentication
Username, Password, Nonce, pwHash, pwKey,
isClosed, close, ServerData(..), Pipeline(..), putOpMsg,
bitOpMsg
isClosed, close, ServerData(..), Pipeline(..)
) where
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>))
#endif
import Control.Monad ( forM, replicateM, unless, forever )
import Data.Binary.Get (Get, runGet, getInt8)
import Data.Binary.Put (Put, runPut, putInt8)
import Data.Bits (bit, testBit, zeroBits)
import Control.Monad (forM, replicateM, unless)
import Data.Binary.Get (Get, runGet)
import Data.Binary.Put (Put, runPut)
import Data.Bits (bit, testBit)
import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle)
import System.IO.Error (doesNotExistErrorType, mkIOError)
import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList, fromJust)
import Data.Maybe (maybeToList)
import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad.STM (atomically)
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)
import Control.Monad (forever)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try)
import Control.Exception.Lifted (onException, throwIO, try)
import qualified Data.ByteString.Lazy as L
import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson (Document, (=:), merge, cast, valueAt, look)
import Data.Bson (Document)
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
putInt64, putCString)
import Data.Text (Text)
@ -68,14 +66,11 @@ import qualified Data.Text.Encoding as TE
import Database.MongoDB.Internal.Util (bitOr, byteStringHex)
import Database.MongoDB.Transport (Transport)
import qualified Database.MongoDB.Transport as Tr
import qualified Database.MongoDB.Transport as T
#if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, mkWeakMVar, isEmptyMVar)
import GHC.List (foldl1')
import Conduit (repeatWhileMC, (.|), runConduit, foldlC)
putMVar, readMVar, mkWeakMVar)
#else
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, addMVarFinalizer)
@ -86,15 +81,13 @@ mkWeakMVar :: MVar a -> IO () -> IO ()
mkWeakMVar = addMVarFinalizer
#endif
-- * Pipeline
-- | Thread-safe and pipelined connection
data Pipeline = Pipeline
{ vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
, responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrives we pop the next thread and give it the response.
, responseQueue :: Chan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
, listenThread :: ThreadId
, finished :: MVar ()
, serverData :: ServerData
}
@ -106,54 +99,25 @@ data ServerData = ServerData
, maxBsonObjectSize :: Int
, maxWriteBatchSize :: Int
}
deriving Show
-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally action and_then =
mask_ $ forkIOWithUnmask $ \unmask ->
try (unmask action) >>= and_then
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline serverData stream = do
vStream <- newMVar stream
responseQueue <- atomically newTChan
finished <- newEmptyMVar
let drainReplies = do
chanEmpty <- atomically $ isEmptyTChan responseQueue
if chanEmpty
then return ()
else do
var <- atomically $ readTChan responseQueue
putMVar var $ Left $ mkIOError
doesNotExistErrorType
"Handle has been closed"
Nothing
Nothing
drainReplies
responseQueue <- newChan
rec
let pipe = Pipeline{..}
listenThread <- forkUnmaskedFinally (listen pipe) $ \_ -> do
putMVar finished ()
drainReplies
listenThread <- forkIO (listen pipe)
_ <- mkWeakMVar vStream $ do
killThread listenThread
Tr.close stream
T.close stream
return pipe
isFinished :: Pipeline -> IO Bool
isFinished Pipeline {finished} = do
empty <- isEmptyMVar finished
return $ not empty
close :: Pipeline -> IO ()
-- ^ Close pipe and underlying connection
close Pipeline{..} = do
killThread listenThread
Tr.close =<< readMVar vStream
T.close =<< readMVar vStream
isClosed :: Pipeline -> IO Bool
isClosed Pipeline{listenThread} = do
@ -163,7 +127,6 @@ isClosed Pipeline{listenThread} = do
ThreadFinished -> True
ThreadBlocked _ -> False
ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: Pipeline -> IO ()
@ -172,10 +135,10 @@ listen Pipeline{..} = do
stream <- readMVar vStream
forever $ do
e <- try $ readMessage stream
var <- atomically $ readTChan responseQueue
var <- readChan responseQueue
putMVar var e
case e of
Left err -> Tr.close stream >> ioError err -- close and stop looping
Left err -> T.close stream >> ioError err -- close and stop looping
Right _ -> return ()
psend :: Pipeline -> Message -> IO ()
@ -183,55 +146,24 @@ psend :: Pipeline -> Message -> IO ()
-- Throw IOError and close pipeline if send fails
psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `onException` close p
psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()-- IO (IO Response)
psendOpMsg p@Pipeline{..} commands flagBit params =
case flagBit of
Just f -> case f of
MoreToCome -> withMVar vStream (\t -> writeOpMsgMessage t (commands, Nothing) flagBit params) `onException` close p -- >> return (return (0, ReplyEmpty))
_ -> error "moreToCome has to be set if no response is expected"
_ -> error "moreToCome has to be set if no response is expected"
pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcall p@Pipeline{..} message = do
listenerStopped <- isFinished p
if listenerStopped
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
else withMVar vStream doCall `onException` close p
where
pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p where
doCall stream = do
writeMessage stream message
var <- newEmptyMVar
liftIO $ atomically $ writeTChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise
pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcallOpMsg p@Pipeline{..} message flagbit params = do
listenerStopped <- isFinished p
if listenerStopped
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
else withMVar vStream doCall `onException` close p
where
doCall stream = do
writeOpMsgMessage stream ([], message) flagbit params
var <- newEmptyMVar
-- put var into the response-queue so that it can
-- fetch the latest response
liftIO $ atomically $ writeTChan responseQueue var
liftIO $ writeChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise
-- * Pipe
type Pipe = Pipeline
-- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe`
-- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary.
-- ^ Thread-safe TCP connection with pipelined requests
newPipe :: ServerData -> Handle -> IO Pipe
-- ^ Create pipe over handle
newPipe sd handle = Tr.fromHandle handle >>= (newPipeWith sd)
newPipe sd handle = T.fromHandle handle >>= (newPipeWith sd)
newPipeWith :: ServerData -> Transport -> IO Pipe
-- ^ Create pipe over connection
@ -241,12 +173,6 @@ send :: Pipe -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send pipe notices = psend pipe (notices, Nothing)
sendOpMsg :: Pipe -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
sendOpMsg pipe commands@(Nc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
sendOpMsg pipe commands@(Kc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
sendOpMsg _ _ _ _ = error "This function only supports Cmd types wrapped in Nc or Kc type constructors"
call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call pipe notices request = do
@ -257,73 +183,11 @@ call pipe notices request = do
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
callOpMsg pipe request flagBit params = do
requestId <- genRequestId
promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params
promise' <- promise :: IO Response
return $ snd <$> produce requestId promise'
where
-- We need to perform streaming here as within the OP_MSG protocol mongoDB expects
-- our client to keep receiving messages after the MoreToCome flagbit was
-- set by the server until our client receives an empty flagbit. After the
-- first MoreToCome flagbit was set the responseTo field in the following
-- headers will reference the cursorId that was set in the previous message.
-- see:
-- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses
checkFlagBit p =
case p of
(_, r) ->
case r of
ReplyOpMsg{..} -> flagBits == [MoreToCome]
-- This is called by functions using the OP_MSG protocol,
-- so this has to be ReplyOpMsg
_ -> error "Impossible"
produce reqId p = runConduit $
case p of
(rt, r) ->
case r of
ReplyOpMsg{..} ->
if flagBits == [MoreToCome]
then yieldResponses .| foldlC mergeResponses p
else return $ (rt, check reqId p)
_ -> error "Impossible" -- see comment above
yieldResponses = repeatWhileMC
(do
var <- newEmptyMVar
liftIO $ atomically $ writeTChan (responseQueue pipe) var
readMVar var >>= either throwIO return :: IO Response
)
checkFlagBit
mergeResponses p@(rt,rep) p' =
case (p, p') of
((_, r), (_, r')) ->
case (r, r') of
(ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do
let (section, section') = (head sec, head sec')
(cur, cur') = (maybe Nothing cast $ look "cursor" section,
maybe Nothing cast $ look "cursor" section')
case (cur, cur') of
(Just doc, Just doc') -> do
let (docs, docs') =
( fromJust $ cast $ valueAt "nextBatch" doc :: [Document]
, fromJust $ cast $ valueAt "nextBatch" doc' :: [Document])
id' = fromJust $ cast $ valueAt "id" doc' :: Int32
(rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++)
-- Since we use this to process moreToCome messages, we
-- know that there will be a nextBatch key in the document
_ -> error "Impossible"
_ -> error "Impossible" -- see comment above
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- * Message
type Message = ([Notice], Maybe (Request, RequestId))
-- ^ A write notice(s) with getLastError request, or just query request.
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
type OpMsgMessage = ([Cmd], Maybe (Request, RequestId))
writeMessage :: Transport -> Message -> IO ()
-- ^ Write message to connection
@ -338,27 +202,8 @@ writeMessage conn (notices, mRequest) = do
let s = runPut $ putRequest request requestId
return $ (lenBytes s) `L.append` s
Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
Tr.flush conn
where
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4)
writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
-- ^ Write message to connection
writeOpMsgMessage conn (notices, mRequest) flagBit params = do
noticeStrings <- forM notices $ \n -> do
requestId <- genRequestId
let s = runPut $ putOpMsg n requestId flagBit params
return $ (lenBytes s) `L.append` s
let requestString = do
(request, requestId) <- mRequest
let s = runPut $ putOpMsg (Req request) requestId flagBit params
return $ (lenBytes s) `L.append` s
Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
Tr.flush conn
T.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
T.flush conn
where
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4)
@ -370,8 +215,8 @@ readMessage :: Transport -> IO Response
-- ^ read response from a connection
readMessage conn = readResp where
readResp = do
len <- fromEnum . decodeSize . L.fromStrict <$> Tr.read conn 4
runGet getReply . L.fromStrict <$> Tr.read conn len
len <- fromEnum . decodeSize . L.fromStrict <$> T.read conn 4
runGet getReply . L.fromStrict <$> T.read conn len
decodeSize = subtract 4 . runGet getInt32
type FullCollection = Text
@ -388,7 +233,6 @@ type ResponseTo = RequestId
genRequestId :: (MonadIO m) => m RequestId
-- ^ Generate fresh request id
{-# NOINLINE genRequestId #-}
genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where
counter :: IORef RequestId
counter = unsafePerformIO (newIORef 0)
@ -403,13 +247,6 @@ putHeader opcode requestId = do
putInt32 0
putInt32 opcode
putOpMsgHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putOpMsgHeader opcode requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read
getHeader = do
@ -484,137 +321,6 @@ putNotice notice requestId = do
putInt32 $ toEnum (length kCursorIds)
mapM_ putInt64 kCursorIds
data KillC = KillC { killCursor :: Notice, kFullCollection:: FullCollection} deriving Show
data Cmd = Nc Notice | Req Request | Kc KillC deriving Show
data FlagBit =
ChecksumPresent -- ^ The message ends with 4 bytes containing a CRC-32C checksum
| MoreToCome -- ^ Another message will follow this one without further action from the receiver.
| ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
deriving (Show, Eq, Enum)
{-
OP_MSG header == 16 byte
+ 4 bytes flagBits
+ 1 byte payload type = 1
+ 1 byte payload type = 2
+ 4 byte size of payload
== 26 bytes opcode overhead
+ X Full command document {insert: "test", writeConcern: {...}}
+ Y command identifier ("documents", "deletes", "updates") ( + \0)
-}
putOpMsg :: Cmd -> RequestId -> Maybe FlagBit -> Document -> Put
putOpMsg cmd requestId flagBit params = do
let biT = maybe zeroBits (bit . bitOpMsg) flagBit:: Int32
putOpMsgHeader opMsgOpcode requestId -- header
case cmd of
Nc n -> case n of
Insert{..} -> do
let (sec0, sec1Size) =
prepSectionInfo
iFullCollection
(Just (iDocuments:: [Document]))
(Nothing:: Maybe Document)
("insert":: Text)
("documents":: Text)
params
putInt32 biT -- flagBit
putInt8 0 -- payload type 0
putDocument sec0 -- payload
putInt8 1 -- payload type 1
putInt32 sec1Size -- size of section
putCString "documents" -- identifier
mapM_ putDocument iDocuments -- payload
Update{..} -> do
let doc = ["q" =: uSelector, "u" =: uUpdater]
(sec0, sec1Size) =
prepSectionInfo
uFullCollection
(Nothing:: Maybe [Document])
(Just doc)
("update":: Text)
("updates":: Text)
params
putInt32 biT
putInt8 0
putDocument sec0
putInt8 1
putInt32 sec1Size
putCString "updates"
putDocument doc
Delete{..} -> do
-- Setting limit to 1 here is ok, since this is only used by deleteOne
let doc = ["q" =: dSelector, "limit" =: (1 :: Int32)]
(sec0, sec1Size) =
prepSectionInfo
dFullCollection
(Nothing:: Maybe [Document])
(Just doc)
("delete":: Text)
("deletes":: Text)
params
putInt32 biT
putInt8 0
putDocument sec0
putInt8 1
putInt32 sec1Size
putCString "deletes"
putDocument doc
_ -> error "The KillCursors command cannot be wrapped into a Nc type constructor. Please use the Kc type constructor"
Req r -> case r of
Query{..} -> do
let n = T.splitOn "." qFullCollection
db = head n
sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector]
putInt32 biT
putInt8 0
putDocument sec0
GetMore{..} -> do
let n = T.splitOn "." gFullCollection
(db, coll) = (head n, last n)
pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize]
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
putInt8 0
putDocument pre
Kc k -> case k of
KillC{..} -> do
let n = T.splitOn "." kFullCollection
(db, coll) = (head n, last n)
case killCursor of
KillCursors{..} -> do
let doc = ["killCursors" =: coll, "cursors" =: kCursorIds, "$db" =: db]
putInt32 biT
putInt8 0
putDocument doc
-- Notices are already captured at the beginning, so all
-- other cases are impossible
_ -> error "impossible"
where
lenBytes bytes = toEnum . fromEnum $ L.length bytes:: Int32
prepSectionInfo fullCollection documents document command identifier ps =
let n = T.splitOn "." fullCollection
(db, coll) = (head n, last n)
in
case documents of
Just ds ->
let
sec0 = merge ps [command =: coll, "$db" =: db]
s = sum $ map (lenBytes . runPut . putDocument) ds
i = runPut $ putCString identifier
-- +4 bytes for the type 1 section size that has to be
-- transported in addition to the type 1 section document
sec1Size = s + lenBytes i + 4
in (sec0, sec1Size)
Nothing ->
let
sec0 = merge ps [command =: coll, "$db" =: db]
s = runPut $ putDocument $ fromJust document
i = runPut $ putCString identifier
sec1Size = lenBytes s + lenBytes i + 4
in (sec0, sec1Size)
iBit :: InsertOption -> Int32
iBit KeepGoing = bit 0
@ -634,11 +340,6 @@ dBit SingleRemove = bit 0
dBits :: [DeleteOption] -> Int32
dBits = bitOr . map dBit
bitOpMsg :: FlagBit -> Int
bitOpMsg ChecksumPresent = 0
bitOpMsg MoreToCome = 1
bitOpMsg ExhaustAllowed = 16
-- ** Request
-- | A request is a message that is sent with a 'Reply' expected in return
@ -648,8 +349,8 @@ data Request =
qFullCollection :: FullCollection,
qSkip :: Int32, -- ^ Number of initial matching documents to skip
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document, -- ^ @[]@ = return all documents in collection
qProjector :: Document -- ^ @[]@ = return whole document
qSelector :: Document, -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ \[\] = return whole document
} | GetMore {
gFullCollection :: FullCollection,
gBatchSize :: Int32,
@ -657,15 +358,13 @@ data Request =
deriving (Show, Eq)
data QueryOption =
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on @CursorNotFound@ exception.
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
| NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
-- Exhaust commented out because not compatible with current `Pipeline` implementation
| Partial -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
| Partial -- ^ Get partial results from a _mongos_ if some shards are down, instead of throwing an error.
deriving (Show, Eq)
-- *** Binary format
@ -674,9 +373,6 @@ qOpcode :: Request -> Opcode
qOpcode Query{} = 2004
qOpcode GetMore{} = 2005
opMsgOpcode :: Opcode
opMsgOpcode = 2013
putRequest :: Request -> RequestId -> Put
putRequest request requestId = do
putHeader (qOpcode request) requestId
@ -700,7 +396,7 @@ qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4
qBit AwaitData = bit 5
--qBit Exhaust = bit 6
qBit Database.MongoDB.Internal.Protocol.Partial = bit 7
qBit Partial = bit 7
qBits :: [QueryOption] -> Int32
qBits = bitOr . map qBit
@ -713,13 +409,7 @@ data Reply = Reply {
rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32,
rDocuments :: [Document]
}
| ReplyOpMsg {
flagBits :: [FlagBit],
sections :: [Document],
checksum :: Maybe Int32
}
deriving (Show, Eq)
} deriving (Show, Eq)
data ResponseFlag =
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
@ -735,38 +425,17 @@ replyOpcode = 1
getReply :: Get (ResponseTo, Reply)
getReply = do
(opcode, responseTo) <- getHeader
if opcode == 2013
then do
-- Notes:
-- Checksum bits that are set by the server don't seem to be supported by official drivers.
-- See: https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/message.py#L1423
flagBits <- rFlagsOpMsg <$> getInt32
_ <- getInt8
sec0 <- getDocument
let sections = [sec0]
checksum = Nothing
return (responseTo, ReplyOpMsg{..})
else do
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
rFlags :: Int32 -> [ResponseFlag]
rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..]
-- See https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#flagbits
rFlagsOpMsg :: Int32 -> [FlagBit]
rFlagsOpMsg bits = isValidFlag bits
where isValidFlag bt =
let setBits = map fst $ filter (\(_,b) -> b == True) $ zip ([0..31] :: [Int32]) $ map (testBit bt) [0 .. 31]
in if any (\n -> not $ elem n [0,1,16]) setBits
then error "Unsopported bit was set"
else filter (testBit bt . bitOpMsg) [ChecksumPresent ..]
rBit :: ResponseFlag -> Int
rBit CursorNotFound = 0
rBit QueryError = 1

View file

@ -1,7 +1,9 @@
-- | Miscellaneous general functions
-- | Miscellaneous general functions and Show, Eq, and Ord instances for PortID
{-# LANGUAGE FlexibleInstances, UndecidableInstances, StandaloneDeriving #-}
{-# LANGUAGE CPP #-}
-- PortID instances
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Database.MongoDB.Internal.Util where
@ -12,19 +14,26 @@ import Control.Exception (handle, throwIO, Exception)
import Control.Monad (liftM, liftM2)
import Data.Bits (Bits, (.|.))
import Data.Word (Word8)
import Network (PortID(..))
import Numeric (showHex)
import System.Random (newStdGen)
import System.Random.Shuffle (shuffle')
import qualified Data.ByteString as S
import Control.Monad.Except (MonadError(..))
import Control.Monad.Error (MonadError(..), Error(..))
import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson
import Data.Text (Text)
import qualified Data.Text as T
#if !MIN_VERSION_network(2, 4, 1)
deriving instance Show PortID
deriving instance Eq PortID
#endif
deriving instance Ord PortID
-- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude
mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a]
mergesortM cmp = mergesortM' cmp . map wrap
@ -56,16 +65,13 @@ shuffle :: [a] -> IO [a]
-- ^ Randomly shuffle items in list
shuffle list = shuffle' list (length list) <$> newStdGen
loop :: Monad m => m (Maybe a) -> m [a]
loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
loop act = act >>= maybe (return []) (\a -> (a :) `liftM` loop act)
loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act)
untilSuccess :: (MonadError e m) => (a -> m b) -> [a] -> m b
untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty.
untilSuccess = untilSuccess' (error "empty untilSuccess")
-- Use 'error' copying behavior in removed 'Control.Monad.Error.Error' instance:
-- instance Error Failure where strMsg = error
-- 'fail' is treated the same as a programming 'error'. In other words, don't use it.
untilSuccess = untilSuccess' (strMsg "empty untilSuccess")
untilSuccess' :: (MonadError e m) => e -> (a -> m b) -> [a] -> m b
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw given error if list is empty

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,6 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
#if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-}
@ -20,44 +21,40 @@ ATTENTION!!! Be aware that this module is highly experimental and is
barely tested. The current implementation doesn't verify server's identity.
It only allows you to connect to a mongodb server using TLS protocol.
-}
module Database.MongoDB.Transport.Tls
( connect
, connectWithTlsParams
)
(connect)
where
import Data.IORef
import Data.Monoid
import qualified Data.ByteString as ByteString
import qualified Data.ByteString.Lazy as Lazy.ByteString
import Data.Default.Class (def)
import Control.Applicative ((<$>))
import Control.Exception (bracketOnError)
import Control.Monad (when, unless)
import System.IO
import Database.MongoDB.Internal.Protocol (Pipe, newPipeWith)
import Database.MongoDB (Pipe)
import Database.MongoDB.Internal.Protocol (newPipeWith)
import Database.MongoDB.Transport (Transport(Transport))
import qualified Database.MongoDB.Transport as T
import System.IO.Error (mkIOError, eofErrorType)
import Database.MongoDB.Internal.Network (connectTo, HostName, PortID)
import Network (connectTo, HostName, PortID)
import qualified Network.TLS as TLS
import qualified Network.TLS.Extra.Cipher as TLS
import Database.MongoDB.Query (access, slaveOk, retrieveServerData)
-- | Connect to mongodb using TLS
connect :: HostName -> PortID -> IO Pipe
connect host port = connectWithTlsParams params host port
where
params = (TLS.defaultParamsClient host "")
{ TLS.clientSupported = def
{ TLS.supportedCiphers = TLS.ciphersuite_default }
, TLS.clientHooks = def
{ TLS.onServerCertificate = \_ _ _ _ -> return [] }
}
connect host port = bracketOnError (connectTo host port) hClose $ \handle -> do
-- | Connect to mongodb using TLS using provided TLS client parameters
connectWithTlsParams :: TLS.ClientParams -> HostName -> PortID -> IO Pipe
connectWithTlsParams clientParams host port = bracketOnError (connectTo host port) hClose $ \handle -> do
context <- TLS.contextNew handle clientParams
let params = (TLS.defaultParamsClient host "")
{ TLS.clientSupported = def
{ TLS.supportedCiphers = TLS.ciphersuite_all}
, TLS.clientHooks = def
{ TLS.onServerCertificate = \_ _ _ _ -> return []}
}
context <- TLS.contextNew handle params
TLS.handshake context
conn <- tlsConnection context

View file

@ -1,7 +1,7 @@
This is the Haskell MongoDB driver (client). [MongoDB](http://www.mongodb.org) is a free, scalable, fast, document database management system. This driver lets you connect to a MongoDB server, and update and query its data. It also lets you do adminstrative tasks, like create an index or look at performance statistics.
[![Join the chat at https://gitter.im/mongodb-haskell/mongodb](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mongodb-haskell/mongodb?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Build Status](https://travis-ci.org/mongodb-haskell/mongodb.svg?branch=master)](https://travis-ci.org/mongodb-haskell/mongodb)
[![Build Status](https://travis-ci.org/mongodb-haskell/mongodb.svg)](https://travis-ci.org/mongodb-haskell/mongodb)
### Documentation
@ -11,68 +11,3 @@ This is the Haskell MongoDB driver (client). [MongoDB](http://www.mongodb.org) i
* [MapReduce example](http://github.com/mongodb-haskell/mongodb/blob/master/doc/map-reduce-example.md)
* [Driver design](https://github.com/mongodb-haskell/mongodb/blob/master/doc/Article1.md)
* [MongoDB DBMS](http://www.mongodb.org)
### Dev Environment
It's important for this library to be tested with various versions of mongodb
server and with different ghc versions. In order to achieve this we use docker
containers and docker-compose. This repository contains two files: docker-compose.yml
and reattach.sh.
Docker compose file describes two containers.
One container is for running mongodb server. If you want a different version of
mongodb server you need to change the tag of mongo image in the
docker-compose.yml. In order to start your mongodb server you need to run:
```
docker-compose up -d mongodb
```
In order to stop your containers without loosing the data inside of it:
```
docker-compose stop mongodb
```
Restart:
```
docker-compose start mongodb
```
If you want to remove the mongodb container and start from scratch then:
```
docker-compose stop mongodb
docker-compose rm mongodb
docker-compose up -d mongodb
```
The other container is for compiling your code. By specifying the tag of the image
you can change the version of ghc you will be using. If you never started this
container then you need:
```
docker-compose run mongodb-haskell
```
It will start the container and mount your working directory to
`/opt/mongodb-haskell` If you exit the bash cli then the conainer will stop.
In order to reattach to an old stopped container you need to run script
`reattach.sh`. If you run `docker-compose run` again it will create another
container and all work made by cabal will be lost. `reattach.sh` is a
workaround for docker-compose's inability to pick up containers that exited.
When you are done with testing you need to run:
```
docker-compose stop mongodb
```
Next time you will need to do:
```
docker-compose start mongodb
reattach.sh
```
It will start your stopped container with mongodb server and pick up the stopped
container with haskell.

0
Setup.lhs Normal file → Executable file
View file

View file

@ -1,4 +1,3 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ExtendedDefaultRules #-}
@ -8,12 +7,7 @@ import Database.MongoDB (Action, Document, Document, Value, access,
close, connect, delete, exclude, find,
host, insertMany, master, project, rest,
select, sort, (=:))
#if (__GLASGOW_HASKELL__ >= 800)
import Control.Monad.IO.Class (liftIO)
#else
import Control.Monad.Trans (liftIO)
#endif
main :: IO ()
main = do

View file

@ -12,10 +12,9 @@ Start a haskell session:
$ ghci
> :set prompt "> "
Import the MongoDB driver library, and set OverloadedStrings so literal strings are converted to UTF-8 automatically and ExtendedDefaultRules for using BSON fields with basic types.
Import the MongoDB driver library, and set OverloadedStrings so literal strings are converted to UTF-8 automatically.
> :set -XOverloadedStrings
> :set -XExtendedDefaultRules
> import Database.MongoDB
### Connecting
@ -36,7 +35,7 @@ A DB read or write operation is called a DB `Action`. A DB Action is a monad so
> access pipe master "test" allCollections
`access` throw `Failure` if there is any issue. Failure means there was a connection failure, or a read/write failure like cursor expired or duplicate key insert.
`access` return either Left `Failure` or Right result. Failure means there was a connection failure, or a read/write failure like cursor expired or duplicate key insert.
`master` is an `AccessMode`. Access mode indicates how reads and writes will be performed. Its three modes are: `ReadStaleOk`, `UnconfirmedWrites`, and `ConfirmWrites GetLastErrorParams`. `master` is just short hand for `ConfirmWrites []`. The first mode may be used against a slave or a master server, the last two must be used against a master server.

View file

@ -1,16 +0,0 @@
version: '3'
services:
mongodb:
ports:
- 27017:27017
image: mongo:3.6
mongodb-haskell:
image: phadej/ghc:8.0.2
environment:
- HASKELL_MONGODB_TEST_HOST=mongodb
entrypoint:
- /bin/bash
volumes:
- ./:/opt/mongodb-haskell
# vim: ts=2 et sw=2 ai

View file

@ -1,5 +1,5 @@
Name: mongoDB
Version: 2.7.1.2
Version: 2.1.0
Synopsis: Driver (client) for MongoDB, a free, scalable, fast, document
DBMS
Description: This package lets you connect to MongoDB servers and
@ -10,74 +10,50 @@ Category: Database
Homepage: https://github.com/mongodb-haskell/mongodb
Bug-reports: https://github.com/mongodb-haskell/mongodb/issues
Author: Tony Hannan
Maintainer: Victor Denisov <denisovenator@gmail.com>
Maintainer: Fedor Gogolev <knsd@knsd.net>
Copyright: Copyright (c) 2010-2012 10gen Inc.
License: Apache-2.0
License: OtherLicense
License-file: LICENSE
Cabal-version: >= 1.10
Build-type: Simple
Stability: alpha
Extra-Source-Files: CHANGELOG.md
-- Imitated from https://github.com/mongodb-haskell/bson/pull/18
Flag _old-network
description: Control whether to use <http://hackage.haskell.org/package/network-bsd network-bsd>
manual: False
Library
GHC-options: -Wall
GHC-prof-options: -auto-all-exported
default-language: Haskell2010
Build-depends: array -any
, base <5
, binary -any
, bson >= 0.3 && < 0.5
, bson >= 0.3 && < 0.4
, text
, bytestring -any
, containers -any
, conduit
, conduit-extra
, mtl >= 2
, cryptohash -any
, network -any
, parsec -any
, random -any
, random-shuffle -any
, resourcet
, monad-control >= 0.3.1
, lifted-base >= 0.1.0.3
, pureMD5
, stm
, tagged
, tls >= 1.3.0
, time
, tls >= 1.2.0
, data-default-class -any
, transformers
, transformers-base >= 0.4.1
, hashtables >= 1.1.2.0
, base16-bytestring >= 0.1.1.6
, base64-bytestring >= 1.0.0.1
, nonce >= 1.0.5
, fail
, dns
, http-types
if flag(_old-network)
-- "Network.BSD" is only available in network < 2.9
build-depends: network < 2.9
else
-- "Network.BSD" has been moved into its own package `network-bsd`
build-depends: network >= 3.0
, network-bsd >= 2.7 && < 2.9
, nonce >= 1.0.2
Exposed-modules: Database.MongoDB
Database.MongoDB.Admin
Database.MongoDB.Connection
Database.MongoDB.GridFS
Database.MongoDB.Query
Database.MongoDB.Transport
Database.MongoDB.Transport.Tls
Other-modules: Database.MongoDB.Internal.Network
Database.MongoDB.Internal.Protocol
Other-modules: Database.MongoDB.Internal.Protocol
Database.MongoDB.Internal.Util
Source-repository head
@ -87,9 +63,6 @@ Source-repository head
test-suite test
hs-source-dirs: test
main-is: Main.hs
other-modules: Spec
, QuerySpec
, TestImport
ghc-options: -Wall -with-rtsopts "-K64m"
type: exitcode-stdio-1.0
build-depends: mongoDB
@ -114,15 +87,14 @@ Benchmark bench
, base64-bytestring
, base16-bytestring
, binary -any
, bson >= 0.3 && < 0.5
, data-default-class -any
, bson >= 0.3 && < 0.4
, text
, bytestring -any
, containers -any
, mtl >= 2
, cryptohash -any
, nonce >= 1.0.5
, stm
, network -any
, nonce
, parsec -any
, random -any
, random-shuffle -any
@ -130,19 +102,6 @@ Benchmark bench
, lifted-base >= 0.1.0.3
, transformers-base >= 0.4.1
, hashtables >= 1.1.2.0
, fail
, dns
, http-types
, criterion
, tls >= 1.3.0
if flag(_old-network)
-- "Network.BSD" is only available in network < 2.9
build-depends: network < 2.9
else
-- "Network.BSD" has been moved into its own package `network-bsd`
build-depends: network >= 3.0
, network-bsd >= 2.7 && < 2.9
default-language: Haskell2010
default-extensions: OverloadedStrings

View file

@ -1,3 +0,0 @@
#!/bin/bash
docker start -ai mongodb_mongodb-haskell_run_1

View file

@ -1,4 +0,0 @@
resolver: lts-9.21
flags:
mongoDB:
_old-network: true

View file

@ -1,4 +0,0 @@
resolver: lts-11.22
flags:
mongoDB:
_old-network: true

View file

@ -1,4 +0,0 @@
resolver: lts-12.26
flags:
mongoDB:
_old-network: true

View file

@ -1,6 +0,0 @@
resolver: lts-13.23
extra-deps:
- git: git@github.com:hvr/bson.git # https://github.com/mongodb-haskell/bson/pull/18
commit: 2fc8d04120c0758201762b8e22254aeb6d574f41
- network-bsd-2.8.1.0
- network-3.1.0.0

View file

@ -1,4 +0,0 @@
resolver: lts-13.23
flags:
mongoDB:
_old-network: true

View file

@ -1,69 +0,0 @@
# This file was automatically generated by 'stack init'
#
# Some commonly used options have been documented as comments in this file.
# For advanced use and comprehensive documentation of the format, please see:
# https://docs.haskellstack.org/en/stable/yaml_configuration/
# Resolver to choose a 'specific' stackage snapshot or a compiler version.
# A snapshot resolver dictates the compiler version and the set of packages
# to be used for project dependencies. For example:
#
# resolver: lts-3.5
# resolver: nightly-2015-09-21
# resolver: ghc-7.10.2
#
# The location of a snapshot can be provided as a file or url. Stack assumes
# a snapshot provided as a file might change, whereas a url resource does not.
#
# resolver: ./custom-snapshot.yaml
# resolver: https://example.com/snapshots/2018-01-01.yaml
resolver:
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml
# User packages to be built.
# Various formats can be used as shown in the example below.
#
# packages:
# - some-directory
# - https://example.com/foo/bar/baz-0.0.2.tar.gz
# subdirs:
# - auto-update
# - wai
packages:
- .
# Dependency packages to be pulled from upstream that are not in the resolver.
# These entries can reference officially published versions as well as
# forks / in-progress versions pinned to a git hash. For example:
#
# extra-deps:
# - acme-missiles-0.3
# - git: https://github.com/commercialhaskell/stack.git
# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a
#
# extra-deps: []
# Override default flag values for local packages and extra-deps
flags:
mongoDB:
_old-network: false
# Extra package databases containing global packages
# extra-package-dbs: []
# Control whether we use the GHC we find on the path
# system-ghc: true
#
# Require a specific version of stack, using version ranges
# require-stack-version: -any # Default
# require-stack-version: ">=2.7"
#
# Override the architecture used by stack, especially useful on Windows
# arch: i386
# arch: x86_64
#
# Extra directories used by stack for building
# extra-include-dirs: [/path/to/dir]
# extra-lib-dirs: [/path/to/dir]
#
# Allow a newer minor version of GHC than the snapshot specifies
# compiler-check: newer-minor

View file

@ -1,13 +0,0 @@
# This file was autogenerated by Stack.
# You should not edit this file by hand.
# For more information, please see the documentation at:
# https://docs.haskellstack.org/en/stable/lock_files
packages: []
snapshots:
- completed:
size: 586110
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml
sha256: ce4fb8d44f3c6c6032060a02e0ebb1bd29937c9a70101c1517b92a87d9515160
original:
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml

View file

@ -5,15 +5,11 @@ import Database.MongoDB.Connection (connect, host)
import Database.MongoDB.Query (access, slaveOk)
import Data.Text (unpack)
import Test.Hspec.Runner
import System.Environment (getEnv)
import System.IO.Error (catchIOError)
import TestImport
import qualified Spec
main :: IO ()
main = do
mongodbHost <- getEnv mongodbHostEnvVariable `catchIOError` (\_ -> return "localhost")
p <- connect $ host mongodbHost
p <- connect $ host "localhost"
version <- access p slaveOk "admin" serverVersion
putStrLn $ "Running tests with mongodb version: " ++ (unpack version)
hspecWith defaultConfig Spec.spec

View file

@ -5,9 +5,7 @@ module QuerySpec (spec) where
import Data.String (IsString(..))
import TestImport
import Control.Exception
import Control.Monad (forM_, when)
import System.Environment (getEnv)
import System.IO.Error (catchIOError)
import Control.Monad (forM_)
import qualified Data.List as L
import qualified Data.Text as T
@ -17,17 +15,11 @@ testDBName = "mongodb-haskell-test"
db :: Action IO a -> IO a
db action = do
mongodbHost <- getEnv mongodbHostEnvVariable `catchIOError` (\_ -> return "localhost")
pipe <- connect (host mongodbHost)
pipe <- connect (host "127.0.0.1")
result <- access pipe master testDBName action
close pipe
return result
getWireVersion :: IO Int
getWireVersion = db $ do
sd <- retrieveServerData
return $ maxWireVersion sd
withCleanDatabase :: ActionWith () -> IO ()
withCleanDatabase action = dropDB >> action () >> dropDB >> return ()
where
@ -43,21 +35,6 @@ insertDuplicateWith testInsert = do
]
return ()
insertUsers :: IO ()
insertUsers = db $
insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]
, ["_id" =: "joe", "joined" =: parseDate "2012-07-02", "likes" =: ["tennis", "golf", "swimming"]]
, ["_id" =: "jill", "joined" =: parseDate "2013-11-17", "likes" =: ["cricket", "golf"]]
]
pendingIfMongoVersion :: ((Integer, Integer) -> Bool) -> SpecWith () -> Spec
pendingIfMongoVersion invalidVersion = before $ do
version <- db $ extractVersion . T.splitOn "." . at "version" <$> runCommand1 "buildinfo"
when (invalidVersion version) $ pendingWith "This test does not run in the current database version"
where
extractVersion (major:minor:_) = (read $ T.unpack major, read $ T.unpack minor)
extractVersion _ = error "Invalid version specification"
bigDocument :: Document
bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
@ -130,8 +107,8 @@ spec = around withCleanDatabase $ do
describe "insertAll" $ do
it "inserts documents to the collection and returns their _ids" $ do
(_id1:_id2:_) <- db $ insertAll "team" [ ["name" =: "Yankees", "league" =: "American"]
, ["name" =: "Dodgers", "league" =: "American"]
]
, ["name" =: "Dodgers", "league" =: "American"]
]
result <- db $ rest =<< find (select [] "team")
result `shouldBe` [["_id" =: _id1, "name" =: "Yankees", "league" =: "American"]
,["_id" =: _id2, "name" =: "Dodgers", "league" =: "American"]
@ -191,7 +168,7 @@ spec = around withCleanDatabase $ do
liftIO $ (length returnedDocs) `shouldBe` 1000
it "skips one too big document" $ do
(db $ insertAll_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException
db $ insertAll_ "hugeDocCollection" [hugeDocument]
db $ do
cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000}
returnedDocs <- rest cur
@ -212,146 +189,107 @@ spec = around withCleanDatabase $ do
describe "updateMany" $ do
it "updates value" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
result <- db $ rest =<< find (select [] "team")
result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]]
_ <- db $ updateMany "team" [([ "_id" =: _id]
, ["$set" =: ["league" =: "European"]]
, [])]
updatedResult <- db $ rest =<< find (select [] "team")
updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]]
_id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
result <- db $ rest =<< find (select [] "team")
result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]]
_ <- db $ updateMany "team" [([ "_id" =: _id]
, ["$set" =: ["league" =: "European"]]
, [])]
updatedResult <- db $ rest =<< find (select [] "team")
updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]]
it "upserts value" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
c <- db $ count (select [] "team")
c `shouldBe` 0
_ <- db $ updateMany "team" [( []
, ["name" =: "Giants", "league" =: "MLB"]
, [Upsert]
)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]]
c <- db $ count (select [] "team")
c `shouldBe` 0
_ <- db $ updateMany "team" [( []
, ["name" =: "Giants", "league" =: "MLB"]
, [Upsert]
)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]]
it "updates all documents with Multi enabled" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
, ["$set" =: ["league" =: "MLB"]]
, [MultiUpdate]
)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
, ["league" =: "MLB", "name" =: "Yankees"]
]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
, ["$set" =: ["league" =: "MLB"]]
, [MultiUpdate]
)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
, ["league" =: "MLB", "name" =: "Yankees"]
]
it "updates one document when there is no Multi option" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
, ["$set" =: ["league" =: "MLB"]]
, []
)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
, ["league" =: "MiLB", "name" =: "Yankees"]
]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
, ["$set" =: ["league" =: "MLB"]]
, []
)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
, ["league" =: "MiLB", "name" =: "Yankees"]
]
it "can process different updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"]
_ <- db $ updateMany "team" [ ( ["name" =: "Yankees"]
, ["$set" =: ["league" =: "MiLB"]]
, []
)
, ( ["name" =: "Giants"]
, ["$set" =: ["league" =: "MLB"]]
, []
)
]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB" , "name" =: "Giants"]
, ["league" =: "MiLB", "name" =: "Yankees"]
]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"]
_ <- db $ updateMany "team" [ ( ["name" =: "Yankees"]
, ["$set" =: ["league" =: "MiLB"]]
, []
)
, ( ["name" =: "Giants"]
, ["$set" =: ["league" =: "MLB"]]
, []
)
]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB" , "name" =: "Giants"]
, ["league" =: "MiLB", "name" =: "Yankees"]
]
it "can process different updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
updateResult <- (db $ updateMany "team" [ ( ["name" =: "Yankees"]
, ["$inc" =: ["score" =: (1 :: Int)]]
, []
)
, ( ["name" =: "Giants"]
, ["$inc" =: ["score" =: (2 :: Int)]]
, []
)
])
failed updateResult `shouldBe` True
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)]
]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
(db $ updateMany "team" [ ( ["name" =: "Yankees"]
, ["$inc" =: ["score" =: (1 :: Int)]]
, []
)
, ( ["name" =: "Giants"]
, ["$inc" =: ["score" =: (2 :: Int)]]
, []
)
]) `shouldThrow` anyException
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)]
]
it "can handle big updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
let docs = (flip map) [0..20000] $ \i ->
["name" =: (T.pack $ "name " ++ (show i))]
ids <- db $ insertAll "bigCollection" docs
let updateDocs = (flip map) ids (\i -> ( [ "_id" =: i]
, ["$set" =: ["name" =: ("name " ++ (show i))]]
, []
))
_ <- db $ updateMany "team" updateDocs
updatedResult <- db $ rest =<< find (select [] "team")
forM_ updatedResult $ \r -> let (i :: ObjectId) = "_id" `at` r
in (("name" `at` r) :: String) `shouldBe` ("name" ++ (show i))
let docs = (flip map) [0..20000] $ \i ->
["name" =: (T.pack $ "name " ++ (show i))]
ids <- db $ insertAll "bigCollection" docs
let updateDocs = (flip map) ids (\i -> ( [ "_id" =: i]
, ["$set" =: ["name" =: ("name " ++ (show i))]]
, []
))
_ <- db $ updateMany "team" updateDocs
updatedResult <- db $ rest =<< find (select [] "team")
forM_ updatedResult $ \r -> let (i :: ObjectId) = "_id" `at` r
in (("name" `at` r) :: String) `shouldBe` ("name" ++ (show i))
describe "updateAll" $ do
it "can process different updates" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
updateResult <- (db $ updateAll "team" [ ( ["name" =: "Yankees"]
, ["$inc" =: ["score" =: (1 :: Int)]]
, []
)
, ( ["name" =: "Giants"]
, ["$inc" =: ["score" =: (2 :: Int)]]
, []
)
])
failed updateResult `shouldBe` True
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)]
]
it "returns correct number of matched and modified" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [MultiUpdate])]
nMatched res `shouldBe` 2
nModified res `shouldBe` (Just 2)
it "returns correct number of upserted" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myfield" =: "newValue"]], [Upsert])]
(length $ upserted res) `shouldBe` 1
it "updates only one doc without multi update" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [])]
nMatched res `shouldBe` 1
nModified res `shouldBe` (Just 1)
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
(db $ updateAll "team" [ ( ["name" =: "Yankees"]
, ["$inc" =: ["score" =: (1 :: Int)]]
, []
)
, ( ["name" =: "Giants"]
, ["$inc" =: ["score" =: (2 :: Int)]]
, []
)
]) `shouldThrow` anyException
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)]
]
describe "delete" $ do
it "actually deletes something" $ do
@ -393,47 +331,34 @@ spec = around withCleanDatabase $ do
describe "deleteMany" $ do
it "actually deletes something" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" ["name" =: ("Giants" :: String)]
_ <- db $ insert "team" ["name" =: ("Yankees" :: String)]
_ <- db $ deleteMany "team" [ (["name" =: ("Giants" :: String)], [])
, (["name" =: ("Yankees" :: String)], [])
]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
_ <- db $ insert "team" ["name" =: ("Giants" :: String)]
_ <- db $ insert "team" ["name" =: ("Yankees" :: String)]
_ <- db $ deleteMany "team" [ (["name" =: ("Giants" :: String)], [])
, (["name" =: ("Yankees" :: String)], [])
]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
describe "deleteAll" $ do
it "actually deletes something" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "team" [ "name" =: ("Giants" :: String)
, "score" =: (Nothing :: Maybe Int)
]
_ <- db $ insert "team" [ "name" =: ("Yankees" :: String)
, "score" =: (1 :: Int)
]
_ <- db $ deleteAll "team" [ (["name" =: ("Giants" :: String)], [])
, (["name" =: ("Yankees" :: String)], [])
]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
_ <- db $ insert "team" [ "name" =: ("Giants" :: String)
, "score" =: (Nothing :: Maybe Int)
]
_ <- db $ insert "team" [ "name" =: ("Yankees" :: String)
, "score" =: (1 :: Int)
]
_ <- db $ deleteAll "team" [ (["name" =: ("Giants" :: String)], [])
, (["name" =: ("Yankees" :: String)], [])
]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
it "can handle big deletes" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
let docs = (flip map) [0..20000] $ \i ->
["name" =: (T.pack $ "name " ++ (show i))]
_ <- db $ insertAll "bigCollection" docs
_ <- db $ deleteAll "bigCollection" $ map (\d -> (d, [])) docs
updatedResult <- db $ rest =<< find ((select [] "bigCollection") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
it "returns correct result" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
res <- db $ deleteAll "testCollection" [ (["myField" =: "myValue"], []) ]
nRemoved res `shouldBe` 2
let docs = (flip map) [0..20000] $ \i ->
["name" =: (T.pack $ "name " ++ (show i))]
_ <- db $ insertAll "bigCollection" docs
_ <- db $ deleteAll "bigCollection" $ map (\d -> (d, [])) docs
updatedResult <- db $ rest =<< find ((select [] "bigCollection") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
describe "allCollections" $ do
it "returns all collections in a database" $ do
@ -443,34 +368,13 @@ spec = around withCleanDatabase $ do
collections <- db $ allCollections
liftIO $ (L.sort collections) `shouldContain` ["team1", "team2", "team3"]
describe "aggregate" $ before_ insertUsers $
describe "aggregate" $ do
it "aggregates to normalize and sort documents" $ do
db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]
, ["_id" =: "joe", "joined" =: parseDate "2012-07-02", "likes" =: ["tennis", "golf", "swimming"]]
, ["_id" =: "jill", "joined" =: parseDate "2013-11-17", "likes" =: ["cricket", "golf"]]
]
result <- db $ aggregate "users" [ ["$project" =: ["name" =: ["$toUpper" =: "$_id"], "_id" =: 0]]
, ["$sort" =: ["name" =: 1]]
]
result `shouldBe` [["name" =: "JANE"], ["name" =: "JILL"], ["name" =: "JOE"]]
-- This feature was introduced in MongoDB version 3.2
-- https://docs.mongodb.com/manual/reference/command/find/
describe "findCommand" $ pendingIfMongoVersion (< (3,2)) $
context "when mongo version is 3.2 or superior" $ before insertUsers $ do
it "fetches all the records" $ do
result <- db $ rest =<< findCommand (select [] "users")
length result `shouldBe` 3
it "filters the records" $ do
result <- db $ rest =<< findCommand (select ["_id" =: "joe"] "users")
length result `shouldBe` 1
it "projects the records" $ do
result <- db $ rest =<< findCommand
(select [] "users") { project = [ "_id" =: 1 ] }
result `shouldBe` [["_id" =: "jane"], ["_id" =: "joe"], ["_id" =: "jill"]]
it "sorts the records" $ do
result <- db $ rest =<< findCommand
(select [] "users") { project = [ "_id" =: 1 ]
, sort = [ "_id" =: 1 ]
}
result `shouldBe` [["_id" =: "jane"], ["_id" =: "jill"], ["_id" =: "joe"]]

View file

@ -8,6 +8,7 @@ module TestImport (
import Test.Hspec as Export hiding (Selector)
import Database.MongoDB as Export
import Control.Monad.Trans as Export (MonadIO, liftIO)
import Data.Maybe (fromJust)
import Data.Time (ParseTime, UTCTime)
import qualified Data.Time as Time
@ -17,7 +18,6 @@ import qualified Data.Time as Time
import Data.Time.Format (defaultTimeLocale, iso8601DateFormat)
#else
import System.Locale (defaultTimeLocale, iso8601DateFormat)
import Data.Maybe (fromJust)
#endif
parseTime :: ParseTime t => String -> String -> t
@ -32,6 +32,3 @@ parseDate = parseTime (iso8601DateFormat Nothing)
parseDateTime :: String -> UTCTime
parseDateTime = parseTime (iso8601DateFormat (Just "%H:%M:%S"))
mongodbHostEnvVariable :: String
mongodbHostEnvVariable = "HASKELL_MONGODB_TEST_HOST"