feat(rpc): background wallet sync

This commit is contained in:
Rene Vergara 2024-10-08 08:20:52 -05:00
parent 7410eed991
commit c75316ddd7
Signed by: pitmutt
GPG key ID: 65122AD495A7F5B2
7 changed files with 430 additions and 188 deletions

View file

@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `getnewaccount` RPC method
- `getnewaddress` RPC method
- `getoperationstatus` RPC method
- `sendmany` RPC method
- Function `prepareTxV2` implementing `PrivacyPolicy`
### Changed
@ -27,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Detection of changes in database schema for automatic re-scan
- Block tracking for chain re-org detection
- Refactored `ZcashPool`
- Preventing write operations to occur during wallet sync
## [0.6.0.0-beta]

View file

@ -2,15 +2,22 @@
module Server where
import Control.Concurrent (forkIO, threadDelay)
import Control.Exception (throwIO, try)
import Control.Monad (when)
import Control.Monad (forever, when)
import Data.Configurator
import Network.Wai.Handler.Warp (run)
import Servant
import ZcashHaskell.Types (ZebraGetBlockChainInfo(..), ZebraGetInfo(..))
import Zenith.Core (checkBlockChain, checkZebra)
import Zenith.DB (initDb)
import Zenith.RPC (State(..), ZenithRPC(..), authenticate, zenithServer)
import Zenith.RPC
( State(..)
, ZenithRPC(..)
, authenticate
, scanZebra
, zenithServer
)
import Zenith.Scanner (rescanZebra)
import Zenith.Types (Config(..))
@ -39,6 +46,12 @@ main = do
Left e2 -> throwIO $ userError e2
Right x' -> do
when x' $ rescanZebra zebraHost zebraPort dbFilePath
_ <-
forkIO $
forever $ do
_ <-
scanZebra dbFilePath zebraHost zebraPort (zgb_net chainInfo)
threadDelay 90000000
let myState =
State
(zgb_net chainInfo)

View file

@ -102,6 +102,7 @@ import Zenith.Types
, PrivacyPolicy(..)
, UnifiedAddressDB(..)
, ZcashNetDB(..)
, ZenithStatus(..)
)
import Zenith.Utils
( displayTaz
@ -752,6 +753,10 @@ scanZebra dbP zHost zPort b eChan znet = do
pool <- liftIO $ runNoLoggingT $ initPool dbP
dbBlock <- liftIO $ getMaxBlock pool $ ZcashNetDB znet
chkBlock <- liftIO $ checkIntegrity dbP zHost zPort dbBlock 1
syncChk <- liftIO $ isSyncing pool
if syncChk
then liftIO $ BC.writeBChan eChan $ TickMsg "Sync alread in progress"
else do
logDebugN $
"dbBlock: " <>
T.pack (show dbBlock) <> " chkBlock: " <> T.pack (show chkBlock)
@ -762,24 +767,31 @@ scanZebra dbP zHost zPort b eChan znet = do
else max chkBlock b
if sb > zgb_blocks bStatus || sb < 1
then do
liftIO $ BC.writeBChan eChan $ TickMsg "Invalid starting block for scan"
liftIO $
BC.writeBChan eChan $ TickMsg "Invalid starting block for scan"
else do
let bList = [(sb + 1) .. (zgb_blocks bStatus)]
if not (null bList)
then do
let step =
(1.0 :: Float) / fromIntegral (zgb_blocks bStatus - (sb + 1))
(1.0 :: Float) /
fromIntegral (zgb_blocks bStatus - (sb + 1))
_ <- liftIO $ startSync pool
mapM_ (liftIO . processBlock pool step) bList
else liftIO $ BC.writeBChan eChan $ TickVal 1.0
confUp <-
liftIO $ try $ updateConfs zHost zPort pool :: LoggingT
IO
(Either IOError ())
case confUp of
Left _e0 ->
Left _e0 -> do
_ <- liftIO $ completeSync pool Failed
liftIO $
BC.writeBChan eChan $ TickMsg "Failed to update unconfirmed transactions"
Right _ -> return ()
BC.writeBChan eChan $
TickMsg "Failed to update unconfirmed transactions"
Right _ -> do
_ <- liftIO $ completeSync pool Successful
return ()
else liftIO $ BC.writeBChan eChan $ TickVal 1.0
where
processBlock :: ConnectionPool -> Float -> Int -> IO ()
processBlock pool step bl = do
@ -791,7 +803,9 @@ scanZebra dbP zHost zPort b eChan znet = do
"getblock"
[Data.Aeson.String $ T.pack $ show bl, jsonNumber 1]
case r of
Left e1 -> liftIO $ BC.writeBChan eChan $ TickMsg e1
Left e1 -> do
_ <- liftIO $ completeSync pool Failed
liftIO $ BC.writeBChan eChan $ TickMsg e1
Right blk -> do
r2 <-
liftIO $
@ -801,7 +815,9 @@ scanZebra dbP zHost zPort b eChan znet = do
"getblock"
[Data.Aeson.String $ T.pack $ show bl, jsonNumber 0]
case r2 of
Left e2 -> liftIO $ BC.writeBChan eChan $ TickMsg e2
Left e2 -> do
_ <- liftIO $ completeSync pool Failed
liftIO $ BC.writeBChan eChan $ TickMsg e2
Right hb -> do
let blockTime = getBlockTime hb
bi <-

View file

@ -291,6 +291,13 @@ share
result T.Text Maybe
UniqueOp uuid
deriving Show Eq
ChainSync
name T.Text
start UTCTime
end UTCTime Maybe
status ZenithStatus
UniqueSync name
deriving Show Eq
|]
-- ** Type conversions
@ -2329,6 +2336,46 @@ finalizeOperation pool op status result = do
]
where_ (ops ^. OperationId ==. val op)
-- * Chain sync
-- | Check if the wallet is currently running a sync
isSyncing :: ConnectionPool -> IO Bool
isSyncing pool = do
s <-
runNoLoggingT $
PS.retryOnBusy $
flip PS.runSqlPool pool $
selectOne $ do
r <- from $ table @ChainSync
where_ $ r ^. ChainSyncStatus ==. val Processing
pure r
case s of
Nothing -> return False
Just _ -> return True
-- | Record the start of a sync
startSync :: ConnectionPool -> IO ()
startSync pool = do
start <- getCurrentTime
_ <-
runNoLoggingT $
PS.retryOnBusy $
flip PS.runSqlPool pool $
upsert (ChainSync "Internal" start Nothing Processing) []
return ()
-- | Complete a sync
completeSync :: ConnectionPool -> ZenithStatus -> IO ()
completeSync pool st = do
end <- getCurrentTime
_ <-
runNoLoggingT $
PS.retryOnBusy $
flip PS.runSqlPool pool $
update $ \s -> do
set s [ChainSyncEnd =. val (Just end), ChainSyncStatus =. val st]
where_ (s ^. ChainSyncName ==. val "Internal")
return ()
-- | Rewind the data store to a given block height
rewindWalletData :: ConnectionPool -> Int -> IO ()
rewindWalletData pool b = do

View file

@ -1475,7 +1475,6 @@ handleEvent wenv node model evt =
res <- liftIO $ updateAdrsInAdrBook pool d a a
return $ ShowMessage "Address Book entry updated!!"
-- model & recipientValid .~ ((model ^. privacyChoice) == Low) ]
scanZebra :: T.Text -> T.Text -> Int -> ZcashNet -> (AppEvent -> IO ()) -> IO ()
scanZebra dbPath zHost zPort net sendMsg = do
bStatus <- liftIO $ checkBlockChain zHost zPort
@ -1483,6 +1482,10 @@ scanZebra dbPath zHost zPort net sendMsg = do
b <- liftIO $ getMinBirthdayHeight pool
dbBlock <- getMaxBlock pool $ ZcashNetDB net
chkBlock <- checkIntegrity dbPath zHost zPort dbBlock 1
syncChk <- isSyncing pool
if syncChk
then sendMsg (ShowError "Sync already in progress")
else do
unless (chkBlock == dbBlock) $ rewindWalletData pool chkBlock
let sb =
if chkBlock == dbBlock
@ -1495,12 +1498,19 @@ scanZebra dbPath zHost zPort net sendMsg = do
if not (null bList)
then do
let step = (1.0 :: Float) / fromIntegral (length bList)
_ <- startSync pool
mapM_ (processBlock pool step) bList
else sendMsg (SyncVal 1.0)
confUp <- try $ updateConfs zHost zPort pool :: IO (Either IOError ())
confUp <-
try $ updateConfs zHost zPort pool :: IO (Either IOError ())
case confUp of
Left _e0 -> sendMsg (ShowError "Failed to update unconfirmed transactions")
Right _ -> return ()
Left _e0 -> do
_ <- completeSync pool Failed
sendMsg
(ShowError "Failed to update unconfirmed transactions")
Right _ -> do
_ <- completeSync pool Successful
return ()
else sendMsg (SyncVal 1.0)
where
processBlock :: ConnectionPool -> Float -> Int -> IO ()
processBlock pool step bl = do
@ -1512,7 +1522,9 @@ scanZebra dbPath zHost zPort net sendMsg = do
"getblock"
[Data.Aeson.String $ showt bl, jsonNumber 1]
case r of
Left e1 -> sendMsg (ShowError $ showt e1)
Left e1 -> do
_ <- completeSync pool Failed
sendMsg (ShowError $ showt e1)
Right blk -> do
r2 <-
liftIO $
@ -1522,7 +1534,9 @@ scanZebra dbPath zHost zPort net sendMsg = do
"getblock"
[Data.Aeson.String $ showt bl, jsonNumber 0]
case r2 of
Left e2 -> sendMsg (ShowError $ showt e2)
Left e2 -> do
_ <- completeSync pool Failed
sendMsg (ShowError $ showt e2)
Right hb -> do
let blockTime = getBlockTime hb
bi <-

View file

@ -14,8 +14,9 @@ module Zenith.RPC where
import Control.Concurrent (forkIO)
import Control.Exception (try)
import Control.Monad (unless, when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Logger (runFileLoggingT, runNoLoggingT)
import Control.Monad.Logger (runFileLoggingT, runNoLoggingT, runStderrLoggingT)
import Data.Aeson
import qualified Data.HexString as H
import Data.Int
@ -27,7 +28,8 @@ import qualified Data.UUID as U
import Data.UUID.V4 (nextRandom)
import qualified Data.Vector as V
import Database.Esqueleto.Experimental
( entityKey
( ConnectionPool
, entityKey
, entityVal
, fromSqlKey
, toSqlKey
@ -36,13 +38,27 @@ import Servant
import Text.Read (readMaybe)
import ZcashHaskell.Keys (generateWalletSeedPhrase)
import ZcashHaskell.Orchard (parseAddress)
import ZcashHaskell.Types (RpcError(..), Scope(..), ZcashNet(..))
import ZcashHaskell.Utils (makeZebraCall)
import Zenith.Core (createCustomWalletAddress, createZcashAccount, prepareTxV2)
import ZcashHaskell.Types
( BlockResponse(..)
, RpcError(..)
, Scope(..)
, ZcashNet(..)
, ZebraGetBlockChainInfo(..)
)
import ZcashHaskell.Utils (getBlockTime, makeZebraCall)
import Zenith.Core
( checkBlockChain
, createCustomWalletAddress
, createZcashAccount
, prepareTxV2
, syncWallet
)
import Zenith.DB
( Operation(..)
, ZcashAccount(..)
, ZcashBlock(..)
, ZcashWallet(..)
, completeSync
, finalizeOperation
, findNotesByAddress
, getAccountById
@ -53,24 +69,32 @@ import Zenith.DB
, getLastSyncBlock
, getMaxAccount
, getMaxAddress
, getMaxBlock
, getMinBirthdayHeight
, getOperation
, getPoolBalance
, getUnconfPoolBalance
, getWalletNotes
, getWallets
, initPool
, isSyncing
, rewindWalletData
, saveAccount
, saveAddress
, saveBlock
, saveOperation
, saveWallet
, startSync
, toZcashAccountAPI
, toZcashAddressAPI
, toZcashWalletAPI
, walletExists
)
import Zenith.Scanner (checkIntegrity, processTx, updateConfs)
import Zenith.Types
( AccountBalance(..)
, Config(..)
, HexStringDB(..)
, PhraseDB(..)
, PrivacyPolicy(..)
, ProposedNote(..)
@ -622,8 +646,16 @@ zenithServer state = getinfo :<|> handleRPC
case parameters req of
NameParams t -> do
let dbPath = w_dbPath state
sP <- liftIO generateWalletSeedPhrase
pool <- liftIO $ runNoLoggingT $ initPool dbPath
syncChk <- liftIO $ isSyncing pool
if syncChk
then return $
ErrorResponse
(callId req)
(-32012)
"The Zenith server is syncing, please try again later."
else do
sP <- liftIO generateWalletSeedPhrase
r <-
liftIO $
saveWallet pool $
@ -650,6 +682,14 @@ zenithServer state = getinfo :<|> handleRPC
NameIdParams t i -> do
let dbPath = w_dbPath state
pool <- liftIO $ runNoLoggingT $ initPool dbPath
syncChk <- liftIO $ isSyncing pool
if syncChk
then return $
ErrorResponse
(callId req)
(-32012)
"The Zenith server is syncing, please try again later."
else do
w <- liftIO $ walletExists pool i
case w of
Just w' -> do
@ -677,7 +717,10 @@ zenithServer state = getinfo :<|> handleRPC
fromSqlKey $ entityKey x
Nothing ->
return $
ErrorResponse (callId req) (-32008) "Wallet does not exist."
ErrorResponse
(callId req)
(-32008)
"Wallet does not exist."
_anyOtherParams ->
return $ ErrorResponse (callId req) (-32602) "Invalid params"
GetNewAddress ->
@ -686,7 +729,16 @@ zenithServer state = getinfo :<|> handleRPC
let dbPath = w_dbPath state
let net = w_network state
pool <- liftIO $ runNoLoggingT $ initPool dbPath
acc <- liftIO $ getAccountById pool $ toSqlKey $ fromIntegral i
syncChk <- liftIO $ isSyncing pool
if syncChk
then return $
ErrorResponse
(callId req)
(-32012)
"The Zenith server is syncing, please try again later."
else do
acc <-
liftIO $ getAccountById pool $ toSqlKey $ fromIntegral i
case acc of
Just acc' -> do
maxAddr <-
@ -705,7 +757,9 @@ zenithServer state = getinfo :<|> handleRPC
case dbAddr of
Just nAddr -> do
return $
NewAddrResponse (callId req) (toZcashAddressAPI nAddr)
NewAddrResponse
(callId req)
(toZcashAddressAPI nAddr)
Nothing ->
return $
ErrorResponse
@ -714,7 +768,10 @@ zenithServer state = getinfo :<|> handleRPC
"Entity with that name already exists."
Nothing ->
return $
ErrorResponse (callId req) (-32006) "Account does not exist."
ErrorResponse
(callId req)
(-32006)
"Account does not exist."
_anyOtherParams ->
return $ ErrorResponse (callId req) (-32602) "Invalid params"
GetOperationStatus ->
@ -739,15 +796,29 @@ zenithServer state = getinfo :<|> handleRPC
let zPort = w_port state
let znet = w_network state
pool <- liftIO $ runNoLoggingT $ initPool dbPath
syncChk <- liftIO $ isSyncing pool
if syncChk
then return $
ErrorResponse
(callId req)
(-32012)
"The Zenith server is syncing, please try again later."
else do
opid <- liftIO nextRandom
startTime <- liftIO getCurrentTime
opkey <-
liftIO $
saveOperation pool $
Operation (ZenithUuid opid) startTime Nothing Processing Nothing
Operation
(ZenithUuid opid)
startTime
Nothing
Processing
Nothing
case opkey of
Nothing ->
return $ ErrorResponse (callId req) (-32010) "Internal Error"
return $
ErrorResponse (callId req) (-32010) "Internal Error"
Just opkey' -> do
acc <-
liftIO $ getAccountById pool $ toSqlKey $ fromIntegral a
@ -812,3 +883,67 @@ authenticate config = BasicAuthCheck check
packRpcResponse :: ToJSON a => T.Text -> a -> Value
packRpcResponse i x =
object ["jsonrpc" .= ("2.0" :: String), "id" .= i, "result" .= x]
scanZebra :: T.Text -> T.Text -> Int -> ZcashNet -> IO ()
scanZebra dbPath zHost zPort net = do
bStatus <- checkBlockChain zHost zPort
pool <- runNoLoggingT $ initPool dbPath
b <- getMinBirthdayHeight pool
dbBlock <- getMaxBlock pool $ ZcashNetDB net
chkBlock <- checkIntegrity dbPath zHost zPort dbBlock 1
syncChk <- isSyncing pool
unless syncChk $ do
unless (chkBlock == dbBlock) $ rewindWalletData pool chkBlock
let sb =
if chkBlock == dbBlock
then max dbBlock b
else max chkBlock b
unless (sb > zgb_blocks bStatus || sb < 1) $ do
let bList = [(sb + 1) .. (zgb_blocks bStatus)]
unless (null bList) $ do
_ <- startSync pool
mapM_ (processBlock pool) bList
confUp <- try $ updateConfs zHost zPort pool :: IO (Either IOError ())
case confUp of
Left _e0 -> do
_ <- completeSync pool Failed
return ()
Right _ -> do
wals <- getWallets pool net
runStderrLoggingT $
mapM_
(syncWallet (Config dbPath zHost zPort "user" "pwd" 8080))
wals
_ <- completeSync pool Successful
return ()
where
processBlock :: ConnectionPool -> Int -> IO ()
processBlock pool bl = do
r <-
makeZebraCall
zHost
zPort
"getblock"
[Data.Aeson.String $ T.pack (show bl), jsonNumber 1]
case r of
Left _ -> completeSync pool Failed
Right blk -> do
r2 <-
makeZebraCall
zHost
zPort
"getblock"
[Data.Aeson.String $ T.pack (show bl), jsonNumber 0]
case r2 of
Left _ -> completeSync pool Failed
Right hb -> do
let blockTime = getBlockTime hb
bi <-
saveBlock pool $
ZcashBlock
(fromIntegral $ bl_height blk)
(HexStringDB $ bl_hash blk)
(fromIntegral $ bl_confirmations blk)
blockTime
(ZcashNetDB net)
mapM_ (processTx zHost zPort bi pool) $ bl_txs blk

View file

@ -37,6 +37,7 @@ import Zenith.DB
, ZcashBlockId
, clearWalletData
, clearWalletTransactions
, completeSync
, getBlock
, getMaxBlock
, getMinBirthdayHeight
@ -47,10 +48,16 @@ import Zenith.DB
, saveBlock
, saveConfs
, saveTransaction
, startSync
, updateWalletSync
, upgradeQrTable
)
import Zenith.Types (Config(..), HexStringDB(..), ZcashNetDB(..))
import Zenith.Types
( Config(..)
, HexStringDB(..)
, ZcashNetDB(..)
, ZenithStatus(..)
)
import Zenith.Utils (jsonNumber)
-- | Function to scan the Zcash blockchain through the Zebra node and populate the Zenith database
@ -74,6 +81,7 @@ rescanZebra host port dbFilePath = do
upgradeQrTable pool1
clearWalletTransactions pool1
clearWalletData pool1
_ <- startSync pool1
dbBlock <- getMaxBlock pool1 znet
b <- liftIO $ getMinBirthdayHeight pool1
let sb = max dbBlock b
@ -99,6 +107,7 @@ rescanZebra host port dbFilePath = do
{-mapM_ (processBlock host port pool2 pg2 znet) bl2 `concurrently_`-}
{-mapM_ (processBlock host port pool3 pg3 znet) bl3-}
print "Please wait..."
_ <- completeSync pool1 Successful
print "Rescan complete"
-- | Function to process a raw block and extract the transaction information
@ -119,7 +128,9 @@ processBlock host port pool pg net b = do
"getblock"
[Data.Aeson.String $ T.pack $ show b, jsonNumber 1]
case r of
Left e -> liftIO $ throwIO $ userError e
Left e -> do
_ <- completeSync pool Failed
liftIO $ throwIO $ userError e
Right blk -> do
r2 <-
liftIO $
@ -129,7 +140,9 @@ processBlock host port pool pg net b = do
"getblock"
[Data.Aeson.String $ T.pack $ show b, jsonNumber 0]
case r2 of
Left e2 -> liftIO $ throwIO $ userError e2
Left e2 -> do
_ <- completeSync pool Failed
liftIO $ throwIO $ userError e2
Right hb -> do
let blockTime = getBlockTime hb
bi <-
@ -160,7 +173,9 @@ processTx host port bt pool t = do
"getrawtransaction"
[Data.Aeson.String $ toText t, jsonNumber 1]
case r of
Left e -> liftIO $ throwIO $ userError e
Left e -> do
_ <- completeSync pool Failed
liftIO $ throwIO $ userError e
Right rawTx -> do
case readZebraTransaction (ztr_hex rawTx) of
Nothing -> return ()