Merge pull request #50 from VictorDenisov/all_collections
All collections
This commit is contained in:
commit
726109588b
10 changed files with 136 additions and 32 deletions
36
.travis.yml
36
.travis.yml
|
@ -1,27 +1,47 @@
|
|||
# See https://github.com/hvr/multi-ghc-travis for more information.
|
||||
|
||||
services:
|
||||
- mongodb
|
||||
|
||||
env:
|
||||
# We use CABALVER=1.22 everywhere because it uses the flag --enable-coverage
|
||||
# instead of --enable-library-coverage used by older versions.
|
||||
- GHCVER=7.6.3 CABALVER=1.22
|
||||
- GHCVER=7.8.4 CABALVER=1.22
|
||||
- GHCVER=7.10.1 CABALVER=1.22
|
||||
- GHCVER=head CABALVER=head
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=2.4.14
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.4.14
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=2.4.14
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=2.6.12
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=2.6.12
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=2.6.12
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=3.0.12
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=3.0.12
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=3.0.12
|
||||
- GHCVER=7.6.3 CABALVER=1.22 MONGO=3.2.6
|
||||
- GHCVER=7.8.4 CABALVER=1.22 MONGO=3.2.6
|
||||
- GHCVER=7.10.1 CABALVER=1.22 MONGO=3.2.6
|
||||
- GHCVER=head CABALVER=head MONGO=3.2.6
|
||||
|
||||
matrix:
|
||||
allow_failures:
|
||||
# The text here should match the last line above exactly.
|
||||
- env: GHCVER=head CABALVER=head
|
||||
- env: GHCVER=head CABALVER=head MONGO=3.2.6
|
||||
|
||||
before_install:
|
||||
|
||||
- travis_retry sudo add-apt-repository -y ppa:hvr/ghc
|
||||
- travis_retry sudo apt-get update
|
||||
- travis_retry sudo apt-get install cabal-install-$CABALVER ghc-$GHCVER
|
||||
- export PATH=$HOME/.cabal/bin:/opt/ghc/$GHCVER/bin:/opt/cabal/$CABALVER/bin:$PATH
|
||||
- cabal --version
|
||||
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927
|
||||
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
|
||||
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list
|
||||
- echo "deb http://repo.mongodb.org/apt/ubuntu precise/mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.0.list
|
||||
- echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
|
||||
- sudo apt-get update
|
||||
- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo apt-get install mongodb-10gen=$MONGO; else sudo apt-get install -y mongodb-org=$MONGO mongodb-org-server=$MONGO mongodb-org-shell=$MONGO mongodb-org-tools=$MONGO; fi
|
||||
- ls /etc/init.d
|
||||
- if [[ ${MONGO:0:3} == "2.4" ]]; then sudo service mongodb start; fi
|
||||
- sleep 15 #mongo may not be responded directly. See http://docs.travis-ci.com/user/database-setup/#MongoDB
|
||||
- ps axf | grep mongo
|
||||
- netstat -apn
|
||||
- mongo --version
|
||||
|
||||
install:
|
||||
- travis_retry cabal update
|
||||
|
|
|
@ -10,6 +10,9 @@ This project adheres to [Package Versioning Policy](https://wiki.haskell.org/Pac
|
|||
### Removed
|
||||
- System.IO.Pipeline module
|
||||
|
||||
### Fixed
|
||||
- allCollections request for mongo versions above 3.0
|
||||
|
||||
## [2.0.10] - 2015-12-22
|
||||
|
||||
### Fixed
|
||||
|
|
|
@ -2,6 +2,12 @@
|
|||
|
||||
{-# LANGUAGE CPP, OverloadedStrings, ScopedTypeVariables, TupleSections #-}
|
||||
|
||||
#if (__GLASGOW_HASKELL__ >= 706)
|
||||
{-# LANGUAGE RecursiveDo #-}
|
||||
#else
|
||||
{-# LANGUAGE DoRec #-}
|
||||
#endif
|
||||
|
||||
module Database.MongoDB.Connection (
|
||||
-- * Util
|
||||
Secs,
|
||||
|
@ -46,7 +52,7 @@ import Database.MongoDB.Internal.Protocol (Pipe, newPipe, close, isClosed)
|
|||
import Database.MongoDB.Internal.Util (untilSuccess, liftIOE,
|
||||
updateAssocs, shuffle, mergesortM)
|
||||
import Database.MongoDB.Query (Command, Failure(ConnectionFailure), access,
|
||||
slaveOk, runCommand)
|
||||
slaveOk, runCommand, retrieveServerData)
|
||||
|
||||
adminCommand :: Command -> Pipe -> IO Document
|
||||
-- ^ Run command against admin database on server connected to pipe. Fail if connection fails.
|
||||
|
@ -113,7 +119,10 @@ connect' :: Secs -> Host -> IO Pipe
|
|||
connect' timeoutSecs (Host hostname port) = do
|
||||
mh <- timeout (round $ timeoutSecs * 1000000) (connectTo hostname port)
|
||||
handle <- maybe (ioError $ userError "connect timed out") return mh
|
||||
newPipe handle
|
||||
rec
|
||||
p <- newPipe sd handle
|
||||
sd <- access p slaveOk "admin" retrieveServerData
|
||||
return p
|
||||
|
||||
-- * Replica Set
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ module Database.MongoDB.Internal.Protocol (
|
|||
Reply(..), ResponseFlag(..),
|
||||
-- * Authentication
|
||||
Username, Password, Nonce, pwHash, pwKey,
|
||||
isClosed, close
|
||||
isClosed, close, ServerData(..), Pipeline(..)
|
||||
) where
|
||||
|
||||
#if !MIN_VERSION_base(4,8,0)
|
||||
|
@ -83,15 +83,22 @@ mkWeakMVar = addMVarFinalizer
|
|||
-- * Pipeline
|
||||
|
||||
-- | Thread-safe and pipelined connection
|
||||
data Pipeline = Pipeline {
|
||||
vStream :: MVar Transport, -- ^ Mutex on handle, so only one thread at a time can write to it
|
||||
responseQueue :: Chan (MVar (Either IOError Response)), -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
|
||||
listenThread :: ThreadId
|
||||
data Pipeline = Pipeline
|
||||
{ vStream :: MVar Transport -- ^ Mutex on handle, so only one thread at a time can write to it
|
||||
, responseQueue :: Chan (MVar (Either IOError Response)) -- ^ Queue of threads waiting for responses. Every time a response arrive we pop the next thread and give it the response.
|
||||
, listenThread :: ThreadId
|
||||
, serverData :: ServerData
|
||||
}
|
||||
|
||||
data ServerData = ServerData
|
||||
{ isMaster :: Bool
|
||||
, minWireVersion :: Int
|
||||
, maxWireVersion :: Int
|
||||
}
|
||||
|
||||
-- | Create new Pipeline over given handle. You should 'close' pipeline when finished, which will also close handle. If pipeline is not closed but eventually garbage collected, it will be closed along with handle.
|
||||
newPipeline :: Transport -> IO Pipeline
|
||||
newPipeline stream = do
|
||||
newPipeline :: ServerData -> Transport -> IO Pipeline
|
||||
newPipeline serverData stream = do
|
||||
vStream <- newMVar stream
|
||||
responseQueue <- newChan
|
||||
rec
|
||||
|
@ -150,13 +157,13 @@ pcall p@Pipeline{..} message = withMVar vStream doCall `onException` close p wh
|
|||
type Pipe = Pipeline
|
||||
-- ^ Thread-safe TCP connection with pipelined requests
|
||||
|
||||
newPipe :: Handle -> IO Pipe
|
||||
newPipe :: ServerData -> Handle -> IO Pipe
|
||||
-- ^ Create pipe over handle
|
||||
newPipe handle = T.fromHandle handle >>= newPipeWith
|
||||
newPipe sd handle = T.fromHandle handle >>= (newPipeWith sd)
|
||||
|
||||
newPipeWith :: Transport -> IO Pipe
|
||||
newPipeWith :: ServerData -> Transport -> IO Pipe
|
||||
-- ^ Create pipe over connection
|
||||
newPipeWith conn = newPipeline conn
|
||||
newPipeWith sd conn = newPipeline sd conn
|
||||
|
||||
send :: Pipe -> [Notice] -> IO ()
|
||||
-- ^ Send notices as a contiguous batch to server with no reply. Throw IOError if connection fails.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
-- | Query and update documents
|
||||
|
||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable #-}
|
||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables #-}
|
||||
|
||||
module Database.MongoDB.Query (
|
||||
-- * Monad
|
||||
|
@ -42,13 +42,13 @@ module Database.MongoDB.Query (
|
|||
MRResult, mapReduce, runMR, runMR',
|
||||
-- * Command
|
||||
Command, runCommand, runCommand1,
|
||||
eval,
|
||||
eval, retrieveServerData
|
||||
) where
|
||||
|
||||
import Prelude hiding (lookup)
|
||||
import Control.Exception (Exception, throwIO)
|
||||
import Control.Monad (unless, replicateM, liftM)
|
||||
import Data.Int (Int32)
|
||||
import Data.Int (Int32, Int64)
|
||||
import Data.Maybe (listToMaybe, catMaybes, isNothing)
|
||||
import Data.Word (Word32)
|
||||
#if !MIN_VERSION_base(4,8,0)
|
||||
|
@ -72,7 +72,7 @@ import Control.Monad.Trans (MonadIO, liftIO)
|
|||
import Control.Monad.Trans.Control (MonadBaseControl(..))
|
||||
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
||||
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
||||
(=?))
|
||||
(=?), (!?), Val(..))
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
|
||||
|
@ -84,7 +84,7 @@ import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..),
|
|||
Request(GetMore, qOptions, qSkip,
|
||||
qFullCollection, qBatchSize,
|
||||
qSelector, qProjector),
|
||||
pwKey)
|
||||
pwKey, ServerData(..))
|
||||
import Database.MongoDB.Internal.Util (loop, liftIOE, true1, (<.>))
|
||||
import qualified Database.MongoDB.Internal.Protocol as P
|
||||
|
||||
|
@ -99,6 +99,7 @@ import qualified Crypto.MAC.HMAC as HMAC
|
|||
import Data.Bits (xor)
|
||||
import qualified Data.Map as Map
|
||||
import Text.Read (readMaybe)
|
||||
import Data.Maybe (fromMaybe)
|
||||
|
||||
#if !MIN_VERSION_base(4,6,0)
|
||||
--mkWeakMVar = addMVarFinalizer
|
||||
|
@ -296,6 +297,16 @@ parseSCRAM :: B.ByteString -> Map.Map B.ByteString B.ByteString
|
|||
parseSCRAM = Map.fromList . fmap cleanup . (fmap $ T.breakOn "=") . T.splitOn "," . T.pack . B.unpack
|
||||
where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2)
|
||||
|
||||
retrieveServerData :: (MonadIO m) => Action m ServerData
|
||||
retrieveServerData = do
|
||||
d <- runCommand1 "isMaster"
|
||||
let newSd = ServerData
|
||||
{ isMaster = (fromMaybe False $ lookup "ismaster" d)
|
||||
, minWireVersion = (fromMaybe 0 $ lookup "minWireVersion" d)
|
||||
, maxWireVersion = (fromMaybe 0 $ lookup "maxWireVersion" d)
|
||||
}
|
||||
return newSd
|
||||
|
||||
-- * Collection
|
||||
|
||||
type Collection = Text
|
||||
|
@ -304,9 +315,28 @@ type Collection = Text
|
|||
allCollections :: (MonadIO m, MonadBaseControl IO m) => Action m [Collection]
|
||||
-- ^ List all collections in this database
|
||||
allCollections = do
|
||||
p <- asks mongoPipe
|
||||
let sd = P.serverData p
|
||||
if (maxWireVersion sd <= 2)
|
||||
then do
|
||||
db <- thisDatabase
|
||||
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
|
||||
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
|
||||
else do
|
||||
r <- runCommand1 "listCollections"
|
||||
let curData = do
|
||||
(Doc curDoc) <- r !? "cursor"
|
||||
(curId :: Int64) <- curDoc !? "id"
|
||||
(curNs :: Text) <- curDoc !? "ns"
|
||||
(firstBatch :: [Value]) <- curDoc !? "firstBatch"
|
||||
return $ (curId, curNs, ((catMaybes (map cast' firstBatch)) :: [Document]))
|
||||
case curData of
|
||||
Nothing -> return []
|
||||
Just (curId, curNs, firstBatch) -> do
|
||||
db <- thisDatabase
|
||||
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch
|
||||
docs <- rest nc
|
||||
return $ catMaybes $ map (\d -> (d !? "name")) docs
|
||||
where
|
||||
dropDbPrefix = T.tail . T.dropWhile (/= '.')
|
||||
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
|
||||
|
|
|
@ -1,6 +1,13 @@
|
|||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
#if (__GLASGOW_HASKELL__ >= 706)
|
||||
{-# LANGUAGE RecursiveDo #-}
|
||||
#else
|
||||
{-# LANGUAGE DoRec #-}
|
||||
#endif
|
||||
|
||||
{-|
|
||||
Module : MongoDB TLS
|
||||
Description : TLS transport for mongodb
|
||||
|
@ -36,6 +43,7 @@ import System.IO.Error (mkIOError, eofErrorType)
|
|||
import Network (connectTo, HostName, PortID)
|
||||
import qualified Network.TLS as TLS
|
||||
import qualified Network.TLS.Extra.Cipher as TLS
|
||||
import Database.MongoDB.Query (access, slaveOk, retrieveServerData)
|
||||
|
||||
-- | Connect to mongodb using TLS
|
||||
connect :: HostName -> PortID -> IO Pipe
|
||||
|
@ -51,7 +59,10 @@ connect host port = bracketOnError (connectTo host port) hClose $ \handle -> do
|
|||
TLS.handshake context
|
||||
|
||||
conn <- tlsConnection context
|
||||
newPipeWith conn
|
||||
rec
|
||||
p <- newPipeWith sd conn
|
||||
sd <- access p slaveOk "admin" retrieveServerData
|
||||
return p
|
||||
|
||||
tlsConnection :: TLS.Context -> IO Transport
|
||||
tlsConnection ctx = do
|
||||
|
|
|
@ -62,7 +62,7 @@ Source-repository head
|
|||
|
||||
test-suite test
|
||||
hs-source-dirs: test
|
||||
main-is: Spec.hs
|
||||
main-is: Main.hs
|
||||
ghc-options: -Wall -with-rtsopts "-K32m"
|
||||
type: exitcode-stdio-1.0
|
||||
build-depends: mongoDB
|
||||
|
|
15
test/Main.hs
Normal file
15
test/Main.hs
Normal file
|
@ -0,0 +1,15 @@
|
|||
module Main where
|
||||
|
||||
import Database.MongoDB.Admin (serverVersion)
|
||||
import Database.MongoDB.Connection (connect, host)
|
||||
import Database.MongoDB.Query (access, slaveOk)
|
||||
import Data.Text (unpack)
|
||||
import Test.Hspec.Runner
|
||||
import qualified Spec
|
||||
|
||||
main :: IO ()
|
||||
main = do
|
||||
p <- connect $ host "localhost"
|
||||
version <- access p slaveOk "admin" serverVersion
|
||||
putStrLn $ "Running tests with mongodb version: " ++ (unpack version)
|
||||
hspecWith defaultConfig Spec.spec
|
|
@ -4,6 +4,7 @@
|
|||
module QuerySpec (spec) where
|
||||
import TestImport
|
||||
import Control.Exception
|
||||
import qualified Data.List as L
|
||||
|
||||
import qualified Data.Text as T
|
||||
|
||||
|
@ -147,6 +148,14 @@ spec = around withCleanDatabase $ do
|
|||
|
||||
liftIO $ (length returnedDocs) `shouldBe` 6001
|
||||
|
||||
describe "allCollections" $ do
|
||||
it "returns all collections in a database" $ do
|
||||
_ <- db $ insert "team1" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team2" ["name" =: "Yankees", "league" =: "American"]
|
||||
_ <- db $ insert "team3" ["name" =: "Yankees", "league" =: "American"]
|
||||
collections <- db $ allCollections
|
||||
liftIO $ (L.sort collections) `shouldContain` ["team1", "team2", "team3"]
|
||||
|
||||
describe "aggregate" $ do
|
||||
it "aggregates to normalize and sort documents" $ do
|
||||
db $ insertAll_ "users" [ ["_id" =: "jane", "joined" =: parseDate "2011-03-02", "likes" =: ["golf", "racquetball"]]
|
||||
|
|
|
@ -1 +1 @@
|
|||
{-# OPTIONS_GHC -F -pgmF hspec-discover #-}
|
||||
{-# OPTIONS_GHC -F -pgmF hspec-discover -optF --module-name=Spec #-}
|
||||
|
|
Loading…
Reference in a new issue