Cleaning, updating codebase to avoid warnings/infos
Updating to avoid deprecated Producer/Consumer from `conduit`. Removed unused imports. Removing superfluous brackets. Simplifying a few function bodies with catMaybe, fromMaybe, mapMaybe.
This commit is contained in:
parent
5980bc18b2
commit
727bdef020
9 changed files with 238 additions and 214 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -2,3 +2,5 @@ dist/
|
||||||
cabal.sandbox.config
|
cabal.sandbox.config
|
||||||
.cabal-sandbox/
|
.cabal-sandbox/
|
||||||
.stack-work/
|
.stack-work/
|
||||||
|
dist-newstyle/*
|
||||||
|
!dist-newstyle/config
|
|
@ -33,7 +33,6 @@ import Control.Applicative ((<$>))
|
||||||
#endif
|
#endif
|
||||||
import Control.Concurrent (forkIO, threadDelay)
|
import Control.Concurrent (forkIO, threadDelay)
|
||||||
import Control.Monad (forever, unless, liftM)
|
import Control.Monad (forever, unless, liftM)
|
||||||
import Control.Monad.Fail(MonadFail)
|
|
||||||
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
|
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
|
||||||
import Data.Maybe (maybeToList)
|
import Data.Maybe (maybeToList)
|
||||||
import Data.Set (Set)
|
import Data.Set (Set)
|
||||||
|
|
|
@ -32,7 +32,6 @@ import Control.Applicative ((<$>))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
import Control.Monad (forM_, guard)
|
import Control.Monad (forM_, guard)
|
||||||
import Control.Monad.Fail(MonadFail)
|
|
||||||
import System.IO.Unsafe (unsafePerformIO)
|
import System.IO.Unsafe (unsafePerformIO)
|
||||||
import System.Timeout (timeout)
|
import System.Timeout (timeout)
|
||||||
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof,
|
import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar, eof,
|
||||||
|
@ -40,7 +39,6 @@ import Text.ParserCombinators.Parsec (parse, many1, letter, digit, char, anyChar
|
||||||
import qualified Data.List as List
|
import qualified Data.List as List
|
||||||
|
|
||||||
|
|
||||||
import Control.Monad.Identity (runIdentity)
|
|
||||||
import Control.Monad.Except (throwError)
|
import Control.Monad.Except (throwError)
|
||||||
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
|
import Control.Concurrent.MVar.Lifted (MVar, newMVar, withMVar, modifyMVar,
|
||||||
readMVar)
|
readMVar)
|
||||||
|
@ -229,7 +227,7 @@ routedHost :: ((Host, Bool) -> (Host, Bool) -> IO Ordering) -> ReplicaSet -> IO
|
||||||
routedHost f rs = do
|
routedHost f rs = do
|
||||||
info <- updateMembers rs
|
info <- updateMembers rs
|
||||||
hosts <- shuffle (possibleHosts info)
|
hosts <- shuffle (possibleHosts info)
|
||||||
let addIsPrimary h = (h, if Just h == statedPrimary info then True else False)
|
let addIsPrimary h = (h, Just h == statedPrimary info)
|
||||||
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
|
hosts' <- mergesortM (\a b -> f (addIsPrimary a) (addIsPrimary b)) hosts
|
||||||
untilSuccess (connection rs Nothing) hosts'
|
untilSuccess (connection rs Nothing) hosts'
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
-- Author:
|
-- Author:
|
||||||
-- Brent Tubbs <brent.tubbs@gmail.com>
|
-- Brent Tubbs <brent.tubbs@gmail.com>
|
||||||
-- | MongoDB GridFS implementation
|
-- | MongoDB GridFS implementation
|
||||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, RankNTypes #-}
|
{-# LANGUAGE OverloadedStrings, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, TypeFamilies, CPP, RankNTypes #-}
|
||||||
|
|
||||||
module Database.MongoDB.GridFS
|
module Database.MongoDB.GridFS
|
||||||
( Bucket
|
( Bucket
|
||||||
|
@ -23,10 +23,8 @@ module Database.MongoDB.GridFS
|
||||||
)
|
)
|
||||||
where
|
where
|
||||||
|
|
||||||
import Control.Applicative((<$>))
|
|
||||||
|
|
||||||
import Control.Monad(when)
|
import Control.Monad(when)
|
||||||
import Control.Monad.Fail(MonadFail)
|
|
||||||
import Control.Monad.IO.Class
|
import Control.Monad.IO.Class
|
||||||
import Control.Monad.Trans(lift)
|
import Control.Monad.Trans(lift)
|
||||||
|
|
||||||
|
@ -64,7 +62,7 @@ openBucket :: (Monad m, MonadIO m) => Text -> Action m Bucket
|
||||||
openBucket name = do
|
openBucket name = do
|
||||||
let filesCollection = name `append` ".files"
|
let filesCollection = name `append` ".files"
|
||||||
let chunksCollection = name `append` ".chunks"
|
let chunksCollection = name `append` ".chunks"
|
||||||
ensureIndex $ (index filesCollection ["filename" =: (1::Int), "uploadDate" =: (1::Int)])
|
ensureIndex $ index filesCollection ["filename" =: (1::Int), "uploadDate" =: (1::Int)]
|
||||||
ensureIndex $ (index chunksCollection ["files_id" =: (1::Int), "n" =: (1::Int)]) { iUnique = True, iDropDups = True }
|
ensureIndex $ (index chunksCollection ["files_id" =: (1::Int), "n" =: (1::Int)]) { iUnique = True, iDropDups = True }
|
||||||
return $ Bucket filesCollection chunksCollection
|
return $ Bucket filesCollection chunksCollection
|
||||||
|
|
||||||
|
@ -72,9 +70,9 @@ data File = File {bucket :: Bucket, document :: Document}
|
||||||
|
|
||||||
getChunk :: (MonadFail m, MonadIO m) => File -> Int -> Action m (Maybe S.ByteString)
|
getChunk :: (MonadFail m, MonadIO m) => File -> Int -> Action m (Maybe S.ByteString)
|
||||||
-- ^ Get a chunk of a file
|
-- ^ Get a chunk of a file
|
||||||
getChunk (File bucket doc) i = do
|
getChunk (File _bucket doc) i = do
|
||||||
files_id <- B.look "_id" doc
|
files_id <- B.look "_id" doc
|
||||||
result <- findOne $ select ["files_id" := files_id, "n" =: i] $ chunks bucket
|
result <- findOne $ select ["files_id" := files_id, "n" =: i] $ chunks _bucket
|
||||||
let content = at "data" <$> result
|
let content = at "data" <$> result
|
||||||
case content of
|
case content of
|
||||||
Just (Binary b) -> return (Just b)
|
Just (Binary b) -> return (Just b)
|
||||||
|
@ -82,36 +80,36 @@ getChunk (File bucket doc) i = do
|
||||||
|
|
||||||
findFile :: MonadIO m => Bucket -> Selector -> Action m [File]
|
findFile :: MonadIO m => Bucket -> Selector -> Action m [File]
|
||||||
-- ^ Find files in the bucket
|
-- ^ Find files in the bucket
|
||||||
findFile bucket sel = do
|
findFile _bucket sel = do
|
||||||
cursor <- find $ select sel $ files bucket
|
cursor <- find $ select sel $ files _bucket
|
||||||
results <- rest cursor
|
results <- rest cursor
|
||||||
return $ File bucket <$> results
|
return $ File _bucket <$> results
|
||||||
|
|
||||||
findOneFile :: MonadIO m => Bucket -> Selector -> Action m (Maybe File)
|
findOneFile :: MonadIO m => Bucket -> Selector -> Action m (Maybe File)
|
||||||
-- ^ Find one file in the bucket
|
-- ^ Find one file in the bucket
|
||||||
findOneFile bucket sel = do
|
findOneFile _bucket sel = do
|
||||||
mdoc <- findOne $ select sel $ files bucket
|
mdoc <- findOne $ select sel $ files _bucket
|
||||||
return $ File bucket <$> mdoc
|
return $ File _bucket <$> mdoc
|
||||||
|
|
||||||
fetchFile :: MonadIO m => Bucket -> Selector -> Action m File
|
fetchFile :: MonadIO m => Bucket -> Selector -> Action m File
|
||||||
-- ^ Fetch one file in the bucket
|
-- ^ Fetch one file in the bucket
|
||||||
fetchFile bucket sel = do
|
fetchFile _bucket sel = do
|
||||||
doc <- fetch $ select sel $ files bucket
|
doc <- fetch $ select sel $ files _bucket
|
||||||
return $ File bucket doc
|
return $ File _bucket doc
|
||||||
|
|
||||||
deleteFile :: (MonadIO m, MonadFail m) => File -> Action m ()
|
deleteFile :: (MonadIO m, MonadFail m) => File -> Action m ()
|
||||||
-- ^ Delete files in the bucket
|
-- ^ Delete files in the bucket
|
||||||
deleteFile (File bucket doc) = do
|
deleteFile (File _bucket doc) = do
|
||||||
files_id <- B.look "_id" doc
|
files_id <- B.look "_id" doc
|
||||||
delete $ select ["_id" := files_id] $ files bucket
|
delete $ select ["_id" := files_id] $ files _bucket
|
||||||
delete $ select ["files_id" := files_id] $ chunks bucket
|
delete $ select ["files_id" := files_id] $ chunks _bucket
|
||||||
|
|
||||||
putChunk :: (Monad m, MonadIO m) => Bucket -> ObjectId -> Int -> L.ByteString -> Action m ()
|
putChunk :: (Monad m, MonadIO m) => Bucket -> ObjectId -> Int -> L.ByteString -> Action m ()
|
||||||
-- ^ Put a chunk in the bucket
|
-- ^ Put a chunk in the bucket
|
||||||
putChunk bucket files_id i chunk = do
|
putChunk _bucket files_id i chunk = do
|
||||||
insert_ (chunks bucket) ["files_id" =: files_id, "n" =: i, "data" =: Binary (L.toStrict chunk)]
|
insert_ (chunks _bucket) ["files_id" =: files_id, "n" =: i, "data" =: Binary (L.toStrict chunk)]
|
||||||
|
|
||||||
sourceFile :: (MonadFail m, MonadIO m) => File -> Producer (Action m) S.ByteString
|
sourceFile :: (MonadFail m, MonadIO m) => File -> ConduitT File S.ByteString (Action m) ()
|
||||||
-- ^ A producer for the contents of a file
|
-- ^ A producer for the contents of a file
|
||||||
sourceFile file = yieldChunk 0 where
|
sourceFile file = yieldChunk 0 where
|
||||||
yieldChunk i = do
|
yieldChunk i = do
|
||||||
|
@ -134,19 +132,19 @@ data FileWriter = FileWriter
|
||||||
|
|
||||||
-- Finalize file, calculating md5 digest, saving the last chunk, and creating the file in the bucket
|
-- Finalize file, calculating md5 digest, saving the last chunk, and creating the file in the bucket
|
||||||
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
|
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
|
||||||
finalizeFile filename (FileWriter chunkSize bucket files_id i size acc md5context md5acc) = do
|
finalizeFile filename (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) = do
|
||||||
let md5digest = finalizeMD5 md5context (L.toStrict md5acc)
|
let md5digest = finalizeMD5 md5context (L.toStrict md5acc)
|
||||||
when (L.length acc > 0) $ putChunk bucket files_id i acc
|
when (L.length acc > 0) $ putChunk _bucket files_id i acc
|
||||||
currentTimestamp <- liftIO $ getCurrentTime
|
currentTimestamp <- liftIO getCurrentTime
|
||||||
let doc = [ "_id" =: files_id
|
let doc = [ "_id" =: files_id
|
||||||
, "length" =: size
|
, "length" =: size
|
||||||
, "uploadDate" =: currentTimestamp
|
, "uploadDate" =: currentTimestamp
|
||||||
, "md5" =: show (md5digest)
|
, "md5" =: show md5digest
|
||||||
, "chunkSize" =: chunkSize
|
, "chunkSize" =: chunkSize
|
||||||
, "filename" =: filename
|
, "filename" =: filename
|
||||||
]
|
]
|
||||||
insert_ (files bucket) doc
|
insert_ (files _bucket) doc
|
||||||
return $ File bucket doc
|
return $ File _bucket doc
|
||||||
|
|
||||||
-- finalize the remainder and return the MD5Digest.
|
-- finalize the remainder and return the MD5Digest.
|
||||||
finalizeMD5 :: MD5Context -> S.ByteString -> MD5Digest
|
finalizeMD5 :: MD5Context -> S.ByteString -> MD5Digest
|
||||||
|
@ -160,7 +158,7 @@ finalizeMD5 ctx remainder =
|
||||||
|
|
||||||
-- Write as many chunks as can be written from the file writer
|
-- Write as many chunks as can be written from the file writer
|
||||||
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
|
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
|
||||||
writeChunks (FileWriter chunkSize bucket files_id i size acc md5context md5acc) chunk = do
|
writeChunks (FileWriter chunkSize _bucket files_id i size acc md5context md5acc) chunk = do
|
||||||
-- Update md5 context
|
-- Update md5 context
|
||||||
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
|
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
|
||||||
let md5acc_temp = (md5acc `L.append` chunk)
|
let md5acc_temp = (md5acc `L.append` chunk)
|
||||||
|
@ -174,17 +172,17 @@ writeChunks (FileWriter chunkSize bucket files_id i size acc md5context md5acc)
|
||||||
let size' = (size + L.length chunk)
|
let size' = (size + L.length chunk)
|
||||||
let acc_temp = (acc `L.append` chunk)
|
let acc_temp = (acc `L.append` chunk)
|
||||||
if (L.length acc_temp < chunkSize)
|
if (L.length acc_temp < chunkSize)
|
||||||
then return (FileWriter chunkSize bucket files_id i size' acc_temp md5context' md5acc')
|
then return (FileWriter chunkSize _bucket files_id i size' acc_temp md5context' md5acc')
|
||||||
else do
|
else do
|
||||||
let (newChunk, acc') = L.splitAt chunkSize acc_temp
|
let (newChunk, acc') = L.splitAt chunkSize acc_temp
|
||||||
putChunk bucket files_id i newChunk
|
putChunk _bucket files_id i newChunk
|
||||||
writeChunks (FileWriter chunkSize bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
|
writeChunks (FileWriter chunkSize _bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
|
||||||
|
|
||||||
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> Consumer S.ByteString (Action m) File
|
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> ConduitT S.ByteString () (Action m) File
|
||||||
-- ^ A consumer that creates a file in the bucket and puts all consumed data in it
|
-- ^ A consumer that creates a file in the bucket and puts all consumed data in it
|
||||||
sinkFile bucket filename = do
|
sinkFile _bucket filename = do
|
||||||
files_id <- liftIO $ genObjectId
|
files_id <- liftIO $ genObjectId
|
||||||
awaitChunk $ FileWriter defaultChunkSize bucket files_id 0 0 L.empty md5InitialContext L.empty
|
awaitChunk $ FileWriter defaultChunkSize _bucket files_id 0 0 L.empty md5InitialContext L.empty
|
||||||
where
|
where
|
||||||
awaitChunk fw = do
|
awaitChunk fw = do
|
||||||
mchunk <- await
|
mchunk <- await
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
-- | Compatibility layer for network package, including newtype 'PortID'
|
-- | Compatibility layer for network package, including newtype 'PortID'
|
||||||
{-# LANGUAGE CPP, GeneralizedNewtypeDeriving, OverloadedStrings #-}
|
{-# LANGUAGE CPP, OverloadedStrings #-}
|
||||||
|
|
||||||
module Database.MongoDB.Internal.Network (Host(..), PortID(..), N.HostName, connectTo,
|
module Database.MongoDB.Internal.Network (Host(..), PortID(..), N.HostName, connectTo,
|
||||||
lookupReplicaSetName, lookupSeedList) where
|
lookupReplicaSetName, lookupSeedList) where
|
||||||
|
|
||||||
|
|
||||||
#if !MIN_VERSION_network(2, 9, 0)
|
#if !MIN_VERSION_network(2, 9, 0)
|
||||||
|
|
||||||
import qualified Network as N
|
import qualified Network as N
|
||||||
|
@ -20,7 +19,7 @@ import System.IO (Handle, IOMode(ReadWriteMode))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
import Data.ByteString.Char8 (pack, unpack)
|
import Data.ByteString.Char8 (pack, unpack)
|
||||||
import Data.List (dropWhileEnd, lookup)
|
import Data.List (dropWhileEnd)
|
||||||
import Data.Maybe (fromMaybe)
|
import Data.Maybe (fromMaybe)
|
||||||
import Data.Text (Text)
|
import Data.Text (Text)
|
||||||
import Network.DNS.Lookup (lookupSRV, lookupTXT)
|
import Network.DNS.Lookup (lookupSRV, lookupTXT)
|
||||||
|
@ -60,7 +59,7 @@ connectTo hostname (PortNumber port) = do
|
||||||
proto <- BSD.getProtocolNumber "tcp"
|
proto <- BSD.getProtocolNumber "tcp"
|
||||||
bracketOnError
|
bracketOnError
|
||||||
(N.socket N.AF_INET N.Stream proto)
|
(N.socket N.AF_INET N.Stream proto)
|
||||||
(N.close) -- only done if there's an error
|
N.close -- only done if there's an error
|
||||||
(\sock -> do
|
(\sock -> do
|
||||||
he <- BSD.getHostByName hostname
|
he <- BSD.getHostByName hostname
|
||||||
N.connect sock (N.SockAddrInet port (hostAddress he))
|
N.connect sock (N.SockAddrInet port (hostAddress he))
|
||||||
|
@ -71,7 +70,7 @@ connectTo hostname (PortNumber port) = do
|
||||||
connectTo _ (UnixSocket path) = do
|
connectTo _ (UnixSocket path) = do
|
||||||
bracketOnError
|
bracketOnError
|
||||||
(N.socket N.AF_UNIX N.Stream 0)
|
(N.socket N.AF_UNIX N.Stream 0)
|
||||||
(N.close)
|
N.close
|
||||||
(\sock -> do
|
(\sock -> do
|
||||||
N.connect sock (N.SockAddrUnix path)
|
N.connect sock (N.SockAddrUnix path)
|
||||||
N.socketToHandle sock ReadWriteMode
|
N.socketToHandle sock ReadWriteMode
|
||||||
|
|
|
@ -4,8 +4,8 @@
|
||||||
-- This module is not intended for direct use. Use the high-level interface at
|
-- This module is not intended for direct use. Use the high-level interface at
|
||||||
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.
|
-- "Database.MongoDB.Query" and "Database.MongoDB.Connection" instead.
|
||||||
|
|
||||||
{-# LANGUAGE RecordWildCards, StandaloneDeriving, OverloadedStrings #-}
|
{-# LANGUAGE RecordWildCards, OverloadedStrings #-}
|
||||||
{-# LANGUAGE CPP, FlexibleContexts, TupleSections, TypeSynonymInstances #-}
|
{-# LANGUAGE CPP, FlexibleContexts #-}
|
||||||
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
|
{-# LANGUAGE MultiParamTypeClasses, FlexibleInstances, UndecidableInstances #-}
|
||||||
{-# LANGUAGE BangPatterns #-}
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ module Database.MongoDB.Internal.Protocol (
|
||||||
#if !MIN_VERSION_base(4,8,0)
|
#if !MIN_VERSION_base(4,8,0)
|
||||||
import Control.Applicative ((<$>))
|
import Control.Applicative ((<$>))
|
||||||
#endif
|
#endif
|
||||||
import Control.Monad (forM, replicateM, unless)
|
import Control.Monad ( forM, replicateM, unless, forever )
|
||||||
import Data.Binary.Get (Get, runGet)
|
import Data.Binary.Get (Get, runGet)
|
||||||
import Data.Binary.Put (Put, runPut)
|
import Data.Binary.Put (Put, runPut)
|
||||||
import Data.Bits (bit, testBit)
|
import Data.Bits (bit, testBit)
|
||||||
|
@ -46,7 +46,6 @@ import System.IO.Error (doesNotExistErrorType, mkIOError)
|
||||||
import System.IO.Unsafe (unsafePerformIO)
|
import System.IO.Unsafe (unsafePerformIO)
|
||||||
import Data.Maybe (maybeToList)
|
import Data.Maybe (maybeToList)
|
||||||
import GHC.Conc (ThreadStatus(..), threadStatus)
|
import GHC.Conc (ThreadStatus(..), threadStatus)
|
||||||
import Control.Monad (forever)
|
|
||||||
import Control.Monad.STM (atomically)
|
import Control.Monad.STM (atomically)
|
||||||
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
|
import Control.Concurrent (ThreadId, killThread, forkIOWithUnmask)
|
||||||
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)
|
import Control.Concurrent.STM.TChan (TChan, newTChan, readTChan, writeTChan, isEmptyTChan)
|
||||||
|
@ -70,6 +69,7 @@ import Database.MongoDB.Internal.Util (bitOr, byteStringHex)
|
||||||
import Database.MongoDB.Transport (Transport)
|
import Database.MongoDB.Transport (Transport)
|
||||||
import qualified Database.MongoDB.Transport as Tr
|
import qualified Database.MongoDB.Transport as Tr
|
||||||
|
|
||||||
|
|
||||||
#if MIN_VERSION_base(4,6,0)
|
#if MIN_VERSION_base(4,6,0)
|
||||||
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
import Control.Concurrent.MVar.Lifted (MVar, newEmptyMVar, newMVar, withMVar,
|
||||||
putMVar, readMVar, mkWeakMVar, isEmptyMVar)
|
putMVar, readMVar, mkWeakMVar, isEmptyMVar)
|
||||||
|
@ -83,6 +83,7 @@ mkWeakMVar :: MVar a -> IO () -> IO ()
|
||||||
mkWeakMVar = addMVarFinalizer
|
mkWeakMVar = addMVarFinalizer
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
-- * Pipeline
|
-- * Pipeline
|
||||||
|
|
||||||
-- | Thread-safe and pipelined connection
|
-- | Thread-safe and pipelined connection
|
||||||
|
@ -270,6 +271,7 @@ type ResponseTo = RequestId
|
||||||
|
|
||||||
genRequestId :: (MonadIO m) => m RequestId
|
genRequestId :: (MonadIO m) => m RequestId
|
||||||
-- ^ Generate fresh request id
|
-- ^ Generate fresh request id
|
||||||
|
{-# NOINLINE genRequestId #-}
|
||||||
genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where
|
genRequestId = liftIO $ atomicModifyIORef counter $ \n -> (n + 1, n) where
|
||||||
counter :: IORef RequestId
|
counter :: IORef RequestId
|
||||||
counter = unsafePerformIO (newIORef 0)
|
counter = unsafePerformIO (newIORef 0)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
-- | Query and update documents
|
-- | Query and update documents
|
||||||
|
|
||||||
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, GeneralizedNewtypeDeriving, StandaloneDeriving, TypeSynonymInstances, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables, BangPatterns #-}
|
{-# LANGUAGE OverloadedStrings, RecordWildCards, NamedFieldPuns, TupleSections, FlexibleContexts, FlexibleInstances, UndecidableInstances, MultiParamTypeClasses, TypeFamilies, CPP, DeriveDataTypeable, ScopedTypeVariables, BangPatterns #-}
|
||||||
|
|
||||||
module Database.MongoDB.Query (
|
module Database.MongoDB.Query (
|
||||||
-- * Monad
|
-- * Monad
|
||||||
|
@ -46,69 +46,92 @@ module Database.MongoDB.Query (
|
||||||
eval, retrieveServerData, ServerData(..)
|
eval, retrieveServerData, ServerData(..)
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Prelude hiding (lookup)
|
|
||||||
import Control.Exception (Exception, throwIO)
|
|
||||||
import Control.Monad (unless, replicateM, liftM, liftM2)
|
|
||||||
import Control.Monad.Fail(MonadFail)
|
|
||||||
import Data.Default.Class (Default(..))
|
|
||||||
import Data.Int (Int32, Int64)
|
|
||||||
import Data.Either (lefts, rights)
|
|
||||||
import Data.List (foldl1')
|
|
||||||
import Data.Maybe (listToMaybe, catMaybes, isNothing)
|
|
||||||
import Data.Word (Word32)
|
|
||||||
#if !MIN_VERSION_base(4,8,0)
|
|
||||||
import Data.Monoid (mappend)
|
|
||||||
#endif
|
|
||||||
import Data.Typeable (Typeable)
|
|
||||||
import System.Mem.Weak (Weak)
|
|
||||||
|
|
||||||
import qualified Control.Concurrent.MVar as MV
|
import qualified Control.Concurrent.MVar as MV
|
||||||
#if MIN_VERSION_base(4,6,0)
|
import Control.Concurrent.MVar.Lifted
|
||||||
import Control.Concurrent.MVar.Lifted (MVar,
|
( MVar,
|
||||||
readMVar)
|
readMVar,
|
||||||
#else
|
)
|
||||||
import Control.Concurrent.MVar.Lifted (MVar, addMVarFinalizer,
|
import Control.Exception (Exception, catch, throwIO)
|
||||||
readMVar)
|
import Control.Monad
|
||||||
#endif
|
( liftM2,
|
||||||
import Control.Applicative ((<$>))
|
replicateM,
|
||||||
import Control.Exception (catch)
|
unless,
|
||||||
import Control.Monad (when, void)
|
void,
|
||||||
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask, asks, local)
|
when,
|
||||||
|
)
|
||||||
|
import Control.Monad.Reader (MonadReader, ReaderT, ask, asks, local, runReaderT)
|
||||||
import Control.Monad.Trans (MonadIO, liftIO)
|
import Control.Monad.Trans (MonadIO, liftIO)
|
||||||
import Data.Binary.Put (runPut)
|
|
||||||
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
|
||||||
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
|
||||||
(=?), (!?), Val(..), ObjectId, Value(..))
|
|
||||||
import Data.Bson.Binary (putDocument)
|
|
||||||
import Data.Text (Text)
|
|
||||||
import qualified Data.Text as T
|
|
||||||
|
|
||||||
import Database.MongoDB.Internal.Protocol (Reply(..), QueryOption(..),
|
|
||||||
ResponseFlag(..), InsertOption(..),
|
|
||||||
UpdateOption(..), DeleteOption(..),
|
|
||||||
CursorId, FullCollection, Username,
|
|
||||||
Password, Pipe, Notice(..),
|
|
||||||
Request(GetMore, qOptions, qSkip,
|
|
||||||
qFullCollection, qBatchSize,
|
|
||||||
qSelector, qProjector),
|
|
||||||
pwKey, ServerData(..))
|
|
||||||
import Database.MongoDB.Internal.Util (loop, liftIOE, true1, (<.>))
|
|
||||||
import qualified Database.MongoDB.Internal.Protocol as P
|
|
||||||
|
|
||||||
import qualified Crypto.Nonce as Nonce
|
|
||||||
import qualified Data.ByteString as BS
|
|
||||||
import qualified Data.ByteString.Lazy as LBS
|
|
||||||
import qualified Data.ByteString.Base16 as B16
|
|
||||||
import qualified Data.ByteString.Base64 as B64
|
|
||||||
import qualified Data.ByteString.Char8 as B
|
|
||||||
import qualified Data.Either as E
|
|
||||||
import qualified Crypto.Hash.MD5 as MD5
|
import qualified Crypto.Hash.MD5 as MD5
|
||||||
import qualified Crypto.Hash.SHA1 as SHA1
|
import qualified Crypto.Hash.SHA1 as SHA1
|
||||||
import qualified Crypto.MAC.HMAC as HMAC
|
import qualified Crypto.MAC.HMAC as HMAC
|
||||||
|
import qualified Crypto.Nonce as Nonce
|
||||||
|
import Data.Binary.Put (runPut)
|
||||||
import Data.Bits (xor)
|
import Data.Bits (xor)
|
||||||
|
import Data.Bson
|
||||||
|
( Document,
|
||||||
|
Field (..),
|
||||||
|
Javascript,
|
||||||
|
Label,
|
||||||
|
ObjectId,
|
||||||
|
Val (..),
|
||||||
|
Value (..),
|
||||||
|
at,
|
||||||
|
genObjectId,
|
||||||
|
look,
|
||||||
|
lookup,
|
||||||
|
valueAt,
|
||||||
|
(!?),
|
||||||
|
(=:),
|
||||||
|
(=?),
|
||||||
|
)
|
||||||
|
import Data.Bson.Binary (putDocument)
|
||||||
|
import qualified Data.ByteString as BS
|
||||||
|
import qualified Data.ByteString.Base16 as B16
|
||||||
|
import qualified Data.ByteString.Base64 as B64
|
||||||
|
import qualified Data.ByteString.Char8 as B
|
||||||
|
import qualified Data.ByteString.Lazy as LBS
|
||||||
|
import Data.Default.Class (Default (..))
|
||||||
|
import Data.Either (lefts, rights)
|
||||||
|
import qualified Data.Either as E
|
||||||
|
import Data.Functor ((<&>))
|
||||||
|
import Data.Int (Int32, Int64)
|
||||||
|
import Data.List (foldl1')
|
||||||
import qualified Data.Map as Map
|
import qualified Data.Map as Map
|
||||||
|
import Data.Maybe (catMaybes, fromMaybe, isNothing, listToMaybe, mapMaybe)
|
||||||
|
import Data.Text (Text)
|
||||||
|
import qualified Data.Text as T
|
||||||
|
import Data.Typeable (Typeable)
|
||||||
|
import Data.Word (Word32)
|
||||||
|
import Database.MongoDB.Internal.Protocol
|
||||||
|
( CursorId,
|
||||||
|
DeleteOption (..),
|
||||||
|
FullCollection,
|
||||||
|
InsertOption (..),
|
||||||
|
Notice (..),
|
||||||
|
Password,
|
||||||
|
Pipe,
|
||||||
|
QueryOption (..),
|
||||||
|
Reply (..),
|
||||||
|
Request
|
||||||
|
( GetMore,
|
||||||
|
qBatchSize,
|
||||||
|
qFullCollection,
|
||||||
|
qOptions,
|
||||||
|
qProjector,
|
||||||
|
qSelector,
|
||||||
|
qSkip
|
||||||
|
),
|
||||||
|
ResponseFlag (..),
|
||||||
|
ServerData (..),
|
||||||
|
UpdateOption (..),
|
||||||
|
Username,
|
||||||
|
pwKey,
|
||||||
|
)
|
||||||
|
import qualified Database.MongoDB.Internal.Protocol as P
|
||||||
|
import Database.MongoDB.Internal.Util (liftIOE, loop, true1, (<.>))
|
||||||
|
import System.Mem.Weak (Weak)
|
||||||
import Text.Read (readMaybe)
|
import Text.Read (readMaybe)
|
||||||
import Data.Maybe (fromMaybe)
|
import Prelude hiding (lookup)
|
||||||
|
|
||||||
-- * Monad
|
-- * Monad
|
||||||
|
|
||||||
|
@ -185,7 +208,7 @@ slaveOk = ReadStaleOk
|
||||||
|
|
||||||
accessMode :: (Monad m) => AccessMode -> Action m a -> Action m a
|
accessMode :: (Monad m) => AccessMode -> Action m a -> Action m a
|
||||||
-- ^ Run action with given 'AccessMode'
|
-- ^ Run action with given 'AccessMode'
|
||||||
accessMode mode act = local (\ctx -> ctx {mongoAccessMode = mode}) act
|
accessMode mode = local (\ctx -> ctx {mongoAccessMode = mode})
|
||||||
|
|
||||||
readMode :: AccessMode -> ReadMode
|
readMode :: AccessMode -> ReadMode
|
||||||
readMode ReadStaleOk = StaleOk
|
readMode ReadStaleOk = StaleOk
|
||||||
|
@ -227,7 +250,7 @@ type Database = Text
|
||||||
|
|
||||||
allDatabases :: (MonadIO m) => Action m [Database]
|
allDatabases :: (MonadIO m) => Action m [Database]
|
||||||
-- ^ List all databases residing on server
|
-- ^ List all databases residing on server
|
||||||
allDatabases = (map (at "name") . at "databases") `liftM` useDb "admin" (runCommand1 "listDatabases")
|
allDatabases = map (at "name") . at "databases" <$> useDb "admin" (runCommand1 "listDatabases")
|
||||||
|
|
||||||
thisDatabase :: (Monad m) => Action m Database
|
thisDatabase :: (Monad m) => Action m Database
|
||||||
-- ^ Current database in use
|
-- ^ Current database in use
|
||||||
|
@ -235,34 +258,34 @@ thisDatabase = asks mongoDatabase
|
||||||
|
|
||||||
useDb :: (Monad m) => Database -> Action m a -> Action m a
|
useDb :: (Monad m) => Database -> Action m a -> Action m a
|
||||||
-- ^ Run action against given database
|
-- ^ Run action against given database
|
||||||
useDb db act = local (\ctx -> ctx {mongoDatabase = db}) act
|
useDb db = local (\ctx -> ctx {mongoDatabase = db})
|
||||||
|
|
||||||
-- * Authentication
|
-- * Authentication
|
||||||
|
|
||||||
auth :: MonadIO m => Username -> Password -> Action m Bool
|
auth :: MonadIO m => Username -> Password -> Action m Bool
|
||||||
-- ^ Authenticate with the current database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe. SCRAM-SHA-1 will be used for server versions 3.0+, MONGO-CR for lower versions.
|
-- ^ Authenticate with the current database (if server is running in secure mode). Return whether authentication was successful or not. Reauthentication is required for every new pipe. SCRAM-SHA-1 will be used for server versions 3.0+, MONGO-CR for lower versions.
|
||||||
auth un pw = do
|
auth un pw = do
|
||||||
let serverVersion = liftM (at "version") $ useDb "admin" $ runCommand ["buildinfo" =: (1 :: Int)]
|
let serverVersion = fmap (at "version") $ useDb "admin" $ runCommand ["buildinfo" =: (1 :: Int)]
|
||||||
mmv <- liftM (readMaybe . T.unpack . head . T.splitOn ".") $ serverVersion
|
mmv <- readMaybe . T.unpack . head . T.splitOn "." <$> serverVersion
|
||||||
maybe (return False) performAuth mmv
|
maybe (return False) performAuth mmv
|
||||||
where
|
where
|
||||||
performAuth majorVersion =
|
performAuth majorVersion =
|
||||||
case (majorVersion >= (3 :: Int)) of
|
if majorVersion >= (3 :: Int)
|
||||||
True -> authSCRAMSHA1 un pw
|
then authSCRAMSHA1 un pw
|
||||||
False -> authMongoCR un pw
|
else authMongoCR un pw
|
||||||
|
|
||||||
authMongoCR :: (MonadIO m) => Username -> Password -> Action m Bool
|
authMongoCR :: (MonadIO m) => Username -> Password -> Action m Bool
|
||||||
-- ^ Authenticate with the current database, using the MongoDB-CR authentication mechanism (default in MongoDB server < 3.0)
|
-- ^ Authenticate with the current database, using the MongoDB-CR authentication mechanism (default in MongoDB server < 3.0)
|
||||||
authMongoCR usr pss = do
|
authMongoCR usr pss = do
|
||||||
n <- at "nonce" `liftM` runCommand ["getnonce" =: (1 :: Int)]
|
n <- at "nonce" <$> runCommand ["getnonce" =: (1 :: Int)]
|
||||||
true1 "ok" `liftM` runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
|
true1 "ok" <$> runCommand ["authenticate" =: (1 :: Int), "user" =: usr, "nonce" =: n, "key" =: pwKey n usr pss]
|
||||||
|
|
||||||
authSCRAMSHA1 :: MonadIO m => Username -> Password -> Action m Bool
|
authSCRAMSHA1 :: MonadIO m => Username -> Password -> Action m Bool
|
||||||
-- ^ Authenticate with the current database, using the SCRAM-SHA-1 authentication mechanism (default in MongoDB server >= 3.0)
|
-- ^ Authenticate with the current database, using the SCRAM-SHA-1 authentication mechanism (default in MongoDB server >= 3.0)
|
||||||
authSCRAMSHA1 un pw = do
|
authSCRAMSHA1 un pw = do
|
||||||
let hmac = HMAC.hmac SHA1.hash 64
|
let hmac = HMAC.hmac SHA1.hash 64
|
||||||
nonce <- liftIO (Nonce.withGenerator Nonce.nonce128 >>= return . B64.encode)
|
nonce <- liftIO (Nonce.withGenerator Nonce.nonce128 <&> B64.encode)
|
||||||
let firstBare = B.concat [B.pack $ "n=" ++ (T.unpack un) ++ ",r=", nonce]
|
let firstBare = B.concat [B.pack $ "n=" ++ T.unpack un ++ ",r=", nonce]
|
||||||
let client1 = ["saslStart" =: (1 :: Int), "mechanism" =: ("SCRAM-SHA-1" :: String), "payload" =: (B.unpack . B64.encode $ B.concat [B.pack "n,,", firstBare]), "autoAuthorize" =: (1 :: Int)]
|
let client1 = ["saslStart" =: (1 :: Int), "mechanism" =: ("SCRAM-SHA-1" :: String), "payload" =: (B.unpack . B64.encode $ B.concat [B.pack "n,,", firstBare]), "autoAuthorize" =: (1 :: Int)]
|
||||||
server1 <- runCommand client1
|
server1 <- runCommand client1
|
||||||
|
|
||||||
|
@ -286,7 +309,7 @@ authSCRAMSHA1 un pw = do
|
||||||
let clientFinal = B.concat [withoutProof, B.pack ",p=", pval]
|
let clientFinal = B.concat [withoutProof, B.pack ",p=", pval]
|
||||||
let serverKey = hmac saltedPass (B.pack "Server Key")
|
let serverKey = hmac saltedPass (B.pack "Server Key")
|
||||||
let serverSig = B64.encode $ hmac serverKey authMsg
|
let serverSig = B64.encode $ hmac serverKey authMsg
|
||||||
let client2 = ["saslContinue" =: (1 :: Int), "conversationId" =: (at "conversationId" server1 :: Int), "payload" =: (B.unpack $ B64.encode clientFinal)]
|
let client2 = ["saslContinue" =: (1 :: Int), "conversationId" =: (at "conversationId" server1 :: Int), "payload" =: B.unpack (B64.encode clientFinal)]
|
||||||
server2 <- runCommand client2
|
server2 <- runCommand client2
|
||||||
|
|
||||||
shortcircuit (true1 "ok" server2) $ do
|
shortcircuit (true1 "ok" server2) $ do
|
||||||
|
@ -317,19 +340,19 @@ scramHI digest salt iters = snd $ foldl com (u1, u1) [1..(iters-1)]
|
||||||
com (u,uc) _ = let u' = hmacd u in (u', BS.pack $ BS.zipWith xor uc u')
|
com (u,uc) _ = let u' = hmacd u in (u', BS.pack $ BS.zipWith xor uc u')
|
||||||
|
|
||||||
parseSCRAM :: B.ByteString -> Map.Map B.ByteString B.ByteString
|
parseSCRAM :: B.ByteString -> Map.Map B.ByteString B.ByteString
|
||||||
parseSCRAM = Map.fromList . fmap cleanup . (fmap $ T.breakOn "=") . T.splitOn "," . T.pack . B.unpack
|
parseSCRAM = Map.fromList . fmap (cleanup . T.breakOn "=") . T.splitOn "," . T.pack . B.unpack
|
||||||
where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2)
|
where cleanup (t1, t2) = (B.pack $ T.unpack t1, B.pack . T.unpack $ T.drop 1 t2)
|
||||||
|
|
||||||
retrieveServerData :: (MonadIO m) => Action m ServerData
|
retrieveServerData :: (MonadIO m) => Action m ServerData
|
||||||
retrieveServerData = do
|
retrieveServerData = do
|
||||||
d <- runCommand1 "isMaster"
|
d <- runCommand1 "isMaster"
|
||||||
let newSd = ServerData
|
let newSd = ServerData
|
||||||
{ isMaster = (fromMaybe False $ lookup "ismaster" d)
|
{ isMaster = fromMaybe False $ lookup "ismaster" d
|
||||||
, minWireVersion = (fromMaybe 0 $ lookup "minWireVersion" d)
|
, minWireVersion = fromMaybe 0 $ lookup "minWireVersion" d
|
||||||
, maxWireVersion = (fromMaybe 0 $ lookup "maxWireVersion" d)
|
, maxWireVersion = fromMaybe 0 $ lookup "maxWireVersion" d
|
||||||
, maxMessageSizeBytes = (fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d)
|
, maxMessageSizeBytes = fromMaybe 48000000 $ lookup "maxMessageSizeBytes" d
|
||||||
, maxBsonObjectSize = (fromMaybe (16 * 1024 * 1024) $ lookup "maxBsonObjectSize" d)
|
, maxBsonObjectSize = fromMaybe (16 * 1024 * 1024) $ lookup "maxBsonObjectSize" d
|
||||||
, maxWriteBatchSize = (fromMaybe 1000 $ lookup "maxWriteBatchSize" d)
|
, maxWriteBatchSize = fromMaybe 1000 $ lookup "maxWriteBatchSize" d
|
||||||
}
|
}
|
||||||
return newSd
|
return newSd
|
||||||
|
|
||||||
|
@ -343,11 +366,11 @@ allCollections :: MonadIO m => Action m [Collection]
|
||||||
allCollections = do
|
allCollections = do
|
||||||
p <- asks mongoPipe
|
p <- asks mongoPipe
|
||||||
let sd = P.serverData p
|
let sd = P.serverData p
|
||||||
if (maxWireVersion sd <= 2)
|
if maxWireVersion sd <= 2
|
||||||
then do
|
then do
|
||||||
db <- thisDatabase
|
db <- thisDatabase
|
||||||
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
|
docs <- rest =<< find (query [] "system.namespaces") {sort = ["name" =: (1 :: Int)]}
|
||||||
return . filter (not . isSpecial db) . map dropDbPrefix $ map (at "name") docs
|
(return . filter (not . isSpecial db)) (map (dropDbPrefix . at "name") docs)
|
||||||
else do
|
else do
|
||||||
r <- runCommand1 "listCollections"
|
r <- runCommand1 "listCollections"
|
||||||
let curData = do
|
let curData = do
|
||||||
|
@ -355,14 +378,14 @@ allCollections = do
|
||||||
(curId :: Int64) <- curDoc !? "id"
|
(curId :: Int64) <- curDoc !? "id"
|
||||||
(curNs :: Text) <- curDoc !? "ns"
|
(curNs :: Text) <- curDoc !? "ns"
|
||||||
(firstBatch :: [Value]) <- curDoc !? "firstBatch"
|
(firstBatch :: [Value]) <- curDoc !? "firstBatch"
|
||||||
return $ (curId, curNs, ((catMaybes (map cast' firstBatch)) :: [Document]))
|
return (curId, curNs, mapMaybe cast' firstBatch :: [Document])
|
||||||
case curData of
|
case curData of
|
||||||
Nothing -> return []
|
Nothing -> return []
|
||||||
Just (curId, curNs, firstBatch) -> do
|
Just (curId, curNs, firstBatch) -> do
|
||||||
db <- thisDatabase
|
db <- thisDatabase
|
||||||
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch
|
nc <- newCursor db curNs 0 $ return $ Batch Nothing curId firstBatch
|
||||||
docs <- rest nc
|
docs <- rest nc
|
||||||
return $ catMaybes $ map (\d -> (d !? "name")) docs
|
return $ mapMaybe (\d -> d !? "name") docs
|
||||||
where
|
where
|
||||||
dropDbPrefix = T.tail . T.dropWhile (/= '.')
|
dropDbPrefix = T.tail . T.dropWhile (/= '.')
|
||||||
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
|
isSpecial db col = T.any (== '$') col && db <.> col /= "local.oplog.$main"
|
||||||
|
@ -473,7 +496,7 @@ insert' opts col docs = do
|
||||||
NoConfirm -> ["w" =: (0 :: Int)]
|
NoConfirm -> ["w" =: (0 :: Int)]
|
||||||
Confirm params -> params
|
Confirm params -> params
|
||||||
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
|
let docSize = sizeOfDocument $ insertCommandDocument opts col [] writeConcern
|
||||||
let ordered = (not (KeepGoing `elem` opts))
|
let ordered = KeepGoing `notElem` opts
|
||||||
let preChunks = splitAtLimit
|
let preChunks = splitAtLimit
|
||||||
(maxBsonObjectSize sd - docSize)
|
(maxBsonObjectSize sd - docSize)
|
||||||
-- size of auxiliary part of insert
|
-- size of auxiliary part of insert
|
||||||
|
@ -487,7 +510,7 @@ insert' opts col docs = do
|
||||||
else rights preChunks
|
else rights preChunks
|
||||||
|
|
||||||
let lens = map length chunks
|
let lens = map length chunks
|
||||||
let lSums = 0 : (zipWith (+) lSums lens)
|
let lSums = 0 : zipWith (+) lSums lens
|
||||||
|
|
||||||
chunkResults <- interruptibleFor ordered (zip lSums chunks) $ insertBlock opts col
|
chunkResults <- interruptibleFor ordered (zip lSums chunks) $ insertBlock opts col
|
||||||
|
|
||||||
|
@ -508,13 +531,13 @@ insertBlock opts col (prevCount, docs) = do
|
||||||
|
|
||||||
p <- asks mongoPipe
|
p <- asks mongoPipe
|
||||||
let sd = P.serverData p
|
let sd = P.serverData p
|
||||||
if (maxWireVersion sd < 2)
|
if maxWireVersion sd < 2
|
||||||
then do
|
then do
|
||||||
res <- liftDB $ write (Insert (db <.> col) opts docs)
|
res <- liftDB $ write (Insert (db <.> col) opts docs)
|
||||||
let errorMessage = do
|
let errorMessage = do
|
||||||
jRes <- res
|
jRes <- res
|
||||||
em <- lookup "err" jRes
|
em <- lookup "err" jRes
|
||||||
return $ WriteFailure prevCount (maybe 0 id $ lookup "code" jRes) em
|
return $ WriteFailure prevCount (fromMaybe 0 $ lookup "code" jRes) em
|
||||||
-- In older versions of ^^ the protocol we can't really say which document failed.
|
-- In older versions of ^^ the protocol we can't really say which document failed.
|
||||||
-- So we just report the accumulated number of documents in the previous blocks.
|
-- So we just report the accumulated number of documents in the previous blocks.
|
||||||
|
|
||||||
|
@ -530,45 +553,45 @@ insertBlock opts col (prevCount, docs) = do
|
||||||
case (look "writeErrors" doc, look "writeConcernError" doc) of
|
case (look "writeErrors" doc, look "writeConcernError" doc) of
|
||||||
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
|
(Nothing, Nothing) -> return $ Right $ map (valueAt "_id") docs
|
||||||
(Just (Array errs), Nothing) -> do
|
(Just (Array errs), Nothing) -> do
|
||||||
let writeErrors = map (anyToWriteError prevCount) $ errs
|
let writeErrors = map (anyToWriteError prevCount) errs
|
||||||
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
|
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
|
||||||
return $ Left $ CompoundFailure errorsWithFailureIndex
|
return $ Left $ CompoundFailure errorsWithFailureIndex
|
||||||
(Nothing, Just err) -> do
|
(Nothing, Just err) -> do
|
||||||
return $ Left $ WriteFailure
|
return $ Left $ WriteFailure
|
||||||
prevCount
|
prevCount
|
||||||
(maybe 0 id $ lookup "ok" doc)
|
(fromMaybe 0 $ lookup "ok" doc)
|
||||||
(show err)
|
(show err)
|
||||||
(Just (Array errs), Just writeConcernErr) -> do
|
(Just (Array errs), Just writeConcernErr) -> do
|
||||||
let writeErrors = map (anyToWriteError prevCount) $ errs
|
let writeErrors = map (anyToWriteError prevCount) errs
|
||||||
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
|
let errorsWithFailureIndex = map (addFailureIndex prevCount) writeErrors
|
||||||
return $ Left $ CompoundFailure $ (WriteFailure
|
return $ Left $ CompoundFailure $ WriteFailure
|
||||||
prevCount
|
prevCount
|
||||||
(maybe 0 id $ lookup "ok" doc)
|
(fromMaybe 0 $ lookup "ok" doc)
|
||||||
(show writeConcernErr)) : errorsWithFailureIndex
|
(show writeConcernErr) : errorsWithFailureIndex
|
||||||
(Just unknownValue, Nothing) -> do
|
(Just unknownValue, Nothing) -> do
|
||||||
return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
|
return $ Left $ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
|
||||||
(Just unknownValue, Just writeConcernErr) -> do
|
(Just unknownValue, Just writeConcernErr) -> do
|
||||||
return $ Left $ CompoundFailure $ [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
|
return $ Left $ CompoundFailure [ ProtocolFailure prevCount $ "Expected array of errors. Received: " ++ show unknownValue
|
||||||
, WriteFailure prevCount (maybe 0 id $ lookup "ok" doc) $ show writeConcernErr]
|
, WriteFailure prevCount (fromMaybe 0 $ lookup "ok" doc) $ show writeConcernErr]
|
||||||
|
|
||||||
splitAtLimit :: Int -> Int -> [Document] -> [Either Failure [Document]]
|
splitAtLimit :: Int -> Int -> [Document] -> [Either Failure [Document]]
|
||||||
splitAtLimit maxSize maxCount list = chop (go 0 0 []) list
|
splitAtLimit maxSize maxCount list = chop (go 0 0 []) list
|
||||||
where
|
where
|
||||||
go :: Int -> Int -> [Document] -> [Document] -> ((Either Failure [Document]), [Document])
|
go :: Int -> Int -> [Document] -> [Document] -> (Either Failure [Document], [Document])
|
||||||
go _ _ res [] = (Right $ reverse res, [])
|
go _ _ res [] = (Right $ reverse res, [])
|
||||||
go curSize curCount [] (x:xs) |
|
go curSize curCount [] (x:xs) |
|
||||||
((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize) =
|
(curSize + sizeOfDocument x + 2 + curCount) > maxSize =
|
||||||
(Left $ WriteFailure 0 0 "One document is too big for the message", xs)
|
(Left $ WriteFailure 0 0 "One document is too big for the message", xs)
|
||||||
go curSize curCount res (x:xs) =
|
go curSize curCount res (x:xs) =
|
||||||
if ( ((curSize + (sizeOfDocument x) + 2 + curCount) > maxSize)
|
if ((curSize + sizeOfDocument x + 2 + curCount) > maxSize)
|
||||||
-- we have ^ 2 brackets and curCount commas in
|
-- we have ^ 2 brackets and curCount commas in
|
||||||
-- the document that we need to take into
|
-- the document that we need to take into
|
||||||
-- account
|
-- account
|
||||||
|| ((curCount + 1) > maxCount))
|
|| ((curCount + 1) > maxCount)
|
||||||
then
|
then
|
||||||
(Right $ reverse res, x:xs)
|
(Right $ reverse res, x:xs)
|
||||||
else
|
else
|
||||||
go (curSize + (sizeOfDocument x)) (curCount + 1) (x:res) xs
|
go (curSize + sizeOfDocument x) (curCount + 1) (x:res) xs
|
||||||
|
|
||||||
chop :: ([a] -> (b, [a])) -> [a] -> [b]
|
chop :: ([a] -> (b, [a])) -> [a] -> [b]
|
||||||
chop _ [] = []
|
chop _ [] = []
|
||||||
|
@ -581,7 +604,7 @@ assignId :: Document -> IO Document
|
||||||
-- ^ Assign a unique value to _id field if missing
|
-- ^ Assign a unique value to _id field if missing
|
||||||
assignId doc = if any (("_id" ==) . label) doc
|
assignId doc = if any (("_id" ==) . label) doc
|
||||||
then return doc
|
then return doc
|
||||||
else (\oid -> ("_id" =: oid) : doc) `liftM` genObjectId
|
else (\oid -> ("_id" =: oid) : doc) <$> genObjectId
|
||||||
|
|
||||||
-- ** Update
|
-- ** Update
|
||||||
|
|
||||||
|
@ -696,22 +719,21 @@ update' ordered col updateDocs = do
|
||||||
then takeRightsUpToLeft preChunks
|
then takeRightsUpToLeft preChunks
|
||||||
else rights preChunks
|
else rights preChunks
|
||||||
let lens = map length chunks
|
let lens = map length chunks
|
||||||
let lSums = 0 : (zipWith (+) lSums lens)
|
let lSums = 0 : zipWith (+) lSums lens
|
||||||
blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do
|
blocks <- interruptibleFor ordered (zip lSums chunks) $ \b -> do
|
||||||
ur <- runReaderT (updateBlock ordered col b) ctx
|
runReaderT (updateBlock ordered col b) ctx
|
||||||
return ur
|
|
||||||
`catch` \(e :: Failure) -> do
|
`catch` \(e :: Failure) -> do
|
||||||
return $ WriteResult True 0 Nothing 0 [] [e] []
|
return $ WriteResult True 0 Nothing 0 [] [e] []
|
||||||
let failedTotal = or $ map failed blocks
|
let failedTotal = any failed blocks
|
||||||
let updatedTotal = sum $ map nMatched blocks
|
let updatedTotal = sum $ map nMatched blocks
|
||||||
let modifiedTotal =
|
let modifiedTotal =
|
||||||
if all isNothing $ map nModified blocks
|
if all (isNothing . nModified) blocks
|
||||||
then Nothing
|
then Nothing
|
||||||
else Just $ sum $ catMaybes $ map nModified blocks
|
else Just $ sum $ mapMaybe nModified blocks
|
||||||
let totalWriteErrors = concat $ map writeErrors blocks
|
let totalWriteErrors = concatMap writeErrors blocks
|
||||||
let totalWriteConcernErrors = concat $ map writeConcernErrors blocks
|
let totalWriteConcernErrors = concatMap writeConcernErrors blocks
|
||||||
|
|
||||||
let upsertedTotal = concat $ map upserted blocks
|
let upsertedTotal = concatMap upserted blocks
|
||||||
return $ WriteResult
|
return $ WriteResult
|
||||||
failedTotal
|
failedTotal
|
||||||
updatedTotal
|
updatedTotal
|
||||||
|
@ -728,7 +750,7 @@ updateBlock :: (MonadIO m)
|
||||||
updateBlock ordered col (prevCount, docs) = do
|
updateBlock ordered col (prevCount, docs) = do
|
||||||
p <- asks mongoPipe
|
p <- asks mongoPipe
|
||||||
let sd = P.serverData p
|
let sd = P.serverData p
|
||||||
if (maxWireVersion sd < 2)
|
if maxWireVersion sd < 2
|
||||||
then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6"
|
then liftIO $ ioError $ userError "updateMany doesn't support mongodb older than 2.6"
|
||||||
else do
|
else do
|
||||||
mode <- asks mongoWriteMode
|
mode <- asks mongoWriteMode
|
||||||
|
@ -751,7 +773,7 @@ updateBlock ordered col (prevCount, docs) = do
|
||||||
[ ProtocolFailure
|
[ ProtocolFailure
|
||||||
prevCount
|
prevCount
|
||||||
$ "Expected array of error docs, but received: "
|
$ "Expected array of error docs, but received: "
|
||||||
++ (show unknownErr)]
|
++ show unknownErr]
|
||||||
[]
|
[]
|
||||||
|
|
||||||
let writeConcernResults =
|
let writeConcernResults =
|
||||||
|
@ -778,9 +800,9 @@ updateBlock ordered col (prevCount, docs) = do
|
||||||
[ ProtocolFailure
|
[ ProtocolFailure
|
||||||
prevCount
|
prevCount
|
||||||
$ "Expected doc in writeConcernError, but received: "
|
$ "Expected doc in writeConcernError, but received: "
|
||||||
++ (show unknownErr)]
|
++ show unknownErr]
|
||||||
|
|
||||||
let upsertedList = map docToUpserted $ fromMaybe [] (doc !? "upserted")
|
let upsertedList = maybe [] (map docToUpserted) (doc !? "upserted")
|
||||||
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
|
let successResults = WriteResult False n (doc !? "nModified") 0 upsertedList [] []
|
||||||
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
|
return $ foldl1' mergeWriteResults [writeErrorsResults, writeConcernResults, successResults]
|
||||||
|
|
||||||
|
@ -799,10 +821,10 @@ mergeWriteResults :: WriteResult -> WriteResult -> WriteResult
|
||||||
mergeWriteResults
|
mergeWriteResults
|
||||||
(WriteResult failed1 nMatched1 nModified1 nDeleted1 upserted1 writeErrors1 writeConcernErrors1)
|
(WriteResult failed1 nMatched1 nModified1 nDeleted1 upserted1 writeErrors1 writeConcernErrors1)
|
||||||
(WriteResult failed2 nMatched2 nModified2 nDeleted2 upserted2 writeErrors2 writeConcernErrors2) =
|
(WriteResult failed2 nMatched2 nModified2 nDeleted2 upserted2 writeErrors2 writeConcernErrors2) =
|
||||||
(WriteResult
|
WriteResult
|
||||||
(failed1 || failed2)
|
(failed1 || failed2)
|
||||||
(nMatched1 + nMatched2)
|
(nMatched1 + nMatched2)
|
||||||
((liftM2 (+)) nModified1 nModified2)
|
(liftM2 (+) nModified1 nModified2)
|
||||||
(nDeleted1 + nDeleted2)
|
(nDeleted1 + nDeleted2)
|
||||||
-- This function is used in foldl1' function. The first argument is the accumulator.
|
-- This function is used in foldl1' function. The first argument is the accumulator.
|
||||||
-- The list in the accumulator is usually longer than the subsequent value which goes in the second argument.
|
-- The list in the accumulator is usually longer than the subsequent value which goes in the second argument.
|
||||||
|
@ -811,7 +833,6 @@ mergeWriteResults
|
||||||
(upserted2 ++ upserted1)
|
(upserted2 ++ upserted1)
|
||||||
(writeErrors2 ++ writeErrors1)
|
(writeErrors2 ++ writeErrors1)
|
||||||
(writeConcernErrors2 ++ writeConcernErrors1)
|
(writeConcernErrors2 ++ writeConcernErrors1)
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
docToUpserted :: Document -> Upserted
|
docToUpserted :: Document -> Upserted
|
||||||
|
@ -905,7 +926,7 @@ delete' ordered col deleteDocs = do
|
||||||
deletes
|
deletes
|
||||||
ctx <- ask
|
ctx <- ask
|
||||||
let lens = map (either (const 1) length) chunks
|
let lens = map (either (const 1) length) chunks
|
||||||
let lSums = 0 : (zipWith (+) lSums lens)
|
let lSums = 0 : zipWith (+) lSums lens
|
||||||
let failureResult e = return $ WriteResult True 0 Nothing 0 [] [e] []
|
let failureResult e = return $ WriteResult True 0 Nothing 0 [] [e] []
|
||||||
let doChunk b = runReaderT (deleteBlock ordered col b) ctx `catch` failureResult
|
let doChunk b = runReaderT (deleteBlock ordered col b) ctx `catch` failureResult
|
||||||
blockResult <- liftIO $ interruptibleFor ordered (zip lSums chunks) $ \(n, c) ->
|
blockResult <- liftIO $ interruptibleFor ordered (zip lSums chunks) $ \(n, c) ->
|
||||||
|
@ -924,7 +945,7 @@ deleteBlock :: (MonadIO m)
|
||||||
deleteBlock ordered col (prevCount, docs) = do
|
deleteBlock ordered col (prevCount, docs) = do
|
||||||
p <- asks mongoPipe
|
p <- asks mongoPipe
|
||||||
let sd = P.serverData p
|
let sd = P.serverData p
|
||||||
if (maxWireVersion sd < 2)
|
if maxWireVersion sd < 2
|
||||||
then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6"
|
then liftIO $ ioError $ userError "deleteMany doesn't support mongodb older than 2.6"
|
||||||
else do
|
else do
|
||||||
mode <- asks mongoWriteMode
|
mode <- asks mongoWriteMode
|
||||||
|
@ -948,7 +969,7 @@ deleteBlock ordered col (prevCount, docs) = do
|
||||||
[ ProtocolFailure
|
[ ProtocolFailure
|
||||||
prevCount
|
prevCount
|
||||||
$ "Expected array of error docs, but received: "
|
$ "Expected array of error docs, but received: "
|
||||||
++ (show unknownErr)]
|
++ show unknownErr]
|
||||||
[]
|
[]
|
||||||
let writeConcernResults =
|
let writeConcernResults =
|
||||||
case look "writeConcernError" doc of
|
case look "writeConcernError" doc of
|
||||||
|
@ -974,7 +995,7 @@ deleteBlock ordered col (prevCount, docs) = do
|
||||||
[ ProtocolFailure
|
[ ProtocolFailure
|
||||||
prevCount
|
prevCount
|
||||||
$ "Expected doc in writeConcernError, but received: "
|
$ "Expected doc in writeConcernError, but received: "
|
||||||
++ (show unknownErr)]
|
++ show unknownErr]
|
||||||
return $ foldl1' mergeWriteResults [successResults, writeErrorsResults, writeConcernResults]
|
return $ foldl1' mergeWriteResults [successResults, writeErrorsResults, writeConcernResults]
|
||||||
|
|
||||||
anyToWriteError :: Int -> Value -> Failure
|
anyToWriteError :: Int -> Value -> Failure
|
||||||
|
@ -1115,11 +1136,11 @@ findAndModifyOpts :: (MonadIO m, MonadFail m)
|
||||||
=> Query
|
=> Query
|
||||||
-> FindAndModifyOpts
|
-> FindAndModifyOpts
|
||||||
-> Action m (Either String (Maybe Document))
|
-> Action m (Either String (Maybe Document))
|
||||||
findAndModifyOpts (Query {
|
findAndModifyOpts Query {
|
||||||
selection = Select sel collection
|
selection = Select sel collection
|
||||||
, project = project
|
, project = project
|
||||||
, sort = sort
|
, sort = sort
|
||||||
}) famOpts = do
|
} famOpts = do
|
||||||
result <- runCommand
|
result <- runCommand
|
||||||
([ "findAndModify" := String collection
|
([ "findAndModify" := String collection
|
||||||
, "query" := Doc sel
|
, "query" := Doc sel
|
||||||
|
@ -1165,13 +1186,13 @@ explain q = do -- same as findOne but with explain set to true
|
||||||
|
|
||||||
count :: (MonadIO m) => Query -> Action m Int
|
count :: (MonadIO m) => Query -> Action m Int
|
||||||
-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present)
|
-- ^ Fetch number of documents satisfying query (including effect of skip and/or limit if present)
|
||||||
count Query{selection = Select sel col, skip, limit} = at "n" `liftM` runCommand
|
count Query{selection = Select sel col, skip, limit} = at "n" <$> runCommand
|
||||||
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
|
(["count" =: col, "query" =: sel, "skip" =: (fromIntegral skip :: Int32)]
|
||||||
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
|
++ ("limit" =? if limit == 0 then Nothing else Just (fromIntegral limit :: Int32)))
|
||||||
|
|
||||||
distinct :: (MonadIO m) => Label -> Selection -> Action m [Value]
|
distinct :: (MonadIO m) => Label -> Selection -> Action m [Value]
|
||||||
-- ^ Fetch distinct values of field in selected documents
|
-- ^ Fetch distinct values of field in selected documents
|
||||||
distinct k (Select sel col) = at "values" `liftM` runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
|
distinct k (Select sel col) = at "values" <$> runCommand ["distinct" =: col, "key" =: k, "query" =: sel]
|
||||||
|
|
||||||
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit)
|
queryRequest :: (Monad m) => Bool -> Query -> Action m (Request, Maybe Limit)
|
||||||
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
|
-- ^ Translate Query to Protocol.Query. If first arg is true then add special $explain attribute.
|
||||||
|
@ -1192,7 +1213,7 @@ queryRequest isExplain Query{..} = do
|
||||||
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
|
special = catMaybes [mOrder, mSnapshot, mHint, mExplain]
|
||||||
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
|
qSelector = if null special then s else ("$query" =: s) : special where s = selector selection
|
||||||
|
|
||||||
batchSizeRemainingLimit :: BatchSize -> (Maybe Limit) -> (Int32, Maybe Limit)
|
batchSizeRemainingLimit :: BatchSize -> Maybe Limit -> (Int32, Maybe Limit)
|
||||||
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
|
-- ^ Given batchSize and limit return P.qBatchSize and remaining limit
|
||||||
batchSizeRemainingLimit batchSize mLimit =
|
batchSizeRemainingLimit batchSize mLimit =
|
||||||
let remaining =
|
let remaining =
|
||||||
|
@ -1253,10 +1274,10 @@ nextBatch (Cursor fcol batchSize var) = liftDB $ modifyMVar var $ \dBatch -> do
|
||||||
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
|
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
|
||||||
let newLimit = do
|
let newLimit = do
|
||||||
limit <- mLimit
|
limit <- mLimit
|
||||||
return $ limit - (min limit $ fromIntegral $ length docs)
|
return $ limit - min limit (fromIntegral $ length docs)
|
||||||
let emptyBatch = return $ Batch (Just 0) 0 []
|
let emptyBatch = return $ Batch (Just 0) 0 []
|
||||||
let getNextBatch = nextBatch' fcol batchSize newLimit cid
|
let getNextBatch = nextBatch' fcol batchSize newLimit cid
|
||||||
let resultDocs = (maybe id (take . fromIntegral) mLimit) docs
|
let resultDocs = maybe id (take . fromIntegral) mLimit docs
|
||||||
case (cid, newLimit) of
|
case (cid, newLimit) of
|
||||||
(0, _) -> return (emptyBatch, resultDocs)
|
(0, _) -> return (emptyBatch, resultDocs)
|
||||||
(_, Just 0) -> do
|
(_, Just 0) -> do
|
||||||
|
@ -1269,11 +1290,11 @@ fulfill' :: FullCollection -> BatchSize -> DelayedBatch -> Action IO Batch
|
||||||
-- Discard pre-fetched batch if empty with nonzero cid.
|
-- Discard pre-fetched batch if empty with nonzero cid.
|
||||||
fulfill' fcol batchSize dBatch = do
|
fulfill' fcol batchSize dBatch = do
|
||||||
b@(Batch limit cid docs) <- fulfill dBatch
|
b@(Batch limit cid docs) <- fulfill dBatch
|
||||||
if cid /= 0 && null docs && (limit > (Just 0))
|
if cid /= 0 && null docs && (limit > Just 0)
|
||||||
then nextBatch' fcol batchSize limit cid >>= fulfill
|
then nextBatch' fcol batchSize limit cid >>= fulfill
|
||||||
else return b
|
else return b
|
||||||
|
|
||||||
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> (Maybe Limit) -> CursorId -> Action m DelayedBatch
|
nextBatch' :: (MonadIO m) => FullCollection -> BatchSize -> Maybe Limit -> CursorId -> Action m DelayedBatch
|
||||||
nextBatch' fcol batchSize limit cid = do
|
nextBatch' fcol batchSize limit cid = do
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit)
|
liftIO $ request pipe [] (GetMore fcol batchSize' cid, remLimit)
|
||||||
|
@ -1286,7 +1307,7 @@ next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where
|
||||||
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
|
-- nextState:: DelayedBatch -> Action m (DelayedBatch, Maybe Document)
|
||||||
nextState dBatch = do
|
nextState dBatch = do
|
||||||
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
|
Batch mLimit cid docs <- liftDB $ fulfill' fcol batchSize dBatch
|
||||||
if mLimit == (Just 0)
|
if mLimit == Just 0
|
||||||
then return (return $ Batch (Just 0) 0 [], Nothing)
|
then return (return $ Batch (Just 0) 0 [], Nothing)
|
||||||
else
|
else
|
||||||
case docs of
|
case docs of
|
||||||
|
@ -1294,10 +1315,10 @@ next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where
|
||||||
let newLimit = do
|
let newLimit = do
|
||||||
limit <- mLimit
|
limit <- mLimit
|
||||||
return $ limit - 1
|
return $ limit - 1
|
||||||
dBatch' <- if null docs' && cid /= 0 && ((newLimit > (Just 0)) || (isNothing newLimit))
|
dBatch' <- if null docs' && cid /= 0 && ((newLimit > Just 0) || isNothing newLimit)
|
||||||
then nextBatch' fcol batchSize newLimit cid
|
then nextBatch' fcol batchSize newLimit cid
|
||||||
else return $ return (Batch newLimit cid docs')
|
else return $ return (Batch newLimit cid docs')
|
||||||
when (newLimit == (Just 0)) $ unless (cid == 0) $ do
|
when (newLimit == Just 0) $ unless (cid == 0) $ do
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
|
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
|
||||||
return (dBatch', Just doc)
|
return (dBatch', Just doc)
|
||||||
|
@ -1309,7 +1330,7 @@ next (Cursor fcol batchSize var) = liftDB $ modifyMVar var nextState where
|
||||||
|
|
||||||
nextN :: MonadIO m => Int -> Cursor -> Action m [Document]
|
nextN :: MonadIO m => Int -> Cursor -> Action m [Document]
|
||||||
-- ^ Return next N documents or less if end is reached
|
-- ^ Return next N documents or less if end is reached
|
||||||
nextN n c = catMaybes `liftM` replicateM n (next c)
|
nextN n c = catMaybes <$> replicateM n (next c)
|
||||||
|
|
||||||
rest :: MonadIO m => Cursor -> Action m [Document]
|
rest :: MonadIO m => Cursor -> Action m [Document]
|
||||||
-- ^ Return remaining documents in query result
|
-- ^ Return remaining documents in query result
|
||||||
|
@ -1321,7 +1342,7 @@ closeCursor (Cursor _ _ var) = liftDB $ modifyMVar var $ \dBatch -> do
|
||||||
unless (cid == 0) $ do
|
unless (cid == 0) $ do
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
|
liftIOE ConnectionFailure $ P.send pipe [KillCursors [cid]]
|
||||||
return $ (return $ Batch (Just 0) 0 [], ())
|
return (return $ Batch (Just 0) 0 [], ())
|
||||||
|
|
||||||
isCursorClosed :: MonadIO m => Cursor -> Action m Bool
|
isCursorClosed :: MonadIO m => Cursor -> Action m Bool
|
||||||
isCursorClosed (Cursor _ _ var) = do
|
isCursorClosed (Cursor _ _ var) = do
|
||||||
|
@ -1404,7 +1425,7 @@ groupDocument Group{..} =
|
||||||
|
|
||||||
group :: (MonadIO m) => Group -> Action m [Document]
|
group :: (MonadIO m) => Group -> Action m [Document]
|
||||||
-- ^ Execute group query and return resulting aggregate value for each distinct key
|
-- ^ Execute group query and return resulting aggregate value for each distinct key
|
||||||
group g = at "retval" `liftM` runCommand ["group" =: groupDocument g]
|
group g = at "retval" <$> runCommand ["group" =: groupDocument g]
|
||||||
|
|
||||||
-- ** MapReduce
|
-- ** MapReduce
|
||||||
|
|
||||||
|
@ -1497,7 +1518,7 @@ type Command = Document
|
||||||
|
|
||||||
runCommand :: (MonadIO m) => Command -> Action m Document
|
runCommand :: (MonadIO m) => Command -> Action m Document
|
||||||
-- ^ Run command against the database and return its result
|
-- ^ Run command against the database and return its result
|
||||||
runCommand c = maybe err id `liftM` findOne (query c "$cmd") where
|
runCommand c = fromMaybe err <$> findOne (query c "$cmd") where
|
||||||
err = error $ "Nothing returned for command: " ++ show c
|
err = error $ "Nothing returned for command: " ++ show c
|
||||||
|
|
||||||
runCommand1 :: (MonadIO m) => Text -> Action m Document
|
runCommand1 :: (MonadIO m) => Text -> Action m Document
|
||||||
|
@ -1506,7 +1527,7 @@ runCommand1 c = runCommand [c =: (1 :: Int)]
|
||||||
|
|
||||||
eval :: (MonadIO m, Val v) => Javascript -> Action m v
|
eval :: (MonadIO m, Val v) => Javascript -> Action m v
|
||||||
-- ^ Run code on server
|
-- ^ Run code on server
|
||||||
eval code = at "retval" `liftM` runCommand ["$eval" =: code]
|
eval code = at "retval" <$> runCommand ["$eval" =: code]
|
||||||
|
|
||||||
modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b
|
modifyMVar :: MVar a -> (a -> Action IO (a, b)) -> Action IO b
|
||||||
modifyMVar v f = do
|
modifyMVar v f = do
|
||||||
|
@ -1516,6 +1537,7 @@ modifyMVar v f = do
|
||||||
mkWeakMVar :: MVar a -> Action IO () -> Action IO (Weak (MVar a))
|
mkWeakMVar :: MVar a -> Action IO () -> Action IO (Weak (MVar a))
|
||||||
mkWeakMVar m closing = do
|
mkWeakMVar m closing = do
|
||||||
ctx <- ask
|
ctx <- ask
|
||||||
|
|
||||||
#if MIN_VERSION_base(4,6,0)
|
#if MIN_VERSION_base(4,6,0)
|
||||||
liftIO $ MV.mkWeakMVar m $ runReaderT closing ctx
|
liftIO $ MV.mkWeakMVar m $ runReaderT closing ctx
|
||||||
#else
|
#else
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
{-# LANGUAGE CPP #-}
|
{-# LANGUAGE CPP #-}
|
||||||
{-# LANGUAGE OverloadedStrings #-}
|
{-# LANGUAGE OverloadedStrings #-}
|
||||||
{-# LANGUAGE RecordWildCards #-}
|
|
||||||
|
|
||||||
#if (__GLASGOW_HASKELL__ >= 706)
|
#if (__GLASGOW_HASKELL__ >= 706)
|
||||||
{-# LANGUAGE RecursiveDo #-}
|
{-# LANGUAGE RecursiveDo #-}
|
||||||
|
@ -21,16 +20,17 @@ ATTENTION!!! Be aware that this module is highly experimental and is
|
||||||
barely tested. The current implementation doesn't verify server's identity.
|
barely tested. The current implementation doesn't verify server's identity.
|
||||||
It only allows you to connect to a mongodb server using TLS protocol.
|
It only allows you to connect to a mongodb server using TLS protocol.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
module Database.MongoDB.Transport.Tls
|
module Database.MongoDB.Transport.Tls
|
||||||
(connect)
|
( connect
|
||||||
|
, connectWithTlsParams
|
||||||
|
)
|
||||||
where
|
where
|
||||||
|
|
||||||
import Data.IORef
|
import Data.IORef
|
||||||
import Data.Monoid
|
|
||||||
import qualified Data.ByteString as ByteString
|
import qualified Data.ByteString as ByteString
|
||||||
import qualified Data.ByteString.Lazy as Lazy.ByteString
|
import qualified Data.ByteString.Lazy as Lazy.ByteString
|
||||||
import Data.Default.Class (def)
|
import Data.Default.Class (def)
|
||||||
import Control.Applicative ((<$>))
|
|
||||||
import Control.Exception (bracketOnError)
|
import Control.Exception (bracketOnError)
|
||||||
import Control.Monad (when, unless)
|
import Control.Monad (when, unless)
|
||||||
import System.IO
|
import System.IO
|
||||||
|
@ -45,15 +45,19 @@ import Database.MongoDB.Query (access, slaveOk, retrieveServerData)
|
||||||
|
|
||||||
-- | Connect to mongodb using TLS
|
-- | Connect to mongodb using TLS
|
||||||
connect :: HostName -> PortID -> IO Pipe
|
connect :: HostName -> PortID -> IO Pipe
|
||||||
connect host port = bracketOnError (connectTo host port) hClose $ \handle -> do
|
connect host port = connectWithTlsParams params host port
|
||||||
|
where
|
||||||
let params = (TLS.defaultParamsClient host "")
|
params = (TLS.defaultParamsClient host "")
|
||||||
{ TLS.clientSupported = def
|
{ TLS.clientSupported = def
|
||||||
{ TLS.supportedCiphers = TLS.ciphersuite_default}
|
{ TLS.supportedCiphers = TLS.ciphersuite_default }
|
||||||
, TLS.clientHooks = def
|
, TLS.clientHooks = def
|
||||||
{ TLS.onServerCertificate = \_ _ _ _ -> return []}
|
{ TLS.onServerCertificate = \_ _ _ _ -> return [] }
|
||||||
}
|
}
|
||||||
context <- TLS.contextNew handle params
|
|
||||||
|
-- | Connect to mongodb using TLS using provided TLS client parameters
|
||||||
|
connectWithTlsParams :: TLS.ClientParams -> HostName -> PortID -> IO Pipe
|
||||||
|
connectWithTlsParams clientParams host port = bracketOnError (connectTo host port) hClose $ \handle -> do
|
||||||
|
context <- TLS.contextNew handle clientParams
|
||||||
TLS.handshake context
|
TLS.handshake context
|
||||||
|
|
||||||
conn <- tlsConnection context
|
conn <- tlsConnection context
|
||||||
|
|
BIN
dist-newstyle/cache/config
vendored
BIN
dist-newstyle/cache/config
vendored
Binary file not shown.
Loading…
Reference in a new issue