Ms.sync cancel (#10244)

* Start fixing other issues

* Fork point, sync cancelling, random peer

* Reduce logging

* Improve performance and have a fallback peer for fetching

* Disconnect not synced peers

* Make sure try catch doesn't fail

* Fix lint

* Update chia/wallet/wallet_node.py

Co-authored-by: Kyle Altendorf <sda@fstab.net>

* Pylint has a bug so ignore pylint for this line

Co-authored-by: Kyle Altendorf <sda@fstab.net>
This commit is contained in:
Mariano Sorgente 2022-02-17 16:35:41 -05:00 committed by GitHub
parent fad3f88b13
commit 52e439ccbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 172 additions and 77 deletions

View File

@ -30,6 +30,7 @@ VALID_REPLY_MESSAGE_MAP = {
pmt.request_signage_point_or_end_of_sub_slot: [pmt.respond_signage_point, pmt.respond_end_of_sub_slot],
pmt.request_compact_vdf: [pmt.respond_compact_vdf],
pmt.request_peers: [pmt.respond_peers],
pmt.request_header_blocks: [pmt.respond_header_blocks, pmt.reject_header_blocks],
}

View File

@ -383,7 +383,7 @@ class WalletRpcApi:
async def get_height_info(self, request: Dict):
assert self.service.wallet_state_manager is not None
height = self.service.wallet_state_manager.blockchain.get_peak_height()
height = await self.service.wallet_state_manager.blockchain.get_finished_sync_up_to()
return {"height": height}
async def get_network_info(self, request: Dict):

View File

@ -80,9 +80,9 @@ class Service:
chia_ca_crt, chia_ca_key = chia_ssl_ca_paths(root_path, self.config)
inbound_rlp = self.config.get("inbound_rate_limit_percent")
outbound_rlp = self.config.get("outbound_rate_limit_percent")
if NodeType == NodeType.WALLET:
if node_type == NodeType.WALLET:
inbound_rlp = service_config.get("inbound_rate_limit_percent", inbound_rlp)
outbound_rlp = service_config.get("outbound_rate_limit_percent", 60)
outbound_rlp = 60
assert inbound_rlp and outbound_rlp
self._server = ChiaServer(

View File

@ -1,4 +1,6 @@
import asyncio
import logging
import random
from typing import List, Optional, Tuple, Union, Dict
from chia.consensus.constants import ConsensusConstants
@ -14,6 +16,9 @@ from chia.protocols.wallet_protocol import (
CoinState,
RespondToPhUpdates,
RespondToCoinUpdates,
RespondHeaderBlocks,
RequestHeaderBlocks,
RejectHeaderBlocks,
)
from chia.server.ws_connection import WSChiaConnection
from chia.types.blockchain_format.coin import hash_coin_list, Coin
@ -22,7 +27,7 @@ from chia.types.full_block import FullBlock
from chia.types.header_block import HeaderBlock
from chia.util.ints import uint32
from chia.util.merkle_set import confirm_not_included_already_hashed, confirm_included_already_hashed, MerkleSet
from chia.wallet.util.peer_request_cache import PeerRequestCache
log = logging.getLogger(__name__)
@ -270,3 +275,75 @@ def get_block_challenge(
curr = all_blocks.get(curr.prev_header_hash, None)
challenge = reversed_challenge_hashes[challenges_to_look_for - 1]
return challenge
def last_change_height_cs(cs: CoinState) -> uint32:
if cs.spent_height is not None:
return cs.spent_height
if cs.created_height is not None:
return cs.created_height
return uint32(0)
async def _fetch_header_blocks_inner(
all_peers: List[WSChiaConnection], selected_peer_node_id: bytes32, request: RequestHeaderBlocks
) -> Optional[RespondHeaderBlocks]:
if len(all_peers) == 0:
return None
random_peer: WSChiaConnection = random.choice(all_peers)
res = await random_peer.request_header_blocks(request)
if isinstance(res, RespondHeaderBlocks):
return res
elif isinstance(res, RejectHeaderBlocks):
# Peer is not synced, close connection
await random_peer.close()
bad_peer_id = random_peer.peer_node_id
if len(all_peers) == 1:
# No more peers to fetch from
return None
else:
if selected_peer_node_id == bad_peer_id:
# Select another peer fallback
while random_peer != bad_peer_id and len(all_peers) > 1:
random_peer = random.choice(all_peers)
else:
# Use the selected peer instead
random_peer = [p for p in all_peers if p.peer_node_id == selected_peer_node_id][0]
# Retry
res = await random_peer.request_header_blocks(request)
if isinstance(res, RespondHeaderBlocks):
return res
else:
return None
async def fetch_header_blocks_in_range(
start: uint32,
end: uint32,
peer_request_cache: PeerRequestCache,
all_peers: List[WSChiaConnection],
selected_peer_id: bytes32,
) -> Optional[List[HeaderBlock]]:
blocks: List[HeaderBlock] = []
for i in range(start - (start % 32), end + 1, 32):
request_start = min(uint32(i), end)
request_end = min(uint32(i + 31), end)
if (request_start, request_end) in peer_request_cache.block_requests:
res_h_blocks_task: asyncio.Task = peer_request_cache.block_requests[(request_start, request_end)]
if res_h_blocks_task.done():
res_h_blocks: Optional[RespondHeaderBlocks] = res_h_blocks_task.result()
else:
res_h_blocks = await res_h_blocks_task
else:
request_header_blocks = RequestHeaderBlocks(request_start, request_end)
res_h_blocks_task = asyncio.create_task(
_fetch_header_blocks_inner(all_peers, selected_peer_id, request_header_blocks)
)
peer_request_cache.block_requests[(request_start, request_end)] = res_h_blocks_task
res_h_blocks = await res_h_blocks_task
if res_h_blocks is None:
return None
assert res_h_blocks is not None
blocks.extend([bl for bl in res_h_blocks.header_blocks if bl.height >= start])
return blocks

View File

@ -181,7 +181,8 @@ class WalletBlockchain(BlockchainInterface):
return await self._basic_store.get_object("PEAK_BLOCK", HeaderBlock)
async def set_finished_sync_up_to(self, height: uint32):
await self._basic_store.set_object("FINISHED_SYNC_UP_TO", height)
if height > await self.get_finished_sync_up_to():
await self._basic_store.set_object("FINISHED_SYNC_UP_TO", height)
async def get_finished_sync_up_to(self):
h: Optional[uint32] = await self._basic_store.get_object("FINISHED_SYNC_UP_TO", uint32)

View File

@ -1,6 +1,7 @@
import asyncio
import json
import logging
import random
import time
import traceback
from asyncio import CancelledError
@ -32,7 +33,6 @@ from chia.protocols.wallet_protocol import (
RequestSESInfo,
RespondSESInfo,
RequestHeaderBlocks,
RespondHeaderBlocks,
)
from chia.server.node_discovery import WalletPeers
from chia.server.outbound_message import Message, NodeType, make_msg
@ -61,6 +61,8 @@ from chia.wallet.util.wallet_sync_utils import (
fetch_last_tx_from_peer,
subscribe_to_phs,
subscribe_to_coin_updates,
last_change_height_cs,
fetch_header_blocks_in_range,
)
from chia.wallet.wallet_coin_record import WalletCoinRecord
from chia.wallet.wallet_state_manager import WalletStateManager
@ -505,6 +507,8 @@ class WalletNode:
self.rollback_request_caches(fork_height)
await self.update_ui()
# We only process new state updates to avoid slow reprocessing. We set the sync height after adding
# Things, so we don't have to reprocess these later. There can be many things in ph_update_res.
already_checked_ph: Set[bytes32] = set()
continue_while: bool = True
all_puzzle_hashes: List[bytes32] = await self.get_puzzle_hashes_to_subscribe()
@ -516,7 +520,10 @@ class WalletNode:
[p for p in chunk if p not in already_checked_ph], full_node, 0
)
ph_update_res = list(filter(is_new_state_update, ph_update_res))
await self.receive_state_from_peer(ph_update_res, full_node)
ph_update_res.sort(key=last_change_height_cs)
if not await self.receive_state_from_peer(ph_update_res, full_node, update_finished_height=True):
# If something goes wrong, abort sync
return
already_checked_ph.update(chunk)
# Check if new puzzle hashed have been created
@ -529,18 +536,23 @@ class WalletNode:
break
self.log.info(f"Successfully subscribed and updated {len(already_checked_ph)} puzzle hashes")
# The number of coin id updates are usually going to be significantly less than ph updates, so we can
# sync from 0 every time.
continue_while = False
all_coin_ids: List[bytes32] = await self.get_coin_ids_to_subscribe(fork_height)
all_coin_ids: List[bytes32] = await self.get_coin_ids_to_subscribe(0)
already_checked_coin_ids: Set[bytes32] = set()
while continue_while:
one_k_chunks = chunks(all_coin_ids, 1000)
for chunk in one_k_chunks:
c_update_res: List[CoinState] = await subscribe_to_coin_updates(chunk, full_node, 0)
c_update_res = list(filter(is_new_state_update, c_update_res))
await self.receive_state_from_peer(c_update_res, full_node)
c_update_res.sort(key=last_change_height_cs)
if not await self.receive_state_from_peer(c_update_res, full_node):
# If something goes wrong, abort sync
return
already_checked_coin_ids.update(chunk)
all_coin_ids = await self.get_coin_ids_to_subscribe(fork_height)
all_coin_ids = await self.get_coin_ids_to_subscribe(0)
continue_while = False
for coin_id in all_coin_ids:
if coin_id not in already_checked_coin_ids:
@ -548,8 +560,8 @@ class WalletNode:
break
self.log.info(f"Successfully subscribed and updated {len(already_checked_coin_ids)} coin ids")
if target_height > await self.wallet_state_manager.blockchain.get_finished_sync_up_to():
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(target_height)
# Only update this fully when the entire sync has completed
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(target_height)
if trusted:
self.local_node_synced = True
@ -570,13 +582,14 @@ class WalletNode:
fork_height: Optional[uint32] = None,
height: Optional[uint32] = None,
header_hash: Optional[bytes32] = None,
):
update_finished_height: bool = False,
) -> bool:
# Adds the state to the wallet state manager. If the peer is trusted, we do not validate. If the peer is
# untrusted we do, but we might not add the state, since we need to receive the new_peak message as well.
assert self.wallet_state_manager is not None
trusted = self.is_trusted(peer)
# Validate states in parallel, apply serial
# TODO: optimize fetching
if self.validation_semaphore is None:
self.validation_semaphore = asyncio.Semaphore(6)
if self.new_state_lock is None:
@ -589,41 +602,54 @@ class WalletNode:
if fork_height is not None:
cache.clear_after_height(fork_height)
all_tasks = []
all_tasks: List[asyncio.Task] = []
target_concurrent_tasks: int = 20
num_concurrent_tasks: int = 0
async def receive_and_validate(inner_state: CoinState, inner_idx: int):
try:
assert self.validation_semaphore is not None
async with self.validation_semaphore:
assert self.wallet_state_manager is not None
if header_hash is not None:
assert height is not None
self.add_state_to_race_cache(header_hash, height, inner_state)
self.log.info(f"Added to race cache: {height}, {inner_state}")
if trusted:
valid = True
else:
valid = await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
if valid:
self.log.info(f"new coin state received ({inner_idx + 1} / {len(items)})")
assert self.new_state_lock is not None
async with self.new_state_lock:
await self.wallet_state_manager.new_coin_state([inner_state], peer, fork_height)
if update_finished_height:
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(
last_change_height_cs(inner_state)
)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Exception while adding state: {e} {tb}")
finally:
nonlocal num_concurrent_tasks
num_concurrent_tasks -= 1 # pylint: disable=E0602
for idx, potential_state in enumerate(items):
async def receive_and_validate(inner_state: CoinState, inner_idx: int):
assert self.wallet_state_manager is not None
assert self.validation_semaphore is not None
# if height is not None:
async with self.validation_semaphore:
try:
if header_hash is not None:
assert height is not None
self.add_state_to_race_cache(header_hash, height, inner_state)
self.log.info(f"Added to race cache: {height}, {inner_state}")
if trusted:
valid = True
else:
valid = await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
if valid:
self.log.info(f"new coin state received ({inner_idx + 1} / {len(items)})")
assert self.new_state_lock is not None
async with self.new_state_lock:
await self.wallet_state_manager.new_coin_state([inner_state], peer, fork_height)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Exception while adding state: {e} {tb}")
task = receive_and_validate(potential_state, idx)
all_tasks.append(task)
while len(self.validation_semaphore._waiters) > 20:
self.log.debug("sleeping 2 sec")
await asyncio.sleep(2)
if self.server is None:
self.log.error("No server")
return False
if peer.peer_node_id not in self.server.all_connections:
self.log.error(f"Disconnected from peer {peer.peer_node_id} host {peer.peer_host}")
return False
while num_concurrent_tasks >= target_concurrent_tasks:
await asyncio.sleep(1)
all_tasks.append(asyncio.create_task(receive_and_validate(potential_state, idx)))
num_concurrent_tasks += 1
await asyncio.gather(*all_tasks)
await self.update_ui()
return True
async def get_coins_with_puzzle_hash(self, puzzle_hash) -> List[CoinState]:
assert self.wallet_state_manager is not None
@ -691,7 +717,7 @@ class WalletNode:
assert self.server is not None
nodes = self.server.get_full_node_connections()
if len(nodes) > 0:
return nodes[0]
return random.choice(nodes)
else:
return None
@ -804,12 +830,16 @@ class WalletNode:
assert weight_proof is not None
old_proof = self.wallet_state_manager.blockchain.synced_weight_proof
if syncing:
fork_point: int = max(0, current_height - 32)
# This usually happens the first time we start up the wallet. We roll back slightly to be
# safe, but we don't want to rollback too much (hence 16)
fork_point: int = max(0, current_height - 16)
else:
fork_point = max(0, current_height - 50000)
# In this case we will not rollback so it's OK to check some older updates as well, to ensure
# that no recent transactions are being hidden.
fork_point = 0
if old_proof is not None:
# If the weight proof fork point is in the past, rollback more to ensure we don't have duplicate
# state
# state.
wp_fork_point = self.wallet_state_manager.weight_proof_handler.get_fork_point(
old_proof, weight_proof
)
@ -868,13 +898,14 @@ class WalletNode:
ph_updates: List[CoinState] = await subscribe_to_phs(phs, peer, uint32(0))
coin_updates: List[CoinState] = await subscribe_to_coin_updates(all_coin_ids, peer, uint32(0))
peer_new_peak_height, peer_new_peak_hash = self.node_peaks[peer.peer_node_id]
await self.receive_state_from_peer(
success = await self.receive_state_from_peer(
ph_updates + coin_updates,
peer,
height=peer_new_peak_height,
header_hash=peer_new_peak_hash,
)
self.synced_peers.add(peer.peer_node_id)
if success:
self.synced_peers.add(peer.peer_node_id)
else:
if peak_hb is not None and new_peak.weight <= peak_hb.weight:
# Don't process blocks at the same weight
@ -891,10 +922,7 @@ class WalletNode:
self.wallet_state_manager.set_sync_mode(False)
self.log.info(f"Finished processing new peak of {new_peak.height}")
if (
peer.peer_node_id in self.synced_peers
and new_peak.height > await self.wallet_state_manager.blockchain.get_finished_sync_up_to()
):
if peer.peer_node_id in self.synced_peers:
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(new_peak.height)
await self.wallet_state_manager.new_peak(new_peak)
@ -1034,6 +1062,7 @@ class WalletNode:
and current_spent_height == spent_height
and current.confirmed_block_height == confirmed_height
):
peer_request_cache.states_validated[coin_state.get_hash()] = coin_state
return True
reorg_mode = False
@ -1051,6 +1080,8 @@ class WalletNode:
else:
request = RequestHeaderBlocks(confirmed_height, confirmed_height)
res = await peer.request_header_blocks(request)
if res is None:
return False
state_block = res.header_blocks[0]
peer_request_cache.blocks[confirmed_height] = state_block
@ -1137,6 +1168,7 @@ class WalletNode:
self, block: HeaderBlock, peer: WSChiaConnection, peer_request_cache: PeerRequestCache
) -> bool:
assert self.wallet_state_manager is not None
assert self.server is not None
if self.wallet_state_manager.blockchain.contains_height(block.height):
stored_hash = self.wallet_state_manager.blockchain.height_to_hash(block.height)
stored_record = self.wallet_state_manager.blockchain.try_block_record(stored_hash)
@ -1192,29 +1224,13 @@ class WalletNode:
self.log.error("Failed validation 2")
return False
blocks: List[HeaderBlock] = []
for i in range(start - (start % 32), end + 1, 32):
request_start = min(uint32(i), end)
request_end = min(uint32(i + 31), end)
request_h_response = RequestHeaderBlocks(request_start, request_end)
if (request_start, request_end) in peer_request_cache.block_requests:
self.log.info(f"Using cache for blocks {request_start} - {request_end}")
res_h_blocks: Optional[RespondHeaderBlocks] = peer_request_cache.block_requests[
(request_start, request_end)
]
else:
start_time = time.time()
res_h_blocks = await peer.request_header_blocks(request_h_response)
if res_h_blocks is None:
self.log.error("Failed validation 2.5")
return False
end_time = time.time()
peer_request_cache.block_requests[(request_start, request_end)] = res_h_blocks
self.log.info(
f"Fetched blocks: {request_start} - {request_end} | duration: {end_time - start_time}"
)
assert res_h_blocks is not None
blocks.extend([bl for bl in res_h_blocks.header_blocks if bl.height >= start])
all_peers = self.server.get_full_node_connections()
blocks: Optional[List[HeaderBlock]] = await fetch_header_blocks_in_range(
start, end, peer_request_cache, all_peers, peer.peer_node_id
)
if blocks is None:
self.log.error(f"Error fetching blocks {start} {end}")
return False
if compare_to_recent and weight_proof.recent_chain_data[0].header_hash != blocks[-1].header_hash:
self.log.error("Failed validation 3")