Introduce sync_store.Peak (#13615)

This commit is contained in:
dustinface 2022-11-18 17:44:13 +01:00 committed by GitHub
parent 9b1989b014
commit 78e367665b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 79 deletions

View File

@ -279,7 +279,10 @@ class FullNode:
peak_store = None
for con in connections:
if peak_store is not None and con.peer_node_id in peak_store:
peak_hash, peak_height, peak_weight = peak_store[con.peer_node_id]
peak = peak_store[con.peer_node_id]
peak_height = peak.height
peak_hash = peak.header_hash
peak_weight = peak.weight
else:
peak_height = None
peak_hash = None
@ -705,21 +708,20 @@ class FullNode:
if self.sync_store.get_sync_mode():
# If peer connects while we are syncing, check if they have the block we are syncing towards
peak_sync_hash = self.sync_store.get_sync_target_hash()
peak_sync_height = self.sync_store.get_sync_target_height()
if peak_sync_hash is not None and request.header_hash != peak_sync_hash and peak_sync_height is not None:
peak_peers: Set[bytes32] = self.sync_store.get_peers_that_have_peak([peak_sync_hash])
target_peak = self.sync_store.target_peak
if target_peak is not None and request.header_hash != target_peak.header_hash:
peak_peers: Set[bytes32] = self.sync_store.get_peers_that_have_peak([target_peak.header_hash])
# Don't ask if we already know this peer has the peak
if peer.peer_node_id not in peak_peers:
target_peak_response: Optional[RespondBlock] = await peer.request_block(
full_node_protocol.RequestBlock(uint32(peak_sync_height), False), timeout=10
full_node_protocol.RequestBlock(target_peak.height, False), timeout=10
)
if target_peak_response is not None and isinstance(target_peak_response, RespondBlock):
self.sync_store.peer_has_block(
peak_sync_hash,
target_peak.header_hash,
peer.peer_node_id,
target_peak_response.block.weight,
peak_sync_height,
target_peak.height,
False,
)
else:
@ -968,7 +970,7 @@ class FullNode:
# Wait until we have 3 peaks or up to a max of 30 seconds
peaks = []
for i in range(300):
peaks = [tup[0] for tup in self.sync_store.get_peak_of_each_peer().values()]
peaks = [peak.header_hash for peak in self.sync_store.get_peak_of_each_peer().values()]
if len(self.sync_store.get_peers_that_have_peak(peaks)) < 3:
if self._shut_down:
return None
@ -984,10 +986,10 @@ class FullNode:
if target_peak is None:
raise RuntimeError("Not performing sync, no peaks collected")
heaviest_peak_hash, heaviest_peak_height, heaviest_peak_weight = target_peak
self.sync_store.set_peak_target(heaviest_peak_hash, heaviest_peak_height)
self.log.info(f"Selected peak {heaviest_peak_height}, {heaviest_peak_hash}")
self.sync_store.target_peak = target_peak
self.log.info(f"Selected peak {target_peak}")
# Check which peers are updated to this height
peers: List[bytes32] = []
@ -996,48 +998,45 @@ class FullNode:
if peer.connection_type == NodeType.FULL_NODE:
peers.append(peer.peer_node_id)
coroutines.append(
peer.request_block(
full_node_protocol.RequestBlock(uint32(heaviest_peak_height), True), timeout=10
)
peer.request_block(full_node_protocol.RequestBlock(target_peak.height, True), timeout=10)
)
for i, target_peak_response in enumerate(await asyncio.gather(*coroutines)):
if target_peak_response is not None and isinstance(target_peak_response, RespondBlock):
self.sync_store.peer_has_block(
heaviest_peak_hash, peers[i], heaviest_peak_weight, heaviest_peak_height, False
target_peak.header_hash, peers[i], target_peak.weight, target_peak.height, False
)
# TODO: disconnect from peer which gave us the heaviest_peak, if nobody has the peak
peer_ids: Set[bytes32] = self.sync_store.get_peers_that_have_peak([heaviest_peak_hash])
peer_ids: Set[bytes32] = self.sync_store.get_peers_that_have_peak([target_peak.header_hash])
peers_with_peak: List[WSChiaConnection] = [
c for c in self.server.all_connections.values() if c.peer_node_id in peer_ids
]
# Request weight proof from a random peer
self.log.info(f"Total of {len(peers_with_peak)} peers with peak {heaviest_peak_height}")
self.log.info(f"Total of {len(peers_with_peak)} peers with peak {target_peak.height}")
weight_proof_peer: WSChiaConnection = random.choice(peers_with_peak)
self.log.info(
f"Requesting weight proof from peer {weight_proof_peer.peer_host} up to height"
f" {heaviest_peak_height}"
f"Requesting weight proof from peer {weight_proof_peer.peer_host} up to height" f" {target_peak.height}"
)
cur_peak: Optional[BlockRecord] = self.blockchain.get_peak()
if cur_peak is not None and heaviest_peak_weight <= cur_peak.weight:
if cur_peak is not None and target_peak.weight <= cur_peak.weight:
raise ValueError("Not performing sync, already caught up.")
wp_timeout = 360
if "weight_proof_timeout" in self.config:
wp_timeout = self.config["weight_proof_timeout"]
self.log.debug(f"weight proof timeout is {wp_timeout} sec")
request = full_node_protocol.RequestProofOfWeight(heaviest_peak_height, heaviest_peak_hash)
request = full_node_protocol.RequestProofOfWeight(target_peak.height, target_peak.header_hash)
response = await weight_proof_peer.request_proof_of_weight(request, timeout=wp_timeout)
# Disconnect from this peer, because they have not behaved properly
if response is None or not isinstance(response, full_node_protocol.RespondProofOfWeight):
await weight_proof_peer.close(600)
raise RuntimeError(f"Weight proof did not arrive in time from peer: {weight_proof_peer.peer_host}")
if response.wp.recent_chain_data[-1].reward_chain_block.height != heaviest_peak_height:
if response.wp.recent_chain_data[-1].reward_chain_block.height != target_peak.height:
await weight_proof_peer.close(600)
raise RuntimeError(f"Weight proof had the wrong height: {weight_proof_peer.peer_host}")
if response.wp.recent_chain_data[-1].reward_chain_block.weight != heaviest_peak_weight:
if response.wp.recent_chain_data[-1].reward_chain_block.weight != target_peak.weight:
await weight_proof_peer.close(600)
raise RuntimeError(f"Weight proof had the wrong weight: {weight_proof_peer.peer_host}")
@ -1058,13 +1057,13 @@ class FullNode:
await weight_proof_peer.close(600)
raise ValueError("Weight proof validation failed")
self.log.info(f"Re-checked peers: total of {len(peers_with_peak)} peers with peak {heaviest_peak_height}")
self.log.info(f"Re-checked peers: total of {len(peers_with_peak)} peers with peak {target_peak.height}")
self.sync_store.set_sync_mode(True)
self._state_changed("sync_mode")
# Ensures that the fork point does not change
async with self._blockchain_lock_high_priority:
await self.blockchain.warmup(fork_point)
await self.sync_from_fork_point(fork_point, heaviest_peak_height, heaviest_peak_hash, summaries)
await self.sync_from_fork_point(fork_point, target_peak.height, target_peak.header_hash, summaries)
except asyncio.CancelledError:
self.log.warning("Syncing failed, CancelledError")
except Exception as e:

View File

@ -4,7 +4,7 @@ import asyncio
import logging
from collections import OrderedDict as orderedDict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, OrderedDict, Set, Tuple
from typing import Dict, List, Optional, OrderedDict, Set
import typing_extensions
@ -14,6 +14,13 @@ from chia.util.ints import uint32, uint128
log = logging.getLogger(__name__)
@dataclass
class Peak:
header_hash: bytes32
height: uint32
weight: uint128
@typing_extensions.final
@dataclass
class SyncStore:
@ -22,28 +29,16 @@ class SyncStore:
long_sync: bool = False
# Header hash : peer node id
peak_to_peer: OrderedDict[bytes32, Set[bytes32]] = field(default_factory=orderedDict)
# peer node id : [header_hash, height, weight]
peer_to_peak: Dict[bytes32, Tuple[bytes32, uint32, uint128]] = field(default_factory=dict)
# Peak hash we are syncing towards
sync_target_header_hash: Optional[bytes32] = None
# Peak height we are syncing towards
sync_target_height: Optional[uint32] = None
# peer node id : Peak
peer_to_peak: Dict[bytes32, Peak] = field(default_factory=dict)
# Peak we are syncing towards
target_peak: Optional[Peak] = None
peers_changed: asyncio.Event = field(default_factory=asyncio.Event)
# Set of nodes which we are batch syncing from
batch_syncing: Set[bytes32] = field(default_factory=set)
# Set of nodes which we are backtrack syncing from, and how many threads
backtrack_syncing: Dict[bytes32, int] = field(default_factory=dict)
def set_peak_target(self, peak_hash: bytes32, target_height: uint32) -> None:
self.sync_target_header_hash = peak_hash
self.sync_target_height = target_height
def get_sync_target_hash(self) -> Optional[bytes32]:
return self.sync_target_header_hash
def get_sync_target_height(self) -> Optional[uint32]:
return self.sync_target_height
def set_sync_mode(self, sync_mode: bool) -> None:
self.sync_mode = sync_mode
@ -66,7 +61,7 @@ class SyncStore:
Adds a record that a certain peer has a block.
"""
if header_hash == self.sync_target_header_hash:
if self.target_peak is not None and header_hash == self.target_peak.header_hash:
self.peers_changed.set()
if header_hash in self.peak_to_peer:
self.peak_to_peer[header_hash].add(peer_id)
@ -75,11 +70,11 @@ class SyncStore:
if len(self.peak_to_peer) > 256: # nice power of two
item = self.peak_to_peer.popitem(last=False) # Remove the oldest entry
# sync target hash is used throughout the sync process and should not be deleted.
if item[0] == self.sync_target_header_hash:
if self.target_peak is not None and item[0] == self.target_peak.header_hash:
self.peak_to_peer[item[0]] = item[1] # Put it back in if it was the sync target
self.peak_to_peer.popitem(last=False) # Remove the oldest entry again
if new_peak:
self.peer_to_peak[peer_id] = (header_hash, height, weight)
self.peer_to_peak[peer_id] = Peak(header_hash, height, weight)
def get_peers_that_have_peak(self, header_hashes: List[bytes32]) -> Set[bytes32]:
"""
@ -93,19 +88,19 @@ class SyncStore:
node_ids.add(node_id)
return node_ids
def get_peak_of_each_peer(self) -> Dict[bytes32, Tuple[bytes32, uint32, uint128]]:
def get_peak_of_each_peer(self) -> Dict[bytes32, Peak]:
"""
Returns: dictionary of peer id to peak information.
"""
ret = {}
for peer_id, v in self.peer_to_peak.items():
if v[0] not in self.peak_to_peer:
for peer_id, peak in self.peer_to_peak.items():
if peak.header_hash not in self.peak_to_peer:
continue
ret[peer_id] = v
ret[peer_id] = peak
return ret
def get_heaviest_peak(self) -> Optional[Tuple[bytes32, uint32, uint128]]:
def get_heaviest_peak(self) -> Optional[Peak]:
"""
Returns: the header_hash, height, and weight of the heaviest block that one of our peers has notified
us of.
@ -113,18 +108,14 @@ class SyncStore:
if len(self.peer_to_peak) == 0:
return None
heaviest_peak_hash: Optional[bytes32] = None
heaviest_peak_weight: uint128 = uint128(0)
heaviest_peak_height: Optional[uint32] = None
for peer_id, (peak_hash, height, weight) in self.peer_to_peak.items():
if peak_hash not in self.peak_to_peer:
heaviest_peak: Optional[Peak] = None
for peak in self.peer_to_peak.values():
if peak.header_hash not in self.peak_to_peer:
continue
if heaviest_peak_hash is None or weight > heaviest_peak_weight:
heaviest_peak_hash = peak_hash
heaviest_peak_weight = weight
heaviest_peak_height = height
assert heaviest_peak_hash is not None and heaviest_peak_weight is not None and heaviest_peak_height is not None
return heaviest_peak_hash, heaviest_peak_height, heaviest_peak_weight
if heaviest_peak is None or peak.weight > heaviest_peak.weight:
heaviest_peak = peak
assert heaviest_peak is not None
return heaviest_peak
async def clear_sync_info(self) -> None:
"""

View File

@ -156,9 +156,9 @@ class FullNodeRpcApi:
sync_tip_height: Optional[uint32] = uint32(0)
if sync_mode:
if self.service.sync_store.get_sync_target_height() is not None:
sync_tip_height = self.service.sync_store.get_sync_target_height()
assert sync_tip_height is not None
target_peak = self.service.sync_store.target_peak
if target_peak is not None:
sync_tip_height = target_peak.height
if peak is not None:
sync_progress_height: uint32 = peak.height
# Don't display we're syncing towards 0, instead show 'Syncing height/height'

View File

@ -19,10 +19,6 @@ class TestStore:
# clear sync info
await store.clear_sync_info()
store.set_peak_target(std_hash(b"1"), 100)
assert store.get_sync_target_hash() == std_hash(b"1")
assert store.get_sync_target_height() == 100
peer_ids = [std_hash(bytes([a])) for a in range(3)]
assert store.get_peers_that_have_peak([]) == set()
@ -36,26 +32,26 @@ class TestStore:
store.peer_has_block(std_hash(b"block10"), peer_ids[2], 500, 10, False)
store.peer_has_block(std_hash(b"block1"), peer_ids[2], 300, 1, False)
assert store.get_heaviest_peak()[0] == std_hash(b"block10")
assert store.get_heaviest_peak()[1] == 10
assert store.get_heaviest_peak()[2] == 500
assert store.get_heaviest_peak().header_hash == std_hash(b"block10")
assert store.get_heaviest_peak().height == 10
assert store.get_heaviest_peak().weight == 500
assert len(store.get_peak_of_each_peer()) == 2
store.peer_has_block(std_hash(b"block1"), peer_ids[2], 500, 1, True)
assert len(store.get_peak_of_each_peer()) == 3
assert store.get_peak_of_each_peer()[peer_ids[0]][2] == 500
assert store.get_peak_of_each_peer()[peer_ids[1]][2] == 300
assert store.get_peak_of_each_peer()[peer_ids[2]][2] == 500
assert store.get_peak_of_each_peer()[peer_ids[0]].weight == 500
assert store.get_peak_of_each_peer()[peer_ids[1]].weight == 300
assert store.get_peak_of_each_peer()[peer_ids[2]].weight == 500
assert store.get_peers_that_have_peak([std_hash(b"block1")]) == set(peer_ids)
assert store.get_peers_that_have_peak([std_hash(b"block10")]) == {peer_ids[0], peer_ids[2]}
store.peer_disconnected(peer_ids[0])
assert store.get_heaviest_peak()[2] == 500
assert store.get_heaviest_peak().weight == 500
assert len(store.get_peak_of_each_peer()) == 2
assert store.get_peers_that_have_peak([std_hash(b"block10")]) == {peer_ids[2]}
store.peer_disconnected(peer_ids[2])
assert store.get_heaviest_peak()[2] == 300
assert store.get_heaviest_peak().weight == 300
store.peer_has_block(std_hash(b"block30"), peer_ids[0], 700, 30, True)
assert store.get_peak_of_each_peer()[peer_ids[0]][2] == 700
assert store.get_heaviest_peak()[2] == 700
assert store.get_peak_of_each_peer()[peer_ids[0]].weight == 700
assert store.get_heaviest_peak().weight == 700