Replace IORefs with interruptibleFor
This commit is contained in:
parent
d9db9bca52
commit
c6bd17f1f4
1 changed files with 32 additions and 49 deletions
|
@ -651,19 +651,11 @@ update' ordered col updateDocs = do
|
||||||
updates
|
updates
|
||||||
let lens = map length chunks
|
let lens = map length chunks
|
||||||
let lSums = 0 : (zipWith (+) lSums lens)
|
let lSums = 0 : (zipWith (+) lSums lens)
|
||||||
errorDetected <- liftIO $ newIORef False
|
|
||||||
ctx <- ask
|
ctx <- ask
|
||||||
blocks <- forM (zip lSums chunks) $ \b -> liftIO $ do
|
blocks <- liftIO $ interruptibleFor ordered (zip lSums chunks) $ \b -> liftIO $ do
|
||||||
ed <- readIORef errorDetected
|
ur <- runReaderT (updateBlock ordered col b) ctx
|
||||||
if ed && ordered
|
return ur
|
||||||
then return $ UpdateResult True 0 Nothing [] [] []
|
|
||||||
else do
|
|
||||||
ur <- runReaderT (updateBlock ordered col b) ctx
|
|
||||||
when (failed ur) $ do
|
|
||||||
writeIORef errorDetected True
|
|
||||||
return ur
|
|
||||||
`catch` \(e :: SomeException) -> do
|
`catch` \(e :: SomeException) -> do
|
||||||
writeIORef errorDetected True
|
|
||||||
return $ UpdateResult True 0 Nothing [] [] [] -- TODO probably should be revised
|
return $ UpdateResult True 0 Nothing [] [] [] -- TODO probably should be revised
|
||||||
let failedTotal = or $ map failed blocks
|
let failedTotal = or $ map failed blocks
|
||||||
let updatedTotal = sum $ map nMatched blocks
|
let updatedTotal = sum $ map nMatched blocks
|
||||||
|
@ -720,46 +712,37 @@ updateBlockLegacy :: (MonadIO m)
|
||||||
updateBlockLegacy ordered col (prevCount, docs) = do
|
updateBlockLegacy ordered col (prevCount, docs) = do
|
||||||
db <- thisDatabase
|
db <- thisDatabase
|
||||||
ctx <- ask
|
ctx <- ask
|
||||||
errorDetected <- liftIO $ newIORef False
|
results <- liftIO $
|
||||||
results <-
|
interruptibleFor ordered (zip [prevCount, (prevCount + 1) ..] docs) $ \(i, updateDoc) -> do
|
||||||
liftIO $ forM (zip [prevCount, (prevCount + 1) ..] docs) $ \(i, updateDoc) -> do
|
let doc = (at "u" updateDoc) :: Document
|
||||||
ed <- readIORef errorDetected
|
let sel = (at "q" updateDoc) :: Document
|
||||||
if ed && ordered
|
let upsrt = if at "upsert" updateDoc then [Upsert] else []
|
||||||
then do
|
let multi = if at "multi" updateDoc then [MultiUpdate] else []
|
||||||
return $ UpdateResult True 0 Nothing [] [] []
|
mRes <- runReaderT (write (Update (db <.> col) (upsrt ++ multi) sel doc)) ctx
|
||||||
else do
|
case mRes of
|
||||||
let doc = (at "u" updateDoc) :: Document
|
Nothing -> return $ UpdateResult False 0 Nothing [] [] []
|
||||||
let sel = (at "q" updateDoc) :: Document
|
Just resDoc -> do
|
||||||
let upsrt = if at "upsert" updateDoc then [Upsert] else []
|
let em = lookup "err" resDoc
|
||||||
let multi = if at "multi" updateDoc then [MultiUpdate] else []
|
let eCode = lookup "code" resDoc
|
||||||
mRes <- runReaderT (write (Update (db <.> col) (upsrt ++ multi) sel doc)) ctx
|
let wtimeout = fromMaybe False $ lookup "wtimeout" resDoc
|
||||||
case mRes of
|
if isNothing em && isNothing eCode
|
||||||
Nothing -> return $ UpdateResult False 0 Nothing [] [] []
|
then do
|
||||||
Just resDoc -> do
|
let n = at "n" resDoc
|
||||||
let em = lookup "err" resDoc
|
let ups = do
|
||||||
let eCode = lookup "code" resDoc
|
upsValue <- lookup "upserted" resDoc
|
||||||
let wtimeout = fromMaybe False $ lookup "wtimeout" resDoc
|
return $ Upserted i upsValue
|
||||||
if isNothing em && isNothing eCode
|
return $ UpdateResult False n Nothing (maybeToList ups) [] []
|
||||||
|
else do
|
||||||
|
let defaultCode = if wtimeout then 64 else 24
|
||||||
|
let errV = fromMaybe "unknown error" em
|
||||||
|
let c = fromMaybe defaultCode eCode
|
||||||
|
if wtimeout
|
||||||
then do
|
then do
|
||||||
let n = at "n" resDoc
|
return $ UpdateResult True 0 Nothing [] [] [WriteConcernError c errV]
|
||||||
let ups = do
|
|
||||||
upsValue <- lookup "upserted" resDoc
|
|
||||||
return $ Upserted i upsValue
|
|
||||||
return $ UpdateResult False n Nothing (maybeToList ups) [] []
|
|
||||||
else do
|
else do
|
||||||
let defaultCode = if wtimeout then 64 else 24
|
return $ UpdateResult True 0 Nothing [] [WriteError i c errV] []
|
||||||
let errV = fromMaybe "unknown error" em
|
`catch` \(e :: SomeException) -> do
|
||||||
let c = fromMaybe defaultCode eCode
|
return $ UpdateResult True 0 Nothing [] [WriteError i 0 (show e)] []
|
||||||
if wtimeout
|
|
||||||
then do
|
|
||||||
writeIORef errorDetected True
|
|
||||||
return $ UpdateResult True 0 Nothing [] [] [WriteConcernError c errV]
|
|
||||||
else do
|
|
||||||
writeIORef errorDetected True
|
|
||||||
return $ UpdateResult True 0 Nothing [] [WriteError i c errV] []
|
|
||||||
`catch` \(e :: SomeException) -> do
|
|
||||||
writeIORef errorDetected True
|
|
||||||
return $ UpdateResult True 0 Nothing [] [WriteError i 0 (show e)] []
|
|
||||||
return $ foldl1' mergeUpdateResults results
|
return $ foldl1' mergeUpdateResults results
|
||||||
|
|
||||||
mergeUpdateResults :: UpdateResult -> UpdateResult -> UpdateResult
|
mergeUpdateResults :: UpdateResult -> UpdateResult -> UpdateResult
|
||||||
|
|
Loading…
Reference in a new issue