Improve trusted sync performance (#10377)

* Improve trusted sync performance

* Fix broken CLI

* Decrease timestamp cache size

* Add all valid states at the right time
This commit is contained in:
Mariano Sorgente 2022-02-22 21:37:51 -05:00 committed by GitHub
parent 001effae61
commit c844016019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 25 deletions

View File

@ -459,7 +459,7 @@ class WSChiaConnection:
await asyncio.sleep(3)
return None
else:
self.log.warning(
self.log.debug(
f"Peer surpassed rate limit {self.peer_host}, message: {message_type}, "
f"port {self.peer_port} but not disconnecting"
)

View File

@ -4,7 +4,7 @@ from typing import Optional
from chia.protocols.wallet_protocol import CoinState, RespondSESInfo
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.header_block import HeaderBlock
from chia.util.ints import uint32
from chia.util.ints import uint32, uint64
from chia.util.lru_cache import LRUCache
@ -13,18 +13,24 @@ class PeerRequestCache:
_block_requests: LRUCache # (start, end) -> RequestHeaderBlocks
_ses_requests: LRUCache # height -> Ses request
_states_validated: LRUCache # coin state hash -> last change height, or None for reorg
_timestamps: LRUCache # block height -> timestamp
def __init__(self):
self._blocks = LRUCache(100)
self._block_requests = LRUCache(100)
self._ses_requests = LRUCache(100)
self._states_validated = LRUCache(1000)
self._timestamps = LRUCache(1000)
def get_block(self, height: uint32) -> Optional[HeaderBlock]:
return self._blocks.get(height)
def add_to_blocks(self, header_block: HeaderBlock) -> None:
self._blocks.put(header_block.height, header_block)
if header_block.is_transaction_block:
assert header_block.foliage_transaction_block is not None
if self._timestamps.get(header_block.height) is None:
self._timestamps.put(header_block.height, header_block.foliage_transaction_block.timestamp)
def get_block_request(self, start: uint32, end: uint32) -> Optional[asyncio.Task]:
return self._block_requests.get((start, end))
@ -49,6 +55,9 @@ class PeerRequestCache:
cs_height = coin_state.created_height
self._states_validated.put(coin_state.get_hash(), cs_height)
def get_height_timestamp(self, height: uint32) -> Optional[uint64]:
return self._timestamps.get(height)
def clear_after_height(self, height: int):
# Remove any cached item which relates to an event that happened at a height above height.
new_blocks = LRUCache(self._blocks.capacity)
@ -75,6 +84,12 @@ class PeerRequestCache:
new_states_validated.put(k, cs_height)
self._states_validated = new_states_validated
new_timestamps = LRUCache(self._timestamps.capacity)
for h, ts in self._timestamps.cache.items():
if h <= height:
new_timestamps.put(h, ts)
self._timestamps = new_timestamps
async def can_use_peer_request_cache(
coin_state: CoinState, peer_request_cache: PeerRequestCache, fork_height: Optional[uint32]

View File

@ -320,13 +320,13 @@ async def fetch_header_blocks_in_range(
res_h_blocks_task: Optional[asyncio.Task] = peer_request_cache.get_block_request(request_start, request_end)
if res_h_blocks_task is not None:
log.info(f"Using cache for: {start}-{end}")
log.debug(f"Using cache for: {start}-{end}")
if res_h_blocks_task.done():
res_h_blocks: Optional[RespondHeaderBlocks] = res_h_blocks_task.result()
else:
res_h_blocks = await res_h_blocks_task
else:
log.info(f"Fetching: {start}-{end}")
log.debug(f"Fetching: {start}-{end}")
request_header_blocks = RequestHeaderBlocks(request_start, request_end)
res_h_blocks_task = asyncio.create_task(_fetch_header_blocks_inner(all_peers, request_header_blocks))
peer_request_cache.add_to_block_requests(request_start, request_end, res_h_blocks_task)

View File

@ -605,27 +605,35 @@ class WalletNode:
target_concurrent_tasks: int = 20
num_concurrent_tasks: int = 0
async def receive_and_validate(inner_state: CoinState, inner_idx: int):
async def receive_and_validate(inner_states: List[CoinState], inner_idx_start: int):
try:
assert self.validation_semaphore is not None
async with self.validation_semaphore:
assert self.wallet_state_manager is not None
if header_hash is not None:
assert height is not None
self.add_state_to_race_cache(header_hash, height, inner_state)
self.log.info(f"Added to race cache: {height}, {inner_state}")
for inner_state in inner_states:
self.add_state_to_race_cache(header_hash, height, inner_state)
self.log.info(f"Added to race cache: {height}, {inner_state}")
if trusted:
valid = True
valid_states = inner_states
else:
valid = await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
if valid:
self.log.info(f"new coin state received ({inner_idx + 1} / {len(items)})")
valid_states = [
inner_state
for inner_state in inner_states
if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
]
if len(valid_states) > 0:
self.log.info(
f"new coin state received ({inner_idx_start}-"
f"{inner_idx_start + len(inner_states) - 1}/ {len(items) + 1})"
)
assert self.new_state_lock is not None
async with self.new_state_lock:
await self.wallet_state_manager.new_coin_state([inner_state], peer, fork_height)
await self.wallet_state_manager.new_coin_state(valid_states, peer, fork_height)
if update_finished_height:
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(
last_change_height_cs(inner_state)
last_change_height_cs(inner_states[-1])
)
except Exception as e:
tb = traceback.format_exc()
@ -634,7 +642,11 @@ class WalletNode:
nonlocal num_concurrent_tasks
num_concurrent_tasks -= 1 # pylint: disable=E0602
for idx, potential_state in enumerate(items):
idx = 1
# Keep chunk size below 1000 just in case, windows has sqlite limits of 999 per query
# Untrusted has a smaller batch size since validation has to happen which takes a while
chunk_size: int = 900 if trusted else 10
for states in chunks(items, chunk_size):
if self.server is None:
self.log.error("No server")
return False
@ -642,9 +654,10 @@ class WalletNode:
self.log.error(f"Disconnected from peer {peer.peer_node_id} host {peer.peer_host}")
return False
while num_concurrent_tasks >= target_concurrent_tasks:
await asyncio.sleep(1)
all_tasks.append(asyncio.create_task(receive_and_validate(potential_state, idx)))
await asyncio.sleep(0.1)
all_tasks.append(asyncio.create_task(receive_and_validate(states, idx)))
num_concurrent_tasks += 1
idx += len(states)
await asyncio.gather(*all_tasks)
await self.update_ui()
@ -740,21 +753,19 @@ class WalletNode:
return self.height_to_time[height]
for cache in self.untrusted_caches.values():
block: Optional[HeaderBlock] = cache.get_block(height)
if block is not None:
if (
block.foliage_transaction_block is not None
and block.foliage_transaction_block.timestamp is not None
):
self.height_to_time[height] = block.foliage_transaction_block.timestamp
return block.foliage_transaction_block.timestamp
cache_ts: Optional[uint64] = cache.get_height_timestamp(height)
if cache_ts is not None:
return cache_ts
peer: Optional[WSChiaConnection] = self.get_full_node_peer()
if peer is None:
raise ValueError("Cannot fetch timestamp, no peers")
self.log.debug(f"Fetching block at height: {height}")
last_tx_block: Optional[HeaderBlock] = await fetch_last_tx_from_peer(height, peer)
if last_tx_block is None:
raise ValueError(f"Error fetching blocks from peer {peer.get_peer_info()}")
assert last_tx_block.foliage_transaction_block is not None
self.get_cache_for_peer(peer).add_to_blocks(last_tx_block)
return last_tx_block.foliage_transaction_block.timestamp
async def new_peak_wallet(self, new_peak: wallet_protocol.NewPeakWallet, peer: WSChiaConnection):
@ -798,6 +809,8 @@ class WalletNode:
# disconnected), we assume that the full node will continue to give us state updates, so we do
# not need to resync.
if peer.peer_node_id not in self.synced_peers:
if new_peak.height - current_height > self.LONG_SYNC_THRESHOLD:
self.wallet_state_manager.set_sync_mode(True)
await self.long_sync(new_peak.height, peer, uint32(max(0, current_height - 256)), rollback=True)
self.wallet_state_manager.set_sync_mode(False)
else:
@ -914,7 +927,7 @@ class WalletNode:
for potential_height in range(backtrack_fork_height + 1, new_peak.height + 1):
header_hash = self.wallet_state_manager.blockchain.height_to_hash(uint32(potential_height))
if header_hash in self.race_cache:
self.log.debug(f"Receiving race state: {self.race_cache[header_hash]}")
self.log.info(f"Receiving race state: {self.race_cache[header_hash]}")
await self.receive_state_from_peer(list(self.race_cache[header_hash]), peer)
self.wallet_state_manager.state_changed("new_block")