Compare commits

..

1 commit

Author SHA1 Message Date
Greg Weber
71111d45f1 run against the persistent-mongoDB test suite 2016-06-22 00:29:04 -07:00
29 changed files with 746 additions and 2583 deletions

3
.gitignore vendored
View file

@ -1,6 +1,3 @@
dist/ dist/
cabal.sandbox.config cabal.sandbox.config
.cabal-sandbox/ .cabal-sandbox/
.stack-work/
dist-newstyle/*
!dist-newstyle/config

View file

@ -1,25 +1,26 @@
# See https://github.com/hvr/multi-ghc-travis for more information. # See https://github.com/hvr/multi-ghc-travis for more information.
sudo: required
services:
- docker
env: env:
# We use CABALVER=1.22 everywhere because it uses the flag --enable-coverage # We use CABALVER=1.22 everywhere because it uses the flag --enable-coverage
# instead of --enable-library-coverage used by older versions. # instead of --enable-library-coverage used by older versions.
#- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.6.12 - GHCVER=7.6.3 CABALVER=1.22 MONGO=2.4.14
#- GHCVER=7.10.3 CABALVER=1.22 MONGO=2.6.12 - GHCVER=7.8.4 CABALVER=1.22 MONGO=2.4.14
#- GHCVER=8.0.2 CABALVER=1.24 MONGO=2.6.12 - GHCVER=7.10.1 CABALVER=1.22 MONGO=2.4.14
- GHCVER=8.4.2 CABALVER=2.2 MONGO=3.6 STACKAGE=nightly - GHCVER=7.6.3 CABALVER=1.22 MONGO=2.6.12
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-11.6 - GHCVER=7.8.4 CABALVER=1.22 MONGO=2.6.12
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-9.21 - GHCVER=7.10.1 CABALVER=1.22 MONGO=2.6.12
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.0 STACKAGE=lts-11.6 - GHCVER=7.6.3 CABALVER=1.22 MONGO=3.0.12
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.0 STACKAGE=lts-9.21 - GHCVER=7.8.4 CABALVER=1.22 MONGO=3.0.12
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-11.6 - GHCVER=7.10.1 CABALVER=1.22 MONGO=3.0.12
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-9.21 - GHCVER=7.6.3 CABALVER=1.22 MONGO=3.2.6
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.2 STACKAGE=lts-11.6 - GHCVER=7.8.4 CABALVER=1.22 MONGO=3.2.6
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.2 STACKAGE=lts-9.21 - GHCVER=7.10.1 CABALVER=1.22 MONGO=3.2.6
- GHCVER=head CABALVER=head MONGO=3.2.6
matrix:
allow_failures:
# The text here should match the last line above exactly.
- env: GHCVER=head CABALVER=head MONGO=3.2.6
before_install: before_install:
@ -28,38 +29,29 @@ before_install:
- travis_retry sudo apt-get install cabal-install-$CABALVER ghc-$GHCVER - travis_retry sudo apt-get install cabal-install-$CABALVER ghc-$GHCVER
- export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH - export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH
- cabal --version - cabal --version
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927 - sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10 - sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6 - echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.4 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.4.list - echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list - echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list - sudo apt-get update
#- echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list - if [[ ${MONGO:0:3} == "2.4" ]]; then sudo apt-get install mongodb-10gen=$MONGO; else sudo apt-get install -y mongodb-org=$MONGO mongodb-org-server=$MONGO mongodb-org-shell=$MONGO mongodb-org-tools=$MONGO; fi
#- sudo apt-get update - ls /etc/init.d
#- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo apt-get install mongodb-10gen=$MONGO; else sudo apt-get install --allow-downgrades -y mongodb-org=$MONGO mongodb-org-server=$MONGO mongodb-org-shell=$MONGO mongodb-org-tools=$MONGO; fi - if [[ ${MONGO:0:3} == "2.4" ]]; then sudo service mongodb start; fi
#- ls /etc/init.d - sleep 15 #mongo may not be responded directly. See http://docs.travis-ci.com/user/database-setup/#MongoDB
#- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo service mongodb start; fi - ps axf | grep mongo
#- sudo service --status-all - netstat -apn
#- sudo service mongod start - mongo --version
#- sleep 15 #mongo may not be responded directly. See http://docs.travis-ci.com/user/database-setup/#MongoDB
#- ps axf | grep mongo
#- sudo netstat -apn
#- mongo --version
- sudo docker pull mongo:$MONGO
- sudo docker run -d -p 27017:27017 mongo:$MONGO
install: install:
- travis_retry cabal update - travis_retry cabal update
# Install the combined dependencies for this package and all other packages # Install the combined dependencies for this package and all other packages
# needed to reduce conflicts. # needed to reduce conflicts.
- cabal sandbox init - cabal sandbox init
- wget https://www.stackage.org/$STACKAGE/cabal.config - cabal install --only-dependencies --enable-tests
- sed -e '/mongoDB/d' cabal.config > cabal.config.new
- mv cabal.config.new cabal.config
- cabal install --only-dependencies --enable-tests --enable-benchmarks --force-reinstalls
script: script:
- cabal configure --enable-tests -v2 --enable-benchmarks - cabal configure --enable-tests -v2
- cabal build - cabal build
# cabal test fails due a to hpc error. Using run-cabal-test instead. # cabal test fails due a to hpc error. Using run-cabal-test instead.
# - cabal test --show-details=always # - cabal test --show-details=always
@ -77,25 +69,9 @@ script:
echo "expected '$SRC_TGZ' not found"; echo "expected '$SRC_TGZ' not found";
exit 1; exit 1;
fi fi
- git clone https://github.com/yesodweb/persistent
jobs: - cd persistent
include: - "cabal install ./persistent ./persistent-mongoDB --only-dependencies"
- stage: deploy - cd persistent-test
env: GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6 - cabal install -fmongodb --enable-tests
before_install: - "cabal configure -fmongodb --enable-tests && cabal test"
- travis_retry sudo add-apt-repository -y ppa:hvr/ghc
- travis_retry sudo apt-get update
- travis_retry sudo apt-get install cabal-install-$CABALVER
- export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH
- cabal --version
install: skip
script: skip
deploy:
provider: hackage
username: VictorDenisov
password:
secure: DPYlqRN09gFp06paLj9bRBzpxTkqkZzfsTrU3j0WiqRzqUMeWEeiZNAkIE/maC9xrEuuYBTk0KlSdz+esF4kjfyRQFTxb9CvXrZ474qHozVLC01vh/av5bGZBDQOwgzJrJNVpfl+g+EADOicz9/nhPXiAd7nCQIv/2s/xM1Yj1U=
on:
repo: mongodb-haskell/mongodb
tags: true

View file

@ -2,117 +2,6 @@
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Package_versioning_policy). This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Package_versioning_policy).
* Get rid of `MonadFail` constraints in `Database.MongoDB.Query`
## [2.7.1.2] - 2022-10-26
### Added
- Support of OP_MSG protocol
- Clarifications in the documentation
- Allow optional TLS parameters
## [2.7.1.1] - 2021-06-14
### Fixed
- Sample code
### Added
- Clarification to the tutorial
## [2.7.1.0] - 2020-08-17
### Added
- Function findCommand
## [2.7.0.1] - 2020-04-07
### Fixed
- Error reporting for deletion of big messages
- Formatting of docs
## [2.7.0.0] - 2020-02-08
### Fixed
- Upgraded bson to compile with GHC 8.8
## [2.6.0.1] - 2020-02-01
### Fixed
- Parsing hostname with underscores in readHostPortM.
## [2.6.0.0] - 2020-01-03
### Added
- MonadFail. It's a standard for newer versions of Haskell,
- Open replica sets over tls.
### Fixed
- Support for unix domain socket connection,
- Stubborn listener threads.
## [2.5.0.0] - 2019-06-14
### Fixed
Compatibility with network 3.0 package
## [2.4.0.1] - 2019-03-03
### Fixed
Doc for modify method
## [2.4.0.0] - 2018-05-03
### Fixed
- GHC 8.4 compatibility. isEmptyChan is not available in base 4.11 anymore.
## [2.3.0.5] - 2018-03-15
### Fixed
- Resource leak in SCRAM authentication
## [2.3.0.4] - 2018-02-11
### Fixed
- Benchmark's build
## [2.3.0.3] - 2018-02-10
### Fixed
- aggregate requires cursor in mongo 3.6
## [2.3.0.2] - 2018-01-28
### Fixed
- Uploading files with GridFS
## [2.3.0.1] - 2017-12-28
### Removed
- Log output that littered stdout in modify many commands.
## [2.3.0] - 2017-05-31
### Changed
- Description of access function
- Lift MonadBaseControl restriction
- Update and delete results are squashed into one WriteResult type
- Functions insertMany, updateMany, deleteMany are rewritten to properly report
various errors
## [2.2.0] - 2017-04-08
### Added
- GridFS implementation
### Fixed
- Write functions hang when the connection is lost.
## [2.1.1] - 2016-08-13
### Changed
- Interfaces of update and delete functions. They don't require MonadBaseControl
anymore.
## [2.1.0] - 2016-06-21 ## [2.1.0] - 2016-06-21
### Added ### Added

View file

@ -1,54 +1,42 @@
-- | {- |
-- Client interface to MongoDB database management system. Client interface to MongoDB database management system.
--
-- Simple example below. Simple example below. Use with language extensions /OverloadedStrings/ & /ExtendedDefaultRules/.
--
-- @
-- {\-\# LANGUAGE OverloadedStrings \#\-} > import Database.MongoDB
-- {\-\# LANGUAGE ExtendedDefaultRules \#\-} > import Control.Monad.Trans (liftIO)
-- >
-- import Database.MongoDB > main = do
-- import Control.Monad.Trans (liftIO) > pipe <- connect (host "127.0.0.1")
-- > e <- access pipe master "baseball" run
-- main :: IO () > close pipe
-- main = do > print e
-- pipe <- connect (host \"127.0.0.1\") >
-- e <- access pipe master \"baseball\" run > run = do
-- close pipe > clearTeams
-- print e > insertTeams
-- > allTeams >>= printDocs "All Teams"
-- run :: Action IO () > nationalLeagueTeams >>= printDocs "National League Teams"
-- run = do > newYorkTeams >>= printDocs "New York Teams"
-- clearTeams >
-- insertTeams > clearTeams = delete (select [] "team")
-- allTeams >>= printDocs \"All Teams\" >
-- nationalLeagueTeams >>= printDocs \"National League Teams\" > insertTeams = insertMany "team" [
-- newYorkTeams >>= printDocs \"New York Teams\" > ["name" =: "Yankees", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "American"],
-- > ["name" =: "Mets", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "National"],
-- clearTeams :: Action IO () > ["name" =: "Phillies", "home" =: ["city" =: "Philadelphia", "state" =: "PA"], "league" =: "National"],
-- clearTeams = delete (select [] \"team\") > ["name" =: "Red Sox", "home" =: ["city" =: "Boston", "state" =: "MA"], "league" =: "American"] ]
-- >
-- insertTeams :: Action IO [Value] > allTeams = rest =<< find (select [] "team") {sort = ["home.city" =: 1]}
-- insertTeams = insertMany \"team\" [ >
-- [\"name\" =: \"Yankees\", \"home\" =: [\"city\" =: \"New York\", \"state\" =: \"NY\"], \"league\" =: \"American\"], > nationalLeagueTeams = rest =<< find (select ["league" =: "National"] "team")
-- [\"name\" =: \"Mets\", \"home\" =: [\"city\" =: \"New York\", \"state\" =: \"NY\"], \"league\" =: \"National\"], >
-- [\"name\" =: \"Phillies\", \"home\" =: [\"city\" =: \"Philadelphia\", \"state\" =: \"PA\"], \"league\" =: \"National\"], > newYorkTeams = rest =<< find (select ["home.state" =: "NY"] "team") {project = ["name" =: 1, "league" =: 1]}
-- [\"name\" =: \"Red Sox\", \"home\" =: [\"city\" =: \"Boston\", \"state\" =: \"MA\"], \"league\" =: \"American\"] ] >
-- > printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
-- allTeams :: Action IO [Document] >
-- allTeams = rest =<< find (select [] \"team\") {sort = [\"home.city\" =: 1]} -}
--
-- nationalLeagueTeams :: Action IO [Document]
-- nationalLeagueTeams = rest =<< find (select [\"league\" =: \"National\"] \"team\")
--
-- newYorkTeams :: Action IO [Document]
-- newYorkTeams = rest =<< find (select [\"home.state\" =: \"NY\"] \"team\") {project = [\"name\" =: 1, \"league\" =: 1]}
--
-- printDocs :: String -> [Document] -> Action IO ()
-- printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude [\"_id\"]) docs
--
-- @
--
module Database.MongoDB ( module Database.MongoDB (
module Data.Bson, module Data.Bson,

View file

@ -42,6 +42,7 @@ import qualified Data.HashTable.IO as H
import qualified Data.Set as Set import qualified Data.Set as Set
import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Bson (Document, Field(..), at, (=:), (=?), exclude, merge) import Data.Bson (Document, Field(..), at, (=:), (=?), exclude, merge)
import Data.Text (Text) import Data.Text (Text)
@ -76,8 +77,8 @@ renameCollection from to = do
db <- thisDatabase db <- thisDatabase
useDb admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True] useDb admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
dropCollection :: (MonadIO m, MonadFail m) => Collection -> Action m Bool dropCollection :: (MonadIO m) => Collection -> Action m Bool
-- ^ Delete the given collection! Return @True@ if collection existed (and was deleted); return @False@ if collection did not exist (and no action). -- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action).
dropCollection coll = do dropCollection coll = do
resetIndexCache resetIndexCache
r <- runCommand ["drop" =: coll] r <- runCommand ["drop" =: coll]
@ -86,7 +87,7 @@ dropCollection coll = do
fail $ "dropCollection failed: " ++ show r fail $ "dropCollection failed: " ++ show r
validateCollection :: (MonadIO m) => Collection -> Action m Document validateCollection :: (MonadIO m) => Collection -> Action m Document
-- ^ Validate the given collection, scanning the data and indexes for correctness. This operation takes a while. -- ^ This operation takes a while
validateCollection coll = runCommand ["validate" =: coll] validateCollection coll = runCommand ["validate" =: coll]
-- ** Index -- ** Index
@ -111,7 +112,7 @@ idxDocument Index{..} db = [
"dropDups" =: iDropDups ] ++ (maybeToList $ fmap ((=:) "expireAfterSeconds") iExpireAfterSeconds) "dropDups" =: iDropDups ] ++ (maybeToList $ fmap ((=:) "expireAfterSeconds") iExpireAfterSeconds)
index :: Collection -> Order -> Index index :: Collection -> Order -> Index
-- ^ Spec of index of ordered keys on collection. 'iName' is generated from keys. 'iUnique' and 'iDropDups' are @False@. -- ^ Spec of index of ordered keys on collection. Name is generated from keys. Unique and dropDups are False.
index coll keys = Index coll keys (genName keys) False False Nothing index coll keys = Index coll keys (genName keys) False False Nothing
genName :: Order -> IndexName genName :: Order -> IndexName
@ -132,12 +133,12 @@ createIndex :: (MonadIO m) => Index -> Action m ()
createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase
dropIndex :: (MonadIO m) => Collection -> IndexName -> Action m Document dropIndex :: (MonadIO m) => Collection -> IndexName -> Action m Document
-- ^ Remove the index from the given collection. -- ^ Remove the index
dropIndex coll idxName = do dropIndex coll idxName = do
resetIndexCache resetIndexCache
runCommand ["deleteIndexes" =: coll, "index" =: idxName] runCommand ["deleteIndexes" =: coll, "index" =: idxName]
getIndexes :: MonadIO m => Collection -> Action m [Document] getIndexes :: (MonadIO m, MonadBaseControl IO m, Functor m) => Collection -> Action m [Document]
-- ^ Get all indexes on this collection -- ^ Get all indexes on this collection
getIndexes coll = do getIndexes coll = do
db <- thisDatabase db <- thisDatabase
@ -190,31 +191,31 @@ resetIndexCache = do
-- ** User -- ** User
allUsers :: MonadIO m => Action m [Document] allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document]
-- ^ Fetch all users of this database -- ^ Fetch all users of this database
allUsers = map (exclude ["_id"]) `liftM` (rest =<< find allUsers = map (exclude ["_id"]) <$> (rest =<< find
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]}) (select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
addUser :: (MonadIO m) addUser :: (MonadBaseControl IO m, MonadIO m)
=> Bool -> Username -> Password -> Action m () => Bool -> Username -> Password -> Action m ()
-- ^ Add user with password with read-only access if the boolean argument is @True@, or read-write access if it's @False@ -- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
addUser readOnly user pass = do addUser readOnly user pass = do
mu <- findOne (select ["user" =: user] "system.users") mu <- findOne (select ["user" =: user] "system.users")
let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu) let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
save "system.users" usr save "system.users" usr
removeUser :: (MonadIO m) removeUser :: (MonadIO m, MonadBaseControl IO m)
=> Username -> Action m () => Username -> Action m ()
removeUser user = delete (select ["user" =: user] "system.users") removeUser user = delete (select ["user" =: user] "system.users")
-- ** Database -- ** Database
admin :: Database admin :: Database
-- ^ The \"admin\" database, which stores user authorization and authentication data plus other system collections. -- ^ \"admin\" database
admin = "admin" admin = "admin"
cloneDatabase :: (MonadIO m) => Database -> Host -> Action m Document cloneDatabase :: (MonadIO m) => Database -> Host -> Action m Document
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use 'copyDatabase' in this case). -- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost] cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost]
copyDatabase :: (MonadIO m) => Database -> Host -> Maybe (Username, Password) -> Database -> Action m Document copyDatabase :: (MonadIO m) => Database -> Host -> Maybe (Username, Password) -> Database -> Action m Document
@ -238,11 +239,9 @@ repairDatabase db = useDb db $ runCommand ["repairDatabase" =: (1 :: Int)]
-- ** Server -- ** Server
serverBuildInfo :: (MonadIO m) => Action m Document serverBuildInfo :: (MonadIO m) => Action m Document
-- ^ Return a document containing the parameters used to compile the server instance.
serverBuildInfo = useDb admin $ runCommand ["buildinfo" =: (1 :: Int)] serverBuildInfo = useDb admin $ runCommand ["buildinfo" =: (1 :: Int)]
serverVersion :: (MonadIO m) => Action m Text serverVersion :: (MonadIO m) => Action m Text
-- ^ Return the version of the server instance.
serverVersion = at "version" `liftM` serverBuildInfo serverVersion = at "version" `liftM` serverBuildInfo
-- * Diagnostics -- * Diagnostics
@ -250,22 +249,18 @@ serverVersion = at "version" `liftM` serverBuildInfo
-- ** Collection -- ** Collection
collectionStats :: (MonadIO m) => Collection -> Action m Document collectionStats :: (MonadIO m) => Collection -> Action m Document
-- ^ Return some storage statistics for the given collection.
collectionStats coll = runCommand ["collstats" =: coll] collectionStats coll = runCommand ["collstats" =: coll]
dataSize :: (MonadIO m) => Collection -> Action m Int dataSize :: (MonadIO m) => Collection -> Action m Int
-- ^ Return the total uncompressed size (in bytes) in memory of all records in the given collection. Does not include indexes.
dataSize c = at "size" `liftM` collectionStats c dataSize c = at "size" `liftM` collectionStats c
storageSize :: (MonadIO m) => Collection -> Action m Int storageSize :: (MonadIO m) => Collection -> Action m Int
-- ^ Return the total bytes allocated to the given collection. Does not include indexes.
storageSize c = at "storageSize" `liftM` collectionStats c storageSize c = at "storageSize" `liftM` collectionStats c
totalIndexSize :: (MonadIO m) => Collection -> Action m Int totalIndexSize :: (MonadIO m) => Collection -> Action m Int
-- ^ The total size in bytes of all indexes in this collection.
totalIndexSize c = at "totalIndexSize" `liftM` collectionStats c totalIndexSize c = at "totalIndexSize" `liftM` collectionStats c
totalSize :: MonadIO m => Collection -> Action m Int totalSize :: (MonadIO m, MonadBaseControl IO m) => Collection -> Action m Int
totalSize coll = do totalSize coll = do
x <- storageSize coll x <- storageSize coll
xs <- mapM isize =<< getIndexes coll xs <- mapM isize =<< getIndexes coll
@ -275,45 +270,34 @@ totalSize coll = do
-- ** Profiling -- ** Profiling
-- | The available profiler levels. data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq)
data ProfilingLevel
= Off -- ^ No data collection.
| Slow -- ^ Data collected only for slow operations. The slow operation time threshold is 100ms by default, but can be changed using 'setProfilingLevel'.
| All -- ^ Data collected for all operations.
deriving (Show, Enum, Eq)
getProfilingLevel :: (MonadIO m) => Action m ProfilingLevel getProfilingLevel :: (MonadIO m) => Action m ProfilingLevel
-- ^ Get the profiler level.
getProfilingLevel = (toEnum . at "was") `liftM` runCommand ["profile" =: (-1 :: Int)] getProfilingLevel = (toEnum . at "was") `liftM` runCommand ["profile" =: (-1 :: Int)]
type MilliSec = Int type MilliSec = Int
setProfilingLevel :: (MonadIO m) => ProfilingLevel -> Maybe MilliSec -> Action m () setProfilingLevel :: (MonadIO m) => ProfilingLevel -> Maybe MilliSec -> Action m ()
-- ^ Set the profiler level, and optionally the slow operation time threshold (in milliseconds).
setProfilingLevel p mSlowMs = setProfilingLevel p mSlowMs =
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return () runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
-- ** Database -- ** Database
dbStats :: (MonadIO m) => Action m Document dbStats :: (MonadIO m) => Action m Document
-- ^ Return some storage statistics for the given database.
dbStats = runCommand ["dbstats" =: (1 :: Int)] dbStats = runCommand ["dbstats" =: (1 :: Int)]
currentOp :: (MonadIO m) => Action m (Maybe Document) currentOp :: (MonadIO m) => Action m (Maybe Document)
-- ^ See currently running operation on the database, if any -- ^ See currently running operation on the database, if any
currentOp = findOne (select [] "$cmd.sys.inprog") currentOp = findOne (select [] "$cmd.sys.inprog")
-- | An operation indentifier.
type OpNum = Int type OpNum = Int
killOp :: (MonadIO m) => OpNum -> Action m (Maybe Document) killOp :: (MonadIO m) => OpNum -> Action m (Maybe Document)
-- ^ Terminate the operation specified by the given 'OpNum'.
killOp op = findOne (select ["op" =: op] "$cmd.sys.killop") killOp op = findOne (select ["op" =: op] "$cmd.sys.killop")
-- ** Server -- ** Server
serverStatus :: (MonadIO m) => Action m Document serverStatus :: (MonadIO m) => Action m Document
-- ^ Return a document with an overview of the state of the database.
serverStatus = useDb admin $ runCommand ["serverStatus" =: (1 :: Int)] serverStatus = useDb admin $ runCommand ["serverStatus" =: (1 :: Int)]

View file

@ -17,29 +17,29 @@ module Database.MongoDB.Connection (
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort, Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
readHostPortM, globalConnectTimeout, connect, connect', readHostPortM, globalConnectTimeout, connect, connect',
-- * Replica Set -- * Replica Set
ReplicaSetName, openReplicaSet, openReplicaSet', openReplicaSetTLS, openReplicaSetTLS', ReplicaSetName, openReplicaSet, openReplicaSet',
openReplicaSetSRV, openReplicaSetSRV', openReplicaSetSRV'', openReplicaSetSRV''',
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
) where ) where
import Prelude hiding (lookup) import Prelude hiding (lookup)
import Data.IORef (IORef, newIORef, readIORef) import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersect, partition, (\\), delete) import Data.List (intersect, partition, (\\), delete)
import Data.Maybe (fromJust)
#if !MIN_VERSION_base(4,8,0) #if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>)) import Control.Applicative ((<$>))
#endif #endif
import Control.Monad (forM_, guard) import Control.Monad (forM_)
import Network (HostName, PortID(..), connectTo)
import System.IO.Unsafe (unsafePerformIO) import System.IO.Unsafe (unsafePerformIO)
import System.Timeout (timeout) import System.Timeout (timeout)
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof, import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof,
spaces, try, (<|>)) spaces, try, (<|>))
import qualified Data.List as List import qualified Data.List as List
import Control.Monad.Except (throwError) import Control.Monad.Identity (runIdentity)
import Control.Monad.Error (throwError)
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar, import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
readMVar) readMVar)
import Data.Bson (Document, at, (=:)) import Data.Bson (Document, at, (=:))
@ -48,13 +48,11 @@ import Data.Text (Text)
import qualified Data.Bson as B import qualified Data.Bson as B
import qualified Data.Text as T import qualified Data.Text as T
import Database.MongoDB.Internal.Network (Host(..), HostName, PortID(..), connectTo, lookupSeedList, lookupReplicaSetName)
import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed) import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed)
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE, import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
updateAssocs, shuffle, mergesortM) updateAssocs, shuffle, mergesortM)
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access, import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
slaveOk, runCommand, retrieveServerData) slaveOk, runCommand, retrieveServerData)
import qualified Database.MongoDB.Transport.Tls as TLS (connect)
adminCommand :: Command -> Pipe -> IO Document adminCommand :: Command -> Pipe -> IO Document
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails. -- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
@ -64,6 +62,10 @@ adminCommand cmd pipe =
failureToIOError (ConnectionFailure e) = e failureToIOError (ConnectionFailure e) = e
failureToIOError e = userError $ show e failureToIOError e = userError $ show e
-- * Host
data Host = Host HostName PortID deriving (Show, Eq, Ord)
defaultPort :: PortID defaultPort :: PortID
-- ^ Default MongoDB port = 27017 -- ^ Default MongoDB port = 27017
defaultPort = PortNumber 27017 defaultPort = PortNumber 27017
@ -74,50 +76,46 @@ host hostname = Host hostname defaultPort
showHostPort :: Host -> String showHostPort :: Host -> String
-- ^ Display host as \"host:port\" -- ^ Display host as \"host:port\"
-- TODO: Distinguish Service port -- TODO: Distinguish Service and UnixSocket port
showHostPort (Host hostname (PortNumber port)) = hostname ++ ":" ++ show port showHostPort (Host hostname port) = hostname ++ ":" ++ portname where
portname = case port of
Service s -> s
PortNumber p -> show p
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32) #if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
showHostPort (Host _ (UnixSocket path)) = "unix:" ++ path UnixSocket s -> s
#endif #endif
readHostPortM :: (MonadFail m) => String -> m Host readHostPortM :: (Monad m) => String -> m Host
-- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax. -- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
-- TODO: handle Service and UnixSocket port
-- TODO: handle Service port
readHostPortM = either (fail . show) return . parse parser "readHostPort" where readHostPortM = either (fail . show) return . parse parser "readHostPort" where
hostname = many1 (letter <|> digit <|> char '-' <|> char '.' <|> char '_') hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
parser = do parser = do
spaces spaces
h <- hostname h <- hostname
try (spaces >> eof >> return (host h)) <|> do try (spaces >> eof >> return (host h)) <|> do
_ <- char ':' _ <- char ':'
try ( do port :: Int <- read <$> many1 digit port :: Int <- read <$> many1 digit
spaces >> eof spaces >> eof
return $ Host h (PortNumber $ fromIntegral port)) return $ Host h (PortNumber $ fromIntegral port)
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
<|> do guard (h == "unix")
p <- many1 anyChar
eof
return $ Host "" (UnixSocket p)
#endif
readHostPort :: String -> Host readHostPort :: String -> Host
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax. -- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
readHostPort = fromJust . readHostPortM readHostPort = runIdentity . readHostPortM
type Secs = Double type Secs = Double
globalConnectTimeout :: IORef Secs globalConnectTimeout :: IORef Secs
-- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect'' (and 'openReplicaSet'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection. -- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect\'' (and 'openReplicaSet\'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection.
globalConnectTimeout = unsafePerformIO (newIORef 6) globalConnectTimeout = unsafePerformIO (newIORef 6)
{-# NOINLINE globalConnectTimeout #-} {-# NOINLINE globalConnectTimeout #-}
connect :: Host -> IO Pipe connect :: Host -> IO Pipe
-- ^ Connect to Host returning pipelined TCP connection. Throw 'IOError' if connection refused or no response within 'globalConnectTimeout'. -- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within 'globalConnectTimeout'.
connect h = readIORef globalConnectTimeout >>= flip connect' h connect h = readIORef globalConnectTimeout >>= flip connect' h
connect' :: Secs -> Host -> IO Pipe connect' :: Secs -> Host -> IO Pipe
-- ^ Connect to Host returning pipelined TCP connection. Throw 'IOError' if connection refused or no response within given number of seconds. -- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds.
connect' timeoutSecs (Host hostname port) = do connect' timeoutSecs (Host hostname port) = do
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port) mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
handle <- maybe (ioError $ userError "connect timed out") return mh handle <- maybe (ioError $ userError "connect timed out") return mh
@ -130,85 +128,32 @@ connect' timeoutSecs (Host hostname port) = do
type ReplicaSetName = Text type ReplicaSetName = Text
data TransportSecurity = Secure | Unsecure
-- | Maintains a connection (created on demand) to each server in the named replica set -- | Maintains a connection (created on demand) to each server in the named replica set
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs TransportSecurity data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs
replSetName :: ReplicaSet -> Text replSetName :: ReplicaSet -> Text
-- ^ Get the name of connected replica set. -- ^ name of connected replica set
replSetName (ReplicaSet rsName _ _ _) = rsName replSetName (ReplicaSet rsName _ _) = rsName
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet'' instead. -- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet\'' instead.
openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members. -- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
openReplicaSet' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Unsecure) openReplicaSet' timeoutSecs (rsName, seedList) = do
openReplicaSetTLS :: (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open secure connections (on demand) to servers in the replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetTLS'' instead.
openReplicaSetTLS rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSetTLS' rsSeed
openReplicaSetTLS' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
-- ^ Open secure connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
openReplicaSetTLS' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Secure)
_openReplicaSet :: Secs -> (ReplicaSetName, [Host], TransportSecurity) -> IO ReplicaSet
_openReplicaSet timeoutSecs (rsName, seedList, transportSecurity) = do
vMembers <- newMVar (map (, Nothing) seedList) vMembers <- newMVar (map (, Nothing) seedList)
let rs = ReplicaSet rsName vMembers timeoutSecs transportSecurity let rs = ReplicaSet rsName vMembers timeoutSecs
_ <- updateMembers rs _ <- updateMembers rs
return rs return rs
openReplicaSetSRV :: HostName -> IO ReplicaSet
-- ^ Open /non-secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetSRV''' instead.
openReplicaSetSRV hostname = do
timeoutSecs <- readIORef globalConnectTimeout
_openReplicaSetSRV timeoutSecs Unsecure hostname
openReplicaSetSRV' :: HostName -> IO ReplicaSet
-- ^ Open /secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetSRV'''' instead.
--
-- The preferred connection method for cloud MongoDB providers. A typical connecting sequence is shown in the example below.
--
-- ==== __Example__
-- > do
-- > pipe <- openReplicatSetSRV' "cluster#.xxxxx.yyyyy.zzz"
-- > is_auth <- access pipe master "admin" $ auth user_name password
-- > unless is_auth (throwIO $ userError "Authentication failed!")
openReplicaSetSRV' hostname = do
timeoutSecs <- readIORef globalConnectTimeout
_openReplicaSetSRV timeoutSecs Secure hostname
openReplicaSetSRV'' :: Secs -> HostName -> IO ReplicaSet
-- ^ Open /non-secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. Supplied seconds timeout is used for connect attempts to members.
openReplicaSetSRV'' timeoutSecs = _openReplicaSetSRV timeoutSecs Unsecure
openReplicaSetSRV''' :: Secs -> HostName -> IO ReplicaSet
-- ^ Open /secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. Supplied seconds timeout is used for connect attempts to members.
openReplicaSetSRV''' timeoutSecs = _openReplicaSetSRV timeoutSecs Secure
_openReplicaSetSRV :: Secs -> TransportSecurity -> HostName -> IO ReplicaSet
_openReplicaSetSRV timeoutSecs transportSecurity hostname = do
replicaSetName <- lookupReplicaSetName hostname
hosts <- lookupSeedList hostname
case (replicaSetName, hosts) of
(Nothing, _) -> throwError $ userError "Failed to lookup replica set name"
(_, []) -> throwError $ userError "Failed to lookup replica set seedlist"
(Just rsName, _) ->
case transportSecurity of
Secure -> openReplicaSetTLS' timeoutSecs (rsName, hosts)
Unsecure -> openReplicaSet' timeoutSecs (rsName, hosts)
closeReplicaSet :: ReplicaSet -> IO () closeReplicaSet :: ReplicaSet -> IO ()
-- ^ Close all connections to replica set -- ^ Close all connections to replica set
closeReplicaSet (ReplicaSet _ vMembers _ _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd) closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
primary :: ReplicaSet -> IO Pipe primary :: ReplicaSet -> IO Pipe
-- ^ Return connection to current primary of replica set. Fail if no primary available. -- ^ Return connection to current primary of replica set. Fail if no primary available.
primary rs@(ReplicaSet rsName _ _ _) = do primary rs@(ReplicaSet rsName _ _) = do
mHost <- statedPrimary <$> updateMembers rs mHost <- statedPrimary <$> updateMembers rs
case mHost of case mHost of
Just host' -> connection rs Nothing host' Just host' -> connection rs Nothing host'
@ -227,7 +172,7 @@ routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO
routedHost f rs = do routedHost f rs = do
info <- updateMembers rs info <- updateMembers rs
hosts <- shuffle (possibleHosts info) hosts <- shuffle (possibleHosts info)
let addIsPrimary h = (h, Just h == statedPrimary info) let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
untilSuccess (connection rs Nothing) hosts' untilSuccess (connection rs Nothing) hosts'
@ -244,7 +189,7 @@ possibleHosts (_, info) = map readHostPort $ at "hosts" info
updateMembers :: ReplicaSet -> IO ReplicaInfo updateMembers :: ReplicaSet -> IO ReplicaInfo
-- ^ Fetch replica info from any server and update members accordingly -- ^ Fetch replica info from any server and update members accordingly
updateMembers rs@(ReplicaSet _ vMembers _ _) = do updateMembers rs@(ReplicaSet _ vMembers _) = do
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers (host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
modifyMVar vMembers $ \members -> do modifyMVar vMembers $ \members -> do
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
@ -258,7 +203,7 @@ updateMembers rs@(ReplicaSet _ vMembers _ _) = do
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set. -- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
fetchReplicaInfo rs@(ReplicaSet rsName _ _ _) (host', mPipe) = do fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
pipe <- connection rs mPipe host' pipe <- connection rs mPipe host'
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
case B.lookup "setName" info of case B.lookup "setName" info of
@ -268,15 +213,11 @@ fetchReplicaInfo rs@(ReplicaSet rsName _ _ _) (host', mPipe) = do
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open. -- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
connection (ReplicaSet _ vMembers timeoutSecs transportSecurity) mPipe host' = connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe
where where
conn = modifyMVar vMembers $ \members -> do conn = modifyMVar vMembers $ \members -> do
let (Host h p) = host' let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
let conn' = case transportSecurity of
Secure -> TLS.connect h p
Unsecure -> connect' timeoutSecs host'
let new = conn' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
case List.lookup host' members of case List.lookup host' members of
Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe) Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe)
_ -> new _ -> new

View file

@ -1,191 +0,0 @@
-- Author:
-- Brent Tubbs <brent.tubbs@gmail.com>
-- | MongoDB GridFS implementation
{-# LANGUAGE OverloadedStrings, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, TypeFamilies, CPP, RankNTypes #-}
module Database.MongoDB.GridFS
( Bucket
, files, chunks
, File
, document, bucket
-- ** Setup
, openDefaultBucket
, openBucket
-- ** Query
, findFile
, findOneFile
, fetchFile
-- ** Delete
, deleteFile
-- ** Conduits
, sourceFile
, sinkFile
)
where
import Control.Monad(when)
import Control.Monad.IO.Class
import Control.Monad.Trans(lift)
import Data.Conduit
import Data.Digest.Pure.MD5
import Data.Int
import Data.Tagged(Tagged, untag)
import Data.Text(Text, append)
import Data.Time.Clock(getCurrentTime)
import Database.MongoDB
import Prelude
import qualified Data.Bson as B
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
defaultChunkSize :: Int64
-- ^ The default chunk size is 256 kB
defaultChunkSize = 256 * 1024
-- magic constant for md5Finalize
md5BlockSizeInBytes :: Int
md5BlockSizeInBytes = 64
data Bucket = Bucket {files :: Text, chunks :: Text}
-- ^ Files are stored in "buckets". You open a bucket with openDefaultBucket or openBucket
openDefaultBucket :: (Monad m, MonadIO m) => Action m Bucket
-- ^ Open the default 'Bucket' (named "fs")
openDefaultBucket = openBucket "fs"
openBucket :: (Monad m, MonadIO m) => Text -> Action m Bucket
-- ^ Open a 'Bucket'
openBucket name = do
let filesCollection = name `append` ".files"
let chunksCollection = name `append` ".chunks"
ensureIndex $ index filesCollection ["filename" =: (1::Int), "uploadDate" =: (1::Int)]
ensureIndex $ (index chunksCollection ["files_id" =: (1::Int), "n" =: (1::Int)]) { iUnique = True, iDropDups = True }
return $ Bucket filesCollection chunksCollection
data File = File {bucket :: Bucket, document :: Document}
getChunk :: (MonadFail m, MonadIO m) => File -> Int -> Action m (Maybe S.ByteString)
-- ^ Get a chunk of a file
getChunk (File _bucket doc) i = do
files_id <- B.look "_id" doc
result <- findOne $ select ["files_id" := files_id, "n" =: i] $ chunks _bucket
let content = at "data" <$> result
case content of
Just (Binary b) -> return (Just b)
_ -> return Nothing
findFile :: MonadIO m => Bucket -> Selector -> Action m [File]
-- ^ Find files in the bucket
findFile _bucket sel = do
cursor <- find $ select sel $ files _bucket
results <- rest cursor
return $ File _bucket <$> results
findOneFile :: MonadIO m => Bucket -> Selector -> Action m (Maybe File)
-- ^ Find one file in the bucket
findOneFile _bucket sel = do
mdoc <- findOne $ select sel $ files _bucket
return $ File _bucket <$> mdoc
fetchFile :: MonadIO m => Bucket -> Selector -> Action m File
-- ^ Fetch one file in the bucket
fetchFile _bucket sel = do
doc <- fetch $ select sel $ files _bucket
return $ File _bucket doc
deleteFile :: (MonadIO m, MonadFail m) => File -> Action m ()
-- ^ Delete files in the bucket
deleteFile (File _bucket doc) = do
files_id <- B.look "_id" doc
delete $ select ["_id" := files_id] $ files _bucket
delete $ select ["files_id" := files_id] $ chunks _bucket
putChunk :: (Monad m, MonadIO m) => Bucket -> ObjectId -> Int -> L.ByteString -> Action m ()
-- ^ Put a chunk in the bucket
putChunk _bucket files_id i chunk = do
insert_ (chunks _bucket) ["files_id" =: files_id, "n" =: i, "data" =: Binary (L.toStrict chunk)]
sourceFile :: (MonadFail m, MonadIO m) => File -> ConduitT File S.ByteString (Action m) ()
-- ^ A producer for the contents of a file
sourceFile file = yieldChunk 0 where
yieldChunk i = do
mbytes <- lift $ getChunk file i
case mbytes of
Just bytes -> yield bytes >> yieldChunk (i+1)
Nothing -> return ()
-- Used to keep data during writing
data FileWriter = FileWriter
{ _fwChunkSize :: Int64
, _fwBucket :: Bucket
, _fwFilesId :: ObjectId
, _fwChunkIndex :: Int
, _fwSize :: Int64
, _fwAcc :: L.ByteString
, _fwMd5Context :: MD5Context
, _fwMd5acc :: L.ByteString
}
-- Finalize file, calculating md5 digest, saving the last chunk, and creating the file in the bucket
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
finalizeFile filename (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) = do
let md5digest = finalizeMD5 md5context (L.toStrict md5acc)
when (L.length acc > 0) $ putChunk _bucket files_id i acc
currentTimestamp <- liftIO getCurrentTime
let doc = [ "_id" =: files_id
, "length" =: size
, "uploadDate" =: currentTimestamp
, "md5" =: show md5digest
, "chunkSize" =: chunkSize
, "filename" =: filename
]
insert_ (files _bucket) doc
return $ File _bucket doc
-- finalize the remainder and return the MD5Digest.
finalizeMD5 :: MD5Context -> S.ByteString -> MD5Digest
finalizeMD5 ctx remainder =
md5Finalize ctx2 (S.drop lu remainder) -- can only handle max md5BlockSizeInBytes length
where
l = S.length remainder
r = l `mod` md5BlockSizeInBytes
lu = l - r
ctx2 = md5Update ctx (S.take lu remainder)
-- Write as many chunks as can be written from the file writer
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
writeChunks (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) chunk = do
-- Update md5 context
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
let md5acc_temp = (md5acc `L.append` chunk)
let (md5context', md5acc') =
if (L.length md5acc_temp < md5BlockLength)
then (md5context, md5acc_temp)
else let numBlocks = L.length md5acc_temp `div` md5BlockLength
(current, remainder) = L.splitAt (md5BlockLength * numBlocks) md5acc_temp
in (md5Update md5context (L.toStrict current), remainder)
-- Update chunks
let size' = (size + L.length chunk)
let acc_temp = (acc `L.append` chunk)
if (L.length acc_temp < chunkSize)
then return (FileWriter chunkSize _bucket files_id i size' acc_temp md5context' md5acc')
else do
let (newChunk, acc') = L.splitAt chunkSize acc_temp
putChunk _bucket files_id i newChunk
writeChunks (FileWriter chunkSize _bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> ConduitT S.ByteString () (Action m) File
-- ^ A consumer that creates a file in the bucket and puts all consumed data in it
sinkFile _bucket filename = do
files_id <- liftIO $ genObjectId
awaitChunk $ FileWriter defaultChunkSize _bucket files_id 0 0 L.empty md5InitialContext L.empty
where
awaitChunk fw = do
mchunk <- await
case mchunk of
Nothing -> lift (finalizeFile filename fw)
Just chunk -> lift (writeChunks fw (L.fromStrict chunk)) >>= awaitChunk

View file

@ -1,106 +0,0 @@
-- | Compatibility layer for network package, including newtype 'PortID'
{-# LANGUAGE CPP, OverloadedStrings #-}
module Database.MongoDB.Internal.Network (Host(..), PortID(..), N.HostName, connectTo,
lookupReplicaSetName, lookupSeedList) where
#if !MIN_VERSION_network(2, 9, 0)
import qualified Network as N
import System.IO (Handle)
#else
import Control.Exception (bracketOnError)
import Network.BSD as BSD
import qualified Network.Socket as N
import System.IO (Handle, IOMode(ReadWriteMode))
#endif
import Data.ByteString.Char8 (pack, unpack)
import Data.List (dropWhileEnd)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Network.DNS.Lookup (lookupSRV, lookupTXT)
import Network.DNS.Resolver (defaultResolvConf, makeResolvSeed, withResolver)
import Network.HTTP.Types.URI (parseQueryText)
-- | Wraps network's 'PortNumber'
-- Used to ease compatibility between older and newer network versions.
data PortID = PortNumber N.PortNumber
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
| UnixSocket String
#endif
deriving (Eq, Ord, Show)
#if !MIN_VERSION_network(2, 9, 0)
-- Unwrap our newtype and use network's PortID and connectTo
connectTo :: N.HostName -- Hostname
-> PortID -- Port Identifier
-> IO Handle -- Connected Socket
connectTo hostname (PortNumber port) = N.connectTo hostname (N.PortNumber port)
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
connectTo _ (UnixSocket path) = N.connectTo "" (N.UnixSocket path)
#endif
#else
-- Copied implementation from network 2.8's 'connectTo', but using our 'PortID' newtype.
-- https://github.com/haskell/network/blob/e73f0b96c9da924fe83f3c73488f7e69f712755f/Network.hs#L120-L129
connectTo :: N.HostName -- Hostname
-> PortID -- Port Identifier
-> IO Handle -- Connected Socket
connectTo hostname (PortNumber port) = do
proto <- BSD.getProtocolNumber "tcp"
bracketOnError
(N.socket N.AF_INET N.Stream proto)
N.close -- only done if there's an error
(\sock -> do
he <- BSD.getHostByName hostname
N.connect sock (N.SockAddrInet port (hostAddress he))
N.socketToHandle sock ReadWriteMode
)
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
connectTo _ (UnixSocket path) = do
bracketOnError
(N.socket N.AF_UNIX N.Stream 0)
N.close
(\sock -> do
N.connect sock (N.SockAddrUnix path)
N.socketToHandle sock ReadWriteMode
)
#endif
#endif
-- * Host
data Host = Host N.HostName PortID deriving (Show, Eq, Ord)
lookupReplicaSetName :: N.HostName -> IO (Maybe Text)
-- ^ Retrieves the replica set name from the TXT DNS record for the given hostname
lookupReplicaSetName hostname = do
rs <- makeResolvSeed defaultResolvConf
res <- withResolver rs $ \resolver -> lookupTXT resolver (pack hostname)
case res of
Left _ -> pure Nothing
Right [] -> pure Nothing
Right (x:_) ->
pure $ fromMaybe (Nothing :: Maybe Text) (lookup "replicaSet" $ parseQueryText x)
lookupSeedList :: N.HostName -> IO [Host]
-- ^ Retrieves the replica set seed list from the SRV DNS record for the given hostname
lookupSeedList hostname = do
rs <- makeResolvSeed defaultResolvConf
res <- withResolver rs $ \resolver -> lookupSRV resolver $ pack $ "_mongodb._tcp." ++ hostname
case res of
Left _ -> pure []
Right srv -> pure $ map (\(_, _, por, tar) ->
let tar' = dropWhileEnd (=='.') (unpack tar)
in Host tar' (PortNumber . fromIntegral $ por)) srv

View file

@ -4,8 +4,8 @@
-- This module is not intended for direct use. Use the high-level interface at -- This module is not intended for direct use. Use the high-level interface at
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead. -- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.
{-# LANGUAGE RecordWildCards, OverloadedStrings #-} {-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
{-# LANGUAGE CPP, FlexibleContexts #-} {-# LANGUAGE CPP, FlexibleContexts, TupleSections, TypeSynonymInstances #-}
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-} {-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE BangPatterns #-}
@ -20,43 +20,41 @@
module Database.MongoDB.Internal.Protocol ( module Database.MongoDB.Internal.Protocol (
FullCollection, FullCollection,
-- * Pipe -- * Pipe
Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg, Pipe, newPipe, newPipeWith, send, call,
-- ** Notice -- ** Notice
Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId, Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
-- ** Request -- ** Request
Request(..), QueryOption(..), Cmd (..), KillC(..), Request(..), QueryOption(..),
-- ** Reply -- ** Reply
Reply(..), ResponseFlag(..), FlagBit(..), Reply(..), ResponseFlag(..),
-- * Authentication -- * Authentication
Username, Password, Nonce, pwHash, pwKey, Username, Password, Nonce, pwHash, pwKey,
isClosed, close, ServerData(..), Pipeline(..), putOpMsg, isClosed, close, ServerData(..), Pipeline(..)
bitOpMsg
) where ) where
#if !MIN_VERSION_base(4,8,0) #if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>)) import Control.Applicative ((<$>))
#endif #endif
import Control.Monad ( forM, replicateM, unless, forever ) import Control.Monad (forM, replicateM, unless)
import Data.Binary.Get (Get, runGet, getInt8) import Data.Binary.Get (Get, runGet)
import Data.Binary.Put (Put, runPut, putInt8) import Data.Binary.Put (Put, runPut)
import Data.Bits (bit, testBit, zeroBits) import Data.Bits (bit, testBit)
import Data.Int (Int32, Int64) import Data.Int (Int32, Int64)
import Data.IORef (IORef, newIORef, atomicModifyIORef) import Data.IORef (IORef, newIORef, atomicModifyIORef)
import System.IO (Handle) import System.IO (Handle)
import System.IO.Error (doesNotExistErrorType, mkIOError)
import System.IO.Unsafe (unsafePerformIO) import System.IO.Unsafe (unsafePerformIO)
import Data.Maybe (maybeToList, fromJust) import Data.Maybe (maybeToList)
import GHC.Conc (ThreadStatus(..), threadStatus) import GHC.Conc (ThreadStatus(..), threadStatus)
import Control.Monad.STM (atomically) import Control.Monad (forever)
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask) import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan) import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try) import Control.Exception.Lifted (onException, throwIO, try)
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson (Document, (=:), merge, cast, valueAt, look) import Data.Bson (Document)
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64, import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
putInt64, putCString) putInt64, putCString)
import Data.Text (Text) import Data.Text (Text)
@ -68,14 +66,11 @@ import qualified Data.Text.Encoding as TE
import Database.MongoDB.Internal.Util (bitOr, byteStringHex) import Database.MongoDB.Internal.Util (bitOr, byteStringHex)
import Database.MongoDB.Transport (Transport) import Database.MongoDB.Transport (Transport)
import qualified Database.MongoDB.Transport as Tr import qualified Database.MongoDB.Transport as T
#if MIN_VERSION_base(4,6,0) #if MIN_VERSION_base(4,6,0)
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, mkWeakMVar, isEmptyMVar) putMVar, readMVar, mkWeakMVar)
import GHC.List (foldl1')
import Conduit (repeatWhileMC, (.|), runConduit, foldlC)
#else #else
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar, import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
putMVar, readMVar, addMVarFinalizer) putMVar, readMVar, addMVarFinalizer)
@ -86,15 +81,13 @@ mkWeakMVar :: MVar a -> IO () -> IO ()
mkWeakMVar = addMVarFinalizer mkWeakMVar = addMVarFinalizer
#endif #endif
-- * Pipeline -- * Pipeline
-- | Thread-safe and pipelined connection -- | Thread-safe and pipelined connection
data Pipeline = Pipeline data Pipeline = Pipeline
{ vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it { vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
, responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrives we pop the next thread and give it the response. , responseQueue :: Chan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
, listenThread :: ThreadId , listenThread :: ThreadId
, finished :: MVar ()
, serverData :: ServerData , serverData :: ServerData
} }
@ -106,54 +99,25 @@ data ServerData = ServerData
, maxBsonObjectSize :: Int , maxBsonObjectSize :: Int
, maxWriteBatchSize :: Int , maxWriteBatchSize :: Int
} }
deriving Show
-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkUnmaskedFinally action and_then =
mask_ $ forkIOWithUnmask $ \unmask ->
try (unmask action) >>= and_then
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle. -- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
newPipeline :: ServerData -> Transport -> IO Pipeline newPipeline :: ServerData -> Transport -> IO Pipeline
newPipeline serverData stream = do newPipeline serverData stream = do
vStream <- newMVar stream vStream <- newMVar stream
responseQueue <- atomically newTChan responseQueue <- newChan
finished <- newEmptyMVar
let drainReplies = do
chanEmpty <- atomically $ isEmptyTChan responseQueue
if chanEmpty
then return ()
else do
var <- atomically $ readTChan responseQueue
putMVar var $ Left $ mkIOError
doesNotExistErrorType
"Handle has been closed"
Nothing
Nothing
drainReplies
rec rec
let pipe = Pipeline{..} let pipe = Pipeline{..}
listenThread <- forkUnmaskedFinally (listen pipe) $ \_ -> do listenThread <- forkIO (listen pipe)
putMVar finished ()
drainReplies
_ <- mkWeakMVar vStream $ do _ <- mkWeakMVar vStream $ do
killThread listenThread killThread listenThread
Tr.close stream T.close stream
return pipe return pipe
isFinished :: Pipeline -> IO Bool
isFinished Pipeline {finished} = do
empty <- isEmptyMVar finished
return $ not empty
close :: Pipeline -> IO () close :: Pipeline -> IO ()
-- ^ Close pipe and underlying connection -- ^ Close pipe and underlying connection
close Pipeline{..} = do close Pipeline{..} = do
killThread listenThread killThread listenThread
Tr.close =<< readMVar vStream T.close =<< readMVar vStream
isClosed :: Pipeline -> IO Bool isClosed :: Pipeline -> IO Bool
isClosed Pipeline{listenThread} = do isClosed Pipeline{listenThread} = do
@ -163,7 +127,6 @@ isClosed Pipeline{listenThread} = do
ThreadFinished -> True ThreadFinished -> True
ThreadBlocked _ -> False ThreadBlocked _ -> False
ThreadDied -> True ThreadDied -> True
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read --isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
listen :: Pipeline -> IO () listen :: Pipeline -> IO ()
@ -172,10 +135,10 @@ listen Pipeline{..} = do
stream <- readMVar vStream stream <- readMVar vStream
forever $ do forever $ do
e <- try $ readMessage stream e <- try $ readMessage stream
var <- atomically $ readTChan responseQueue var <- readChan responseQueue
putMVar var e putMVar var e
case e of case e of
Left err -> Tr.close stream >> ioError err -- close and stop looping Left err -> T.close stream >> ioError err -- close and stop looping
Right _ -> return () Right _ -> return ()
psend :: Pipeline -> Message -> IO () psend :: Pipeline -> Message -> IO ()
@ -183,55 +146,24 @@ psend :: Pipeline -> Message -> IO ()
-- Throw IOError and close pipeline if send fails -- Throw IOError and close pipeline if send fails
psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `onException` close p psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `onException` close p
psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()-- IO (IO Response)
psendOpMsg p@Pipeline{..} commands flagBit params =
case flagBit of
Just f -> case f of
MoreToCome -> withMVar vStream (\t -> writeOpMsgMessage t (commands, Nothing) flagBit params) `onException` close p -- >> return (return (0, ReplyEmpty))
_ -> error "moreToCome has to be set if no response is expected"
_ -> error "moreToCome has to be set if no response is expected"
pcall :: Pipeline -> Message -> IO (IO Response) pcall :: Pipeline -> Message -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them). -- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response. -- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcall p@Pipeline{..} message = do pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p where
listenerStopped <- isFinished p
if listenerStopped
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
else withMVar vStream doCall `onException` close p
where
doCall stream = do doCall stream = do
writeMessage stream message writeMessage stream message
var <- newEmptyMVar var <- newEmptyMVar
liftIO $ atomically $ writeTChan responseQueue var liftIO $ writeChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise
pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response)
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
pcallOpMsg p@Pipeline{..} message flagbit params = do
listenerStopped <- isFinished p
if listenerStopped
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
else withMVar vStream doCall `onException` close p
where
doCall stream = do
writeOpMsgMessage stream ([], message) flagbit params
var <- newEmptyMVar
-- put var into the response-queue so that it can
-- fetch the latest response
liftIO $ atomically $ writeTChan responseQueue var
return $ readMVar var >>= either throwIO return -- return promise return $ readMVar var >>= either throwIO return -- return promise
-- * Pipe -- * Pipe
type Pipe = Pipeline type Pipe = Pipeline
-- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe` -- ^ Thread-safe TCP connection with pipelined requests
-- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary.
newPipe :: ServerData -> Handle -> IO Pipe newPipe :: ServerData -> Handle -> IO Pipe
-- ^ Create pipe over handle -- ^ Create pipe over handle
newPipe sd handle = Tr.fromHandle handle >>= (newPipeWith sd) newPipe sd handle = T.fromHandle handle >>= (newPipeWith sd)
newPipeWith :: ServerData -> Transport -> IO Pipe newPipeWith :: ServerData -> Transport -> IO Pipe
-- ^ Create pipe over connection -- ^ Create pipe over connection
@ -241,12 +173,6 @@ send :: Pipe -> [Notice] -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails. -- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
send pipe notices = psend pipe (notices, Nothing) send pipe notices = psend pipe (notices, Nothing)
sendOpMsg :: Pipe -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
sendOpMsg pipe commands@(Nc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
sendOpMsg pipe commands@(Kc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
sendOpMsg _ _ _ _ = error "This function only supports Cmd types wrapped in Nc or Kc type constructors"
call :: Pipe -> [Notice] -> Request -> IO (IO Reply) call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails. -- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
call pipe notices request = do call pipe notices request = do
@ -257,73 +183,11 @@ call pipe notices request = do
check requestId (responseTo, reply) = if requestId == responseTo then reply else check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")" error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
callOpMsg pipe request flagBit params = do
requestId <- genRequestId
promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params
promise' <- promise :: IO Response
return $ snd <$> produce requestId promise'
where
-- We need to perform streaming here as within the OP_MSG protocol mongoDB expects
-- our client to keep receiving messages after the MoreToCome flagbit was
-- set by the server until our client receives an empty flagbit. After the
-- first MoreToCome flagbit was set the responseTo field in the following
-- headers will reference the cursorId that was set in the previous message.
-- see:
-- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses
checkFlagBit p =
case p of
(_, r) ->
case r of
ReplyOpMsg{..} -> flagBits == [MoreToCome]
-- This is called by functions using the OP_MSG protocol,
-- so this has to be ReplyOpMsg
_ -> error "Impossible"
produce reqId p = runConduit $
case p of
(rt, r) ->
case r of
ReplyOpMsg{..} ->
if flagBits == [MoreToCome]
then yieldResponses .| foldlC mergeResponses p
else return $ (rt, check reqId p)
_ -> error "Impossible" -- see comment above
yieldResponses = repeatWhileMC
(do
var <- newEmptyMVar
liftIO $ atomically $ writeTChan (responseQueue pipe) var
readMVar var >>= either throwIO return :: IO Response
)
checkFlagBit
mergeResponses p@(rt,rep) p' =
case (p, p') of
((_, r), (_, r')) ->
case (r, r') of
(ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do
let (section, section') = (head sec, head sec')
(cur, cur') = (maybe Nothing cast $ look "cursor" section,
maybe Nothing cast $ look "cursor" section')
case (cur, cur') of
(Just doc, Just doc') -> do
let (docs, docs') =
( fromJust $ cast $ valueAt "nextBatch" doc :: [Document]
, fromJust $ cast $ valueAt "nextBatch" doc' :: [Document])
id' = fromJust $ cast $ valueAt "id" doc' :: Int32
(rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++)
-- Since we use this to process moreToCome messages, we
-- know that there will be a nextBatch key in the document
_ -> error "Impossible"
_ -> error "Impossible" -- see comment above
check requestId (responseTo, reply) = if requestId == responseTo then reply else
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
-- * Message -- * Message
type Message = ([Notice], Maybe (Request, RequestId)) type Message = ([Notice], Maybe (Request, RequestId))
-- ^ A write notice(s) with getLastError request, or just query request. -- ^ A write notice(s) with getLastError request, or just query request.
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness. -- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
type OpMsgMessage = ([Cmd], Maybe (Request, RequestId))
writeMessage :: Transport -> Message -> IO () writeMessage :: Transport -> Message -> IO ()
-- ^ Write message to connection -- ^ Write message to connection
@ -338,27 +202,8 @@ writeMessage conn (notices, mRequest) = do
let s = runPut $ putRequest request requestId let s = runPut $ putRequest request requestId
return $ (lenBytes s) `L.append` s return $ (lenBytes s) `L.append` s
Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString) T.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
Tr.flush conn T.flush conn
where
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4)
writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
-- ^ Write message to connection
writeOpMsgMessage conn (notices, mRequest) flagBit params = do
noticeStrings <- forM notices $ \n -> do
requestId <- genRequestId
let s = runPut $ putOpMsg n requestId flagBit params
return $ (lenBytes s) `L.append` s
let requestString = do
(request, requestId) <- mRequest
let s = runPut $ putOpMsg (Req request) requestId flagBit params
return $ (lenBytes s) `L.append` s
Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
Tr.flush conn
where where
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
encodeSize = runPut . putInt32 . (+ 4) encodeSize = runPut . putInt32 . (+ 4)
@ -370,8 +215,8 @@ readMessage :: Transport -> IO Response
-- ^ read response from a connection -- ^ read response from a connection
readMessage conn = readResp where readMessage conn = readResp where
readResp = do readResp = do
len <- fromEnum . decodeSize . L.fromStrict <$> Tr.read conn 4 len <- fromEnum . decodeSize . L.fromStrict <$> T.read conn 4
runGet getReply . L.fromStrict <$> Tr.read conn len runGet getReply . L.fromStrict <$> T.read conn len
decodeSize = subtract 4 . runGet getInt32 decodeSize = subtract 4 . runGet getInt32
type FullCollection = Text type FullCollection = Text
@ -388,7 +233,6 @@ type ResponseTo = RequestId
genRequestId :: (MonadIO m) => m RequestId genRequestId :: (MonadIO m) => m RequestId
-- ^ Generate fresh request id -- ^ Generate fresh request id
{-# NOINLINE genRequestId #-}
genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where
counter :: IORef RequestId counter :: IORef RequestId
counter = unsafePerformIO (newIORef 0) counter = unsafePerformIO (newIORef 0)
@ -403,13 +247,6 @@ putHeader opcode requestId = do
putInt32 0 putInt32 0
putInt32 opcode putInt32 opcode
putOpMsgHeader :: Opcode -> RequestId -> Put
-- ^ Note, does not write message length (first int32), assumes caller will write it
putOpMsgHeader opcode requestId = do
putInt32 requestId
putInt32 0
putInt32 opcode
getHeader :: Get (Opcode, ResponseTo) getHeader :: Get (Opcode, ResponseTo)
-- ^ Note, does not read message length (first int32), assumes it was already read -- ^ Note, does not read message length (first int32), assumes it was already read
getHeader = do getHeader = do
@ -484,137 +321,6 @@ putNotice notice requestId = do
putInt32 $ toEnum (length kCursorIds) putInt32 $ toEnum (length kCursorIds)
mapM_ putInt64 kCursorIds mapM_ putInt64 kCursorIds
data KillC = KillC { killCursor :: Notice, kFullCollection:: FullCollection} deriving Show
data Cmd = Nc Notice | Req Request | Kc KillC deriving Show
data FlagBit =
ChecksumPresent -- ^ The message ends with 4 bytes containing a CRC-32C checksum
| MoreToCome -- ^ Another message will follow this one without further action from the receiver.
| ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
deriving (Show, Eq, Enum)
{-
OP_MSG header == 16 byte
+ 4 bytes flagBits
+ 1 byte payload type = 1
+ 1 byte payload type = 2
+ 4 byte size of payload
== 26 bytes opcode overhead
+ X Full command document {insert: "test", writeConcern: {...}}
+ Y command identifier ("documents", "deletes", "updates") ( + \0)
-}
putOpMsg :: Cmd -> RequestId -> Maybe FlagBit -> Document -> Put
putOpMsg cmd requestId flagBit params = do
let biT = maybe zeroBits (bit . bitOpMsg) flagBit:: Int32
putOpMsgHeader opMsgOpcode requestId -- header
case cmd of
Nc n -> case n of
Insert{..} -> do
let (sec0, sec1Size) =
prepSectionInfo
iFullCollection
(Just (iDocuments:: [Document]))
(Nothing:: Maybe Document)
("insert":: Text)
("documents":: Text)
params
putInt32 biT -- flagBit
putInt8 0 -- payload type 0
putDocument sec0 -- payload
putInt8 1 -- payload type 1
putInt32 sec1Size -- size of section
putCString "documents" -- identifier
mapM_ putDocument iDocuments -- payload
Update{..} -> do
let doc = ["q" =: uSelector, "u" =: uUpdater]
(sec0, sec1Size) =
prepSectionInfo
uFullCollection
(Nothing:: Maybe [Document])
(Just doc)
("update":: Text)
("updates":: Text)
params
putInt32 biT
putInt8 0
putDocument sec0
putInt8 1
putInt32 sec1Size
putCString "updates"
putDocument doc
Delete{..} -> do
-- Setting limit to 1 here is ok, since this is only used by deleteOne
let doc = ["q" =: dSelector, "limit" =: (1 :: Int32)]
(sec0, sec1Size) =
prepSectionInfo
dFullCollection
(Nothing:: Maybe [Document])
(Just doc)
("delete":: Text)
("deletes":: Text)
params
putInt32 biT
putInt8 0
putDocument sec0
putInt8 1
putInt32 sec1Size
putCString "deletes"
putDocument doc
_ -> error "The KillCursors command cannot be wrapped into a Nc type constructor. Please use the Kc type constructor"
Req r -> case r of
Query{..} -> do
let n = T.splitOn "." qFullCollection
db = head n
sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector]
putInt32 biT
putInt8 0
putDocument sec0
GetMore{..} -> do
let n = T.splitOn "." gFullCollection
(db, coll) = (head n, last n)
pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize]
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
putInt8 0
putDocument pre
Kc k -> case k of
KillC{..} -> do
let n = T.splitOn "." kFullCollection
(db, coll) = (head n, last n)
case killCursor of
KillCursors{..} -> do
let doc = ["killCursors" =: coll, "cursors" =: kCursorIds, "$db" =: db]
putInt32 biT
putInt8 0
putDocument doc
-- Notices are already captured at the beginning, so all
-- other cases are impossible
_ -> error "impossible"
where
lenBytes bytes = toEnum . fromEnum $ L.length bytes:: Int32
prepSectionInfo fullCollection documents document command identifier ps =
let n = T.splitOn "." fullCollection
(db, coll) = (head n, last n)
in
case documents of
Just ds ->
let
sec0 = merge ps [command =: coll, "$db" =: db]
s = sum $ map (lenBytes . runPut . putDocument) ds
i = runPut $ putCString identifier
-- +4 bytes for the type 1 section size that has to be
-- transported in addition to the type 1 section document
sec1Size = s + lenBytes i + 4
in (sec0, sec1Size)
Nothing ->
let
sec0 = merge ps [command =: coll, "$db" =: db]
s = runPut $ putDocument $ fromJust document
i = runPut $ putCString identifier
sec1Size = lenBytes s + lenBytes i + 4
in (sec0, sec1Size)
iBit :: InsertOption -> Int32 iBit :: InsertOption -> Int32
iBit KeepGoing = bit 0 iBit KeepGoing = bit 0
@ -634,11 +340,6 @@ dBit SingleRemove = bit 0
dBits :: [DeleteOption] -> Int32 dBits :: [DeleteOption] -> Int32
dBits = bitOr . map dBit dBits = bitOr . map dBit
bitOpMsg :: FlagBit -> Int
bitOpMsg ChecksumPresent = 0
bitOpMsg MoreToCome = 1
bitOpMsg ExhaustAllowed = 16
-- ** Request -- ** Request
-- | A request is a message that is sent with a 'Reply' expected in return -- | A request is a message that is sent with a 'Reply' expected in return
@ -648,8 +349,8 @@ data Request =
qFullCollection :: FullCollection, qFullCollection :: FullCollection,
qSkip :: Int32, -- ^ Number of initial matching documents to skip qSkip :: Int32, -- ^ Number of initial matching documents to skip
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size. qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
qSelector :: Document, -- ^ @[]@ = return all documents in collection qSelector :: Document, -- ^ \[\] = return all documents in collection
qProjector :: Document -- ^ @[]@ = return whole document qProjector :: Document -- ^ \[\] = return whole document
} | GetMore { } | GetMore {
gFullCollection :: FullCollection, gFullCollection :: FullCollection,
gBatchSize :: Int32, gBatchSize :: Int32,
@ -657,15 +358,13 @@ data Request =
deriving (Show, Eq) deriving (Show, Eq)
data QueryOption = data QueryOption =
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on @CursorNotFound@ exception. TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local". | SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
| NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed. | NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal. | AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection. -- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
-- Exhaust commented out because not compatible with current `Pipeline` implementation -- Exhaust commented out because not compatible with current `Pipeline` implementation
| Partial -- ^ Get partial results from a _mongos_ if some shards are down, instead of throwing an error.
| Partial -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
deriving (Show, Eq) deriving (Show, Eq)
-- *** Binary format -- *** Binary format
@ -674,9 +373,6 @@ qOpcode :: Request -> Opcode
qOpcode Query{} = 2004 qOpcode Query{} = 2004
qOpcode GetMore{} = 2005 qOpcode GetMore{} = 2005
opMsgOpcode :: Opcode
opMsgOpcode = 2013
putRequest :: Request -> RequestId -> Put putRequest :: Request -> RequestId -> Put
putRequest request requestId = do putRequest request requestId = do
putHeader (qOpcode request) requestId putHeader (qOpcode request) requestId
@ -700,7 +396,7 @@ qBit SlaveOK = bit 2
qBit NoCursorTimeout = bit 4 qBit NoCursorTimeout = bit 4
qBit AwaitData = bit 5 qBit AwaitData = bit 5
--qBit Exhaust = bit 6 --qBit Exhaust = bit 6
qBit Database.MongoDB.Internal.Protocol.Partial = bit 7 qBit Partial = bit 7
qBits :: [QueryOption] -> Int32 qBits :: [QueryOption] -> Int32
qBits = bitOr . map qBit qBits = bitOr . map qBit
@ -713,13 +409,7 @@ data Reply = Reply {
rCursorId :: CursorId, -- ^ 0 = cursor finished rCursorId :: CursorId, -- ^ 0 = cursor finished
rStartingFrom :: Int32, rStartingFrom :: Int32,
rDocuments :: [Document] rDocuments :: [Document]
} } deriving (Show, Eq)
| ReplyOpMsg {
flagBits :: [FlagBit],
sections :: [Document],
checksum :: Maybe Int32
}
deriving (Show, Eq)
data ResponseFlag = data ResponseFlag =
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results. CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
@ -735,38 +425,17 @@ replyOpcode = 1
getReply :: Get (ResponseTo, Reply) getReply :: Get (ResponseTo, Reply)
getReply = do getReply = do
(opcode, responseTo) <- getHeader (opcode, responseTo) <- getHeader
if opcode == 2013 unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
then do rResponseFlags <- rFlags <$> getInt32
-- Notes: rCursorId <- getInt64
-- Checksum bits that are set by the server don't seem to be supported by official drivers. rStartingFrom <- getInt32
-- See: https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/message.py#L1423 numDocs <- fromIntegral <$> getInt32
flagBits <- rFlagsOpMsg <$> getInt32 rDocuments <- replicateM numDocs getDocument
_ <- getInt8 return (responseTo, Reply{..})
sec0 <- getDocument
let sections = [sec0]
checksum = Nothing
return (responseTo, ReplyOpMsg{..})
else do
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
rResponseFlags <- rFlags <$> getInt32
rCursorId <- getInt64
rStartingFrom <- getInt32
numDocs <- fromIntegral <$> getInt32
rDocuments <- replicateM numDocs getDocument
return (responseTo, Reply{..})
rFlags :: Int32 -> [ResponseFlag] rFlags :: Int32 -> [ResponseFlag]
rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..] rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..]
-- See https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#flagbits
rFlagsOpMsg :: Int32 -> [FlagBit]
rFlagsOpMsg bits = isValidFlag bits
where isValidFlag bt =
let setBits = map fst $ filter (\(_,b) -> b == True) $ zip ([0..31] :: [Int32]) $ map (testBit bt) [0 .. 31]
in if any (\n -> not $ elem n [0,1,16]) setBits
then error "Unsopported bit was set"
else filter (testBit bt . bitOpMsg) [ChecksumPresent ..]
rBit :: ResponseFlag -> Int rBit :: ResponseFlag -> Int
rBit CursorNotFound = 0 rBit CursorNotFound = 0
rBit QueryError = 1 rBit QueryError = 1

View file

@ -1,7 +1,9 @@
-- | Miscellaneous general functions -- | Miscellaneous general functions and Show, Eq, and Ord instances for PortID
{-# LANGUAGE FlexibleInstances, UndecidableInstances, StandaloneDeriving #-} {-# LANGUAGE FlexibleInstances, UndecidableInstances, StandaloneDeriving #-}
{-# LANGUAGE CPP #-} {-# LANGUAGE CPP #-}
-- PortID instances
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Database.MongoDB.Internal.Util where module Database.MongoDB.Internal.Util where
@ -12,19 +14,26 @@ import Control.Exception (handle, throwIO, Exception)
import Control.Monad (liftM, liftM2) import Control.Monad (liftM, liftM2)
import Data.Bits (Bits, (.|.)) import Data.Bits (Bits, (.|.))
import Data.Word (Word8) import Data.Word (Word8)
import Network (PortID(..))
import Numeric (showHex) import Numeric (showHex)
import System.Random (newStdGen) import System.Random (newStdGen)
import System.Random.Shuffle (shuffle') import System.Random.Shuffle (shuffle')
import qualified Data.ByteString as S import qualified Data.ByteString as S
import Control.Monad.Except (MonadError(..)) import Control.Monad.Error (MonadError(..), Error(..))
import Control.Monad.Trans (MonadIO, liftIO) import Control.Monad.Trans (MonadIO, liftIO)
import Data.Bson import Data.Bson
import Data.Text (Text) import Data.Text (Text)
import qualified Data.Text as T import qualified Data.Text as T
#if !MIN_VERSION_network(2, 4, 1)
deriving instance Show PortID
deriving instance Eq PortID
#endif
deriving instance Ord PortID
-- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude -- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude
mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a] mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a]
mergesortM cmp = mergesortM' cmp . map wrap mergesortM cmp = mergesortM' cmp . map wrap
@ -56,16 +65,13 @@ shuffle :: [a] -> IO [a]
-- ^ Randomly shuffle items in list -- ^ Randomly shuffle items in list
shuffle list = shuffle' list (length list) <$> newStdGen shuffle list = shuffle' list (length list) <$> newStdGen
loop :: Monad m => m (Maybe a) -> m [a] loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
-- ^ Repeatedy execute action, collecting results, until it returns Nothing -- ^ Repeatedy execute action, collecting results, until it returns Nothing
loop act = act >>= maybe (return []) (\a -> (a :) `liftM` loop act) loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act)
untilSuccess :: (MonadError e m) => (a -> m b) -> [a] -> m b untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty. -- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty.
untilSuccess = untilSuccess' (error "empty untilSuccess") untilSuccess = untilSuccess' (strMsg "empty untilSuccess")
-- Use 'error' copying behavior in removed 'Control.Monad.Error.Error' instance:
-- instance Error Failure where strMsg = error
-- 'fail' is treated the same as a programming 'error'. In other words, don't use it.
untilSuccess' :: (MonadError e m) => e -> (a -> m b) -> [a] -> m b untilSuccess' :: (MonadError e m) => e -> (a -> m b) -> [a] -> m b
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw given error if list is empty -- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw given error if list is empty

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,6 @@
{-# LANGUAGE CPP #-} {-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
#if (__GLASGOW_HASKELL__ >= 706) #if (__GLASGOW_HASKELL__ >= 706)
{-# LANGUAGE RecursiveDo #-} {-# LANGUAGE RecursiveDo #-}
@ -20,44 +21,40 @@ ATTENTION!!! Be aware that this module is highly experimental and is
barely tested. The current implementation doesn't verify server's identity. barely tested. The current implementation doesn't verify server's identity.
It only allows you to connect to a mongodb server using TLS protocol. It only allows you to connect to a mongodb server using TLS protocol.
-} -}
module Database.MongoDB.Transport.Tls module Database.MongoDB.Transport.Tls
( connect (connect)
, connectWithTlsParams
)
where where
import Data.IORef import Data.IORef
import Data.Monoid
import qualified Data.ByteString as ByteString import qualified Data.ByteString as ByteString
import qualified Data.ByteString.Lazy as Lazy.ByteString import qualified Data.ByteString.Lazy as Lazy.ByteString
import Data.Default.Class (def) import Data.Default.Class (def)
import Control.Applicative ((<$>))
import Control.Exception (bracketOnError) import Control.Exception (bracketOnError)
import Control.Monad (when, unless) import Control.Monad (when, unless)
import System.IO import System.IO
import Database.MongoDB.Internal.Protocol (Pipe, newPipeWith) import Database.MongoDB (Pipe)
import Database.MongoDB.Internal.Protocol (newPipeWith)
import Database.MongoDB.Transport (Transport(Transport)) import Database.MongoDB.Transport (Transport(Transport))
import qualified Database.MongoDB.Transport as T import qualified Database.MongoDB.Transport as T
import System.IO.Error (mkIOError, eofErrorType) import System.IO.Error (mkIOError, eofErrorType)
import Database.MongoDB.Internal.Network (connectTo, HostName, PortID) import Network (connectTo, HostName, PortID)
import qualified Network.TLS as TLS import qualified Network.TLS as TLS
import qualified Network.TLS.Extra.Cipher as TLS import qualified Network.TLS.Extra.Cipher as TLS
import Database.MongoDB.Query (access, slaveOk, retrieveServerData) import Database.MongoDB.Query (access, slaveOk, retrieveServerData)
-- | Connect to mongodb using TLS -- | Connect to mongodb using TLS
connect :: HostName -> PortID -> IO Pipe connect :: HostName -> PortID -> IO Pipe
connect host port = connectWithTlsParams params host port connect host port = bracketOnError (connectTo host port) hClose $ \handle -> do
where
params = (TLS.defaultParamsClient host "")
{ TLS.clientSupported = def
{ TLS.supportedCiphers = TLS.ciphersuite_default }
, TLS.clientHooks = def
{ TLS.onServerCertificate = \_ _ _ _ -> return [] }
}
-- | Connect to mongodb using TLS using provided TLS client parameters let params = (TLS.defaultParamsClient host "")
connectWithTlsParams :: TLS.ClientParams -> HostName -> PortID -> IO Pipe { TLS.clientSupported = def
connectWithTlsParams clientParams host port = bracketOnError (connectTo host port) hClose $ \handle -> do { TLS.supportedCiphers = TLS.ciphersuite_all}
context <- TLS.contextNew handle clientParams , TLS.clientHooks = def
{ TLS.onServerCertificate = \_ _ _ _ -> return []}
}
context <- TLS.contextNew handle params
TLS.handshake context TLS.handshake context
conn <- tlsConnection context conn <- tlsConnection context

View file

@ -1,7 +1,7 @@
This is the Haskell MongoDB driver (client). [MongoDB](http://www.mongodb.org) is a free, scalable, fast, document database management system. This driver lets you connect to a MongoDB server, and update and query its data. It also lets you do adminstrative tasks, like create an index or look at performance statistics. This is the Haskell MongoDB driver (client). [MongoDB](http://www.mongodb.org) is a free, scalable, fast, document database management system. This driver lets you connect to a MongoDB server, and update and query its data. It also lets you do adminstrative tasks, like create an index or look at performance statistics.
[![Join the chat at https://gitter.im/mongodb-haskell/mongodb](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mongodb-haskell/mongodb?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![Join the chat at https://gitter.im/mongodb-haskell/mongodb](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mongodb-haskell/mongodb?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Build Status](https://travis-ci.org/mongodb-haskell/mongodb.svg?branch=master)](https://travis-ci.org/mongodb-haskell/mongodb) [![Build Status](https://travis-ci.org/mongodb-haskell/mongodb.svg)](https://travis-ci.org/mongodb-haskell/mongodb)
### Documentation ### Documentation
@ -11,68 +11,3 @@ This is the Haskell MongoDB driver (client). [MongoDB](http://www.mongodb.org) i
* [MapReduce example](http://github.com/mongodb-haskell/mongodb/blob/master/doc/map-reduce-example.md) * [MapReduce example](http://github.com/mongodb-haskell/mongodb/blob/master/doc/map-reduce-example.md)
* [Driver design](https://github.com/mongodb-haskell/mongodb/blob/master/doc/Article1.md) * [Driver design](https://github.com/mongodb-haskell/mongodb/blob/master/doc/Article1.md)
* [MongoDB DBMS](http://www.mongodb.org) * [MongoDB DBMS](http://www.mongodb.org)
### Dev Environment
It's important for this library to be tested with various versions of mongodb
server and with different ghc versions. In order to achieve this we use docker
containers and docker-compose. This repository contains two files: docker-compose.yml
and reattach.sh.
Docker compose file describes two containers.
One container is for running mongodb server. If you want a different version of
mongodb server you need to change the tag of mongo image in the
docker-compose.yml. In order to start your mongodb server you need to run:
```
docker-compose up -d mongodb
```
In order to stop your containers without loosing the data inside of it:
```
docker-compose stop mongodb
```
Restart:
```
docker-compose start mongodb
```
If you want to remove the mongodb container and start from scratch then:
```
docker-compose stop mongodb
docker-compose rm mongodb
docker-compose up -d mongodb
```
The other container is for compiling your code. By specifying the tag of the image
you can change the version of ghc you will be using. If you never started this
container then you need:
```
docker-compose run mongodb-haskell
```
It will start the container and mount your working directory to
`/opt/mongodb-haskell` If you exit the bash cli then the conainer will stop.
In order to reattach to an old stopped container you need to run script
`reattach.sh`. If you run `docker-compose run` again it will create another
container and all work made by cabal will be lost. `reattach.sh` is a
workaround for docker-compose's inability to pick up containers that exited.
When you are done with testing you need to run:
```
docker-compose stop mongodb
```
Next time you will need to do:
```
docker-compose start mongodb
reattach.sh
```
It will start your stopped container with mongodb server and pick up the stopped
container with haskell.

0
Setup.lhs Normal file → Executable file
View file

View file

@ -1,4 +1,3 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ExtendedDefaultRules #-} {-# LANGUAGE ExtendedDefaultRules #-}
@ -8,12 +7,7 @@ import Database.MongoDB (Action, Document, Document, Value, access,
close, connect, delete, exclude, find, close, connect, delete, exclude, find,
host, insertMany, master, project, rest, host, insertMany, master, project, rest,
select, sort, (=:)) select, sort, (=:))
#if (__GLASGOW_HASKELL__ >= 800)
import Control.Monad.IO.Class (liftIO)
#else
import Control.Monad.Trans (liftIO) import Control.Monad.Trans (liftIO)
#endif
main :: IO () main :: IO ()
main = do main = do

View file

@ -12,10 +12,9 @@ Start a haskell session:
$ ghci $ ghci
> :set prompt "> " > :set prompt "> "
Import the MongoDB driver library, and set OverloadedStrings so literal strings are converted to UTF-8 automatically and ExtendedDefaultRules for using BSON fields with basic types. Import the MongoDB driver library, and set OverloadedStrings so literal strings are converted to UTF-8 automatically.
> :set -XOverloadedStrings > :set -XOverloadedStrings
> :set -XExtendedDefaultRules
> import Database.MongoDB > import Database.MongoDB
### Connecting ### Connecting
@ -36,7 +35,7 @@ A DB read or write operation is called a DB `Action`. A DB Action is a monad so
> access pipe master "test" allCollections > access pipe master "test" allCollections
`access` throw `Failure` if there is any issue. Failure means there was a connection failure, or a read/write failure like cursor expired or duplicate key insert. `access` return either Left `Failure` or Right result. Failure means there was a connection failure, or a read/write failure like cursor expired or duplicate key insert.
`master` is an `AccessMode`. Access mode indicates how reads and writes will be performed. Its three modes are: `ReadStaleOk`, `UnconfirmedWrites`, and `ConfirmWrites GetLastErrorParams`. `master` is just short hand for `ConfirmWrites []`. The first mode may be used against a slave or a master server, the last two must be used against a master server. `master` is an `AccessMode`. Access mode indicates how reads and writes will be performed. Its three modes are: `ReadStaleOk`, `UnconfirmedWrites`, and `ConfirmWrites GetLastErrorParams`. `master` is just short hand for `ConfirmWrites []`. The first mode may be used against a slave or a master server, the last two must be used against a master server.

View file

@ -1,16 +0,0 @@
version: '3'
services:
mongodb:
ports:
- 27017:27017
image: mongo:3.6
mongodb-haskell:
image: phadej/ghc:8.0.2
environment:
- HASKELL_MONGODB_TEST_HOST=mongodb
entrypoint:
- /bin/bash
volumes:
- ./:/opt/mongodb-haskell
# vim: ts=2 et sw=2 ai

View file

@ -1,5 +1,5 @@
Name: mongoDB Name: mongoDB
Version: 2.7.1.2 Version: 2.1.0
Synopsis: Driver (client) for MongoDB, a free, scalable, fast, document Synopsis: Driver (client) for MongoDB, a free, scalable, fast, document
DBMS DBMS
Description: This package lets you connect to MongoDB servers and Description: This package lets you connect to MongoDB servers and
@ -10,74 +10,50 @@ Category: Database
Homepage: https://github.com/mongodb-haskell/mongodb Homepage: https://github.com/mongodb-haskell/mongodb
Bug-reports: https://github.com/mongodb-haskell/mongodb/issues Bug-reports: https://github.com/mongodb-haskell/mongodb/issues
Author: Tony Hannan Author: Tony Hannan
Maintainer: Victor Denisov <denisovenator@gmail.com> Maintainer: Fedor Gogolev <knsd@knsd.net>
Copyright: Copyright (c) 2010-2012 10gen Inc. Copyright: Copyright (c) 2010-2012 10gen Inc.
License: Apache-2.0 License: OtherLicense
License-file: LICENSE License-file: LICENSE
Cabal-version: >= 1.10 Cabal-version: >= 1.10
Build-type: Simple Build-type: Simple
Stability: alpha Stability: alpha
Extra-Source-Files: CHANGELOG.md Extra-Source-Files: CHANGELOG.md
-- Imitated from https://github.com/mongodb-haskell/bson/pull/18
Flag _old-network
description: Control whether to use <http://hackage.haskell.org/package/network-bsd network-bsd>
manual: False
Library Library
GHC-options: -Wall GHC-options: -Wall
GHC-prof-options: -auto-all-exported
default-language: Haskell2010 default-language: Haskell2010
Build-depends: array -any Build-depends: array -any
, base <5 , base <5
, binary -any , binary -any
, bson >= 0.3 && < 0.5 , bson >= 0.3 && < 0.4
, text , text
, bytestring -any , bytestring -any
, containers -any , containers -any
, conduit
, conduit-extra
, mtl >= 2 , mtl >= 2
, cryptohash -any , cryptohash -any
, network -any
, parsec -any , parsec -any
, random -any , random -any
, random-shuffle -any , random-shuffle -any
, resourcet
, monad-control >= 0.3.1 , monad-control >= 0.3.1
, lifted-base >= 0.1.0.3 , lifted-base >= 0.1.0.3
, pureMD5 , tls >= 1.2.0
, stm
, tagged
, tls >= 1.3.0
, time
, data-default-class -any , data-default-class -any
, transformers
, transformers-base >= 0.4.1 , transformers-base >= 0.4.1
, hashtables >= 1.1.2.0 , hashtables >= 1.1.2.0
, base16-bytestring >= 0.1.1.6 , base16-bytestring >= 0.1.1.6
, base64-bytestring >= 1.0.0.1 , base64-bytestring >= 1.0.0.1
, nonce >= 1.0.5 , nonce >= 1.0.2
, fail
, dns
, http-types
if flag(_old-network)
-- "Network.BSD" is only available in network < 2.9
build-depends: network < 2.9
else
-- "Network.BSD" has been moved into its own package `network-bsd`
build-depends: network >= 3.0
, network-bsd >= 2.7 && < 2.9
Exposed-modules: Database.MongoDB Exposed-modules: Database.MongoDB
Database.MongoDB.Admin Database.MongoDB.Admin
Database.MongoDB.Connection Database.MongoDB.Connection
Database.MongoDB.GridFS
Database.MongoDB.Query Database.MongoDB.Query
Database.MongoDB.Transport Database.MongoDB.Transport
Database.MongoDB.Transport.Tls Database.MongoDB.Transport.Tls
Other-modules: Database.MongoDB.Internal.Network Other-modules: Database.MongoDB.Internal.Protocol
Database.MongoDB.Internal.Protocol
Database.MongoDB.Internal.Util Database.MongoDB.Internal.Util
Source-repository head Source-repository head
@ -87,9 +63,6 @@ Source-repository head
test-suite test test-suite test
hs-source-dirs: test hs-source-dirs: test
main-is: Main.hs main-is: Main.hs
other-modules: Spec
, QuerySpec
, TestImport
ghc-options: -Wall -with-rtsopts "-K64m" ghc-options: -Wall -with-rtsopts "-K64m"
type: exitcode-stdio-1.0 type: exitcode-stdio-1.0
build-depends: mongoDB build-depends: mongoDB
@ -114,15 +87,14 @@ Benchmark bench
, base64-bytestring , base64-bytestring
, base16-bytestring , base16-bytestring
, binary -any , binary -any
, bson >= 0.3 && < 0.5 , bson >= 0.3 && < 0.4
, data-default-class -any
, text , text
, bytestring -any , bytestring -any
, containers -any , containers -any
, mtl >= 2 , mtl >= 2
, cryptohash -any , cryptohash -any
, nonce >= 1.0.5 , network -any
, stm , nonce
, parsec -any , parsec -any
, random -any , random -any
, random-shuffle -any , random-shuffle -any
@ -130,19 +102,6 @@ Benchmark bench
, lifted-base >= 0.1.0.3 , lifted-base >= 0.1.0.3
, transformers-base >= 0.4.1 , transformers-base >= 0.4.1
, hashtables >= 1.1.2.0 , hashtables >= 1.1.2.0
, fail
, dns
, http-types
, criterion , criterion
, tls >= 1.3.0
if flag(_old-network)
-- "Network.BSD" is only available in network < 2.9
build-depends: network < 2.9
else
-- "Network.BSD" has been moved into its own package `network-bsd`
build-depends: network >= 3.0
, network-bsd >= 2.7 && < 2.9
default-language: Haskell2010 default-language: Haskell2010
default-extensions: OverloadedStrings default-extensions: OverloadedStrings

View file

@ -1,3 +0,0 @@
#!/bin/bash
docker start -ai mongodb_mongodb-haskell_run_1

View file

@ -1,4 +0,0 @@
resolver: lts-9.21
flags:
mongoDB:
_old-network: true

View file

@ -1,4 +0,0 @@
resolver: lts-11.22
flags:
mongoDB:
_old-network: true

View file

@ -1,4 +0,0 @@
resolver: lts-12.26
flags:
mongoDB:
_old-network: true

View file

@ -1,6 +0,0 @@
resolver: lts-13.23
extra-deps:
- git: git@github.com:hvr/bson.git # https://github.com/mongodb-haskell/bson/pull/18
commit: 2fc8d04120c0758201762b8e22254aeb6d574f41
- network-bsd-2.8.1.0
- network-3.1.0.0

View file

@ -1,4 +0,0 @@
resolver: lts-13.23
flags:
mongoDB:
_old-network: true

View file

@ -1,69 +0,0 @@
# This file was automatically generated by 'stack init'
#
# Some commonly used options have been documented as comments in this file.
# For advanced use and comprehensive documentation of the format, please see:
# https://docs.haskellstack.org/en/stable/yaml_configuration/
# Resolver to choose a 'specific' stackage snapshot or a compiler version.
# A snapshot resolver dictates the compiler version and the set of packages
# to be used for project dependencies. For example:
#
# resolver: lts-3.5
# resolver: nightly-2015-09-21
# resolver: ghc-7.10.2
#
# The location of a snapshot can be provided as a file or url. Stack assumes
# a snapshot provided as a file might change, whereas a url resource does not.
#
# resolver: ./custom-snapshot.yaml
# resolver: https://example.com/snapshots/2018-01-01.yaml
resolver:
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml
# User packages to be built.
# Various formats can be used as shown in the example below.
#
# packages:
# - some-directory
# - https://example.com/foo/bar/baz-0.0.2.tar.gz
# subdirs:
# - auto-update
# - wai
packages:
- .
# Dependency packages to be pulled from upstream that are not in the resolver.
# These entries can reference officially published versions as well as
# forks / in-progress versions pinned to a git hash. For example:
#
# extra-deps:
# - acme-missiles-0.3
# - git: https://github.com/commercialhaskell/stack.git
# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a
#
# extra-deps: []
# Override default flag values for local packages and extra-deps
flags:
mongoDB:
_old-network: false
# Extra package databases containing global packages
# extra-package-dbs: []
# Control whether we use the GHC we find on the path
# system-ghc: true
#
# Require a specific version of stack, using version ranges
# require-stack-version: -any # Default
# require-stack-version: ">=2.7"
#
# Override the architecture used by stack, especially useful on Windows
# arch: i386
# arch: x86_64
#
# Extra directories used by stack for building
# extra-include-dirs: [/path/to/dir]
# extra-lib-dirs: [/path/to/dir]
#
# Allow a newer minor version of GHC than the snapshot specifies
# compiler-check: newer-minor

View file

@ -1,13 +0,0 @@
# This file was autogenerated by Stack.
# You should not edit this file by hand.
# For more information, please see the documentation at:
# https://docs.haskellstack.org/en/stable/lock_files
packages: []
snapshots:
- completed:
size: 586110
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml
sha256: ce4fb8d44f3c6c6032060a02e0ebb1bd29937c9a70101c1517b92a87d9515160
original:
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml

View file

@ -5,15 +5,11 @@ import Database.MongoDB.Connection (connect, host)
import Database.MongoDB.Query (access, slaveOk) import Database.MongoDB.Query (access, slaveOk)
import Data.Text (unpack) import Data.Text (unpack)
import Test.Hspec.Runner import Test.Hspec.Runner
import System.Environment (getEnv)
import System.IO.Error (catchIOError)
import TestImport
import qualified Spec import qualified Spec
main :: IO () main :: IO ()
main = do main = do
mongodbHost <- getEnv mongodbHostEnvVariable `catchIOError` (\_ -> return "localhost") p <- connect $ host "localhost"
p <- connect $ host mongodbHost
version <- access p slaveOk "admin" serverVersion version <- access p slaveOk "admin" serverVersion
putStrLn $ "Running tests with mongodb version: " ++ (unpack version) putStrLn $ "Running tests with mongodb version: " ++ (unpack version)
hspecWith defaultConfig Spec.spec hspecWith defaultConfig Spec.spec

View file

@ -5,9 +5,7 @@ module QuerySpec (spec) where
import Data.String (IsString(..)) import Data.String (IsString(..))
import TestImport import TestImport
import Control.Exception import Control.Exception
import Control.Monad (forM_, when) import Control.Monad (forM_)
import System.Environment (getEnv)
import System.IO.Error (catchIOError)
import qualified Data.List as L import qualified Data.List as L
import qualified Data.Text as T import qualified Data.Text as T
@ -17,17 +15,11 @@ testDBName = "mongodb-haskell-test"
db :: Action IO a -> IO a db :: Action IO a -> IO a
db action = do db action = do
mongodbHost <- getEnv mongodbHostEnvVariable `catchIOError` (\_ -> return "localhost") pipe <- connect (host "127.0.0.1")
pipe <- connect (host mongodbHost)
result <- access pipe master testDBName action result <- access pipe master testDBName action
close pipe close pipe
return result return result
getWireVersion :: IO Int
getWireVersion = db $ do
sd <- retrieveServerData
return $ maxWireVersion sd
withCleanDatabase :: ActionWith () -> IO () withCleanDatabase :: ActionWith () -> IO ()
withCleanDatabase action = dropDB >> action () >> dropDB >> return () withCleanDatabase action = dropDB >> action () >> dropDB >> return ()
where where
@ -43,21 +35,6 @@ insertDuplicateWith testInsert = do
] ]
return () return ()
insertUsers :: IO ()
insertUsers = db $
insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]
, ["_id" =: "joe", "joined" =: parseDate "2012-07-02", "likes" =: ["tennis", "golf", "swimming"]]
, ["_id" =: "jill", "joined" =: parseDate "2013-11-17", "likes" =: ["cricket", "golf"]]
]
pendingIfMongoVersion :: ((Integer, Integer) -> Bool) -> SpecWith () -> Spec
pendingIfMongoVersion invalidVersion = before $ do
version <- db $ extractVersion . T.splitOn "." . at "version" <$> runCommand1 "buildinfo"
when (invalidVersion version) $ pendingWith "This test does not run in the current database version"
where
extractVersion (major:minor:_) = (read $ T.unpack major, read $ T.unpack minor)
extractVersion _ = error "Invalid version specification"
bigDocument :: Document bigDocument :: Document
bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name") bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
@ -130,8 +107,8 @@ spec = around withCleanDatabase $ do
describe "insertAll" $ do describe "insertAll" $ do
it "inserts documents to the collection and returns their _ids" $ do it "inserts documents to the collection and returns their _ids" $ do
(_id1:_id2:_) <- db $ insertAll "team" [ ["name" =: "Yankees", "league" =: "American"] (_id1:_id2:_) <- db $ insertAll "team" [ ["name" =: "Yankees", "league" =: "American"]
, ["name" =: "Dodgers", "league" =: "American"] , ["name" =: "Dodgers", "league" =: "American"]
] ]
result <- db $ rest =<< find (select [] "team") result <- db $ rest =<< find (select [] "team")
result `shouldBe` [["_id" =: _id1, "name" =: "Yankees", "league" =: "American"] result `shouldBe` [["_id" =: _id1, "name" =: "Yankees", "league" =: "American"]
,["_id" =: _id2, "name" =: "Dodgers", "league" =: "American"] ,["_id" =: _id2, "name" =: "Dodgers", "league" =: "American"]
@ -191,7 +168,7 @@ spec = around withCleanDatabase $ do
liftIO $ (length returnedDocs) `shouldBe` 1000 liftIO $ (length returnedDocs) `shouldBe` 1000
it "skips one too big document" $ do it "skips one too big document" $ do
(db $ insertAll_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException db $ insertAll_ "hugeDocCollection" [hugeDocument]
db $ do db $ do
cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000} cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000}
returnedDocs <- rest cur returnedDocs <- rest cur
@ -212,146 +189,107 @@ spec = around withCleanDatabase $ do
describe "updateMany" $ do describe "updateMany" $ do
it "updates value" $ do it "updates value" $ do
wireVersion <- getWireVersion _id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
when (wireVersion > 1) $ do result <- db $ rest =<< find (select [] "team")
_id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]]
result <- db $ rest =<< find (select [] "team") _ <- db $ updateMany "team" [([ "_id" =: _id]
result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]] , ["$set" =: ["league" =: "European"]]
_ <- db $ updateMany "team" [([ "_id" =: _id] , [])]
, ["$set" =: ["league" =: "European"]] updatedResult <- db $ rest =<< find (select [] "team")
, [])] updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]]
updatedResult <- db $ rest =<< find (select [] "team")
updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]]
it "upserts value" $ do it "upserts value" $ do
wireVersion <- getWireVersion c <- db $ count (select [] "team")
when (wireVersion > 1) $ do c `shouldBe` 0
c <- db $ count (select [] "team") _ <- db $ updateMany "team" [( []
c `shouldBe` 0 , ["name" =: "Giants", "league" =: "MLB"]
_ <- db $ updateMany "team" [( [] , [Upsert]
, ["name" =: "Giants", "league" =: "MLB"] )]
, [Upsert] updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
)] map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]]
it "updates all documents with Multi enabled" $ do it "updates all documents with Multi enabled" $ do
wireVersion <- getWireVersion _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
when (wireVersion > 1) $ do _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] _ <- db $ updateMany "team" [( ["name" =: "Yankees"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"] , ["$set" =: ["league" =: "MLB"]]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"] , [MultiUpdate]
, ["$set" =: ["league" =: "MLB"]] )]
, [MultiUpdate] updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
)] (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) , ["league" =: "MLB", "name" =: "Yankees"]
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"] ]
, ["league" =: "MLB", "name" =: "Yankees"]
]
it "updates one document when there is no Multi option" $ do it "updates one document when there is no Multi option" $ do
wireVersion <- getWireVersion _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
when (wireVersion > 1) $ do _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] _ <- db $ updateMany "team" [( ["name" =: "Yankees"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"] , ["$set" =: ["league" =: "MLB"]]
_ <- db $ updateMany "team" [( ["name" =: "Yankees"] , []
, ["$set" =: ["league" =: "MLB"]] )]
, [] updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
)] (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) , ["league" =: "MiLB", "name" =: "Yankees"]
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB", "name" =: "Yankees"] ]
, ["league" =: "MiLB", "name" =: "Yankees"]
]
it "can process different updates" $ do it "can process different updates" $ do
wireVersion <- getWireVersion _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
when (wireVersion > 1) $ do _ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"] _ <- db $ updateMany "team" [ ( ["name" =: "Yankees"]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"] , ["$set" =: ["league" =: "MiLB"]]
_ <- db $ updateMany "team" [ ( ["name" =: "Yankees"] , []
, ["$set" =: ["league" =: "MiLB"]] )
, [] , ( ["name" =: "Giants"]
) , ["$set" =: ["league" =: "MLB"]]
, ( ["name" =: "Giants"] , []
, ["$set" =: ["league" =: "MLB"]] )
, [] ]
) updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
] (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB" , "name" =: "Giants"]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) , ["league" =: "MiLB", "name" =: "Yankees"]
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "MLB" , "name" =: "Giants"] ]
, ["league" =: "MiLB", "name" =: "Yankees"]
]
it "can process different updates" $ do it "can process different updates" $ do
wireVersion <- getWireVersion _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
when (wireVersion > 1) $ do _ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)] (db $ updateMany "team" [ ( ["name" =: "Yankees"]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)] , ["$inc" =: ["score" =: (1 :: Int)]]
updateResult <- (db $ updateMany "team" [ ( ["name" =: "Yankees"] , []
, ["$inc" =: ["score" =: (1 :: Int)]] )
, [] , ( ["name" =: "Giants"]
) , ["$inc" =: ["score" =: (2 :: Int)]]
, ( ["name" =: "Giants"] , []
, ["$inc" =: ["score" =: (2 :: Int)]] )
, [] ]) `shouldThrow` anyException
) updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
]) (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
failed updateResult `shouldBe` True , ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) ]
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)]
]
it "can handle big updates" $ do it "can handle big updates" $ do
wireVersion <- getWireVersion let docs = (flip map) [0..20000] $ \i ->
when (wireVersion > 1) $ do ["name" =: (T.pack $ "name " ++ (show i))]
let docs = (flip map) [0..20000] $ \i -> ids <- db $ insertAll "bigCollection" docs
["name" =: (T.pack $ "name " ++ (show i))] let updateDocs = (flip map) ids (\i -> ( [ "_id" =: i]
ids <- db $ insertAll "bigCollection" docs , ["$set" =: ["name" =: ("name " ++ (show i))]]
let updateDocs = (flip map) ids (\i -> ( [ "_id" =: i] , []
, ["$set" =: ["name" =: ("name " ++ (show i))]] ))
, [] _ <- db $ updateMany "team" updateDocs
)) updatedResult <- db $ rest =<< find (select [] "team")
_ <- db $ updateMany "team" updateDocs forM_ updatedResult $ \r -> let (i :: ObjectId) = "_id" `at` r
updatedResult <- db $ rest =<< find (select [] "team") in (("name" `at` r) :: String) `shouldBe` ("name" ++ (show i))
forM_ updatedResult $ \r -> let (i :: ObjectId) = "_id" `at` r
in (("name" `at` r) :: String) `shouldBe` ("name" ++ (show i))
describe "updateAll" $ do describe "updateAll" $ do
it "can process different updates" $ do it "can process different updates" $ do
wireVersion <- getWireVersion _ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
when (wireVersion > 1) $ do _ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)] (db $ updateAll "team" [ ( ["name" =: "Yankees"]
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)] , ["$inc" =: ["score" =: (1 :: Int)]]
updateResult <- (db $ updateAll "team" [ ( ["name" =: "Yankees"] , []
, ["$inc" =: ["score" =: (1 :: Int)]] )
, [] , ( ["name" =: "Giants"]
) , ["$inc" =: ["score" =: (2 :: Int)]]
, ( ["name" =: "Giants"] , []
, ["$inc" =: ["score" =: (2 :: Int)]] )
, [] ]) `shouldThrow` anyException
) updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
]) (L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
failed updateResult `shouldBe` True , ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)]
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]}) ]
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)]
]
it "returns correct number of matched and modified" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [MultiUpdate])]
nMatched res `shouldBe` 2
nModified res `shouldBe` (Just 2)
it "returns correct number of upserted" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myfield" =: "newValue"]], [Upsert])]
(length $ upserted res) `shouldBe` 1
it "updates only one doc without multi update" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [])]
nMatched res `shouldBe` 1
nModified res `shouldBe` (Just 1)
describe "delete" $ do describe "delete" $ do
it "actually deletes something" $ do it "actually deletes something" $ do
@ -393,47 +331,34 @@ spec = around withCleanDatabase $ do
describe "deleteMany" $ do describe "deleteMany" $ do
it "actually deletes something" $ do it "actually deletes something" $ do
wireVersion <- getWireVersion _ <- db $ insert "team" ["name" =: ("Giants" :: String)]
when (wireVersion > 1) $ do _ <- db $ insert "team" ["name" =: ("Yankees" :: String)]
_ <- db $ insert "team" ["name" =: ("Giants" :: String)] _ <- db $ deleteMany "team" [ (["name" =: ("Giants" :: String)], [])
_ <- db $ insert "team" ["name" =: ("Yankees" :: String)] , (["name" =: ("Yankees" :: String)], [])
_ <- db $ deleteMany "team" [ (["name" =: ("Giants" :: String)], []) ]
, (["name" =: ("Yankees" :: String)], []) updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
] length updatedResult `shouldBe` 0
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
describe "deleteAll" $ do describe "deleteAll" $ do
it "actually deletes something" $ do it "actually deletes something" $ do
wireVersion <- getWireVersion _ <- db $ insert "team" [ "name" =: ("Giants" :: String)
when (wireVersion > 1) $ do , "score" =: (Nothing :: Maybe Int)
_ <- db $ insert "team" [ "name" =: ("Giants" :: String) ]
, "score" =: (Nothing :: Maybe Int) _ <- db $ insert "team" [ "name" =: ("Yankees" :: String)
] , "score" =: (1 :: Int)
_ <- db $ insert "team" [ "name" =: ("Yankees" :: String) ]
, "score" =: (1 :: Int) _ <- db $ deleteAll "team" [ (["name" =: ("Giants" :: String)], [])
] , (["name" =: ("Yankees" :: String)], [])
_ <- db $ deleteAll "team" [ (["name" =: ("Giants" :: String)], []) ]
, (["name" =: ("Yankees" :: String)], []) updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
] length updatedResult `shouldBe` 0
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
it "can handle big deletes" $ do it "can handle big deletes" $ do
wireVersion <- getWireVersion let docs = (flip map) [0..20000] $ \i ->
when (wireVersion > 1) $ do ["name" =: (T.pack $ "name " ++ (show i))]
let docs = (flip map) [0..20000] $ \i -> _ <- db $ insertAll "bigCollection" docs
["name" =: (T.pack $ "name " ++ (show i))] _ <- db $ deleteAll "bigCollection" $ map (\d -> (d, [])) docs
_ <- db $ insertAll "bigCollection" docs updatedResult <- db $ rest =<< find ((select [] "bigCollection") {project = ["_id" =: (0 :: Int)]})
_ <- db $ deleteAll "bigCollection" $ map (\d -> (d, [])) docs length updatedResult `shouldBe` 0
updatedResult <- db $ rest =<< find ((select [] "bigCollection") {project = ["_id" =: (0 :: Int)]})
length updatedResult `shouldBe` 0
it "returns correct result" $ do
wireVersion <- getWireVersion
when (wireVersion > 1) $ do
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
res <- db $ deleteAll "testCollection" [ (["myField" =: "myValue"], []) ]
nRemoved res `shouldBe` 2
describe "allCollections" $ do describe "allCollections" $ do
it "returns all collections in a database" $ do it "returns all collections in a database" $ do
@ -443,34 +368,13 @@ spec = around withCleanDatabase $ do
collections <- db $ allCollections collections <- db $ allCollections
liftIO $ (L.sort collections) `shouldContain` ["team1", "team2", "team3"] liftIO $ (L.sort collections) `shouldContain` ["team1", "team2", "team3"]
describe "aggregate" $ before_ insertUsers $ describe "aggregate" $ do
it "aggregates to normalize and sort documents" $ do it "aggregates to normalize and sort documents" $ do
db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]
, ["_id" =: "joe", "joined" =: parseDate "2012-07-02", "likes" =: ["tennis", "golf", "swimming"]]
, ["_id" =: "jill", "joined" =: parseDate "2013-11-17", "likes" =: ["cricket", "golf"]]
]
result <- db $ aggregate "users" [ ["$project" =: ["name" =: ["$toUpper" =: "$_id"], "_id" =: 0]] result <- db $ aggregate "users" [ ["$project" =: ["name" =: ["$toUpper" =: "$_id"], "_id" =: 0]]
, ["$sort" =: ["name" =: 1]] , ["$sort" =: ["name" =: 1]]
] ]
result `shouldBe` [["name" =: "JANE"], ["name" =: "JILL"], ["name" =: "JOE"]] result `shouldBe` [["name" =: "JANE"], ["name" =: "JILL"], ["name" =: "JOE"]]
-- This feature was introduced in MongoDB version 3.2
-- https://docs.mongodb.com/manual/reference/command/find/
describe "findCommand" $ pendingIfMongoVersion (< (3,2)) $
context "when mongo version is 3.2 or superior" $ before insertUsers $ do
it "fetches all the records" $ do
result <- db $ rest =<< findCommand (select [] "users")
length result `shouldBe` 3
it "filters the records" $ do
result <- db $ rest =<< findCommand (select ["_id" =: "joe"] "users")
length result `shouldBe` 1
it "projects the records" $ do
result <- db $ rest =<< findCommand
(select [] "users") { project = [ "_id" =: 1 ] }
result `shouldBe` [["_id" =: "jane"], ["_id" =: "joe"], ["_id" =: "jill"]]
it "sorts the records" $ do
result <- db $ rest =<< findCommand
(select [] "users") { project = [ "_id" =: 1 ]
, sort = [ "_id" =: 1 ]
}
result `shouldBe` [["_id" =: "jane"], ["_id" =: "jill"], ["_id" =: "joe"]]

View file

@ -8,6 +8,7 @@ module TestImport (
import Test.Hspec as Export hiding (Selector) import Test.Hspec as Export hiding (Selector)
import Database.MongoDB as Export import Database.MongoDB as Export
import Control.Monad.Trans as Export (MonadIO, liftIO) import Control.Monad.Trans as Export (MonadIO, liftIO)
import Data.Maybe (fromJust)
import Data.Time (ParseTime, UTCTime) import Data.Time (ParseTime, UTCTime)
import qualified Data.Time as Time import qualified Data.Time as Time
@ -17,7 +18,6 @@ import qualified Data.Time as Time
import Data.Time.Format (defaultTimeLocale, iso8601DateFormat) import Data.Time.Format (defaultTimeLocale, iso8601DateFormat)
#else #else
import System.Locale (defaultTimeLocale, iso8601DateFormat) import System.Locale (defaultTimeLocale, iso8601DateFormat)
import Data.Maybe (fromJust)
#endif #endif
parseTime :: ParseTime t => String -> String -> t parseTime :: ParseTime t => String -> String -> t
@ -32,6 +32,3 @@ parseDate = parseTime (iso8601DateFormat Nothing)
parseDateTime :: String -> UTCTime parseDateTime :: String -> UTCTime
parseDateTime = parseTime (iso8601DateFormat (Just "%H:%M:%S")) parseDateTime = parseTime (iso8601DateFormat (Just "%H:%M:%S"))
mongodbHostEnvVariable :: String
mongodbHostEnvVariable = "HASKELL_MONGODB_TEST_HOST"