wallet short batch sync (#6112)

* short batch sync

* switch log for fixing ci

* switch log level back

* increase test time

* new line

* test fix

* change sem to lock

* change sem to lock

* refactor

* move weight check under lock

* short sync tests

* lint

* remove duplicate code

* remove duplicate code

* add await

* fix
This commit is contained in:
Almog De Paz 2021-06-29 21:25:54 +03:00 committed by GitHub
parent 748f7825ad
commit e5eff54149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 115 additions and 68 deletions

View File

@ -35,6 +35,8 @@ def service_kwargs_for_wallet(
trusted_peer = full_node_config["ssl"]["public_crt"]
config["trusted_peers"] = {}
config["trusted_peers"]["local_node"] = trusted_peer
if "short_sync_blocks_behind_threshold" not in config:
config["short_sync_blocks_behind_threshold"] = 20
node = WalletNode(
config,
keychain,

View File

@ -405,3 +405,5 @@ wallet:
trusted_peers:
trusted_node_1: "config/ssl/full_node/public_full_node.crt"
short_sync_blocks_behind_threshold: 20

View File

@ -105,7 +105,7 @@ class WalletNode:
self.server = None
self.wsm_close_task = None
self.sync_task: Optional[asyncio.Task] = None
self.new_peak_lock: Optional[asyncio.Lock] = None
self.new_peak_lock = asyncio.Lock()
self.logged_in_fingerprint: Optional[int] = None
self.peer_task = None
self.logged_in = False
@ -417,63 +417,55 @@ class WalletNode:
elif result == ReceiveBlockResult.INVALID_BLOCK:
self.log.info(f"Invalid block from peer: {peer.get_peer_info()} {error}")
await peer.close()
return None
return
else:
self.log.debug(f"Result: {result}")
async def new_peak_wallet(self, peak: wallet_protocol.NewPeakWallet, peer: WSChiaConnection):
if self.wallet_state_manager is None:
return None
return
if self.wallet_state_manager.blockchain.contains_block(peak.header_hash):
self.log.debug(f"known peak {peak.header_hash}")
return
if self.wallet_state_manager.sync_mode:
self.last_new_peak_messages.put(peer, peak)
return
curr_peak = self.wallet_state_manager.blockchain.get_peak()
if curr_peak is not None and curr_peak.weight >= peak.weight:
return None
if self.new_peak_lock is None:
self.new_peak_lock = asyncio.Lock()
async with self.new_peak_lock:
curr_peak = self.wallet_state_manager.blockchain.get_peak()
if curr_peak is not None and curr_peak.weight >= peak.weight:
return
request = wallet_protocol.RequestBlockHeader(peak.height)
response: Optional[RespondBlockHeader] = await peer.request_block_header(request)
if response is None or not isinstance(response, RespondBlockHeader) or response.header_block is None:
return None
self.log.warning(f"bad peak response from peer {response}")
return
header_block = response.header_block
if (curr_peak is None and header_block.height < self.constants.WEIGHT_PROOF_RECENT_BLOCKS) or (
curr_peak is not None and curr_peak.height > header_block.height - 200
curr_peak_height = 0 if curr_peak is None else curr_peak.height
if (curr_peak_height == 0 and peak.height < self.constants.WEIGHT_PROOF_RECENT_BLOCKS) or (
curr_peak_height > peak.height - 200
):
top = header_block
blocks = [top]
# Fetch blocks backwards until we hit the one that we have,
# then complete them with additions / removals going forward
while not self.wallet_state_manager.blockchain.contains_block(top.prev_header_hash) and top.height > 0:
request_prev = wallet_protocol.RequestBlockHeader(top.height - 1)
response_prev: Optional[RespondBlockHeader] = await peer.request_block_header(request_prev)
if response_prev is None:
return None
if not isinstance(response_prev, RespondBlockHeader):
return None
prev_head = response_prev.header_block
blocks.append(prev_head)
top = prev_head
blocks.reverse()
await self.complete_blocks(blocks, peer)
await self.wallet_state_manager.create_more_puzzle_hashes()
elif header_block.height >= self.constants.WEIGHT_PROOF_RECENT_BLOCKS:
if peak.height <= curr_peak_height + self.config["short_sync_blocks_behind_threshold"]:
await self.wallet_short_sync_backtrack(header_block, peer)
else:
await self.batch_sync_to_peak(curr_peak_height, peak)
elif peak.height >= self.constants.WEIGHT_PROOF_RECENT_BLOCKS:
# Request weight proof
# Sync if PoW validates
if self.wallet_state_manager.sync_mode:
self.last_new_peak_messages.put(peer, peak)
return None
weight_request = RequestProofOfWeight(header_block.height, header_block.header_hash)
weight_request = RequestProofOfWeight(peak.height, peak.header_hash)
weight_proof_response: RespondProofOfWeight = await peer.request_proof_of_weight(
weight_request, timeout=360
)
if weight_proof_response is None:
return None
return
weight_proof = weight_proof_response.wp
if self.wallet_state_manager is None:
return None
return
if self.server is not None and self.server.is_trusted_peer(peer, self.config["trusted_peers"]):
valid, fork_point = self.wallet_state_manager.weight_proof_handler.get_fork_point_no_validations(
weight_proof
@ -488,7 +480,7 @@ class WalletNode:
f" recent blocks num ,{len(weight_proof.recent_chain_data)}"
)
self.log.debug(f"{weight_proof}")
return None
return
self.log.info(f"Validated, fork point is {fork_point}")
self.wallet_state_manager.sync_store.add_potential_fork_point(
header_block.header_hash, uint32(fork_point)
@ -496,6 +488,51 @@ class WalletNode:
self.wallet_state_manager.sync_store.add_potential_peak(header_block)
self.start_sync()
async def wallet_short_sync_backtrack(self, header_block, peer):
top = header_block
blocks = [top]
# Fetch blocks backwards until we hit the one that we have,
# then complete them with additions / removals going forward
while not self.wallet_state_manager.blockchain.contains_block(top.prev_header_hash) and top.height > 0:
request_prev = wallet_protocol.RequestBlockHeader(top.height - 1)
response_prev: Optional[RespondBlockHeader] = await peer.request_block_header(request_prev)
if response_prev is None or not isinstance(response_prev, RespondBlockHeader):
raise RuntimeError("bad block header response from peer while syncing")
prev_head = response_prev.header_block
blocks.append(prev_head)
top = prev_head
blocks.reverse()
await self.complete_blocks(blocks, peer)
await self.wallet_state_manager.create_more_puzzle_hashes()
async def batch_sync_to_peak(self, fork_height, peak):
advanced_peak = False
batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS
for i in range(max(0, fork_height - 1), peak.height, batch_size):
start_height = i
end_height = min(peak.height, start_height + batch_size)
peers = self.server.get_full_node_connections()
added = False
for peer in peers:
try:
added, advanced_peak = await self.fetch_blocks_and_validate(
peer, uint32(start_height), uint32(end_height), None if advanced_peak else fork_height
)
if added:
break
except Exception as e:
await peer.close()
exc = traceback.format_exc()
self.log.error(f"Error while trying to fetch from peer:{e} {exc}")
if not added:
raise RuntimeError(f"Was not able to add blocks {start_height}-{end_height}")
curr_peak = self.wallet_state_manager.blockchain.get_peak()
assert peak is not None
self.wallet_state_manager.blockchain.clean_block_record(
min(end_height, curr_peak.height) - self.constants.BLOCKS_CACHE_SIZE
)
def start_sync(self) -> None:
self.log.info("self.sync_event.set()")
self.sync_event.set()
@ -611,35 +648,7 @@ class WalletNode:
if fork_height is None:
fork_height = uint32(0)
await self.wallet_state_manager.blockchain.warmup(fork_height)
batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS
advanced_peak = False
for i in range(max(0, fork_height - 1), peak_height, batch_size):
start_height = i
end_height = min(peak_height, start_height + batch_size)
peers = self.server.get_full_node_connections()
added = False
for peer in peers:
try:
added, advanced_peak = await self.fetch_blocks_and_validate(
peer, uint32(start_height), uint32(end_height), None if advanced_peak else fork_height
)
if added:
break
except Exception as e:
await peer.close()
exc = traceback.format_exc()
self.log.error(f"Error while trying to fetch from peer:{e} {exc}")
if not added:
raise RuntimeError(f"Was not able to add blocks {start_height}-{end_height}")
peak = self.wallet_state_manager.blockchain.get_peak()
assert peak is not None
self.wallet_state_manager.blockchain.clean_block_record(
min(
end_height - self.constants.BLOCKS_CACHE_SIZE,
peak.height - self.constants.BLOCKS_CACHE_SIZE,
)
)
await self.batch_sync_to_peak(fork_height, peak)
async def fetch_blocks_and_validate(
self,

View File

@ -2,6 +2,7 @@
import asyncio
import pytest
from colorlog import logging
from chia.consensus.block_rewards import calculate_base_farmer_reward, calculate_pool_reward
from chia.protocols import full_node_protocol
@ -22,6 +23,9 @@ def wallet_height_at_least(wallet_node, h):
return False
log = logging.getLogger(__name__)
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop()
@ -70,6 +74,35 @@ class TestWalletSync:
100, wallet_height_at_least, True, wallet_node, len(default_400_blocks) + num_blocks - 5 - 1
)
@pytest.mark.asyncio
async def test_backtrack_sync_wallet(self, wallet_node, default_400_blocks):
full_node_api, wallet_node, full_node_server, wallet_server = wallet_node
for block in default_400_blocks[:20]:
await full_node_api.full_node.respond_block(full_node_protocol.RespondBlock(block))
await wallet_server.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
# The second node should eventually catch up to the first one, and have the
# same tip at height num_blocks - 1.
await time_out_assert(100, wallet_height_at_least, True, wallet_node, 19)
# Tests a reorg with the wallet
@pytest.mark.asyncio
async def test_short_batch_sync_wallet(self, wallet_node, default_400_blocks):
full_node_api, wallet_node, full_node_server, wallet_server = wallet_node
for block in default_400_blocks[:200]:
await full_node_api.full_node.respond_block(full_node_protocol.RespondBlock(block))
await wallet_server.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
# The second node should eventually catch up to the first one, and have the
# same tip at height num_blocks - 1.
await time_out_assert(100, wallet_height_at_least, True, wallet_node, 199)
# Tests a reorg with the wallet
@pytest.mark.asyncio
async def test_long_sync_wallet(self, wallet_node, default_1000_blocks, default_400_blocks):
@ -90,6 +123,7 @@ class TestWalletSync:
for block in default_1000_blocks:
await full_node_api.full_node.respond_block(full_node_protocol.RespondBlock(block))
log.info(f"wallet node height is {wallet_node.wallet_state_manager.blockchain._peak_height}")
await time_out_assert(600, wallet_height_at_least, True, wallet_node, len(default_1000_blocks) - 1)
await disconnect_all_and_reconnect(wallet_server, full_node_server)