atomic rollback for wallet (#10799)

* atomic rollback for wallet
This commit is contained in:
wjblanke 2022-03-22 11:35:35 -07:00 committed by GitHub
parent b8e1f3b9a9
commit 171abb03ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 49 deletions

View File

@ -80,7 +80,7 @@ class PeerRequestCache:
new_states_validated = LRUCache(self._states_validated.capacity)
for k, cs_height in self._states_validated.cache.items():
if cs_height is not None:
if cs_height is not None and cs_height <= height:
new_states_validated.put(k, cs_height)
self._states_validated = new_states_validated

View File

@ -467,6 +467,28 @@ class WalletNode:
if self.wallet_peers is not None:
await self.wallet_peers.on_connect(peer)
async def perform_atomic_rollback(self, fork_height: int, cache: Optional[PeerRequestCache] = None):
assert self.wallet_state_manager is not None
self.log.info(f"perform_atomic_rollback to {fork_height}")
async with self.wallet_state_manager.db_wrapper.lock:
try:
await self.wallet_state_manager.db_wrapper.begin_transaction()
await self.wallet_state_manager.reorg_rollback(fork_height)
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(fork_height)
if cache is None:
self.rollback_request_caches(fork_height)
else:
cache.clear_after_height(fork_height)
await self.wallet_state_manager.db_wrapper.commit_transaction()
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Exception while perform_atomic_rollback: {e} {tb}")
await self.wallet_state_manager.db_wrapper.rollback_transaction()
await self.wallet_state_manager.coin_store.rebuild_wallet_cache()
await self.wallet_state_manager.tx_store.rebuild_tx_cache()
await self.wallet_state_manager.pool_store.rebuild_cache()
raise
async def long_sync(
self,
target_height: uint32,
@ -500,8 +522,8 @@ class WalletNode:
start_time = time.time()
if rollback:
await self.wallet_state_manager.reorg_rollback(fork_height)
self.rollback_request_caches(fork_height)
# we should clear all peers since this is a full rollback
await self.perform_atomic_rollback(fork_height)
await self.update_ui()
# We only process new state updates to avoid slow reprocessing. We set the sync height after adding
@ -590,14 +612,21 @@ class WalletNode:
if self.validation_semaphore is None:
self.validation_semaphore = asyncio.Semaphore(6)
# Rollback is handled in wallet_short_sync_backtrack for untrusted peers, so we don't need to do it here.
# Also it's not safe to rollback, an untrusted peer can give us old fork point and make our TX dissapear.
# wallet_short_sync_backtrack can safely rollback because we validated the weight for the new peak so we
# know the peer is telling the truth about the reorg.
# If there is a fork, we need to ensure that we roll back in trusted mode to properly handle reorgs
if trusted and fork_height is not None and height is not None and fork_height != height - 1:
await self.wallet_state_manager.reorg_rollback(fork_height)
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(fork_height)
cache: PeerRequestCache = self.get_cache_for_peer(peer)
if fork_height is not None:
cache.clear_after_height(fork_height)
self.log.info(f"Rolling back to {fork_height}")
if trusted and fork_height is not None and height is not None and fork_height != height - 1:
# only one peer told us to rollback so only clear for that peer
await self.perform_atomic_rollback(fork_height, cache=cache)
else:
if fork_height is not None:
# only one peer told us to rollback so only clear for that peer
cache.clear_after_height(fork_height)
self.log.info(f"clear_after_height {fork_height} for peer {peer}")
all_tasks: List[asyncio.Task] = []
target_concurrent_tasks: int = 20
@ -630,7 +659,6 @@ class WalletNode:
if self.wallet_state_manager is None:
return
try:
await self.wallet_state_manager.db_wrapper.commit_transaction()
await self.wallet_state_manager.db_wrapper.begin_transaction()
await self.wallet_state_manager.new_coin_state(valid_states, peer, fork_height)
@ -674,13 +702,12 @@ class WalletNode:
async with self.wallet_state_manager.db_wrapper.lock:
try:
self.log.info(f"new coin state received ({idx}-" f"{idx + len(states) - 1}/ {len(items)})")
await self.wallet_state_manager.db_wrapper.commit_transaction()
await self.wallet_state_manager.db_wrapper.begin_transaction()
await self.wallet_state_manager.new_coin_state(states, peer, fork_height)
await self.wallet_state_manager.db_wrapper.commit_transaction()
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(
last_change_height_cs(states[-1]) - 1, in_transaction=True
)
await self.wallet_state_manager.db_wrapper.commit_transaction()
except Exception as e:
await self.wallet_state_manager.db_wrapper.rollback_transaction()
await self.wallet_state_manager.coin_store.rebuild_wallet_cache()
@ -1030,9 +1057,9 @@ class WalletNode:
peak_height = self.wallet_state_manager.blockchain.get_peak_height()
if fork_height < peak_height:
self.log.info(f"Rolling back to {fork_height}")
await self.wallet_state_manager.reorg_rollback(fork_height)
# we should clear all peers since this is a full rollback
await self.perform_atomic_rollback(fork_height)
await self.update_ui()
self.rollback_request_caches(fork_height)
if peak is not None:
assert header_block.weight >= peak.weight

View File

@ -4,7 +4,6 @@ import logging
import multiprocessing
import multiprocessing.context
import time
import traceback
from collections import defaultdict
from pathlib import Path
from secrets import token_bytes
@ -1103,41 +1102,28 @@ class WalletStateManager:
Rolls back and updates the coin_store and transaction store. It's possible this height
is the tip, or even beyond the tip.
"""
try:
await self.db_wrapper.commit_transaction()
await self.db_wrapper.begin_transaction()
await self.coin_store.rollback_to_block(height)
reorged: List[TransactionRecord] = await self.tx_store.get_transaction_above(height)
await self.tx_store.rollback_to_block(height)
for record in reorged:
if record.type in [
TransactionType.OUTGOING_TX,
TransactionType.OUTGOING_TRADE,
TransactionType.INCOMING_TRADE,
]:
await self.tx_store.tx_reorged(record, in_transaction=True)
self.tx_pending_changed()
await self.coin_store.rollback_to_block(height)
reorged: List[TransactionRecord] = await self.tx_store.get_transaction_above(height)
await self.tx_store.rollback_to_block(height)
for record in reorged:
if record.type in [
TransactionType.OUTGOING_TX,
TransactionType.OUTGOING_TRADE,
TransactionType.INCOMING_TRADE,
]:
await self.tx_store.tx_reorged(record, in_transaction=True)
self.tx_pending_changed()
# Removes wallets that were created from a blockchain transaction which got reorged.
remove_ids = []
for wallet_id, wallet in self.wallets.items():
if wallet.type() == WalletType.POOLING_WALLET.value:
remove: bool = await wallet.rewind(height, in_transaction=True)
if remove:
remove_ids.append(wallet_id)
for wallet_id in remove_ids:
await self.user_store.delete_wallet(wallet_id, in_transaction=True)
self.wallets.pop(wallet_id)
await self.db_wrapper.commit_transaction()
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Exception while rolling back: {e} {tb}")
await self.db_wrapper.rollback_transaction()
await self.coin_store.rebuild_wallet_cache()
await self.tx_store.rebuild_tx_cache()
await self.pool_store.rebuild_cache()
raise
# Removes wallets that were created from a blockchain transaction which got reorged.
remove_ids = []
for wallet_id, wallet in self.wallets.items():
if wallet.type() == WalletType.POOLING_WALLET.value:
remove: bool = await wallet.rewind(height, in_transaction=True)
if remove:
remove_ids.append(wallet_id)
for wallet_id in remove_ids:
await self.user_store.delete_wallet(wallet_id, in_transaction=True)
self.wallets.pop(wallet_id)
async def _await_closed(self) -> None:
await self.db_connection.close()