Return update result for old versions of mongodb
This commit is contained in:
parent
55df5635f1
commit
399e2c3443
1 changed files with 72 additions and 20 deletions
|
@ -50,7 +50,7 @@ import Prelude hiding (lookup)
|
||||||
import Control.Exception (Exception, throwIO, throw)
|
import Control.Exception (Exception, throwIO, throw)
|
||||||
import Control.Monad (unless, replicateM, liftM, forM, forM_)
|
import Control.Monad (unless, replicateM, liftM, forM, forM_)
|
||||||
import Data.Int (Int32, Int64)
|
import Data.Int (Int32, Int64)
|
||||||
import Data.Maybe (listToMaybe, catMaybes, isNothing)
|
import Data.Maybe (listToMaybe, catMaybes, isNothing, maybeToList)
|
||||||
import Data.Word (Word32)
|
import Data.Word (Word32)
|
||||||
#if !MIN_VERSION_base(4,8,0)
|
#if !MIN_VERSION_base(4,8,0)
|
||||||
import Data.Monoid (mappend)
|
import Data.Monoid (mappend)
|
||||||
|
@ -75,8 +75,10 @@ import Control.Monad.Trans.Control (MonadBaseControl(..))
|
||||||
import Data.Binary.Put (runPut)
|
import Data.Binary.Put (runPut)
|
||||||
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool),
|
||||||
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
Javascript, at, valueAt, lookup, look, genObjectId, (=:),
|
||||||
(=?), (!?), Val(..))
|
(=?), (!?), Val(..), ObjectId)
|
||||||
import Data.Bson.Binary (putDocument)
|
import Data.Bson.Binary (putDocument)
|
||||||
|
import Data.Either (lefts, rights)
|
||||||
|
import Data.Maybe (fromJust, isJust)
|
||||||
import Data.Text (Text)
|
import Data.Text (Text)
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
|
|
||||||
|
@ -148,6 +150,22 @@ type GetLastError = Document
|
||||||
-- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options.
|
-- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See <http://www.mongodb.org/display/DOCS/Last+Error+Commands> for more options.
|
||||||
|
|
||||||
data UpdateResult = UpdateResult
|
data UpdateResult = UpdateResult
|
||||||
|
{ nMatched :: Int
|
||||||
|
, nModified :: Maybe Int -- Mongodb server before 2.6 doesn't allow to calculate this value. It's Nothing if we fail to do so
|
||||||
|
, upserted :: [Upserted]
|
||||||
|
, writeErrors :: [WriteError]
|
||||||
|
} deriving Show
|
||||||
|
|
||||||
|
data Upserted = Upserted
|
||||||
|
{ upsertedIndex :: Int
|
||||||
|
, upsertedId :: ObjectId
|
||||||
|
} deriving Show
|
||||||
|
|
||||||
|
data WriteError = WriteError
|
||||||
|
{ errIndex :: Int
|
||||||
|
, errCode :: Int
|
||||||
|
, errMsg :: String
|
||||||
|
} deriving Show
|
||||||
|
|
||||||
data DeleteResult = DeleteResult
|
data DeleteResult = DeleteResult
|
||||||
|
|
||||||
|
@ -371,12 +389,13 @@ data WriteMode =
|
||||||
| Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
|
| Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write.
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
write :: Notice -> Action IO ()
|
write :: Notice -> Action IO (Maybe Document)
|
||||||
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
|
-- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error.
|
||||||
write notice = asks mongoWriteMode >>= \mode -> case mode of
|
write notice = asks mongoWriteMode >>= \mode -> case mode of
|
||||||
NoConfirm -> do
|
NoConfirm -> do
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
liftIOE ConnectionFailure $ P.send pipe [notice]
|
liftIOE ConnectionFailure $ P.send pipe [notice]
|
||||||
|
return Nothing
|
||||||
Confirm params -> do
|
Confirm params -> do
|
||||||
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
|
let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd"
|
||||||
pipe <- asks mongoPipe
|
pipe <- asks mongoPipe
|
||||||
|
@ -384,9 +403,7 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of
|
||||||
r <- queryRequest False q {limit = 1}
|
r <- queryRequest False q {limit = 1}
|
||||||
rr <- liftIO $ request pipe [notice] r
|
rr <- liftIO $ request pipe [notice] r
|
||||||
fulfill rr
|
fulfill rr
|
||||||
case lookup "err" doc of
|
return $ Just doc
|
||||||
Nothing -> return ()
|
|
||||||
Just err -> liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "code" doc) err
|
|
||||||
|
|
||||||
-- ** Insert
|
-- ** Insert
|
||||||
|
|
||||||
|
@ -456,7 +473,11 @@ insertBlock opts col docs = do
|
||||||
let sd = P.serverData p
|
let sd = P.serverData p
|
||||||
if (maxWireVersion sd < 2)
|
if (maxWireVersion sd < 2)
|
||||||
then do
|
then do
|
||||||
liftDB $ write (Insert (db <.> col) opts docs')
|
res <- liftDB $ write (Insert (db <.> col) opts docs')
|
||||||
|
when (isJust res) $ do
|
||||||
|
let jRes = fromJust res
|
||||||
|
let e = lookup "err" jRes
|
||||||
|
when (isJust e) $ liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "code" jRes) (fromJust e)
|
||||||
return $ map (valueAt "_id") docs'
|
return $ map (valueAt "_id") docs'
|
||||||
else do
|
else do
|
||||||
mode <- asks mongoWriteMode
|
mode <- asks mongoWriteMode
|
||||||
|
@ -612,33 +633,55 @@ update' ordered col updateDocs = do
|
||||||
-- the overall size
|
-- the overall size
|
||||||
(maxWriteBatchSize sd)
|
(maxWriteBatchSize sd)
|
||||||
updates
|
updates
|
||||||
forM_ chunks (updateBlock ordered col)
|
let lens = map length chunks
|
||||||
return UpdateResult
|
let lSums = 0 : (zipWith (+) lSums lens)
|
||||||
|
blocks <- forM (zip lSums chunks) (updateBlock ordered col) -- TODO update block can throw exception which will cause other blocks to fail. It's important when ordered is false
|
||||||
|
let updatedTotal = sum $ map nMatched blocks
|
||||||
|
let modifiedTotal =
|
||||||
|
if all isNothing $ map nModified blocks
|
||||||
|
then Nothing
|
||||||
|
else Just $ sum $ catMaybes $ map nModified blocks
|
||||||
|
|
||||||
|
let upsertedTotal = concat $ map upserted blocks
|
||||||
|
return $ UpdateResult updatedTotal modifiedTotal upsertedTotal []
|
||||||
|
|
||||||
updateBlock :: (MonadIO m)
|
updateBlock :: (MonadIO m)
|
||||||
=> Bool -> Collection -> [Document] -> Action m ()
|
=> Bool -> Collection -> (Int, [Document]) -> Action m UpdateResult
|
||||||
updateBlock ordered col docs = do
|
updateBlock ordered col (prevCount, docs) = do
|
||||||
p <- asks mongoPipe
|
p <- asks mongoPipe
|
||||||
let sd = P.serverData p
|
let sd = P.serverData p
|
||||||
if (maxWireVersion sd < 2)
|
if (maxWireVersion sd < 2)
|
||||||
then do
|
then do
|
||||||
db <- thisDatabase
|
db <- thisDatabase
|
||||||
ctx <- ask
|
ctx <- ask
|
||||||
errors <-
|
results <-
|
||||||
liftIO $ forM docs $ \updateDoc -> do
|
liftIO $ forM (zip [prevCount, (prevCount + 1) ..] docs) $ \(i, updateDoc) -> do
|
||||||
let doc = (at "u" updateDoc) :: Document
|
let doc = (at "u" updateDoc) :: Document
|
||||||
let sel = (at "q" updateDoc) :: Document
|
let sel = (at "q" updateDoc) :: Document
|
||||||
let upsrt = if at "upsert" updateDoc then [Upsert] else []
|
let upsrt = if at "upsert" updateDoc then [Upsert] else []
|
||||||
let multi = if at "multi" updateDoc then [MultiUpdate] else []
|
let multi = if at "multi" updateDoc then [MultiUpdate] else []
|
||||||
runReaderT (write (Update (db <.> col) (upsrt ++ multi) sel doc)) ctx
|
mRes <- runReaderT (write (Update (db <.> col) (upsrt ++ multi) sel doc)) ctx
|
||||||
return Nothing
|
case mRes of
|
||||||
|
Nothing -> return $ Right $ UpdateResult 0 Nothing [] []
|
||||||
|
Just resDoc -> do
|
||||||
|
let em = lookup "err" resDoc
|
||||||
|
case em of
|
||||||
|
Nothing -> do
|
||||||
|
let n = at "n" resDoc
|
||||||
|
let ups = do
|
||||||
|
upsValue <- lookup "upserted" resDoc
|
||||||
|
return $ Upserted i upsValue
|
||||||
|
return $ Right $ UpdateResult n Nothing (maybeToList ups) []
|
||||||
|
Just errV -> do
|
||||||
|
return $ Left $ WriteError i (at "code" resDoc) errV
|
||||||
`catch` \(e :: SomeException) -> do
|
`catch` \(e :: SomeException) -> do
|
||||||
when ordered $ liftIO $ throwIO e
|
when ordered $ liftIO $ throwIO e
|
||||||
return $ Just e
|
return $ Left $ WriteError i 0 (show e)
|
||||||
let onlyErrors = catMaybes errors
|
let onlyErrors = lefts results
|
||||||
if not $ null onlyErrors
|
let onlyUpdates = rights results
|
||||||
then liftIO $ throwIO $ WriteFailure 0 (show onlyErrors)
|
let totalnMatched = sum $ map nMatched onlyUpdates
|
||||||
else return ()
|
let totalUpserted = concat $ map upserted onlyUpdates
|
||||||
|
return $ UpdateResult totalnMatched Nothing totalUpserted onlyErrors
|
||||||
else do
|
else do
|
||||||
mode <- asks mongoWriteMode
|
mode <- asks mongoWriteMode
|
||||||
let writeConcern = case mode of
|
let writeConcern = case mode of
|
||||||
|
@ -660,6 +703,15 @@ updateBlock ordered col docs = do
|
||||||
(maybe 0 id $ lookup "ok" doc)
|
(maybe 0 id $ lookup "ok" doc)
|
||||||
(show err ++ show writeConcernErr)
|
(show err ++ show writeConcernErr)
|
||||||
|
|
||||||
|
let upsertedDocs = fromMaybe [] (doc !? "upserted")
|
||||||
|
return $ UpdateResult (at "n" doc) (at "nModified" doc) (map docToUpserted upsertedDocs) []
|
||||||
|
|
||||||
|
docToUpserted :: Document -> Upserted
|
||||||
|
docToUpserted doc = Upserted ind uid
|
||||||
|
where
|
||||||
|
ind = at "index" doc
|
||||||
|
uid = at "_id" doc
|
||||||
|
|
||||||
-- ** Delete
|
-- ** Delete
|
||||||
|
|
||||||
delete :: (MonadIO m)
|
delete :: (MonadIO m)
|
||||||
|
|
Loading…
Reference in a new issue