diff --git a/Database/MongoDB/Query.hs b/Database/MongoDB/Query.hs index 1f580dc..c8a787e 100644 --- a/Database/MongoDB/Query.hs +++ b/Database/MongoDB/Query.hs @@ -50,7 +50,7 @@ import Prelude hiding (lookup) import Control.Exception (Exception, throwIO, throw) import Control.Monad (unless, replicateM, liftM, forM, forM_) import Data.Int (Int32, Int64) -import Data.Maybe (listToMaybe, catMaybes, isNothing) +import Data.Maybe (listToMaybe, catMaybes, isNothing, maybeToList) import Data.Word (Word32) #if !MIN_VERSION_base(4,8,0) import Data.Monoid (mappend) @@ -75,8 +75,10 @@ import Control.Monad.Trans.Control (MonadBaseControl(..)) import Data.Binary.Put (runPut) import Data.Bson (Document, Field(..), Label, Val, Value(String, Doc, Bool), Javascript, at, valueAt, lookup, look, genObjectId, (=:), - (=?), (!?), Val(..)) + (=?), (!?), Val(..), ObjectId) import Data.Bson.Binary (putDocument) +import Data.Either (lefts, rights) +import Data.Maybe (fromJust, isJust) import Data.Text (Text) import qualified Data.Text as T @@ -148,6 +150,22 @@ type GetLastError = Document -- ^ Parameters for getLastError command. For example @[\"w\" =: 2]@ tells the server to wait for the write to reach at least two servers in replica set before acknowledging. See for more options. data UpdateResult = UpdateResult + { nMatched :: Int + , nModified :: Maybe Int -- Mongodb server before 2.6 doesn't allow to calculate this value. It's Nothing if we fail to do so + , upserted :: [Upserted] + , writeErrors :: [WriteError] + } deriving Show + +data Upserted = Upserted + { upsertedIndex :: Int + , upsertedId :: ObjectId + } deriving Show + +data WriteError = WriteError + { errIndex :: Int + , errCode :: Int + , errMsg :: String + } deriving Show data DeleteResult = DeleteResult @@ -371,12 +389,13 @@ data WriteMode = | Confirm GetLastError -- ^ Receive an acknowledgment after every write, and raise exception if one says the write failed. This is acomplished by sending the getLastError command, with given 'GetLastError' parameters, after every write. deriving (Show, Eq) -write :: Notice -> Action IO () +write :: Notice -> Action IO (Maybe Document) -- ^ Send write to server, and if write-mode is 'Safe' then include getLastError request and raise 'WriteFailure' if it reports an error. write notice = asks mongoWriteMode >>= \mode -> case mode of NoConfirm -> do pipe <- asks mongoPipe liftIOE ConnectionFailure $ P.send pipe [notice] + return Nothing Confirm params -> do let q = query (("getlasterror" =: (1 :: Int)) : params) "$cmd" pipe <- asks mongoPipe @@ -384,9 +403,7 @@ write notice = asks mongoWriteMode >>= \mode -> case mode of r <- queryRequest False q {limit = 1} rr <- liftIO $ request pipe [notice] r fulfill rr - case lookup "err" doc of - Nothing -> return () - Just err -> liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "code" doc) err + return $ Just doc -- ** Insert @@ -456,7 +473,11 @@ insertBlock opts col docs = do let sd = P.serverData p if (maxWireVersion sd < 2) then do - liftDB $ write (Insert (db <.> col) opts docs') + res <- liftDB $ write (Insert (db <.> col) opts docs') + when (isJust res) $ do + let jRes = fromJust res + let e = lookup "err" jRes + when (isJust e) $ liftIO $ throwIO $ WriteFailure (maybe 0 id $ lookup "code" jRes) (fromJust e) return $ map (valueAt "_id") docs' else do mode <- asks mongoWriteMode @@ -612,33 +633,55 @@ update' ordered col updateDocs = do -- the overall size (maxWriteBatchSize sd) updates - forM_ chunks (updateBlock ordered col) - return UpdateResult + let lens = map length chunks + let lSums = 0 : (zipWith (+) lSums lens) + blocks <- forM (zip lSums chunks) (updateBlock ordered col) -- TODO update block can throw exception which will cause other blocks to fail. It's important when ordered is false + let updatedTotal = sum $ map nMatched blocks + let modifiedTotal = + if all isNothing $ map nModified blocks + then Nothing + else Just $ sum $ catMaybes $ map nModified blocks + + let upsertedTotal = concat $ map upserted blocks + return $ UpdateResult updatedTotal modifiedTotal upsertedTotal [] updateBlock :: (MonadIO m) - => Bool -> Collection -> [Document] -> Action m () -updateBlock ordered col docs = do + => Bool -> Collection -> (Int, [Document]) -> Action m UpdateResult +updateBlock ordered col (prevCount, docs) = do p <- asks mongoPipe let sd = P.serverData p if (maxWireVersion sd < 2) then do db <- thisDatabase ctx <- ask - errors <- - liftIO $ forM docs $ \updateDoc -> do + results <- + liftIO $ forM (zip [prevCount, (prevCount + 1) ..] docs) $ \(i, updateDoc) -> do let doc = (at "u" updateDoc) :: Document let sel = (at "q" updateDoc) :: Document let upsrt = if at "upsert" updateDoc then [Upsert] else [] let multi = if at "multi" updateDoc then [MultiUpdate] else [] - runReaderT (write (Update (db <.> col) (upsrt ++ multi) sel doc)) ctx - return Nothing + mRes <- runReaderT (write (Update (db <.> col) (upsrt ++ multi) sel doc)) ctx + case mRes of + Nothing -> return $ Right $ UpdateResult 0 Nothing [] [] + Just resDoc -> do + let em = lookup "err" resDoc + case em of + Nothing -> do + let n = at "n" resDoc + let ups = do + upsValue <- lookup "upserted" resDoc + return $ Upserted i upsValue + return $ Right $ UpdateResult n Nothing (maybeToList ups) [] + Just errV -> do + return $ Left $ WriteError i (at "code" resDoc) errV `catch` \(e :: SomeException) -> do when ordered $ liftIO $ throwIO e - return $ Just e - let onlyErrors = catMaybes errors - if not $ null onlyErrors - then liftIO $ throwIO $ WriteFailure 0 (show onlyErrors) - else return () + return $ Left $ WriteError i 0 (show e) + let onlyErrors = lefts results + let onlyUpdates = rights results + let totalnMatched = sum $ map nMatched onlyUpdates + let totalUpserted = concat $ map upserted onlyUpdates + return $ UpdateResult totalnMatched Nothing totalUpserted onlyErrors else do mode <- asks mongoWriteMode let writeConcern = case mode of @@ -660,6 +703,15 @@ updateBlock ordered col docs = do (maybe 0 id $ lookup "ok" doc) (show err ++ show writeConcernErr) + let upsertedDocs = fromMaybe [] (doc !? "upserted") + return $ UpdateResult (at "n" doc) (at "nModified" doc) (map docToUpserted upsertedDocs) [] + +docToUpserted :: Document -> Upserted +docToUpserted doc = Upserted ind uid + where + ind = at "index" doc + uid = at "_id" doc + -- ** Delete delete :: (MonadIO m)