wallet: Handle some peer closed cases better (#14642)

* Move some logs to `DEBUG` for disconnected peers

* Introduce `_add_coin_states` to log and cleanup cache in one place only

* Bail out early for disconnected peers in some places

* Consider renaming from #14460
This commit is contained in:
dustinface 2023-04-07 02:48:19 +07:00 committed by GitHub
parent d819d8ffc5
commit bd571de422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 34 deletions

View File

@ -505,14 +505,9 @@ class WalletNode:
peer = matching_peer[0]
async with self.wallet_state_manager.db_wrapper.writer():
self.log.info(f"retrying coin_state: {state}")
try:
await self.wallet_state_manager.add_coin_states(
[state], peer, None if fork_height == 0 else fork_height
)
except Exception as e:
self.log.exception(f"Exception while adding states.. : {e}")
else:
await self.wallet_state_manager.blockchain.clean_block_records()
await self.wallet_state_manager.add_coin_states(
[state], peer, None if fork_height == 0 else fork_height
)
except asyncio.CancelledError:
self.log.info("Retry task cancelled, exiting.")
raise
@ -864,20 +859,11 @@ class WalletNode:
f"new coin state received ({inner_idx_start}-"
f"{inner_idx_start + len(inner_states) - 1}/ {len(items)})"
)
try:
await self.wallet_state_manager.add_coin_states(valid_states, peer, fork_height)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Exception while adding state: {e} {tb}")
else:
await self.wallet_state_manager.blockchain.clean_block_records()
await self.wallet_state_manager.add_coin_states(valid_states, peer, fork_height)
except Exception as e:
tb = traceback.format_exc()
if self._shut_down:
self.log.debug(f"Shutting down while adding state : {e} {tb}")
else:
self.log.error(f"Exception while adding state: {e} {tb}")
log_level = logging.DEBUG if peer.closed or self._shut_down else logging.ERROR
self.log.log(log_level, f"validate_and_add failed - exception: {e}, traceback: {tb}")
idx = 1
# Keep chunk size below 1000 just in case, windows has sqlite limits of 999 per query
@ -894,16 +880,9 @@ class WalletNode:
return False
if trusted:
async with self.wallet_state_manager.db_wrapper.writer():
try:
self.log.info(f"new coin state received ({idx}-{idx + len(states) - 1}/ {len(items)})")
await self.wallet_state_manager.add_coin_states(states, peer, fork_height)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Error adding states.. {e} {tb}")
self.log.info(f"new coin state received ({idx}-{idx + len(states) - 1}/ {len(items)})")
if not await self.wallet_state_manager.add_coin_states(states, peer, fork_height):
return False
else:
await self.wallet_state_manager.blockchain.clean_block_records()
else:
while len(all_tasks) >= target_concurrent_tasks:
all_tasks = [task for task in all_tasks if not task.done()]
@ -1306,6 +1285,8 @@ class WalletNode:
Returns all state that is valid and included in the blockchain proved by the weight proof. If return_old_states
is False, only new states that are not in the coin_store are returned.
"""
if peer.closed:
return False
# Only use the cache if we are talking about states before the fork point. If we are evaluating something
# in a reorg, we cannot use the cache, since we don't know if it's actually in the new chain after the reorg.
if can_use_peer_request_cache(coin_state, peer_request_cache, fork_height):
@ -1512,10 +1493,8 @@ class WalletNode:
start, end, peer_request_cache, all_peers
)
if blocks is None:
if self._shut_down:
self.log.debug(f"Shutting down, block fetching from: {start} to {end} canceled.")
else:
self.log.error(f"Error fetching blocks {start} {end}")
log_level = logging.DEBUG if self._shut_down or peer.closed else logging.ERROR
self.log.log(log_level, f"Error fetching blocks {start} {end}")
return False
if compare_to_recent and weight_proof.recent_chain_data[0].header_hash != blocks[-1].header_hash:

View File

@ -5,6 +5,7 @@ import json
import logging
import multiprocessing.context
import time
import traceback
from contextlib import asynccontextmanager
from pathlib import Path
from secrets import token_bytes
@ -965,7 +966,7 @@ class WalletStateManager:
wallet_identifier = WalletIdentifier.create(new_nft_wallet)
return wallet_identifier
async def add_coin_states(
async def _add_coin_states(
self,
coin_states: List[CoinState],
peer: WSChiaConnection,
@ -989,6 +990,8 @@ class WalletStateManager:
local_records = await self.coin_store.get_coin_records(coin_names)
for coin_name, coin_state in zip(coin_names, coin_states):
if peer.closed:
raise ConnectionError("Connection closed")
self.log.debug("Add coin state: %s: %s", coin_name, coin_state)
local_record = local_records.get(coin_name)
rollback_wallets = None
@ -1340,6 +1343,23 @@ class WalletStateManager:
await self.retry_store.remove_state(coin_state)
continue
async def add_coin_states(
self,
coin_states: List[CoinState],
peer: WSChiaConnection,
fork_height: Optional[uint32],
) -> bool:
try:
await self._add_coin_states(coin_states, peer, fork_height)
except Exception as e:
log_level = logging.DEBUG if peer.closed else logging.ERROR
self.log.log(log_level, f"add_coin_states failed - exception {e}, traceback: {traceback.format_exc()}")
return False
await self.blockchain.clean_block_records()
return True
async def have_a_pool_wallet_with_launched_id(self, launcher_id: bytes32) -> bool:
for wallet_id, wallet in self.wallets.items():
if wallet.type() == WalletType.POOLING_WALLET: