Compare commits
1 commit
master
...
test-again
Author | SHA1 | Date | |
---|---|---|---|
|
71111d45f1 |
29 changed files with 746 additions and 2583 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -1,6 +1,3 @@
|
|||
dist/
|
||||
cabal.sandbox.config
|
||||
.cabal-sandbox/
|
||||
.stack-work/
|
||||
dist-newstyle/*
|
||||
!dist-newstyle/config
|
102
.travis.yml
102
.travis.yml
|
@ -1,25 +1,26 @@
|
|||
# See https://github.com/hvr/multi-ghc-travis for more information.
|
||||
|
||||
sudo: required
|
||||
|
||||
services:
|
||||
- docker
|
||||
|
||||
env:
|
||||
# We use CABALVER=1.22 everywhere because it uses the flag --enable-coverage
|
||||
# instead of --enable-library-coverage used by older versions.
|
||||
#- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.6.12
|
||||
#- GHCVER=7.10.3 CABALVER=1.22 MONGO=2.6.12
|
||||
#- GHCVER=8.0.2 CABALVER=1.24 MONGO=2.6.12
|
||||
- GHCVER=8.4.2 CABALVER=2.2 MONGO=3.6 STACKAGE=nightly
|
||||
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-11.6
|
||||
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6 STACKAGE=lts-9.21
|
||||
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.0 STACKAGE=lts-11.6
|
||||
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.0 STACKAGE=lts-9.21
|
||||
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-11.6
|
||||
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.4 STACKAGE=lts-9.21
|
||||
- GHCVER=8.2.2 CABALVER=1.24 MONGO=3.2 STACKAGE=lts-11.6
|
||||
- GHCVER=8.0.2 CABALVER=1.24 MONGO=3.2 STACKAGE=lts-9.21
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=2.4.14
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.4.14
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=2.4.14
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=2.6.12
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.6.12
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=2.6.12
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=3.0.12
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=3.0.12
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=3.0.12
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=3.2.6
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=3.2.6
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=3.2.6
|
||||
- GHCVER=head CABALVER=head MONGO=3.2.6
|
||||
|
||||
matrix:
|
||||
allow_failures:
|
||||
# The text here should match the last line above exactly.
|
||||
- env: GHCVER=head CABALVER=head MONGO=3.2.6
|
||||
|
||||
before_install:
|
||||
|
||||
|
@ -28,38 +29,29 @@ before_install:
|
|||
- travis_retry sudo apt-get install cabal-install-$CABALVER ghc-$GHCVER
|
||||
- export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH
|
||||
- cabal --version
|
||||
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927
|
||||
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
|
||||
#- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6
|
||||
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.4 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.4.list
|
||||
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list
|
||||
#- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
|
||||
#- echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
|
||||
#- sudo apt-get update
|
||||
#- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo apt-get install mongodb-10gen=$MONGO; else sudo apt-get install --allow-downgrades -y mongodb-org=$MONGO mongodb-org-server=$MONGO mongodb-org-shell=$MONGO mongodb-org-tools=$MONGO; fi
|
||||
#- ls /etc/init.d
|
||||
#- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo service mongodb start; fi
|
||||
#- sudo service --status-all
|
||||
#- sudo service mongod start
|
||||
#- sleep 15 #mongo may not be responded directly. See http://docs.travis-ci.com/user/database-setup/#MongoDB
|
||||
#- ps axf | grep mongo
|
||||
#- sudo netstat -apn
|
||||
#- mongo --version
|
||||
- sudo docker pull mongo:$MONGO
|
||||
- sudo docker run -d -p 27017:27017 mongo:$MONGO
|
||||
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927
|
||||
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
|
||||
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list
|
||||
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
|
||||
- echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
|
||||
- sudo apt-get update
|
||||
- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo apt-get install mongodb-10gen=$MONGO; else sudo apt-get install -y mongodb-org=$MONGO mongodb-org-server=$MONGO mongodb-org-shell=$MONGO mongodb-org-tools=$MONGO; fi
|
||||
- ls /etc/init.d
|
||||
- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo service mongodb start; fi
|
||||
- sleep 15 #mongo may not be responded directly. See http://docs.travis-ci.com/user/database-setup/#MongoDB
|
||||
- ps axf | grep mongo
|
||||
- netstat -apn
|
||||
- mongo --version
|
||||
|
||||
install:
|
||||
- travis_retry cabal update
|
||||
# Install the combined dependencies for this package and all other packages
|
||||
# needed to reduce conflicts.
|
||||
- cabal sandbox init
|
||||
- wget https://www.stackage.org/$STACKAGE/cabal.config
|
||||
- sed -e '/mongoDB/d' cabal.config > cabal.config.new
|
||||
- mv cabal.config.new cabal.config
|
||||
- cabal install --only-dependencies --enable-tests --enable-benchmarks --force-reinstalls
|
||||
- cabal install --only-dependencies --enable-tests
|
||||
|
||||
script:
|
||||
- cabal configure --enable-tests -v2 --enable-benchmarks
|
||||
- cabal configure --enable-tests -v2
|
||||
- cabal build
|
||||
# cabal test fails due a to hpc error. Using run-cabal-test instead.
|
||||
# - cabal test --show-details=always
|
||||
|
@ -77,25 +69,9 @@ script:
|
|||
echo "expected '$SRC_TGZ' not found";
|
||||
exit 1;
|
||||
fi
|
||||
|
||||
jobs:
|
||||
include:
|
||||
- stage: deploy
|
||||
env: GHCVER=8.0.2 CABALVER=1.24 MONGO=3.6
|
||||
before_install:
|
||||
- travis_retry sudo add-apt-repository -y ppa:hvr/ghc
|
||||
- travis_retry sudo apt-get update
|
||||
- travis_retry sudo apt-get install cabal-install-$CABALVER
|
||||
- export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH
|
||||
- cabal --version
|
||||
install: skip
|
||||
script: skip
|
||||
deploy:
|
||||
provider: hackage
|
||||
username: VictorDenisov
|
||||
password:
|
||||
secure: DPYlqRN09gFp06paLj9bRBzpxTkqkZzfsTrU3j0WiqRzqUMeWEeiZNAkIE/maC9xrEuuYBTk0KlSdz+esF4kjfyRQFTxb9CvXrZ474qHozVLC01vh/av5bGZBDQOwgzJrJNVpfl+g+EADOicz9/nhPXiAd7nCQIv/2s/xM1Yj1U=
|
||||
on:
|
||||
repo: mongodb-haskell/mongodb
|
||||
tags: true
|
||||
|
||||
- git clone https://github.com/yesodweb/persistent
|
||||
- cd persistent
|
||||
- "cabal install ./persistent ./persistent-mongoDB --only-dependencies"
|
||||
- cd persistent-test
|
||||
- cabal install -fmongodb --enable-tests
|
||||
- "cabal configure -fmongodb --enable-tests && cabal test"
|
||||
|
|
111
CHANGELOG.md
111
CHANGELOG.md
|
@ -2,117 +2,6 @@
|
|||
All notable changes to this project will be documented in this file.
|
||||
This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Package_versioning_policy).
|
||||
|
||||
* Get rid of `MonadFail` constraints in `Database.MongoDB.Query`
|
||||
|
||||
## [2.7.1.2] - 2022-10-26
|
||||
|
||||
### Added
|
||||
- Support of OP_MSG protocol
|
||||
- Clarifications in the documentation
|
||||
- Allow optional TLS parameters
|
||||
|
||||
## [2.7.1.1] - 2021-06-14
|
||||
|
||||
### Fixed
|
||||
- Sample code
|
||||
|
||||
### Added
|
||||
- Clarification to the tutorial
|
||||
|
||||
## [2.7.1.0] - 2020-08-17
|
||||
|
||||
### Added
|
||||
- Function findCommand
|
||||
|
||||
## [2.7.0.1] - 2020-04-07
|
||||
|
||||
### Fixed
|
||||
- Error reporting for deletion of big messages
|
||||
- Formatting of docs
|
||||
|
||||
## [2.7.0.0] - 2020-02-08
|
||||
|
||||
### Fixed
|
||||
- Upgraded bson to compile with GHC 8.8
|
||||
|
||||
## [2.6.0.1] - 2020-02-01
|
||||
|
||||
### Fixed
|
||||
- Parsing hostname with underscores in readHostPortM.
|
||||
|
||||
## [2.6.0.0] - 2020-01-03
|
||||
|
||||
### Added
|
||||
- MonadFail. It's a standard for newer versions of Haskell,
|
||||
- Open replica sets over tls.
|
||||
|
||||
### Fixed
|
||||
- Support for unix domain socket connection,
|
||||
- Stubborn listener threads.
|
||||
|
||||
## [2.5.0.0] - 2019-06-14
|
||||
|
||||
### Fixed
|
||||
Compatibility with network 3.0 package
|
||||
|
||||
## [2.4.0.1] - 2019-03-03
|
||||
|
||||
### Fixed
|
||||
Doc for modify method
|
||||
|
||||
## [2.4.0.0] - 2018-05-03
|
||||
|
||||
### Fixed
|
||||
- GHC 8.4 compatibility. isEmptyChan is not available in base 4.11 anymore.
|
||||
|
||||
## [2.3.0.5] - 2018-03-15
|
||||
|
||||
### Fixed
|
||||
- Resource leak in SCRAM authentication
|
||||
|
||||
## [2.3.0.4] - 2018-02-11
|
||||
|
||||
### Fixed
|
||||
- Benchmark's build
|
||||
|
||||
## [2.3.0.3] - 2018-02-10
|
||||
|
||||
### Fixed
|
||||
- aggregate requires cursor in mongo 3.6
|
||||
|
||||
## [2.3.0.2] - 2018-01-28
|
||||
|
||||
### Fixed
|
||||
- Uploading files with GridFS
|
||||
|
||||
## [2.3.0.1] - 2017-12-28
|
||||
|
||||
### Removed
|
||||
- Log output that littered stdout in modify many commands.
|
||||
|
||||
## [2.3.0] - 2017-05-31
|
||||
|
||||
### Changed
|
||||
- Description of access function
|
||||
- Lift MonadBaseControl restriction
|
||||
- Update and delete results are squashed into one WriteResult type
|
||||
- Functions insertMany, updateMany, deleteMany are rewritten to properly report
|
||||
various errors
|
||||
|
||||
## [2.2.0] - 2017-04-08
|
||||
|
||||
### Added
|
||||
- GridFS implementation
|
||||
|
||||
### Fixed
|
||||
- Write functions hang when the connection is lost.
|
||||
|
||||
## [2.1.1] - 2016-08-13
|
||||
|
||||
### Changed
|
||||
- Interfaces of update and delete functions. They don't require MonadBaseControl
|
||||
anymore.
|
||||
|
||||
## [2.1.0] - 2016-06-21
|
||||
|
||||
### Added
|
||||
|
|
|
@ -1,54 +1,42 @@
|
|||
-- |
|
||||
-- Client interface to MongoDB database management system.
|
||||
--
|
||||
-- Simple example below.
|
||||
--
|
||||
-- @
|
||||
-- {\-\# LANGUAGE OverloadedStrings \#\-}
|
||||
-- {\-\# LANGUAGE ExtendedDefaultRules \#\-}
|
||||
--
|
||||
-- import Database.MongoDB
|
||||
-- import Control.Monad.Trans (liftIO)
|
||||
--
|
||||
-- main :: IO ()
|
||||
-- main = do
|
||||
-- pipe <- connect (host \"127.0.0.1\")
|
||||
-- e <- access pipe master \"baseball\" run
|
||||
-- close pipe
|
||||
-- print e
|
||||
--
|
||||
-- run :: Action IO ()
|
||||
-- run = do
|
||||
-- clearTeams
|
||||
-- insertTeams
|
||||
-- allTeams >>= printDocs \"All Teams\"
|
||||
-- nationalLeagueTeams >>= printDocs \"National League Teams\"
|
||||
-- newYorkTeams >>= printDocs \"New York Teams\"
|
||||
--
|
||||
-- clearTeams :: Action IO ()
|
||||
-- clearTeams = delete (select [] \"team\")
|
||||
--
|
||||
-- insertTeams :: Action IO [Value]
|
||||
-- insertTeams = insertMany \"team\" [
|
||||
-- [\"name\" =: \"Yankees\", \"home\" =: [\"city\" =: \"New York\", \"state\" =: \"NY\"], \"league\" =: \"American\"],
|
||||
-- [\"name\" =: \"Mets\", \"home\" =: [\"city\" =: \"New York\", \"state\" =: \"NY\"], \"league\" =: \"National\"],
|
||||
-- [\"name\" =: \"Phillies\", \"home\" =: [\"city\" =: \"Philadelphia\", \"state\" =: \"PA\"], \"league\" =: \"National\"],
|
||||
-- [\"name\" =: \"Red Sox\", \"home\" =: [\"city\" =: \"Boston\", \"state\" =: \"MA\"], \"league\" =: \"American\"] ]
|
||||
--
|
||||
-- allTeams :: Action IO [Document]
|
||||
-- allTeams = rest =<< find (select [] \"team\") {sort = [\"home.city\" =: 1]}
|
||||
--
|
||||
-- nationalLeagueTeams :: Action IO [Document]
|
||||
-- nationalLeagueTeams = rest =<< find (select [\"league\" =: \"National\"] \"team\")
|
||||
--
|
||||
-- newYorkTeams :: Action IO [Document]
|
||||
-- newYorkTeams = rest =<< find (select [\"home.state\" =: \"NY\"] \"team\") {project = [\"name\" =: 1, \"league\" =: 1]}
|
||||
--
|
||||
-- printDocs :: String -> [Document] -> Action IO ()
|
||||
-- printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude [\"_id\"]) docs
|
||||
--
|
||||
-- @
|
||||
--
|
||||
{- |
|
||||
Client interface to MongoDB database management system.
|
||||
|
||||
Simple example below. Use with language extensions /OverloadedStrings/ & /ExtendedDefaultRules/.
|
||||
|
||||
|
||||
> import Database.MongoDB
|
||||
> import Control.Monad.Trans (liftIO)
|
||||
>
|
||||
> main = do
|
||||
> pipe <- connect (host "127.0.0.1")
|
||||
> e <- access pipe master "baseball" run
|
||||
> close pipe
|
||||
> print e
|
||||
>
|
||||
> run = do
|
||||
> clearTeams
|
||||
> insertTeams
|
||||
> allTeams >>= printDocs "All Teams"
|
||||
> nationalLeagueTeams >>= printDocs "National League Teams"
|
||||
> newYorkTeams >>= printDocs "New York Teams"
|
||||
>
|
||||
> clearTeams = delete (select [] "team")
|
||||
>
|
||||
> insertTeams = insertMany "team" [
|
||||
> ["name" =: "Yankees", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "American"],
|
||||
> ["name" =: "Mets", "home" =: ["city" =: "New York", "state" =: "NY"], "league" =: "National"],
|
||||
> ["name" =: "Phillies", "home" =: ["city" =: "Philadelphia", "state" =: "PA"], "league" =: "National"],
|
||||
> ["name" =: "Red Sox", "home" =: ["city" =: "Boston", "state" =: "MA"], "league" =: "American"] ]
|
||||
>
|
||||
> allTeams = rest =<< find (select [] "team") {sort = ["home.city" =: 1]}
|
||||
>
|
||||
> nationalLeagueTeams = rest =<< find (select ["league" =: "National"] "team")
|
||||
>
|
||||
> newYorkTeams = rest =<< find (select ["home.state" =: "NY"] "team") {project = ["name" =: 1, "league" =: 1]}
|
||||
>
|
||||
> printDocs title docs = liftIO $ putStrLn title >> mapM_ (print . exclude ["_id"]) docs
|
||||
>
|
||||
-}
|
||||
|
||||
module Database.MongoDB (
|
||||
module Data.Bson,
|
||||
|
|
|
@ -42,6 +42,7 @@ import qualified Data.HashTable.IO as H
|
|||
import qualified Data.Set as Set
|
||||
|
||||
import Control.Monad.Trans (MonadIO, liftIO)
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.Bson (Document, Field(..), at, (=:), (=?), exclude, merge)
|
||||
import Data.Text (Text)
|
||||
|
||||
|
@ -76,8 +77,8 @@ renameCollection from to = do
|
|||
db <- thisDatabase
|
||||
useDb admin $ runCommand ["renameCollection" =: db <.> from, "to" =: db <.> to, "dropTarget" =: True]
|
||||
|
||||
dropCollection :: (MonadIO m, MonadFail m) => Collection -> Action m Bool
|
||||
-- ^ Delete the given collection! Return @True@ if collection existed (and was deleted); return @False@ if collection did not exist (and no action).
|
||||
dropCollection :: (MonadIO m) => Collection -> Action m Bool
|
||||
-- ^ Delete the given collection! Return True if collection existed (and was deleted); return False if collection did not exist (and no action).
|
||||
dropCollection coll = do
|
||||
resetIndexCache
|
||||
r <- runCommand ["drop" =: coll]
|
||||
|
@ -86,7 +87,7 @@ dropCollection coll = do
|
|||
fail $ "dropCollection failed: " ++ show r
|
||||
|
||||
validateCollection :: (MonadIO m) => Collection -> Action m Document
|
||||
-- ^ Validate the given collection, scanning the data and indexes for correctness. This operation takes a while.
|
||||
-- ^ This operation takes a while
|
||||
validateCollection coll = runCommand ["validate" =: coll]
|
||||
|
||||
-- ** Index
|
||||
|
@ -111,7 +112,7 @@ idxDocument Index{..} db = [
|
|||
"dropDups" =: iDropDups ] ++ (maybeToList $ fmap ((=:) "expireAfterSeconds") iExpireAfterSeconds)
|
||||
|
||||
index :: Collection -> Order -> Index
|
||||
-- ^ Spec of index of ordered keys on collection. 'iName' is generated from keys. 'iUnique' and 'iDropDups' are @False@.
|
||||
-- ^ Spec of index of ordered keys on collection. Name is generated from keys. Unique and dropDups are False.
|
||||
index coll keys = Index coll keys (genName keys) False False Nothing
|
||||
|
||||
genName :: Order -> IndexName
|
||||
|
@ -132,12 +133,12 @@ createIndex :: (MonadIO m) => Index -> Action m ()
|
|||
createIndex idx = insert_ "system.indexes" . idxDocument idx =<< thisDatabase
|
||||
|
||||
dropIndex :: (MonadIO m) => Collection -> IndexName -> Action m Document
|
||||
-- ^ Remove the index from the given collection.
|
||||
-- ^ Remove the index
|
||||
dropIndex coll idxName = do
|
||||
resetIndexCache
|
||||
runCommand ["deleteIndexes" =: coll, "index" =: idxName]
|
||||
|
||||
getIndexes :: MonadIO m => Collection -> Action m [Document]
|
||||
getIndexes :: (MonadIO m, MonadBaseControl IO m, Functor m) => Collection -> Action m [Document]
|
||||
-- ^ Get all indexes on this collection
|
||||
getIndexes coll = do
|
||||
db <- thisDatabase
|
||||
|
@ -190,31 +191,31 @@ resetIndexCache = do
|
|||
|
||||
-- ** User
|
||||
|
||||
allUsers :: MonadIO m => Action m [Document]
|
||||
allUsers :: (MonadIO m, MonadBaseControl IO m, Functor m) => Action m [Document]
|
||||
-- ^ Fetch all users of this database
|
||||
allUsers = map (exclude ["_id"]) `liftM` (rest =<< find
|
||||
allUsers = map (exclude ["_id"]) <$> (rest =<< find
|
||||
(select [] "system.users") {sort = ["user" =: (1 :: Int)], project = ["user" =: (1 :: Int), "readOnly" =: (1 :: Int)]})
|
||||
|
||||
addUser :: (MonadIO m)
|
||||
addUser :: (MonadBaseControl IO m, MonadIO m)
|
||||
=> Bool -> Username -> Password -> Action m ()
|
||||
-- ^ Add user with password with read-only access if the boolean argument is @True@, or read-write access if it's @False@
|
||||
-- ^ Add user with password with read-only access if bool is True or read-write access if bool is False
|
||||
addUser readOnly user pass = do
|
||||
mu <- findOne (select ["user" =: user] "system.users")
|
||||
let usr = merge ["readOnly" =: readOnly, "pwd" =: pwHash user pass] (maybe ["user" =: user] id mu)
|
||||
save "system.users" usr
|
||||
|
||||
removeUser :: (MonadIO m)
|
||||
removeUser :: (MonadIO m, MonadBaseControl IO m)
|
||||
=> Username -> Action m ()
|
||||
removeUser user = delete (select ["user" =: user] "system.users")
|
||||
|
||||
-- ** Database
|
||||
|
||||
admin :: Database
|
||||
-- ^ The \"admin\" database, which stores user authorization and authentication data plus other system collections.
|
||||
-- ^ \"admin\" database
|
||||
admin = "admin"
|
||||
|
||||
cloneDatabase :: (MonadIO m) => Database -> Host -> Action m Document
|
||||
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use 'copyDatabase' in this case).
|
||||
-- ^ Copy database from given host to the server I am connected to. Fails and returns @"ok" = 0@ if we don't have permission to read from given server (use copyDatabase in this case).
|
||||
cloneDatabase db fromHost = useDb db $ runCommand ["clone" =: showHostPort fromHost]
|
||||
|
||||
copyDatabase :: (MonadIO m) => Database -> Host -> Maybe (Username, Password) -> Database -> Action m Document
|
||||
|
@ -238,11 +239,9 @@ repairDatabase db = useDb db $ runCommand ["repairDatabase" =: (1 :: Int)]
|
|||
-- ** Server
|
||||
|
||||
serverBuildInfo :: (MonadIO m) => Action m Document
|
||||
-- ^ Return a document containing the parameters used to compile the server instance.
|
||||
serverBuildInfo = useDb admin $ runCommand ["buildinfo" =: (1 :: Int)]
|
||||
|
||||
serverVersion :: (MonadIO m) => Action m Text
|
||||
-- ^ Return the version of the server instance.
|
||||
serverVersion = at "version" `liftM` serverBuildInfo
|
||||
|
||||
-- * Diagnostics
|
||||
|
@ -250,22 +249,18 @@ serverVersion = at "version" `liftM` serverBuildInfo
|
|||
-- ** Collection
|
||||
|
||||
collectionStats :: (MonadIO m) => Collection -> Action m Document
|
||||
-- ^ Return some storage statistics for the given collection.
|
||||
collectionStats coll = runCommand ["collstats" =: coll]
|
||||
|
||||
dataSize :: (MonadIO m) => Collection -> Action m Int
|
||||
-- ^ Return the total uncompressed size (in bytes) in memory of all records in the given collection. Does not include indexes.
|
||||
dataSize c = at "size" `liftM` collectionStats c
|
||||
|
||||
storageSize :: (MonadIO m) => Collection -> Action m Int
|
||||
-- ^ Return the total bytes allocated to the given collection. Does not include indexes.
|
||||
storageSize c = at "storageSize" `liftM` collectionStats c
|
||||
|
||||
totalIndexSize :: (MonadIO m) => Collection -> Action m Int
|
||||
-- ^ The total size in bytes of all indexes in this collection.
|
||||
totalIndexSize c = at "totalIndexSize" `liftM` collectionStats c
|
||||
|
||||
totalSize :: MonadIO m => Collection -> Action m Int
|
||||
totalSize :: (MonadIO m, MonadBaseControl IO m) => Collection -> Action m Int
|
||||
totalSize coll = do
|
||||
x <- storageSize coll
|
||||
xs <- mapM isize =<< getIndexes coll
|
||||
|
@ -275,45 +270,34 @@ totalSize coll = do
|
|||
|
||||
-- ** Profiling
|
||||
|
||||
-- | The available profiler levels.
|
||||
data ProfilingLevel
|
||||
= Off -- ^ No data collection.
|
||||
| Slow -- ^ Data collected only for slow operations. The slow operation time threshold is 100ms by default, but can be changed using 'setProfilingLevel'.
|
||||
| All -- ^ Data collected for all operations.
|
||||
deriving (Show, Enum, Eq)
|
||||
data ProfilingLevel = Off | Slow | All deriving (Show, Enum, Eq)
|
||||
|
||||
getProfilingLevel :: (MonadIO m) => Action m ProfilingLevel
|
||||
-- ^ Get the profiler level.
|
||||
getProfilingLevel = (toEnum . at "was") `liftM` runCommand ["profile" =: (-1 :: Int)]
|
||||
|
||||
type MilliSec = Int
|
||||
|
||||
setProfilingLevel :: (MonadIO m) => ProfilingLevel -> Maybe MilliSec -> Action m ()
|
||||
-- ^ Set the profiler level, and optionally the slow operation time threshold (in milliseconds).
|
||||
setProfilingLevel p mSlowMs =
|
||||
runCommand (["profile" =: fromEnum p] ++ ("slowms" =? mSlowMs)) >> return ()
|
||||
|
||||
-- ** Database
|
||||
|
||||
dbStats :: (MonadIO m) => Action m Document
|
||||
-- ^ Return some storage statistics for the given database.
|
||||
dbStats = runCommand ["dbstats" =: (1 :: Int)]
|
||||
|
||||
currentOp :: (MonadIO m) => Action m (Maybe Document)
|
||||
-- ^ See currently running operation on the database, if any
|
||||
currentOp = findOne (select [] "$cmd.sys.inprog")
|
||||
|
||||
-- | An operation indentifier.
|
||||
type OpNum = Int
|
||||
|
||||
killOp :: (MonadIO m) => OpNum -> Action m (Maybe Document)
|
||||
-- ^ Terminate the operation specified by the given 'OpNum'.
|
||||
killOp op = findOne (select ["op" =: op] "$cmd.sys.killop")
|
||||
|
||||
-- ** Server
|
||||
|
||||
serverStatus :: (MonadIO m) => Action m Document
|
||||
-- ^ Return a document with an overview of the state of the database.
|
||||
serverStatus = useDb admin $ runCommand ["serverStatus" =: (1 :: Int)]
|
||||
|
||||
|
||||
|
|
|
@ -17,29 +17,29 @@ module Database.MongoDB.Connection (
|
|||
Host(..), PortID(..), defaultPort, host, showHostPort, readHostPort,
|
||||
readHostPortM, globalConnectTimeout, connect, connect',
|
||||
-- * Replica Set
|
||||
ReplicaSetName, openReplicaSet, openReplicaSet', openReplicaSetTLS, openReplicaSetTLS',
|
||||
openReplicaSetSRV, openReplicaSetSRV', openReplicaSetSRV'', openReplicaSetSRV''',
|
||||
ReplicaSetName, openReplicaSet, openReplicaSet',
|
||||
ReplicaSet, primary, secondaryOk, routedHost, closeReplicaSet, replSetName
|
||||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
import Data.IORef (IORef, newIORef, readIORef)
|
||||
import Data.List (intersect, partition, (\\), delete)
|
||||
import Data.Maybe (fromJust)
|
||||
|
||||
#if !MIN_VERSION_base(4,8,0)
|
||||
import Control.Applicative ((<$>))
|
||||
#endif
|
||||
|
||||
import Control.Monad (forM_, guard)
|
||||
import Control.Monad (forM_)
|
||||
import Network (HostName, PortID(..), connectTo)
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
import System.Timeout (timeout)
|
||||
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof,
|
||||
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, eof,
|
||||
spaces, try, (<|>))
|
||||
import qualified Data.List as List
|
||||
|
||||
|
||||
import Control.Monad.Except (throwError)
|
||||
import Control.Monad.Identity (runIdentity)
|
||||
import Control.Monad.Error (throwError)
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
|
||||
readMVar)
|
||||
import Data.Bson (Document, at, (=:))
|
||||
|
@ -48,13 +48,11 @@ import Data.Text (Text)
|
|||
import qualified Data.Bson as B
|
||||
import qualified Data.Text as T
|
||||
|
||||
import Database.MongoDB.Internal.Network (Host(..), HostName, PortID(..), connectTo, lookupSeedList, lookupReplicaSetName)
|
||||
import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed)
|
||||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
|
||||
updateAssocs, shuffle, mergesortM)
|
||||
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
|
||||
slaveOk, runCommand, retrieveServerData)
|
||||
import qualified Database.MongoDB.Transport.Tls as TLS (connect)
|
||||
|
||||
adminCommand :: Command -> Pipe -> IO Document
|
||||
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
|
||||
|
@ -64,6 +62,10 @@ adminCommand cmd pipe =
|
|||
failureToIOError (ConnectionFailure e) = e
|
||||
failureToIOError e = userError $ show e
|
||||
|
||||
-- * Host
|
||||
|
||||
data Host = Host HostName PortID deriving (Show, Eq, Ord)
|
||||
|
||||
defaultPort :: PortID
|
||||
-- ^ Default MongoDB port = 27017
|
||||
defaultPort = PortNumber 27017
|
||||
|
@ -74,50 +76,46 @@ host hostname = Host hostname defaultPort
|
|||
|
||||
showHostPort :: Host -> String
|
||||
-- ^ Display host as \"host:port\"
|
||||
-- TODO: Distinguish Service port
|
||||
showHostPort (Host hostname (PortNumber port)) = hostname ++ ":" ++ show port
|
||||
-- TODO: Distinguish Service and UnixSocket port
|
||||
showHostPort (Host hostname port) = hostname ++ ":" ++ portname where
|
||||
portname = case port of
|
||||
Service s -> s
|
||||
PortNumber p -> show p
|
||||
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
|
||||
showHostPort (Host _ (UnixSocket path)) = "unix:" ++ path
|
||||
UnixSocket s -> s
|
||||
#endif
|
||||
|
||||
readHostPortM :: (MonadFail m) => String -> m Host
|
||||
readHostPortM :: (Monad m) => String -> m Host
|
||||
-- ^ Read string \"hostname:port\" as @Host hosthame (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Fail if string does not match either syntax.
|
||||
|
||||
-- TODO: handle Service port
|
||||
-- TODO: handle Service and UnixSocket port
|
||||
readHostPortM = either (fail . show) return . parse parser "readHostPort" where
|
||||
hostname = many1 (letter <|> digit <|> char '-' <|> char '.' <|> char '_')
|
||||
hostname = many1 (letter <|> digit <|> char '-' <|> char '.')
|
||||
parser = do
|
||||
spaces
|
||||
h <- hostname
|
||||
try (spaces >> eof >> return (host h)) <|> do
|
||||
_ <- char ':'
|
||||
try ( do port :: Int <- read <$> many1 digit
|
||||
port :: Int <- read <$> many1 digit
|
||||
spaces >> eof
|
||||
return $ Host h (PortNumber $ fromIntegral port))
|
||||
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
|
||||
<|> do guard (h == "unix")
|
||||
p <- many1 anyChar
|
||||
eof
|
||||
return $ Host "" (UnixSocket p)
|
||||
#endif
|
||||
return $ Host h (PortNumber $ fromIntegral port)
|
||||
|
||||
readHostPort :: String -> Host
|
||||
-- ^ Read string \"hostname:port\" as @Host hostname (PortNumber port)@ or \"hostname\" as @host hostname@ (default port). Error if string does not match either syntax.
|
||||
readHostPort = fromJust . readHostPortM
|
||||
readHostPort = runIdentity . readHostPortM
|
||||
|
||||
type Secs = Double
|
||||
|
||||
globalConnectTimeout :: IORef Secs
|
||||
-- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect'' (and 'openReplicaSet'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection.
|
||||
-- ^ 'connect' (and 'openReplicaSet') fails if it can't connect within this many seconds (default is 6 seconds). Use 'connect\'' (and 'openReplicaSet\'') if you want to ignore this global and specify your own timeout. Note, this timeout only applies to initial connection establishment, not when reading/writing to the connection.
|
||||
globalConnectTimeout = unsafePerformIO (newIORef 6)
|
||||
{-# NOINLINE globalConnectTimeout #-}
|
||||
|
||||
connect :: Host -> IO Pipe
|
||||
-- ^ Connect to Host returning pipelined TCP connection. Throw 'IOError' if connection refused or no response within 'globalConnectTimeout'.
|
||||
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within 'globalConnectTimeout'.
|
||||
connect h = readIORef globalConnectTimeout >>= flip connect' h
|
||||
|
||||
connect' :: Secs -> Host -> IO Pipe
|
||||
-- ^ Connect to Host returning pipelined TCP connection. Throw 'IOError' if connection refused or no response within given number of seconds.
|
||||
-- ^ Connect to Host returning pipelined TCP connection. Throw IOError if connection refused or no response within given number of seconds.
|
||||
connect' timeoutSecs (Host hostname port) = do
|
||||
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
|
||||
handle <- maybe (ioError $ userError "connect timed out") return mh
|
||||
|
@ -130,85 +128,32 @@ connect' timeoutSecs (Host hostname port) = do
|
|||
|
||||
type ReplicaSetName = Text
|
||||
|
||||
data TransportSecurity = Secure | Unsecure
|
||||
|
||||
-- | Maintains a connection (created on demand) to each server in the named replica set
|
||||
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs TransportSecurity
|
||||
data ReplicaSet = ReplicaSet ReplicaSetName (MVar [(Host, Maybe Pipe)]) Secs
|
||||
|
||||
replSetName :: ReplicaSet -> Text
|
||||
-- ^ Get the name of connected replica set.
|
||||
replSetName (ReplicaSet rsName _ _ _) = rsName
|
||||
-- ^ name of connected replica set
|
||||
replSetName (ReplicaSet rsName _ _) = rsName
|
||||
|
||||
openReplicaSet :: (ReplicaSetName, [Host]) -> IO ReplicaSet
|
||||
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet'' instead.
|
||||
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSet\'' instead.
|
||||
openReplicaSet rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSet' rsSeed
|
||||
|
||||
openReplicaSet' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
|
||||
-- ^ Open connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
|
||||
openReplicaSet' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Unsecure)
|
||||
|
||||
openReplicaSetTLS :: (ReplicaSetName, [Host]) -> IO ReplicaSet
|
||||
-- ^ Open secure connections (on demand) to servers in the replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetTLS'' instead.
|
||||
openReplicaSetTLS rsSeed = readIORef globalConnectTimeout >>= flip openReplicaSetTLS' rsSeed
|
||||
|
||||
openReplicaSetTLS' :: Secs -> (ReplicaSetName, [Host]) -> IO ReplicaSet
|
||||
-- ^ Open secure connections (on demand) to servers in replica set. Supplied hosts is seed list. At least one of them must be a live member of the named replica set, otherwise fail. Supplied seconds timeout is used for connect attempts to members.
|
||||
openReplicaSetTLS' timeoutSecs (rs, hosts) = _openReplicaSet timeoutSecs (rs, hosts, Secure)
|
||||
|
||||
_openReplicaSet :: Secs -> (ReplicaSetName, [Host], TransportSecurity) -> IO ReplicaSet
|
||||
_openReplicaSet timeoutSecs (rsName, seedList, transportSecurity) = do
|
||||
openReplicaSet' timeoutSecs (rsName, seedList) = do
|
||||
vMembers <- newMVar (map (, Nothing) seedList)
|
||||
let rs = ReplicaSet rsName vMembers timeoutSecs transportSecurity
|
||||
let rs = ReplicaSet rsName vMembers timeoutSecs
|
||||
_ <- updateMembers rs
|
||||
return rs
|
||||
|
||||
openReplicaSetSRV :: HostName -> IO ReplicaSet
|
||||
-- ^ Open /non-secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetSRV''' instead.
|
||||
openReplicaSetSRV hostname = do
|
||||
timeoutSecs <- readIORef globalConnectTimeout
|
||||
_openReplicaSetSRV timeoutSecs Unsecure hostname
|
||||
|
||||
openReplicaSetSRV' :: HostName -> IO ReplicaSet
|
||||
-- ^ Open /secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. The value of 'globalConnectTimeout' at the time of this call is the timeout used for future member connect attempts. To use your own value call 'openReplicaSetSRV'''' instead.
|
||||
--
|
||||
-- The preferred connection method for cloud MongoDB providers. A typical connecting sequence is shown in the example below.
|
||||
--
|
||||
-- ==== __Example__
|
||||
-- > do
|
||||
-- > pipe <- openReplicatSetSRV' "cluster#.xxxxx.yyyyy.zzz"
|
||||
-- > is_auth <- access pipe master "admin" $ auth user_name password
|
||||
-- > unless is_auth (throwIO $ userError "Authentication failed!")
|
||||
openReplicaSetSRV' hostname = do
|
||||
timeoutSecs <- readIORef globalConnectTimeout
|
||||
_openReplicaSetSRV timeoutSecs Secure hostname
|
||||
|
||||
openReplicaSetSRV'' :: Secs -> HostName -> IO ReplicaSet
|
||||
-- ^ Open /non-secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. Supplied seconds timeout is used for connect attempts to members.
|
||||
openReplicaSetSRV'' timeoutSecs = _openReplicaSetSRV timeoutSecs Unsecure
|
||||
|
||||
openReplicaSetSRV''' :: Secs -> HostName -> IO ReplicaSet
|
||||
-- ^ Open /secure/ connections (on demand) to servers in a replica set. The seedlist and replica set name is fetched from the SRV and TXT DNS records for the given hostname. Supplied seconds timeout is used for connect attempts to members.
|
||||
openReplicaSetSRV''' timeoutSecs = _openReplicaSetSRV timeoutSecs Secure
|
||||
|
||||
_openReplicaSetSRV :: Secs -> TransportSecurity -> HostName -> IO ReplicaSet
|
||||
_openReplicaSetSRV timeoutSecs transportSecurity hostname = do
|
||||
replicaSetName <- lookupReplicaSetName hostname
|
||||
hosts <- lookupSeedList hostname
|
||||
case (replicaSetName, hosts) of
|
||||
(Nothing, _) -> throwError $ userError "Failed to lookup replica set name"
|
||||
(_, []) -> throwError $ userError "Failed to lookup replica set seedlist"
|
||||
(Just rsName, _) ->
|
||||
case transportSecurity of
|
||||
Secure -> openReplicaSetTLS' timeoutSecs (rsName, hosts)
|
||||
Unsecure -> openReplicaSet' timeoutSecs (rsName, hosts)
|
||||
|
||||
closeReplicaSet :: ReplicaSet -> IO ()
|
||||
-- ^ Close all connections to replica set
|
||||
closeReplicaSet (ReplicaSet _ vMembers _ _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
|
||||
closeReplicaSet (ReplicaSet _ vMembers _) = withMVar vMembers $ mapM_ (maybe (return ()) close . snd)
|
||||
|
||||
primary :: ReplicaSet -> IO Pipe
|
||||
-- ^ Return connection to current primary of replica set. Fail if no primary available.
|
||||
primary rs@(ReplicaSet rsName _ _ _) = do
|
||||
primary rs@(ReplicaSet rsName _ _) = do
|
||||
mHost <- statedPrimary <$> updateMembers rs
|
||||
case mHost of
|
||||
Just host' -> connection rs Nothing host'
|
||||
|
@ -227,7 +172,7 @@ routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO
|
|||
routedHost f rs = do
|
||||
info <- updateMembers rs
|
||||
hosts <- shuffle (possibleHosts info)
|
||||
let addIsPrimary h = (h, Just h == statedPrimary info)
|
||||
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
|
||||
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
|
||||
untilSuccess (connection rs Nothing) hosts'
|
||||
|
||||
|
@ -244,7 +189,7 @@ possibleHosts (_, info) = map readHostPort $ at "hosts" info
|
|||
|
||||
updateMembers :: ReplicaSet -> IO ReplicaInfo
|
||||
-- ^ Fetch replica info from any server and update members accordingly
|
||||
updateMembers rs@(ReplicaSet _ vMembers _ _) = do
|
||||
updateMembers rs@(ReplicaSet _ vMembers _) = do
|
||||
(host', info) <- untilSuccess (fetchReplicaInfo rs) =<< readMVar vMembers
|
||||
modifyMVar vMembers $ \members -> do
|
||||
let ((members', old), new) = intersection (map readHostPort $ at "hosts" info) members
|
||||
|
@ -258,7 +203,7 @@ updateMembers rs@(ReplicaSet _ vMembers _ _) = do
|
|||
|
||||
fetchReplicaInfo :: ReplicaSet -> (Host, Maybe Pipe) -> IO ReplicaInfo
|
||||
-- Connect to host and fetch replica info from host creating new connection if missing or closed (previously failed). Fail if not member of named replica set.
|
||||
fetchReplicaInfo rs@(ReplicaSet rsName _ _ _) (host', mPipe) = do
|
||||
fetchReplicaInfo rs@(ReplicaSet rsName _ _) (host', mPipe) = do
|
||||
pipe <- connection rs mPipe host'
|
||||
info <- adminCommand ["isMaster" =: (1 :: Int)] pipe
|
||||
case B.lookup "setName" info of
|
||||
|
@ -268,15 +213,11 @@ fetchReplicaInfo rs@(ReplicaSet rsName _ _ _) (host', mPipe) = do
|
|||
|
||||
connection :: ReplicaSet -> Maybe Pipe -> Host -> IO Pipe
|
||||
-- ^ Return new or existing connection to member of replica set. If pipe is already known for host it is given, but we still test if it is open.
|
||||
connection (ReplicaSet _ vMembers timeoutSecs transportSecurity) mPipe host' =
|
||||
connection (ReplicaSet _ vMembers timeoutSecs) mPipe host' =
|
||||
maybe conn (\p -> isClosed p >>= \bad -> if bad then conn else return p) mPipe
|
||||
where
|
||||
conn = modifyMVar vMembers $ \members -> do
|
||||
let (Host h p) = host'
|
||||
let conn' = case transportSecurity of
|
||||
Secure -> TLS.connect h p
|
||||
Unsecure -> connect' timeoutSecs host'
|
||||
let new = conn' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
|
||||
let new = connect' timeoutSecs host' >>= \pipe -> return (updateAssocs host' (Just pipe) members, pipe)
|
||||
case List.lookup host' members of
|
||||
Just (Just pipe) -> isClosed pipe >>= \bad -> if bad then new else return (members, pipe)
|
||||
_ -> new
|
||||
|
|
|
@ -1,191 +0,0 @@
|
|||
-- Author:
|
||||
-- Brent Tubbs <brent.tubbs@gmail.com>
|
||||
-- | MongoDB GridFS implementation
|
||||
{-# LANGUAGE OverloadedStrings, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, TypeFamilies, CPP, RankNTypes #-}
|
||||
|
||||
module Database.MongoDB.GridFS
|
||||
( Bucket
|
||||
, files, chunks
|
||||
, File
|
||||
, document, bucket
|
||||
-- ** Setup
|
||||
, openDefaultBucket
|
||||
, openBucket
|
||||
-- ** Query
|
||||
, findFile
|
||||
, findOneFile
|
||||
, fetchFile
|
||||
-- ** Delete
|
||||
, deleteFile
|
||||
-- ** Conduits
|
||||
, sourceFile
|
||||
, sinkFile
|
||||
)
|
||||
where
|
||||
|
||||
|
||||
import Control.Monad(when)
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Monad.Trans(lift)
|
||||
|
||||
import Data.Conduit
|
||||
import Data.Digest.Pure.MD5
|
||||
import Data.Int
|
||||
import Data.Tagged(Tagged, untag)
|
||||
import Data.Text(Text, append)
|
||||
import Data.Time.Clock(getCurrentTime)
|
||||
import Database.MongoDB
|
||||
import Prelude
|
||||
import qualified Data.Bson as B
|
||||
import qualified Data.ByteString as S
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
|
||||
|
||||
defaultChunkSize :: Int64
|
||||
-- ^ The default chunk size is 256 kB
|
||||
defaultChunkSize = 256 * 1024
|
||||
|
||||
-- magic constant for md5Finalize
|
||||
md5BlockSizeInBytes :: Int
|
||||
md5BlockSizeInBytes = 64
|
||||
|
||||
|
||||
data Bucket = Bucket {files :: Text, chunks :: Text}
|
||||
-- ^ Files are stored in "buckets". You open a bucket with openDefaultBucket or openBucket
|
||||
|
||||
openDefaultBucket :: (Monad m, MonadIO m) => Action m Bucket
|
||||
-- ^ Open the default 'Bucket' (named "fs")
|
||||
openDefaultBucket = openBucket "fs"
|
||||
|
||||
openBucket :: (Monad m, MonadIO m) => Text -> Action m Bucket
|
||||
-- ^ Open a 'Bucket'
|
||||
openBucket name = do
|
||||
let filesCollection = name `append` ".files"
|
||||
let chunksCollection = name `append` ".chunks"
|
||||
ensureIndex $ index filesCollection ["filename" =: (1::Int), "uploadDate" =: (1::Int)]
|
||||
ensureIndex $ (index chunksCollection ["files_id" =: (1::Int), "n" =: (1::Int)]) { iUnique = True, iDropDups = True }
|
||||
return $ Bucket filesCollection chunksCollection
|
||||
|
||||
data File = File {bucket :: Bucket, document :: Document}
|
||||
|
||||
getChunk :: (MonadFail m, MonadIO m) => File -> Int -> Action m (Maybe S.ByteString)
|
||||
-- ^ Get a chunk of a file
|
||||
getChunk (File _bucket doc) i = do
|
||||
files_id <- B.look "_id" doc
|
||||
result <- findOne $ select ["files_id" := files_id, "n" =: i] $ chunks _bucket
|
||||
let content = at "data" <$> result
|
||||
case content of
|
||||
Just (Binary b) -> return (Just b)
|
||||
_ -> return Nothing
|
||||
|
||||
findFile :: MonadIO m => Bucket -> Selector -> Action m [File]
|
||||
-- ^ Find files in the bucket
|
||||
findFile _bucket sel = do
|
||||
cursor <- find $ select sel $ files _bucket
|
||||
results <- rest cursor
|
||||
return $ File _bucket <$> results
|
||||
|
||||
findOneFile :: MonadIO m => Bucket -> Selector -> Action m (Maybe File)
|
||||
-- ^ Find one file in the bucket
|
||||
findOneFile _bucket sel = do
|
||||
mdoc <- findOne $ select sel $ files _bucket
|
||||
return $ File _bucket <$> mdoc
|
||||
|
||||
fetchFile :: MonadIO m => Bucket -> Selector -> Action m File
|
||||
-- ^ Fetch one file in the bucket
|
||||
fetchFile _bucket sel = do
|
||||
doc <- fetch $ select sel $ files _bucket
|
||||
return $ File _bucket doc
|
||||
|
||||
deleteFile :: (MonadIO m, MonadFail m) => File -> Action m ()
|
||||
-- ^ Delete files in the bucket
|
||||
deleteFile (File _bucket doc) = do
|
||||
files_id <- B.look "_id" doc
|
||||
delete $ select ["_id" := files_id] $ files _bucket
|
||||
delete $ select ["files_id" := files_id] $ chunks _bucket
|
||||
|
||||
putChunk :: (Monad m, MonadIO m) => Bucket -> ObjectId -> Int -> L.ByteString -> Action m ()
|
||||
-- ^ Put a chunk in the bucket
|
||||
putChunk _bucket files_id i chunk = do
|
||||
insert_ (chunks _bucket) ["files_id" =: files_id, "n" =: i, "data" =: Binary (L.toStrict chunk)]
|
||||
|
||||
sourceFile :: (MonadFail m, MonadIO m) => File -> ConduitT File S.ByteString (Action m) ()
|
||||
-- ^ A producer for the contents of a file
|
||||
sourceFile file = yieldChunk 0 where
|
||||
yieldChunk i = do
|
||||
mbytes <- lift $ getChunk file i
|
||||
case mbytes of
|
||||
Just bytes -> yield bytes >> yieldChunk (i+1)
|
||||
Nothing -> return ()
|
||||
|
||||
-- Used to keep data during writing
|
||||
data FileWriter = FileWriter
|
||||
{ _fwChunkSize :: Int64
|
||||
, _fwBucket :: Bucket
|
||||
, _fwFilesId :: ObjectId
|
||||
, _fwChunkIndex :: Int
|
||||
, _fwSize :: Int64
|
||||
, _fwAcc :: L.ByteString
|
||||
, _fwMd5Context :: MD5Context
|
||||
, _fwMd5acc :: L.ByteString
|
||||
}
|
||||
|
||||
-- Finalize file, calculating md5 digest, saving the last chunk, and creating the file in the bucket
|
||||
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
|
||||
finalizeFile filename (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) = do
|
||||
let md5digest = finalizeMD5 md5context (L.toStrict md5acc)
|
||||
when (L.length acc > 0) $ putChunk _bucket files_id i acc
|
||||
currentTimestamp <- liftIO getCurrentTime
|
||||
let doc = [ "_id" =: files_id
|
||||
, "length" =: size
|
||||
, "uploadDate" =: currentTimestamp
|
||||
, "md5" =: show md5digest
|
||||
, "chunkSize" =: chunkSize
|
||||
, "filename" =: filename
|
||||
]
|
||||
insert_ (files _bucket) doc
|
||||
return $ File _bucket doc
|
||||
|
||||
-- finalize the remainder and return the MD5Digest.
|
||||
finalizeMD5 :: MD5Context -> S.ByteString -> MD5Digest
|
||||
finalizeMD5 ctx remainder =
|
||||
md5Finalize ctx2 (S.drop lu remainder) -- can only handle max md5BlockSizeInBytes length
|
||||
where
|
||||
l = S.length remainder
|
||||
r = l `mod` md5BlockSizeInBytes
|
||||
lu = l - r
|
||||
ctx2 = md5Update ctx (S.take lu remainder)
|
||||
|
||||
-- Write as many chunks as can be written from the file writer
|
||||
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
|
||||
writeChunks (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) chunk = do
|
||||
-- Update md5 context
|
||||
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
|
||||
let md5acc_temp = (md5acc `L.append` chunk)
|
||||
let (md5context', md5acc') =
|
||||
if (L.length md5acc_temp < md5BlockLength)
|
||||
then (md5context, md5acc_temp)
|
||||
else let numBlocks = L.length md5acc_temp `div` md5BlockLength
|
||||
(current, remainder) = L.splitAt (md5BlockLength * numBlocks) md5acc_temp
|
||||
in (md5Update md5context (L.toStrict current), remainder)
|
||||
-- Update chunks
|
||||
let size' = (size + L.length chunk)
|
||||
let acc_temp = (acc `L.append` chunk)
|
||||
if (L.length acc_temp < chunkSize)
|
||||
then return (FileWriter chunkSize _bucket files_id i size' acc_temp md5context' md5acc')
|
||||
else do
|
||||
let (newChunk, acc') = L.splitAt chunkSize acc_temp
|
||||
putChunk _bucket files_id i newChunk
|
||||
writeChunks (FileWriter chunkSize _bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
|
||||
|
||||
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> ConduitT S.ByteString () (Action m) File
|
||||
-- ^ A consumer that creates a file in the bucket and puts all consumed data in it
|
||||
sinkFile _bucket filename = do
|
||||
files_id <- liftIO $ genObjectId
|
||||
awaitChunk $ FileWriter defaultChunkSize _bucket files_id 0 0 L.empty md5InitialContext L.empty
|
||||
where
|
||||
awaitChunk fw = do
|
||||
mchunk <- await
|
||||
case mchunk of
|
||||
Nothing -> lift (finalizeFile filename fw)
|
||||
Just chunk -> lift (writeChunks fw (L.fromStrict chunk)) >>= awaitChunk
|
|
@ -1,106 +0,0 @@
|
|||
-- | Compatibility layer for network package, including newtype 'PortID'
|
||||
{-# LANGUAGE CPP, OverloadedStrings #-}
|
||||
|
||||
module Database.MongoDB.Internal.Network (Host(..), PortID(..), N.HostName, connectTo,
|
||||
lookupReplicaSetName, lookupSeedList) where
|
||||
|
||||
#if !MIN_VERSION_network(2, 9, 0)
|
||||
|
||||
import qualified Network as N
|
||||
import System.IO (Handle)
|
||||
|
||||
#else
|
||||
|
||||
import Control.Exception (bracketOnError)
|
||||
import Network.BSD as BSD
|
||||
import qualified Network.Socket as N
|
||||
import System.IO (Handle, IOMode(ReadWriteMode))
|
||||
|
||||
#endif
|
||||
|
||||
import Data.ByteString.Char8 (pack, unpack)
|
||||
import Data.List (dropWhileEnd)
|
||||
import Data.Maybe (fromMaybe)
|
||||
import Data.Text (Text)
|
||||
import Network.DNS.Lookup (lookupSRV, lookupTXT)
|
||||
import Network.DNS.Resolver (defaultResolvConf, makeResolvSeed, withResolver)
|
||||
import Network.HTTP.Types.URI (parseQueryText)
|
||||
|
||||
|
||||
-- | Wraps network's 'PortNumber'
|
||||
-- Used to ease compatibility between older and newer network versions.
|
||||
data PortID = PortNumber N.PortNumber
|
||||
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
|
||||
| UnixSocket String
|
||||
#endif
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
|
||||
#if !MIN_VERSION_network(2, 9, 0)
|
||||
|
||||
-- Unwrap our newtype and use network's PortID and connectTo
|
||||
connectTo :: N.HostName -- Hostname
|
||||
-> PortID -- Port Identifier
|
||||
-> IO Handle -- Connected Socket
|
||||
connectTo hostname (PortNumber port) = N.connectTo hostname (N.PortNumber port)
|
||||
|
||||
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
|
||||
connectTo _ (UnixSocket path) = N.connectTo "" (N.UnixSocket path)
|
||||
#endif
|
||||
|
||||
#else
|
||||
|
||||
-- Copied implementation from network 2.8's 'connectTo', but using our 'PortID' newtype.
|
||||
-- https://github.com/haskell/network/blob/e73f0b96c9da924fe83f3c73488f7e69f712755f/Network.hs#L120-L129
|
||||
connectTo :: N.HostName -- Hostname
|
||||
-> PortID -- Port Identifier
|
||||
-> IO Handle -- Connected Socket
|
||||
connectTo hostname (PortNumber port) = do
|
||||
proto <- BSD.getProtocolNumber "tcp"
|
||||
bracketOnError
|
||||
(N.socket N.AF_INET N.Stream proto)
|
||||
N.close -- only done if there's an error
|
||||
(\sock -> do
|
||||
he <- BSD.getHostByName hostname
|
||||
N.connect sock (N.SockAddrInet port (hostAddress he))
|
||||
N.socketToHandle sock ReadWriteMode
|
||||
)
|
||||
|
||||
#if !defined(mingw32_HOST_OS) && !defined(cygwin32_HOST_OS) && !defined(_WIN32)
|
||||
connectTo _ (UnixSocket path) = do
|
||||
bracketOnError
|
||||
(N.socket N.AF_UNIX N.Stream 0)
|
||||
N.close
|
||||
(\sock -> do
|
||||
N.connect sock (N.SockAddrUnix path)
|
||||
N.socketToHandle sock ReadWriteMode
|
||||
)
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
-- * Host
|
||||
|
||||
data Host = Host N.HostName PortID deriving (Show, Eq, Ord)
|
||||
|
||||
lookupReplicaSetName :: N.HostName -> IO (Maybe Text)
|
||||
-- ^ Retrieves the replica set name from the TXT DNS record for the given hostname
|
||||
lookupReplicaSetName hostname = do
|
||||
rs <- makeResolvSeed defaultResolvConf
|
||||
res <- withResolver rs $ \resolver -> lookupTXT resolver (pack hostname)
|
||||
case res of
|
||||
Left _ -> pure Nothing
|
||||
Right [] -> pure Nothing
|
||||
Right (x:_) ->
|
||||
pure $ fromMaybe (Nothing :: Maybe Text) (lookup "replicaSet" $ parseQueryText x)
|
||||
|
||||
lookupSeedList :: N.HostName -> IO [Host]
|
||||
-- ^ Retrieves the replica set seed list from the SRV DNS record for the given hostname
|
||||
lookupSeedList hostname = do
|
||||
rs <- makeResolvSeed defaultResolvConf
|
||||
res <- withResolver rs $ \resolver -> lookupSRV resolver $ pack $ "_mongodb._tcp." ++ hostname
|
||||
case res of
|
||||
Left _ -> pure []
|
||||
Right srv -> pure $ map (\(_, _, por, tar) ->
|
||||
let tar' = dropWhileEnd (=='.') (unpack tar)
|
||||
in Host tar' (PortNumber . fromIntegral $ por)) srv
|
|
@ -4,8 +4,8 @@
|
|||
-- This module is not intended for direct use. Use the high-level interface at
|
||||
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.
|
||||
|
||||
{-# LANGUAGE RecordWildCards, OverloadedStrings #-}
|
||||
{-# LANGUAGE CPP, FlexibleContexts #-}
|
||||
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
|
||||
{-# LANGUAGE CPP, FlexibleContexts, TupleSections, TypeSynonymInstances #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
|
@ -20,43 +20,41 @@
|
|||
module Database.MongoDB.Internal.Protocol (
|
||||
FullCollection,
|
||||
-- * Pipe
|
||||
Pipe, newPipe, newPipeWith, send, sendOpMsg, call, callOpMsg,
|
||||
Pipe, newPipe, newPipeWith, send, call,
|
||||
-- ** Notice
|
||||
Notice(..), InsertOption(..), UpdateOption(..), DeleteOption(..), CursorId,
|
||||
-- ** Request
|
||||
Request(..), QueryOption(..), Cmd (..), KillC(..),
|
||||
Request(..), QueryOption(..),
|
||||
-- ** Reply
|
||||
Reply(..), ResponseFlag(..), FlagBit(..),
|
||||
Reply(..), ResponseFlag(..),
|
||||
-- * Authentication
|
||||
Username, Password, Nonce, pwHash, pwKey,
|
||||
isClosed, close, ServerData(..), Pipeline(..), putOpMsg,
|
||||
bitOpMsg
|
||||
isClosed, close, ServerData(..), Pipeline(..)
|
||||
) where
|
||||
|
||||
#if !MIN_VERSION_base(4,8,0)
|
||||
import Control.Applicative ((<$>))
|
||||
#endif
|
||||
import Control.Monad ( forM, replicateM, unless, forever )
|
||||
import Data.Binary.Get (Get, runGet, getInt8)
|
||||
import Data.Binary.Put (Put, runPut, putInt8)
|
||||
import Data.Bits (bit, testBit, zeroBits)
|
||||
import Control.Monad (forM, replicateM, unless)
|
||||
import Data.Binary.Get (Get, runGet)
|
||||
import Data.Binary.Put (Put, runPut)
|
||||
import Data.Bits (bit, testBit)
|
||||
import Data.Int (Int32, Int64)
|
||||
import Data.IORef (IORef, newIORef, atomicModifyIORef)
|
||||
import System.IO (Handle)
|
||||
import System.IO.Error (doesNotExistErrorType, mkIOError)
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
import Data.Maybe (maybeToList, fromJust)
|
||||
import Data.Maybe (maybeToList)
|
||||
import GHC.Conc (ThreadStatus(..), threadStatus)
|
||||
import Control.Monad.STM (atomically)
|
||||
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
|
||||
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)
|
||||
import Control.Monad (forever)
|
||||
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
|
||||
import Control.Concurrent (ThreadId, forkIO, killThread)
|
||||
|
||||
import Control.Exception.Lifted (SomeException, mask_, onException, throwIO, try)
|
||||
import Control.Exception.Lifted (onException, throwIO, try)
|
||||
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
|
||||
import Control.Monad.Trans (MonadIO, liftIO)
|
||||
import Data.Bson (Document, (=:), merge, cast, valueAt, look)
|
||||
import Data.Bson (Document)
|
||||
import Data.Bson.Binary (getDocument, putDocument, getInt32, putInt32, getInt64,
|
||||
putInt64, putCString)
|
||||
import Data.Text (Text)
|
||||
|
@ -68,14 +66,11 @@ import qualified Data.Text.Encoding as TE
|
|||
import Database.MongoDB.Internal.Util (bitOr, byteStringHex)
|
||||
|
||||
import Database.MongoDB.Transport (Transport)
|
||||
import qualified Database.MongoDB.Transport as Tr
|
||||
|
||||
import qualified Database.MongoDB.Transport as T
|
||||
|
||||
#if MIN_VERSION_base(4,6,0)
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
||||
putMVar, readMVar, mkWeakMVar, isEmptyMVar)
|
||||
import GHC.List (foldl1')
|
||||
import Conduit (repeatWhileMC, (.|), runConduit, foldlC)
|
||||
putMVar, readMVar, mkWeakMVar)
|
||||
#else
|
||||
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
||||
putMVar, readMVar, addMVarFinalizer)
|
||||
|
@ -86,15 +81,13 @@ mkWeakMVar :: MVar a -> IO () -> IO ()
|
|||
mkWeakMVar = addMVarFinalizer
|
||||
#endif
|
||||
|
||||
|
||||
-- * Pipeline
|
||||
|
||||
-- | Thread-safe and pipelined connection
|
||||
data Pipeline = Pipeline
|
||||
{ vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
|
||||
, responseQueue :: TChan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrives we pop the next thread and give it the response.
|
||||
, responseQueue :: Chan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
|
||||
, listenThread :: ThreadId
|
||||
, finished :: MVar ()
|
||||
, serverData :: ServerData
|
||||
}
|
||||
|
||||
|
@ -106,54 +99,25 @@ data ServerData = ServerData
|
|||
, maxBsonObjectSize :: Int
|
||||
, maxWriteBatchSize :: Int
|
||||
}
|
||||
deriving Show
|
||||
|
||||
-- | @'forkUnmaskedFinally' action and_then@ behaves the same as @'forkFinally' action and_then@, except that @action@ is run completely unmasked, whereas with 'forkFinally', @action@ is run with the same mask as the parent thread.
|
||||
forkUnmaskedFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
|
||||
forkUnmaskedFinally action and_then =
|
||||
mask_ $ forkIOWithUnmask $ \unmask ->
|
||||
try (unmask action) >>= and_then
|
||||
|
||||
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
|
||||
newPipeline :: ServerData -> Transport -> IO Pipeline
|
||||
newPipeline serverData stream = do
|
||||
vStream <- newMVar stream
|
||||
responseQueue <- atomically newTChan
|
||||
finished <- newEmptyMVar
|
||||
let drainReplies = do
|
||||
chanEmpty <- atomically $ isEmptyTChan responseQueue
|
||||
if chanEmpty
|
||||
then return ()
|
||||
else do
|
||||
var <- atomically $ readTChan responseQueue
|
||||
putMVar var $ Left $ mkIOError
|
||||
doesNotExistErrorType
|
||||
"Handle has been closed"
|
||||
Nothing
|
||||
Nothing
|
||||
drainReplies
|
||||
|
||||
responseQueue <- newChan
|
||||
rec
|
||||
let pipe = Pipeline{..}
|
||||
listenThread <- forkUnmaskedFinally (listen pipe) $ \_ -> do
|
||||
putMVar finished ()
|
||||
drainReplies
|
||||
|
||||
listenThread <- forkIO (listen pipe)
|
||||
_ <- mkWeakMVar vStream $ do
|
||||
killThread listenThread
|
||||
Tr.close stream
|
||||
T.close stream
|
||||
return pipe
|
||||
|
||||
isFinished :: Pipeline -> IO Bool
|
||||
isFinished Pipeline {finished} = do
|
||||
empty <- isEmptyMVar finished
|
||||
return $ not empty
|
||||
|
||||
close :: Pipeline -> IO ()
|
||||
-- ^ Close pipe and underlying connection
|
||||
close Pipeline{..} = do
|
||||
killThread listenThread
|
||||
Tr.close =<< readMVar vStream
|
||||
T.close =<< readMVar vStream
|
||||
|
||||
isClosed :: Pipeline -> IO Bool
|
||||
isClosed Pipeline{listenThread} = do
|
||||
|
@ -163,7 +127,6 @@ isClosed Pipeline{listenThread} = do
|
|||
ThreadFinished -> True
|
||||
ThreadBlocked _ -> False
|
||||
ThreadDied -> True
|
||||
|
||||
--isPipeClosed Pipeline{..} = isClosed =<< readMVar vHandle -- isClosed hangs while listen loop is waiting on read
|
||||
|
||||
listen :: Pipeline -> IO ()
|
||||
|
@ -172,10 +135,10 @@ listen Pipeline{..} = do
|
|||
stream <- readMVar vStream
|
||||
forever $ do
|
||||
e <- try $ readMessage stream
|
||||
var <- atomically $ readTChan responseQueue
|
||||
var <- readChan responseQueue
|
||||
putMVar var e
|
||||
case e of
|
||||
Left err -> Tr.close stream >> ioError err -- close and stop looping
|
||||
Left err -> T.close stream >> ioError err -- close and stop looping
|
||||
Right _ -> return ()
|
||||
|
||||
psend :: Pipeline -> Message -> IO ()
|
||||
|
@ -183,55 +146,24 @@ psend :: Pipeline -> Message -> IO ()
|
|||
-- Throw IOError and close pipeline if send fails
|
||||
psend p@Pipeline{..} !message = withMVar vStream (flip writeMessage message) `onException` close p
|
||||
|
||||
psendOpMsg :: Pipeline -> [Cmd] -> Maybe FlagBit -> Document -> IO ()-- IO (IO Response)
|
||||
psendOpMsg p@Pipeline{..} commands flagBit params =
|
||||
case flagBit of
|
||||
Just f -> case f of
|
||||
MoreToCome -> withMVar vStream (\t -> writeOpMsgMessage t (commands, Nothing) flagBit params) `onException` close p -- >> return (return (0, ReplyEmpty))
|
||||
_ -> error "moreToCome has to be set if no response is expected"
|
||||
_ -> error "moreToCome has to be set if no response is expected"
|
||||
|
||||
pcall :: Pipeline -> Message -> IO (IO Response)
|
||||
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
|
||||
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
|
||||
pcall p@Pipeline{..} message = do
|
||||
listenerStopped <- isFinished p
|
||||
if listenerStopped
|
||||
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
|
||||
else withMVar vStream doCall `onException` close p
|
||||
where
|
||||
pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p where
|
||||
doCall stream = do
|
||||
writeMessage stream message
|
||||
var <- newEmptyMVar
|
||||
liftIO $ atomically $ writeTChan responseQueue var
|
||||
return $ readMVar var >>= either throwIO return -- return promise
|
||||
|
||||
pcallOpMsg :: Pipeline -> Maybe (Request, RequestId) -> Maybe FlagBit -> Document -> IO (IO Response)
|
||||
-- ^ Send message to destination and return /promise/ of response from one message only. The destination must reply to the message (otherwise promises will have the wrong responses in them).
|
||||
-- Throw IOError and closes pipeline if send fails, likewise for promised response.
|
||||
pcallOpMsg p@Pipeline{..} message flagbit params = do
|
||||
listenerStopped <- isFinished p
|
||||
if listenerStopped
|
||||
then ioError $ mkIOError doesNotExistErrorType "Handle has been closed" Nothing Nothing
|
||||
else withMVar vStream doCall `onException` close p
|
||||
where
|
||||
doCall stream = do
|
||||
writeOpMsgMessage stream ([], message) flagbit params
|
||||
var <- newEmptyMVar
|
||||
-- put var into the response-queue so that it can
|
||||
-- fetch the latest response
|
||||
liftIO $ atomically $ writeTChan responseQueue var
|
||||
liftIO $ writeChan responseQueue var
|
||||
return $ readMVar var >>= either throwIO return -- return promise
|
||||
|
||||
-- * Pipe
|
||||
|
||||
type Pipe = Pipeline
|
||||
-- ^ Thread-safe TCP connection with pipelined requests. In long-running applications the user is expected to use it as a "client": create a `Pipe`
|
||||
-- at startup, use it as long as possible, watch out for possible timeouts, and close it on shutdown. Bearing in mind that disconnections may be triggered by MongoDB service providers, the user is responsible for re-creating their `Pipe` whenever necessary.
|
||||
-- ^ Thread-safe TCP connection with pipelined requests
|
||||
|
||||
newPipe :: ServerData -> Handle -> IO Pipe
|
||||
-- ^ Create pipe over handle
|
||||
newPipe sd handle = Tr.fromHandle handle >>= (newPipeWith sd)
|
||||
newPipe sd handle = T.fromHandle handle >>= (newPipeWith sd)
|
||||
|
||||
newPipeWith :: ServerData -> Transport -> IO Pipe
|
||||
-- ^ Create pipe over connection
|
||||
|
@ -241,12 +173,6 @@ send :: Pipe -> [Notice] -> IO ()
|
|||
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
|
||||
send pipe notices = psend pipe (notices, Nothing)
|
||||
|
||||
sendOpMsg :: Pipe -> [Cmd] -> Maybe FlagBit -> Document -> IO ()
|
||||
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
|
||||
sendOpMsg pipe commands@(Nc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
|
||||
sendOpMsg pipe commands@(Kc _ : _) flagBit params = psendOpMsg pipe commands flagBit params
|
||||
sendOpMsg _ _ _ _ = error "This function only supports Cmd types wrapped in Nc or Kc type constructors"
|
||||
|
||||
call :: Pipe -> [Notice] -> Request -> IO (IO Reply)
|
||||
-- ^ Send notices and request as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
|
||||
call pipe notices request = do
|
||||
|
@ -257,73 +183,11 @@ call pipe notices request = do
|
|||
check requestId (responseTo, reply) = if requestId == responseTo then reply else
|
||||
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
|
||||
|
||||
callOpMsg :: Pipe -> Request -> Maybe FlagBit -> Document -> IO (IO Reply)
|
||||
-- ^ Send requests as a contiguous batch to server and return reply promise, which will block when invoked until reply arrives. This call and resulting promise will throw IOError if connection fails.
|
||||
callOpMsg pipe request flagBit params = do
|
||||
requestId <- genRequestId
|
||||
promise <- pcallOpMsg pipe (Just (request, requestId)) flagBit params
|
||||
promise' <- promise :: IO Response
|
||||
return $ snd <$> produce requestId promise'
|
||||
where
|
||||
-- We need to perform streaming here as within the OP_MSG protocol mongoDB expects
|
||||
-- our client to keep receiving messages after the MoreToCome flagbit was
|
||||
-- set by the server until our client receives an empty flagbit. After the
|
||||
-- first MoreToCome flagbit was set the responseTo field in the following
|
||||
-- headers will reference the cursorId that was set in the previous message.
|
||||
-- see:
|
||||
-- https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#moretocome-on-responses
|
||||
checkFlagBit p =
|
||||
case p of
|
||||
(_, r) ->
|
||||
case r of
|
||||
ReplyOpMsg{..} -> flagBits == [MoreToCome]
|
||||
-- This is called by functions using the OP_MSG protocol,
|
||||
-- so this has to be ReplyOpMsg
|
||||
_ -> error "Impossible"
|
||||
produce reqId p = runConduit $
|
||||
case p of
|
||||
(rt, r) ->
|
||||
case r of
|
||||
ReplyOpMsg{..} ->
|
||||
if flagBits == [MoreToCome]
|
||||
then yieldResponses .| foldlC mergeResponses p
|
||||
else return $ (rt, check reqId p)
|
||||
_ -> error "Impossible" -- see comment above
|
||||
yieldResponses = repeatWhileMC
|
||||
(do
|
||||
var <- newEmptyMVar
|
||||
liftIO $ atomically $ writeTChan (responseQueue pipe) var
|
||||
readMVar var >>= either throwIO return :: IO Response
|
||||
)
|
||||
checkFlagBit
|
||||
mergeResponses p@(rt,rep) p' =
|
||||
case (p, p') of
|
||||
((_, r), (_, r')) ->
|
||||
case (r, r') of
|
||||
(ReplyOpMsg _ sec _, ReplyOpMsg _ sec' _) -> do
|
||||
let (section, section') = (head sec, head sec')
|
||||
(cur, cur') = (maybe Nothing cast $ look "cursor" section,
|
||||
maybe Nothing cast $ look "cursor" section')
|
||||
case (cur, cur') of
|
||||
(Just doc, Just doc') -> do
|
||||
let (docs, docs') =
|
||||
( fromJust $ cast $ valueAt "nextBatch" doc :: [Document]
|
||||
, fromJust $ cast $ valueAt "nextBatch" doc' :: [Document])
|
||||
id' = fromJust $ cast $ valueAt "id" doc' :: Int32
|
||||
(rt, check id' (rt, rep{ sections = docs' ++ docs })) -- todo: avoid (++)
|
||||
-- Since we use this to process moreToCome messages, we
|
||||
-- know that there will be a nextBatch key in the document
|
||||
_ -> error "Impossible"
|
||||
_ -> error "Impossible" -- see comment above
|
||||
check requestId (responseTo, reply) = if requestId == responseTo then reply else
|
||||
error $ "expected response id (" ++ show responseTo ++ ") to match request id (" ++ show requestId ++ ")"
|
||||
|
||||
-- * Message
|
||||
|
||||
type Message = ([Notice], Maybe (Request, RequestId))
|
||||
-- ^ A write notice(s) with getLastError request, or just query request.
|
||||
-- Note, that requestId will be out of order because request ids will be generated for notices after the request id supplied was generated. This is ok because the mongo server does not care about order just uniqueness.
|
||||
type OpMsgMessage = ([Cmd], Maybe (Request, RequestId))
|
||||
|
||||
writeMessage :: Transport -> Message -> IO ()
|
||||
-- ^ Write message to connection
|
||||
|
@ -338,27 +202,8 @@ writeMessage conn (notices, mRequest) = do
|
|||
let s = runPut $ putRequest request requestId
|
||||
return $ (lenBytes s) `L.append` s
|
||||
|
||||
Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
|
||||
Tr.flush conn
|
||||
where
|
||||
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
|
||||
encodeSize = runPut . putInt32 . (+ 4)
|
||||
|
||||
writeOpMsgMessage :: Transport -> OpMsgMessage -> Maybe FlagBit -> Document -> IO ()
|
||||
-- ^ Write message to connection
|
||||
writeOpMsgMessage conn (notices, mRequest) flagBit params = do
|
||||
noticeStrings <- forM notices $ \n -> do
|
||||
requestId <- genRequestId
|
||||
let s = runPut $ putOpMsg n requestId flagBit params
|
||||
return $ (lenBytes s) `L.append` s
|
||||
|
||||
let requestString = do
|
||||
(request, requestId) <- mRequest
|
||||
let s = runPut $ putOpMsg (Req request) requestId flagBit params
|
||||
return $ (lenBytes s) `L.append` s
|
||||
|
||||
Tr.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
|
||||
Tr.flush conn
|
||||
T.write conn $ L.toStrict $ L.concat $ noticeStrings ++ (maybeToList requestString)
|
||||
T.flush conn
|
||||
where
|
||||
lenBytes bytes = encodeSize . toEnum . fromEnum $ L.length bytes
|
||||
encodeSize = runPut . putInt32 . (+ 4)
|
||||
|
@ -370,8 +215,8 @@ readMessage :: Transport -> IO Response
|
|||
-- ^ read response from a connection
|
||||
readMessage conn = readResp where
|
||||
readResp = do
|
||||
len <- fromEnum . decodeSize . L.fromStrict <$> Tr.read conn 4
|
||||
runGet getReply . L.fromStrict <$> Tr.read conn len
|
||||
len <- fromEnum . decodeSize . L.fromStrict <$> T.read conn 4
|
||||
runGet getReply . L.fromStrict <$> T.read conn len
|
||||
decodeSize = subtract 4 . runGet getInt32
|
||||
|
||||
type FullCollection = Text
|
||||
|
@ -388,7 +233,6 @@ type ResponseTo = RequestId
|
|||
|
||||
genRequestId :: (MonadIO m) => m RequestId
|
||||
-- ^ Generate fresh request id
|
||||
{-# NOINLINE genRequestId #-}
|
||||
genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where
|
||||
counter :: IORef RequestId
|
||||
counter = unsafePerformIO (newIORef 0)
|
||||
|
@ -403,13 +247,6 @@ putHeader opcode requestId = do
|
|||
putInt32 0
|
||||
putInt32 opcode
|
||||
|
||||
putOpMsgHeader :: Opcode -> RequestId -> Put
|
||||
-- ^ Note, does not write message length (first int32), assumes caller will write it
|
||||
putOpMsgHeader opcode requestId = do
|
||||
putInt32 requestId
|
||||
putInt32 0
|
||||
putInt32 opcode
|
||||
|
||||
getHeader :: Get (Opcode, ResponseTo)
|
||||
-- ^ Note, does not read message length (first int32), assumes it was already read
|
||||
getHeader = do
|
||||
|
@ -484,137 +321,6 @@ putNotice notice requestId = do
|
|||
putInt32 $ toEnum (length kCursorIds)
|
||||
mapM_ putInt64 kCursorIds
|
||||
|
||||
data KillC = KillC { killCursor :: Notice, kFullCollection:: FullCollection} deriving Show
|
||||
|
||||
data Cmd = Nc Notice | Req Request | Kc KillC deriving Show
|
||||
|
||||
data FlagBit =
|
||||
ChecksumPresent -- ^ The message ends with 4 bytes containing a CRC-32C checksum
|
||||
| MoreToCome -- ^ Another message will follow this one without further action from the receiver.
|
||||
| ExhaustAllowed -- ^ The client is prepared for multiple replies to this request using the moreToCome bit.
|
||||
deriving (Show, Eq, Enum)
|
||||
|
||||
|
||||
{-
|
||||
OP_MSG header == 16 byte
|
||||
+ 4 bytes flagBits
|
||||
+ 1 byte payload type = 1
|
||||
+ 1 byte payload type = 2
|
||||
+ 4 byte size of payload
|
||||
== 26 bytes opcode overhead
|
||||
+ X Full command document {insert: "test", writeConcern: {...}}
|
||||
+ Y command identifier ("documents", "deletes", "updates") ( + \0)
|
||||
-}
|
||||
putOpMsg :: Cmd -> RequestId -> Maybe FlagBit -> Document -> Put
|
||||
putOpMsg cmd requestId flagBit params = do
|
||||
let biT = maybe zeroBits (bit . bitOpMsg) flagBit:: Int32
|
||||
putOpMsgHeader opMsgOpcode requestId -- header
|
||||
case cmd of
|
||||
Nc n -> case n of
|
||||
Insert{..} -> do
|
||||
let (sec0, sec1Size) =
|
||||
prepSectionInfo
|
||||
iFullCollection
|
||||
(Just (iDocuments:: [Document]))
|
||||
(Nothing:: Maybe Document)
|
||||
("insert":: Text)
|
||||
("documents":: Text)
|
||||
params
|
||||
putInt32 biT -- flagBit
|
||||
putInt8 0 -- payload type 0
|
||||
putDocument sec0 -- payload
|
||||
putInt8 1 -- payload type 1
|
||||
putInt32 sec1Size -- size of section
|
||||
putCString "documents" -- identifier
|
||||
mapM_ putDocument iDocuments -- payload
|
||||
Update{..} -> do
|
||||
let doc = ["q" =: uSelector, "u" =: uUpdater]
|
||||
(sec0, sec1Size) =
|
||||
prepSectionInfo
|
||||
uFullCollection
|
||||
(Nothing:: Maybe [Document])
|
||||
(Just doc)
|
||||
("update":: Text)
|
||||
("updates":: Text)
|
||||
params
|
||||
putInt32 biT
|
||||
putInt8 0
|
||||
putDocument sec0
|
||||
putInt8 1
|
||||
putInt32 sec1Size
|
||||
putCString "updates"
|
||||
putDocument doc
|
||||
Delete{..} -> do
|
||||
-- Setting limit to 1 here is ok, since this is only used by deleteOne
|
||||
let doc = ["q" =: dSelector, "limit" =: (1 :: Int32)]
|
||||
(sec0, sec1Size) =
|
||||
prepSectionInfo
|
||||
dFullCollection
|
||||
(Nothing:: Maybe [Document])
|
||||
(Just doc)
|
||||
("delete":: Text)
|
||||
("deletes":: Text)
|
||||
params
|
||||
putInt32 biT
|
||||
putInt8 0
|
||||
putDocument sec0
|
||||
putInt8 1
|
||||
putInt32 sec1Size
|
||||
putCString "deletes"
|
||||
putDocument doc
|
||||
_ -> error "The KillCursors command cannot be wrapped into a Nc type constructor. Please use the Kc type constructor"
|
||||
Req r -> case r of
|
||||
Query{..} -> do
|
||||
let n = T.splitOn "." qFullCollection
|
||||
db = head n
|
||||
sec0 = foldl1' merge [qProjector, [ "$db" =: db ], qSelector]
|
||||
putInt32 biT
|
||||
putInt8 0
|
||||
putDocument sec0
|
||||
GetMore{..} -> do
|
||||
let n = T.splitOn "." gFullCollection
|
||||
(db, coll) = (head n, last n)
|
||||
pre = ["getMore" =: gCursorId, "collection" =: coll, "$db" =: db, "batchSize" =: gBatchSize]
|
||||
putInt32 (bit $ bitOpMsg $ ExhaustAllowed)
|
||||
putInt8 0
|
||||
putDocument pre
|
||||
Kc k -> case k of
|
||||
KillC{..} -> do
|
||||
let n = T.splitOn "." kFullCollection
|
||||
(db, coll) = (head n, last n)
|
||||
case killCursor of
|
||||
KillCursors{..} -> do
|
||||
let doc = ["killCursors" =: coll, "cursors" =: kCursorIds, "$db" =: db]
|
||||
putInt32 biT
|
||||
putInt8 0
|
||||
putDocument doc
|
||||
-- Notices are already captured at the beginning, so all
|
||||
-- other cases are impossible
|
||||
_ -> error "impossible"
|
||||
where
|
||||
lenBytes bytes = toEnum . fromEnum $ L.length bytes:: Int32
|
||||
prepSectionInfo fullCollection documents document command identifier ps =
|
||||
let n = T.splitOn "." fullCollection
|
||||
(db, coll) = (head n, last n)
|
||||
in
|
||||
case documents of
|
||||
Just ds ->
|
||||
let
|
||||
sec0 = merge ps [command =: coll, "$db" =: db]
|
||||
s = sum $ map (lenBytes . runPut . putDocument) ds
|
||||
i = runPut $ putCString identifier
|
||||
-- +4 bytes for the type 1 section size that has to be
|
||||
-- transported in addition to the type 1 section document
|
||||
sec1Size = s + lenBytes i + 4
|
||||
in (sec0, sec1Size)
|
||||
Nothing ->
|
||||
let
|
||||
sec0 = merge ps [command =: coll, "$db" =: db]
|
||||
s = runPut $ putDocument $ fromJust document
|
||||
i = runPut $ putCString identifier
|
||||
sec1Size = lenBytes s + lenBytes i + 4
|
||||
in (sec0, sec1Size)
|
||||
|
||||
iBit :: InsertOption -> Int32
|
||||
iBit KeepGoing = bit 0
|
||||
|
||||
|
@ -634,11 +340,6 @@ dBit SingleRemove = bit 0
|
|||
dBits :: [DeleteOption] -> Int32
|
||||
dBits = bitOr . map dBit
|
||||
|
||||
bitOpMsg :: FlagBit -> Int
|
||||
bitOpMsg ChecksumPresent = 0
|
||||
bitOpMsg MoreToCome = 1
|
||||
bitOpMsg ExhaustAllowed = 16
|
||||
|
||||
-- ** Request
|
||||
|
||||
-- | A request is a message that is sent with a 'Reply' expected in return
|
||||
|
@ -648,8 +349,8 @@ data Request =
|
|||
qFullCollection :: FullCollection,
|
||||
qSkip :: Int32, -- ^ Number of initial matching documents to skip
|
||||
qBatchSize :: Int32, -- ^ The number of document to return in each batch response from the server. 0 means use Mongo default. Negative means close cursor after first batch and use absolute value as batch size.
|
||||
qSelector :: Document, -- ^ @[]@ = return all documents in collection
|
||||
qProjector :: Document -- ^ @[]@ = return whole document
|
||||
qSelector :: Document, -- ^ \[\] = return all documents in collection
|
||||
qProjector :: Document -- ^ \[\] = return whole document
|
||||
} | GetMore {
|
||||
gFullCollection :: FullCollection,
|
||||
gBatchSize :: Int32,
|
||||
|
@ -657,15 +358,13 @@ data Request =
|
|||
deriving (Show, Eq)
|
||||
|
||||
data QueryOption =
|
||||
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on @CursorNotFound@ exception.
|
||||
TailableCursor -- ^ Tailable means cursor is not closed when the last data is retrieved. Rather, the cursor marks the final object's position. You can resume using the cursor later, from where it was located, if more data were received. Like any "latent cursor", the cursor may become invalid at some point – for example if the final object it references were deleted. Thus, you should be prepared to requery on CursorNotFound exception.
|
||||
| SlaveOK -- ^ Allow query of replica slave. Normally these return an error except for namespace "local".
|
||||
| NoCursorTimeout -- ^ The server normally times out idle cursors after 10 minutes to prevent a memory leak in case a client forgets to close a cursor. Set this option to allow a cursor to live forever until it is closed.
|
||||
| AwaitData -- ^ Use with TailableCursor. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal.
|
||||
|
||||
-- | Exhaust -- ^ Stream the data down full blast in multiple "more" packages, on the assumption that the client will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: the client is not allowed to not read all the data unless it closes the connection.
|
||||
-- Exhaust commented out because not compatible with current `Pipeline` implementation
|
||||
|
||||
| Partial -- ^ Get partial results from a /mongos/ if some shards are down, instead of throwing an error.
|
||||
| Partial -- ^ Get partial results from a _mongos_ if some shards are down, instead of throwing an error.
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- *** Binary format
|
||||
|
@ -674,9 +373,6 @@ qOpcode :: Request -> Opcode
|
|||
qOpcode Query{} = 2004
|
||||
qOpcode GetMore{} = 2005
|
||||
|
||||
opMsgOpcode :: Opcode
|
||||
opMsgOpcode = 2013
|
||||
|
||||
putRequest :: Request -> RequestId -> Put
|
||||
putRequest request requestId = do
|
||||
putHeader (qOpcode request) requestId
|
||||
|
@ -700,7 +396,7 @@ qBit SlaveOK = bit 2
|
|||
qBit NoCursorTimeout = bit 4
|
||||
qBit AwaitData = bit 5
|
||||
--qBit Exhaust = bit 6
|
||||
qBit Database.MongoDB.Internal.Protocol.Partial = bit 7
|
||||
qBit Partial = bit 7
|
||||
|
||||
qBits :: [QueryOption] -> Int32
|
||||
qBits = bitOr . map qBit
|
||||
|
@ -713,13 +409,7 @@ data Reply = Reply {
|
|||
rCursorId :: CursorId, -- ^ 0 = cursor finished
|
||||
rStartingFrom :: Int32,
|
||||
rDocuments :: [Document]
|
||||
}
|
||||
| ReplyOpMsg {
|
||||
flagBits :: [FlagBit],
|
||||
sections :: [Document],
|
||||
checksum :: Maybe Int32
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
} deriving (Show, Eq)
|
||||
|
||||
data ResponseFlag =
|
||||
CursorNotFound -- ^ Set when getMore is called but the cursor id is not valid at the server. Returned with zero results.
|
||||
|
@ -735,18 +425,6 @@ replyOpcode = 1
|
|||
getReply :: Get (ResponseTo, Reply)
|
||||
getReply = do
|
||||
(opcode, responseTo) <- getHeader
|
||||
if opcode == 2013
|
||||
then do
|
||||
-- Notes:
|
||||
-- Checksum bits that are set by the server don't seem to be supported by official drivers.
|
||||
-- See: https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/message.py#L1423
|
||||
flagBits <- rFlagsOpMsg <$> getInt32
|
||||
_ <- getInt8
|
||||
sec0 <- getDocument
|
||||
let sections = [sec0]
|
||||
checksum = Nothing
|
||||
return (responseTo, ReplyOpMsg{..})
|
||||
else do
|
||||
unless (opcode == replyOpcode) $ fail $ "expected reply opcode (1) but got " ++ show opcode
|
||||
rResponseFlags <- rFlags <$> getInt32
|
||||
rCursorId <- getInt64
|
||||
|
@ -758,15 +436,6 @@ getReply = do
|
|||
rFlags :: Int32 -> [ResponseFlag]
|
||||
rFlags bits = filter (testBit bits . rBit) [CursorNotFound ..]
|
||||
|
||||
-- See https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst#flagbits
|
||||
rFlagsOpMsg :: Int32 -> [FlagBit]
|
||||
rFlagsOpMsg bits = isValidFlag bits
|
||||
where isValidFlag bt =
|
||||
let setBits = map fst $ filter (\(_,b) -> b == True) $ zip ([0..31] :: [Int32]) $ map (testBit bt) [0 .. 31]
|
||||
in if any (\n -> not $ elem n [0,1,16]) setBits
|
||||
then error "Unsopported bit was set"
|
||||
else filter (testBit bt . bitOpMsg) [ChecksumPresent ..]
|
||||
|
||||
rBit :: ResponseFlag -> Int
|
||||
rBit CursorNotFound = 0
|
||||
rBit QueryError = 1
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
-- | Miscellaneous general functions
|
||||
-- | Miscellaneous general functions and Show, Eq, and Ord instances for PortID
|
||||
|
||||
{-# LANGUAGE FlexibleInstances, UndecidableInstances, StandaloneDeriving #-}
|
||||
{-# LANGUAGE CPP #-}
|
||||
-- PortID instances
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Database.MongoDB.Internal.Util where
|
||||
|
||||
|
@ -12,19 +14,26 @@ import Control.Exception (handle, throwIO, Exception)
|
|||
import Control.Monad (liftM, liftM2)
|
||||
import Data.Bits (Bits, (.|.))
|
||||
import Data.Word (Word8)
|
||||
import Network (PortID(..))
|
||||
import Numeric (showHex)
|
||||
import System.Random (newStdGen)
|
||||
import System.Random.Shuffle (shuffle')
|
||||
|
||||
import qualified Data.ByteString as S
|
||||
|
||||
import Control.Monad.Except (MonadError(..))
|
||||
import Control.Monad.Error (MonadError(..), Error(..))
|
||||
import Control.Monad.Trans (MonadIO, liftIO)
|
||||
import Data.Bson
|
||||
import Data.Text (Text)
|
||||
|
||||
import qualified Data.Text as T
|
||||
|
||||
#if !MIN_VERSION_network(2, 4, 1)
|
||||
deriving instance Show PortID
|
||||
deriving instance Eq PortID
|
||||
#endif
|
||||
deriving instance Ord PortID
|
||||
|
||||
-- | A monadic sort implementation derived from the non-monadic one in ghc's Prelude
|
||||
mergesortM :: Monad m => (a -> a -> m Ordering) -> [a] -> m [a]
|
||||
mergesortM cmp = mergesortM' cmp . map wrap
|
||||
|
@ -56,16 +65,13 @@ shuffle :: [a] -> IO [a]
|
|||
-- ^ Randomly shuffle items in list
|
||||
shuffle list = shuffle' list (length list) <$> newStdGen
|
||||
|
||||
loop :: Monad m => m (Maybe a) -> m [a]
|
||||
loop :: (Functor m, Monad m) => m (Maybe a) -> m [a]
|
||||
-- ^ Repeatedy execute action, collecting results, until it returns Nothing
|
||||
loop act = act >>= maybe (return []) (\a -> (a :) `liftM` loop act)
|
||||
loop act = act >>= maybe (return []) (\a -> (a :) <$> loop act)
|
||||
|
||||
untilSuccess :: (MonadError e m) => (a -> m b) -> [a] -> m b
|
||||
untilSuccess :: (MonadError e m, Error e) => (a -> m b) -> [a] -> m b
|
||||
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw 'strMsg' error if list is empty.
|
||||
untilSuccess = untilSuccess' (error "empty untilSuccess")
|
||||
-- Use 'error' copying behavior in removed 'Control.Monad.Error.Error' instance:
|
||||
-- instance Error Failure where strMsg = error
|
||||
-- 'fail' is treated the same as a programming 'error'. In other words, don't use it.
|
||||
untilSuccess = untilSuccess' (strMsg "empty untilSuccess")
|
||||
|
||||
untilSuccess' :: (MonadError e m) => e -> (a -> m b) -> [a] -> m b
|
||||
-- ^ Apply action to elements one at a time until one succeeds. Throw last error if all fail. Throw given error if list is empty
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -1,5 +1,6 @@
|
|||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
#if (__GLASGOW_HASKELL__ >= 706)
|
||||
{-# LANGUAGE RecursiveDo #-}
|
||||
|
@ -20,44 +21,40 @@ ATTENTION!!! Be aware that this module is highly experimental and is
|
|||
barely tested. The current implementation doesn't verify server's identity.
|
||||
It only allows you to connect to a mongodb server using TLS protocol.
|
||||
-}
|
||||
|
||||
module Database.MongoDB.Transport.Tls
|
||||
( connect
|
||||
, connectWithTlsParams
|
||||
)
|
||||
(connect)
|
||||
where
|
||||
|
||||
import Data.IORef
|
||||
import Data.Monoid
|
||||
import qualified Data.ByteString as ByteString
|
||||
import qualified Data.ByteString.Lazy as Lazy.ByteString
|
||||
import Data.Default.Class (def)
|
||||
import Control.Applicative ((<$>))
|
||||
import Control.Exception (bracketOnError)
|
||||
import Control.Monad (when, unless)
|
||||
import System.IO
|
||||
import Database.MongoDB.Internal.Protocol (Pipe, newPipeWith)
|
||||
import Database.MongoDB (Pipe)
|
||||
import Database.MongoDB.Internal.Protocol (newPipeWith)
|
||||
import Database.MongoDB.Transport (Transport(Transport))
|
||||
import qualified Database.MongoDB.Transport as T
|
||||
import System.IO.Error (mkIOError, eofErrorType)
|
||||
import Database.MongoDB.Internal.Network (connectTo, HostName, PortID)
|
||||
import Network (connectTo, HostName, PortID)
|
||||
import qualified Network.TLS as TLS
|
||||
import qualified Network.TLS.Extra.Cipher as TLS
|
||||
import Database.MongoDB.Query (access, slaveOk, retrieveServerData)
|
||||
|
||||
-- | Connect to mongodb using TLS
|
||||
connect :: HostName -> PortID -> IO Pipe
|
||||
connect host port = connectWithTlsParams params host port
|
||||
where
|
||||
params = (TLS.defaultParamsClient host "")
|
||||
{ TLS.clientSupported = def
|
||||
{ TLS.supportedCiphers = TLS.ciphersuite_default }
|
||||
, TLS.clientHooks = def
|
||||
{ TLS.onServerCertificate = \_ _ _ _ -> return [] }
|
||||
}
|
||||
connect host port = bracketOnError (connectTo host port) hClose $ \handle -> do
|
||||
|
||||
-- | Connect to mongodb using TLS using provided TLS client parameters
|
||||
connectWithTlsParams :: TLS.ClientParams -> HostName -> PortID -> IO Pipe
|
||||
connectWithTlsParams clientParams host port = bracketOnError (connectTo host port) hClose $ \handle -> do
|
||||
context <- TLS.contextNew handle clientParams
|
||||
let params = (TLS.defaultParamsClient host "")
|
||||
{ TLS.clientSupported = def
|
||||
{ TLS.supportedCiphers = TLS.ciphersuite_all}
|
||||
, TLS.clientHooks = def
|
||||
{ TLS.onServerCertificate = \_ _ _ _ -> return []}
|
||||
}
|
||||
context <- TLS.contextNew handle params
|
||||
TLS.handshake context
|
||||
|
||||
conn <- tlsConnection context
|
||||
|
|
67
README.md
67
README.md
|
@ -1,7 +1,7 @@
|
|||
This is the Haskell MongoDB driver (client). [MongoDB](http://www.mongodb.org) is a free, scalable, fast, document database management system. This driver lets you connect to a MongoDB server, and update and query its data. It also lets you do adminstrative tasks, like create an index or look at performance statistics.
|
||||
|
||||
[![Join the chat at https://gitter.im/mongodb-haskell/mongodb](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mongodb-haskell/mongodb?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
|
||||
[![Build Status](https://travis-ci.org/mongodb-haskell/mongodb.svg?branch=master)](https://travis-ci.org/mongodb-haskell/mongodb)
|
||||
[![Build Status](https://travis-ci.org/mongodb-haskell/mongodb.svg)](https://travis-ci.org/mongodb-haskell/mongodb)
|
||||
|
||||
### Documentation
|
||||
|
||||
|
@ -11,68 +11,3 @@ This is the Haskell MongoDB driver (client). [MongoDB](http://www.mongodb.org) i
|
|||
* [MapReduce example](http://github.com/mongodb-haskell/mongodb/blob/master/doc/map-reduce-example.md)
|
||||
* [Driver design](https://github.com/mongodb-haskell/mongodb/blob/master/doc/Article1.md)
|
||||
* [MongoDB DBMS](http://www.mongodb.org)
|
||||
|
||||
### Dev Environment
|
||||
|
||||
It's important for this library to be tested with various versions of mongodb
|
||||
server and with different ghc versions. In order to achieve this we use docker
|
||||
containers and docker-compose. This repository contains two files: docker-compose.yml
|
||||
and reattach.sh.
|
||||
|
||||
Docker compose file describes two containers.
|
||||
|
||||
One container is for running mongodb server. If you want a different version of
|
||||
mongodb server you need to change the tag of mongo image in the
|
||||
docker-compose.yml. In order to start your mongodb server you need to run:
|
||||
|
||||
```
|
||||
docker-compose up -d mongodb
|
||||
```
|
||||
|
||||
In order to stop your containers without loosing the data inside of it:
|
||||
|
||||
```
|
||||
docker-compose stop mongodb
|
||||
```
|
||||
|
||||
Restart:
|
||||
|
||||
```
|
||||
docker-compose start mongodb
|
||||
```
|
||||
|
||||
If you want to remove the mongodb container and start from scratch then:
|
||||
|
||||
```
|
||||
docker-compose stop mongodb
|
||||
docker-compose rm mongodb
|
||||
docker-compose up -d mongodb
|
||||
```
|
||||
|
||||
The other container is for compiling your code. By specifying the tag of the image
|
||||
you can change the version of ghc you will be using. If you never started this
|
||||
container then you need:
|
||||
|
||||
```
|
||||
docker-compose run mongodb-haskell
|
||||
```
|
||||
|
||||
It will start the container and mount your working directory to
|
||||
`/opt/mongodb-haskell` If you exit the bash cli then the conainer will stop.
|
||||
In order to reattach to an old stopped container you need to run script
|
||||
`reattach.sh`. If you run `docker-compose run` again it will create another
|
||||
container and all work made by cabal will be lost. `reattach.sh` is a
|
||||
workaround for docker-compose's inability to pick up containers that exited.
|
||||
|
||||
When you are done with testing you need to run:
|
||||
```
|
||||
docker-compose stop mongodb
|
||||
```
|
||||
|
||||
Next time you will need to do:
|
||||
```
|
||||
docker-compose start mongodb
|
||||
reattach.sh
|
||||
```
|
||||
It will start your stopped container with mongodb server and pick up the stopped
|
||||
container with haskell.
|
||||
|
|
0
Setup.lhs
Normal file → Executable file
0
Setup.lhs
Normal file → Executable file
|
@ -1,4 +1,3 @@
|
|||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ExtendedDefaultRules #-}
|
||||
|
||||
|
@ -8,12 +7,7 @@ import Database.MongoDB (Action, Document, Document, Value, access,
|
|||
close, connect, delete, exclude, find,
|
||||
host, insertMany, master, project, rest,
|
||||
select, sort, (=:))
|
||||
|
||||
#if (__GLASGOW_HASKELL__ >= 800)
|
||||
import Control.Monad.IO.Class (liftIO)
|
||||
#else
|
||||
import Control.Monad.Trans (liftIO)
|
||||
#endif
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
|
|
|
@ -12,10 +12,9 @@ Start a haskell session:
|
|||
$ ghci
|
||||
> :set prompt "> "
|
||||
|
||||
Import the MongoDB driver library, and set OverloadedStrings so literal strings are converted to UTF-8 automatically and ExtendedDefaultRules for using BSON fields with basic types.
|
||||
Import the MongoDB driver library, and set OverloadedStrings so literal strings are converted to UTF-8 automatically.
|
||||
|
||||
> :set -XOverloadedStrings
|
||||
> :set -XExtendedDefaultRules
|
||||
> import Database.MongoDB
|
||||
|
||||
### Connecting
|
||||
|
@ -36,7 +35,7 @@ A DB read or write operation is called a DB `Action`. A DB Action is a monad so
|
|||
|
||||
> access pipe master "test" allCollections
|
||||
|
||||
`access` throw `Failure` if there is any issue. Failure means there was a connection failure, or a read/write failure like cursor expired or duplicate key insert.
|
||||
`access` return either Left `Failure` or Right result. Failure means there was a connection failure, or a read/write failure like cursor expired or duplicate key insert.
|
||||
|
||||
`master` is an `AccessMode`. Access mode indicates how reads and writes will be performed. Its three modes are: `ReadStaleOk`, `UnconfirmedWrites`, and `ConfirmWrites GetLastErrorParams`. `master` is just short hand for `ConfirmWrites []`. The first mode may be used against a slave or a master server, the last two must be used against a master server.
|
||||
|
||||
|
|
|
@ -1,16 +0,0 @@
|
|||
version: '3'
|
||||
services:
|
||||
mongodb:
|
||||
ports:
|
||||
- 27017:27017
|
||||
image: mongo:3.6
|
||||
mongodb-haskell:
|
||||
image: phadej/ghc:8.0.2
|
||||
environment:
|
||||
- HASKELL_MONGODB_TEST_HOST=mongodb
|
||||
entrypoint:
|
||||
- /bin/bash
|
||||
volumes:
|
||||
- ./:/opt/mongodb-haskell
|
||||
|
||||
# vim: ts=2 et sw=2 ai
|
|
@ -1,5 +1,5 @@
|
|||
Name: mongoDB
|
||||
Version: 2.7.1.2
|
||||
Version: 2.1.0
|
||||
Synopsis: Driver (client) for MongoDB, a free, scalable, fast, document
|
||||
DBMS
|
||||
Description: This package lets you connect to MongoDB servers and
|
||||
|
@ -10,74 +10,50 @@ Category: Database
|
|||
Homepage: https://github.com/mongodb-haskell/mongodb
|
||||
Bug-reports: https://github.com/mongodb-haskell/mongodb/issues
|
||||
Author: Tony Hannan
|
||||
Maintainer: Victor Denisov <denisovenator@gmail.com>
|
||||
Maintainer: Fedor Gogolev <knsd@knsd.net>
|
||||
Copyright: Copyright (c) 2010-2012 10gen Inc.
|
||||
License: Apache-2.0
|
||||
License: OtherLicense
|
||||
License-file: LICENSE
|
||||
Cabal-version: >= 1.10
|
||||
Build-type: Simple
|
||||
Stability: alpha
|
||||
Extra-Source-Files: CHANGELOG.md
|
||||
|
||||
-- Imitated from https://github.com/mongodb-haskell/bson/pull/18
|
||||
Flag _old-network
|
||||
description: Control whether to use <http://hackage.haskell.org/package/network-bsd network-bsd>
|
||||
manual: False
|
||||
|
||||
Library
|
||||
GHC-options: -Wall
|
||||
GHC-prof-options: -auto-all-exported
|
||||
default-language: Haskell2010
|
||||
|
||||
Build-depends: array -any
|
||||
, base <5
|
||||
, binary -any
|
||||
, bson >= 0.3 && < 0.5
|
||||
, bson >= 0.3 && < 0.4
|
||||
, text
|
||||
, bytestring -any
|
||||
, containers -any
|
||||
, conduit
|
||||
, conduit-extra
|
||||
, mtl >= 2
|
||||
, cryptohash -any
|
||||
, network -any
|
||||
, parsec -any
|
||||
, random -any
|
||||
, random-shuffle -any
|
||||
, resourcet
|
||||
, monad-control >= 0.3.1
|
||||
, lifted-base >= 0.1.0.3
|
||||
, pureMD5
|
||||
, stm
|
||||
, tagged
|
||||
, tls >= 1.3.0
|
||||
, time
|
||||
, tls >= 1.2.0
|
||||
, data-default-class -any
|
||||
, transformers
|
||||
, transformers-base >= 0.4.1
|
||||
, hashtables >= 1.1.2.0
|
||||
, base16-bytestring >= 0.1.1.6
|
||||
, base64-bytestring >= 1.0.0.1
|
||||
, nonce >= 1.0.5
|
||||
, fail
|
||||
, dns
|
||||
, http-types
|
||||
|
||||
if flag(_old-network)
|
||||
-- "Network.BSD" is only available in network < 2.9
|
||||
build-depends: network < 2.9
|
||||
else
|
||||
-- "Network.BSD" has been moved into its own package `network-bsd`
|
||||
build-depends: network >= 3.0
|
||||
, network-bsd >= 2.7 && < 2.9
|
||||
, nonce >= 1.0.2
|
||||
|
||||
Exposed-modules: Database.MongoDB
|
||||
Database.MongoDB.Admin
|
||||
Database.MongoDB.Connection
|
||||
Database.MongoDB.GridFS
|
||||
Database.MongoDB.Query
|
||||
Database.MongoDB.Transport
|
||||
Database.MongoDB.Transport.Tls
|
||||
Other-modules: Database.MongoDB.Internal.Network
|
||||
Database.MongoDB.Internal.Protocol
|
||||
Other-modules: Database.MongoDB.Internal.Protocol
|
||||
Database.MongoDB.Internal.Util
|
||||
|
||||
Source-repository head
|
||||
|
@ -87,9 +63,6 @@ Source-repository head
|
|||
test-suite test
|
||||
hs-source-dirs: test
|
||||
main-is: Main.hs
|
||||
other-modules: Spec
|
||||
, QuerySpec
|
||||
, TestImport
|
||||
ghc-options: -Wall -with-rtsopts "-K64m"
|
||||
type: exitcode-stdio-1.0
|
||||
build-depends: mongoDB
|
||||
|
@ -114,15 +87,14 @@ Benchmark bench
|
|||
, base64-bytestring
|
||||
, base16-bytestring
|
||||
, binary -any
|
||||
, bson >= 0.3 && < 0.5
|
||||
, data-default-class -any
|
||||
, bson >= 0.3 && < 0.4
|
||||
, text
|
||||
, bytestring -any
|
||||
, containers -any
|
||||
, mtl >= 2
|
||||
, cryptohash -any
|
||||
, nonce >= 1.0.5
|
||||
, stm
|
||||
, network -any
|
||||
, nonce
|
||||
, parsec -any
|
||||
, random -any
|
||||
, random-shuffle -any
|
||||
|
@ -130,19 +102,6 @@ Benchmark bench
|
|||
, lifted-base >= 0.1.0.3
|
||||
, transformers-base >= 0.4.1
|
||||
, hashtables >= 1.1.2.0
|
||||
, fail
|
||||
, dns
|
||||
, http-types
|
||||
, criterion
|
||||
, tls >= 1.3.0
|
||||
|
||||
if flag(_old-network)
|
||||
-- "Network.BSD" is only available in network < 2.9
|
||||
build-depends: network < 2.9
|
||||
else
|
||||
-- "Network.BSD" has been moved into its own package `network-bsd`
|
||||
build-depends: network >= 3.0
|
||||
, network-bsd >= 2.7 && < 2.9
|
||||
|
||||
default-language: Haskell2010
|
||||
default-extensions: OverloadedStrings
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
docker start -ai mongodb_mongodb-haskell_run_1
|
|
@ -1,4 +0,0 @@
|
|||
resolver: lts-9.21
|
||||
flags:
|
||||
mongoDB:
|
||||
_old-network: true
|
|
@ -1,4 +0,0 @@
|
|||
resolver: lts-11.22
|
||||
flags:
|
||||
mongoDB:
|
||||
_old-network: true
|
|
@ -1,4 +0,0 @@
|
|||
resolver: lts-12.26
|
||||
flags:
|
||||
mongoDB:
|
||||
_old-network: true
|
|
@ -1,6 +0,0 @@
|
|||
resolver: lts-13.23
|
||||
extra-deps:
|
||||
- git: git@github.com:hvr/bson.git # https://github.com/mongodb-haskell/bson/pull/18
|
||||
commit: 2fc8d04120c0758201762b8e22254aeb6d574f41
|
||||
- network-bsd-2.8.1.0
|
||||
- network-3.1.0.0
|
|
@ -1,4 +0,0 @@
|
|||
resolver: lts-13.23
|
||||
flags:
|
||||
mongoDB:
|
||||
_old-network: true
|
69
stack.yaml
69
stack.yaml
|
@ -1,69 +0,0 @@
|
|||
# This file was automatically generated by 'stack init'
|
||||
#
|
||||
# Some commonly used options have been documented as comments in this file.
|
||||
# For advanced use and comprehensive documentation of the format, please see:
|
||||
# https://docs.haskellstack.org/en/stable/yaml_configuration/
|
||||
|
||||
# Resolver to choose a 'specific' stackage snapshot or a compiler version.
|
||||
# A snapshot resolver dictates the compiler version and the set of packages
|
||||
# to be used for project dependencies. For example:
|
||||
#
|
||||
# resolver: lts-3.5
|
||||
# resolver: nightly-2015-09-21
|
||||
# resolver: ghc-7.10.2
|
||||
#
|
||||
# The location of a snapshot can be provided as a file or url. Stack assumes
|
||||
# a snapshot provided as a file might change, whereas a url resource does not.
|
||||
#
|
||||
# resolver: ./custom-snapshot.yaml
|
||||
# resolver: https://example.com/snapshots/2018-01-01.yaml
|
||||
resolver:
|
||||
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml
|
||||
|
||||
# User packages to be built.
|
||||
# Various formats can be used as shown in the example below.
|
||||
#
|
||||
# packages:
|
||||
# - some-directory
|
||||
# - https://example.com/foo/bar/baz-0.0.2.tar.gz
|
||||
# subdirs:
|
||||
# - auto-update
|
||||
# - wai
|
||||
packages:
|
||||
- .
|
||||
# Dependency packages to be pulled from upstream that are not in the resolver.
|
||||
# These entries can reference officially published versions as well as
|
||||
# forks / in-progress versions pinned to a git hash. For example:
|
||||
#
|
||||
# extra-deps:
|
||||
# - acme-missiles-0.3
|
||||
# - git: https://github.com/commercialhaskell/stack.git
|
||||
# commit: e7b331f14bcffb8367cd58fbfc8b40ec7642100a
|
||||
#
|
||||
# extra-deps: []
|
||||
|
||||
# Override default flag values for local packages and extra-deps
|
||||
flags:
|
||||
mongoDB:
|
||||
_old-network: false
|
||||
|
||||
# Extra package databases containing global packages
|
||||
# extra-package-dbs: []
|
||||
|
||||
# Control whether we use the GHC we find on the path
|
||||
# system-ghc: true
|
||||
#
|
||||
# Require a specific version of stack, using version ranges
|
||||
# require-stack-version: -any # Default
|
||||
# require-stack-version: ">=2.7"
|
||||
#
|
||||
# Override the architecture used by stack, especially useful on Windows
|
||||
# arch: i386
|
||||
# arch: x86_64
|
||||
#
|
||||
# Extra directories used by stack for building
|
||||
# extra-include-dirs: [/path/to/dir]
|
||||
# extra-lib-dirs: [/path/to/dir]
|
||||
#
|
||||
# Allow a newer minor version of GHC than the snapshot specifies
|
||||
# compiler-check: newer-minor
|
|
@ -1,13 +0,0 @@
|
|||
# This file was autogenerated by Stack.
|
||||
# You should not edit this file by hand.
|
||||
# For more information, please see the documentation at:
|
||||
# https://docs.haskellstack.org/en/stable/lock_files
|
||||
|
||||
packages: []
|
||||
snapshots:
|
||||
- completed:
|
||||
size: 586110
|
||||
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml
|
||||
sha256: ce4fb8d44f3c6c6032060a02e0ebb1bd29937c9a70101c1517b92a87d9515160
|
||||
original:
|
||||
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/18/21.yaml
|
|
@ -5,15 +5,11 @@ import Database.MongoDB.Connection (connect, host)
|
|||
import Database.MongoDB.Query (access, slaveOk)
|
||||
import Data.Text (unpack)
|
||||
import Test.Hspec.Runner
|
||||
import System.Environment (getEnv)
|
||||
import System.IO.Error (catchIOError)
|
||||
import TestImport
|
||||
import qualified Spec
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
mongodbHost <- getEnv mongodbHostEnvVariable `catchIOError` (\_ -> return "localhost")
|
||||
p <- connect $ host mongodbHost
|
||||
p <- connect $ host "localhost"
|
||||
version <- access p slaveOk "admin" serverVersion
|
||||
putStrLn $ "Running tests with mongodb version: " ++ (unpack version)
|
||||
hspecWith defaultConfig Spec.spec
|
||||
|
|
|
@ -5,9 +5,7 @@ module QuerySpec (spec) where
|
|||
import Data.String (IsString(..))
|
||||
import TestImport
|
||||
import Control.Exception
|
||||
import Control.Monad (forM_, when)
|
||||
import System.Environment (getEnv)
|
||||
import System.IO.Error (catchIOError)
|
||||
import Control.Monad (forM_)
|
||||
import qualified Data.List as L
|
||||
|
||||
import qualified Data.Text as T
|
||||
|
@ -17,17 +15,11 @@ testDBName = "mongodb-haskell-test"
|
|||
|
||||
db :: Action IO a -> IO a
|
||||
db action = do
|
||||
mongodbHost <- getEnv mongodbHostEnvVariable `catchIOError` (\_ -> return "localhost")
|
||||
pipe <- connect (host mongodbHost)
|
||||
pipe <- connect (host "127.0.0.1")
|
||||
result <- access pipe master testDBName action
|
||||
close pipe
|
||||
return result
|
||||
|
||||
getWireVersion :: IO Int
|
||||
getWireVersion = db $ do
|
||||
sd <- retrieveServerData
|
||||
return $ maxWireVersion sd
|
||||
|
||||
withCleanDatabase :: ActionWith () -> IO ()
|
||||
withCleanDatabase action = dropDB >> action () >> dropDB >> return ()
|
||||
where
|
||||
|
@ -43,21 +35,6 @@ insertDuplicateWith testInsert = do
|
|||
]
|
||||
return ()
|
||||
|
||||
insertUsers :: IO ()
|
||||
insertUsers = db $
|
||||
insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]
|
||||
, ["_id" =: "joe", "joined" =: parseDate "2012-07-02", "likes" =: ["tennis", "golf", "swimming"]]
|
||||
, ["_id" =: "jill", "joined" =: parseDate "2013-11-17", "likes" =: ["cricket", "golf"]]
|
||||
]
|
||||
|
||||
pendingIfMongoVersion :: ((Integer, Integer) -> Bool) -> SpecWith () -> Spec
|
||||
pendingIfMongoVersion invalidVersion = before $ do
|
||||
version <- db $ extractVersion . T.splitOn "." . at "version" <$> runCommand1 "buildinfo"
|
||||
when (invalidVersion version) $ pendingWith "This test does not run in the current database version"
|
||||
where
|
||||
extractVersion (major:minor:_) = (read $ T.unpack major, read $ T.unpack minor)
|
||||
extractVersion _ = error "Invalid version specification"
|
||||
|
||||
bigDocument :: Document
|
||||
bigDocument = (flip map) [1..10000] $ \i -> (fromString $ "team" ++ (show i)) =: ("team " ++ (show i) ++ " name")
|
||||
|
||||
|
@ -191,7 +168,7 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
liftIO $ (length returnedDocs) `shouldBe` 1000
|
||||
it "skips one too big document" $ do
|
||||
(db $ insertAll_ "hugeDocCollection" [hugeDocument]) `shouldThrow` anyException
|
||||
db $ insertAll_ "hugeDocCollection" [hugeDocument]
|
||||
db $ do
|
||||
cur <- find $ (select [] "hugeDocCollection") {limit = 100000, batchSize = 100000}
|
||||
returnedDocs <- rest cur
|
||||
|
@ -212,8 +189,6 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
describe "updateMany" $ do
|
||||
it "updates value" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_id <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
result <- db $ rest =<< find (select [] "team")
|
||||
result `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "American"]]
|
||||
|
@ -223,8 +198,6 @@ spec = around withCleanDatabase $ do
|
|||
updatedResult <- db $ rest =<< find (select [] "team")
|
||||
updatedResult `shouldBe` [["_id" =: _id, "name" =: "Yankees", "league" =: "European"]]
|
||||
it "upserts value" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
c <- db $ count (select [] "team")
|
||||
c `shouldBe` 0
|
||||
_ <- db $ updateMany "team" [( []
|
||||
|
@ -234,8 +207,6 @@ spec = around withCleanDatabase $ do
|
|||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
map L.sort updatedResult `shouldBe` [["league" =: "MLB", "name" =: "Giants"]]
|
||||
it "updates all documents with Multi enabled" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
|
||||
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
|
||||
|
@ -247,8 +218,6 @@ spec = around withCleanDatabase $ do
|
|||
, ["league" =: "MLB", "name" =: "Yankees"]
|
||||
]
|
||||
it "updates one document when there is no Multi option" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "MiLB"]
|
||||
_ <- db $ updateMany "team" [( ["name" =: "Yankees"]
|
||||
|
@ -260,8 +229,6 @@ spec = around withCleanDatabase $ do
|
|||
, ["league" =: "MiLB", "name" =: "Yankees"]
|
||||
]
|
||||
it "can process different updates" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB"]
|
||||
_ <- db $ updateMany "team" [ ( ["name" =: "Yankees"]
|
||||
|
@ -278,11 +245,9 @@ spec = around withCleanDatabase $ do
|
|||
, ["league" =: "MiLB", "name" =: "Yankees"]
|
||||
]
|
||||
it "can process different updates" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
|
||||
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
|
||||
updateResult <- (db $ updateMany "team" [ ( ["name" =: "Yankees"]
|
||||
(db $ updateMany "team" [ ( ["name" =: "Yankees"]
|
||||
, ["$inc" =: ["score" =: (1 :: Int)]]
|
||||
, []
|
||||
)
|
||||
|
@ -290,15 +255,12 @@ spec = around withCleanDatabase $ do
|
|||
, ["$inc" =: ["score" =: (2 :: Int)]]
|
||||
, []
|
||||
)
|
||||
])
|
||||
failed updateResult `shouldBe` True
|
||||
]) `shouldThrow` anyException
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
|
||||
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (1 :: Int)]
|
||||
]
|
||||
it "can handle big updates" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
let docs = (flip map) [0..20000] $ \i ->
|
||||
["name" =: (T.pack $ "name " ++ (show i))]
|
||||
ids <- db $ insertAll "bigCollection" docs
|
||||
|
@ -313,11 +275,9 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
describe "updateAll" $ do
|
||||
it "can process different updates" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "team" ["name" =: "Yankees", "league" =: "American", "score" =: (Nothing :: Maybe Int)]
|
||||
_ <- db $ insert "team" ["name" =: "Giants" , "league" =: "MiLB", "score" =: (1 :: Int)]
|
||||
updateResult <- (db $ updateAll "team" [ ( ["name" =: "Yankees"]
|
||||
(db $ updateAll "team" [ ( ["name" =: "Yankees"]
|
||||
, ["$inc" =: ["score" =: (1 :: Int)]]
|
||||
, []
|
||||
)
|
||||
|
@ -325,33 +285,11 @@ spec = around withCleanDatabase $ do
|
|||
, ["$inc" =: ["score" =: (2 :: Int)]]
|
||||
, []
|
||||
)
|
||||
])
|
||||
failed updateResult `shouldBe` True
|
||||
]) `shouldThrow` anyException
|
||||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
(L.sort $ map L.sort updatedResult) `shouldBe` [ ["league" =: "American", "name" =: "Yankees", "score" =: (Nothing :: Maybe Int)]
|
||||
, ["league" =: "MiLB" , "name" =: "Giants" , "score" =: (3 :: Int)]
|
||||
]
|
||||
it "returns correct number of matched and modified" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
|
||||
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
|
||||
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [MultiUpdate])]
|
||||
nMatched res `shouldBe` 2
|
||||
nModified res `shouldBe` (Just 2)
|
||||
it "returns correct number of upserted" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myfield" =: "newValue"]], [Upsert])]
|
||||
(length $ upserted res) `shouldBe` 1
|
||||
it "updates only one doc without multi update" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
|
||||
_ <- db $ insertMany "testCollection" [["myField" =: "myValue"], ["myField2" =: "myValue2"]]
|
||||
res <- db $ updateMany "testCollection" [(["myField" =: "myValue"], ["$set" =: ["myField" =: "newValue"]], [])]
|
||||
nMatched res `shouldBe` 1
|
||||
nModified res `shouldBe` (Just 1)
|
||||
|
||||
describe "delete" $ do
|
||||
it "actually deletes something" $ do
|
||||
|
@ -393,8 +331,6 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
describe "deleteMany" $ do
|
||||
it "actually deletes something" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "team" ["name" =: ("Giants" :: String)]
|
||||
_ <- db $ insert "team" ["name" =: ("Yankees" :: String)]
|
||||
_ <- db $ deleteMany "team" [ (["name" =: ("Giants" :: String)], [])
|
||||
|
@ -405,8 +341,6 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
describe "deleteAll" $ do
|
||||
it "actually deletes something" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "team" [ "name" =: ("Giants" :: String)
|
||||
, "score" =: (Nothing :: Maybe Int)
|
||||
]
|
||||
|
@ -419,21 +353,12 @@ spec = around withCleanDatabase $ do
|
|||
updatedResult <- db $ rest =<< find ((select [] "team") {project = ["_id" =: (0 :: Int)]})
|
||||
length updatedResult `shouldBe` 0
|
||||
it "can handle big deletes" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
let docs = (flip map) [0..20000] $ \i ->
|
||||
["name" =: (T.pack $ "name " ++ (show i))]
|
||||
_ <- db $ insertAll "bigCollection" docs
|
||||
_ <- db $ deleteAll "bigCollection" $ map (\d -> (d, [])) docs
|
||||
updatedResult <- db $ rest =<< find ((select [] "bigCollection") {project = ["_id" =: (0 :: Int)]})
|
||||
length updatedResult `shouldBe` 0
|
||||
it "returns correct result" $ do
|
||||
wireVersion <- getWireVersion
|
||||
when (wireVersion > 1) $ do
|
||||
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
|
||||
_ <- db $ insert "testCollection" [ "myField" =: "myValue" ]
|
||||
res <- db $ deleteAll "testCollection" [ (["myField" =: "myValue"], []) ]
|
||||
nRemoved res `shouldBe` 2
|
||||
|
||||
describe "allCollections" $ do
|
||||
it "returns all collections in a database" $ do
|
||||
|
@ -443,34 +368,13 @@ spec = around withCleanDatabase $ do
|
|||
collections <- db $ allCollections
|
||||
liftIO $ (L.sort collections) `shouldContain` ["team1", "team2", "team3"]
|
||||
|
||||
describe "aggregate" $ before_ insertUsers $
|
||||
describe "aggregate" $ do
|
||||
it "aggregates to normalize and sort documents" $ do
|
||||
db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]
|
||||
, ["_id" =: "joe", "joined" =: parseDate "2012-07-02", "likes" =: ["tennis", "golf", "swimming"]]
|
||||
, ["_id" =: "jill", "joined" =: parseDate "2013-11-17", "likes" =: ["cricket", "golf"]]
|
||||
]
|
||||
result <- db $ aggregate "users" [ ["$project" =: ["name" =: ["$toUpper" =: "$_id"], "_id" =: 0]]
|
||||
, ["$sort" =: ["name" =: 1]]
|
||||
]
|
||||
result `shouldBe` [["name" =: "JANE"], ["name" =: "JILL"], ["name" =: "JOE"]]
|
||||
|
||||
-- This feature was introduced in MongoDB version 3.2
|
||||
-- https://docs.mongodb.com/manual/reference/command/find/
|
||||
describe "findCommand" $ pendingIfMongoVersion (< (3,2)) $
|
||||
context "when mongo version is 3.2 or superior" $ before insertUsers $ do
|
||||
it "fetches all the records" $ do
|
||||
result <- db $ rest =<< findCommand (select [] "users")
|
||||
length result `shouldBe` 3
|
||||
|
||||
it "filters the records" $ do
|
||||
result <- db $ rest =<< findCommand (select ["_id" =: "joe"] "users")
|
||||
length result `shouldBe` 1
|
||||
|
||||
it "projects the records" $ do
|
||||
result <- db $ rest =<< findCommand
|
||||
(select [] "users") { project = [ "_id" =: 1 ] }
|
||||
result `shouldBe` [["_id" =: "jane"], ["_id" =: "joe"], ["_id" =: "jill"]]
|
||||
|
||||
it "sorts the records" $ do
|
||||
result <- db $ rest =<< findCommand
|
||||
(select [] "users") { project = [ "_id" =: 1 ]
|
||||
, sort = [ "_id" =: 1 ]
|
||||
}
|
||||
result `shouldBe` [["_id" =: "jane"], ["_id" =: "jill"], ["_id" =: "joe"]]
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ module TestImport (
|
|||
import Test.Hspec as Export hiding (Selector)
|
||||
import Database.MongoDB as Export
|
||||
import Control.Monad.Trans as Export (MonadIO, liftIO)
|
||||
import Data.Maybe (fromJust)
|
||||
import Data.Time (ParseTime, UTCTime)
|
||||
import qualified Data.Time as Time
|
||||
|
||||
|
@ -17,7 +18,6 @@ import qualified Data.Time as Time
|
|||
import Data.Time.Format (defaultTimeLocale, iso8601DateFormat)
|
||||
#else
|
||||
import System.Locale (defaultTimeLocale, iso8601DateFormat)
|
||||
import Data.Maybe (fromJust)
|
||||
#endif
|
||||
|
||||
parseTime :: ParseTime t => String -> String -> t
|
||||
|
@ -32,6 +32,3 @@ parseDate = parseTime (iso8601DateFormat Nothing)
|
|||
|
||||
parseDateTime :: String -> UTCTime
|
||||
parseDateTime = parseTime (iso8601DateFormat (Just "%H:%M:%S"))
|
||||
|
||||
mongodbHostEnvVariable :: String
|
||||
mongodbHostEnvVariable = "HASKELL_MONGODB_TEST_HOST"
|
||||
|
|
Loading…
Reference in a new issue