Almost get sync working

This commit is contained in:
Mariano Sorgente 2020-12-08 17:34:21 +09:00 committed by Yostra
parent a98255df8c
commit dbdb20c93f
6 changed files with 79 additions and 70 deletions

View File

@ -97,15 +97,18 @@ class FullNode:
self.coin_store = await CoinStore.create(self.connection)
self.timelord_lock = asyncio.Lock()
self.log.info("Initializing blockchain from disk")
start_time = time.time()
self.blockchain = await Blockchain.create(self.coin_store, self.block_store, self.constants)
self.mempool_manager = MempoolManager(self.coin_store, self.constants)
self.weight_proof_handler = WeightProofHandler(self.constants, BlockCache(self.blockchain))
time_taken = time.time() - start_time
if self.blockchain.get_peak() is None:
self.log.info("Initialized with empty blockchain")
self.log.info(f"Initialized with empty blockchain time taken: {int(time_taken)}s")
else:
self.log.info(
f"Blockchain initialized to peak {self.blockchain.get_peak().header_hash} height"
f" {self.blockchain.get_peak().sub_block_height}"
f" {self.blockchain.get_peak().sub_block_height}, "
f"time taken: {int(time_taken)}s"
)
await self.mempool_manager.new_peak(self.blockchain.get_peak())
@ -275,7 +278,7 @@ class FullNode:
# TODO: fix DOS issue. Attacker can request syncing to an invalid blockchain
await asyncio.sleep(2)
highest_weight: uint128 = uint128(0)
peak_height: uint32 = uint32(0)
target_peak_sb_height: uint32 = uint32(0)
sync_start_time = time.time()
# Based on responses from peers about the current heads, see which head is the heaviest
@ -289,46 +292,57 @@ class FullNode:
for header_hash, potential_peak_block in potential_peaks:
if potential_peak_block.weight > highest_weight:
highest_weight = potential_peak_block.weightc
peak_height = potential_peak_block.sub_block_height
highest_weight = potential_peak_block.weight
target_peak_sb_height = potential_peak_block.sub_block_height
if self.blockchain.get_peak() is not None and highest_weight <= self.blockchain.get_peak().weight:
self.log.info("Not performing sync, already caught up.")
return
self.log.info(f"Peak height {peak_height}")
self.log.info(f"Peak height {target_peak_sb_height}")
peers: List[WSChiaConnection] = self.server.get_full_node_connections()
# find last ses
curr = self.blockchain.sub_blocks[self.blockchain.sub_height_to_hash[self.blockchain.peak_height]]
while True:
if self.blockchain.sub_blocks[curr.header_hash].sub_epoch_summary_included is not None:
break
curr = self.blockchain.sub_blocks[curr.prev_hash]
if self.blockchain.get_peak() is not None:
curr = self.blockchain.sub_blocks[self.blockchain.sub_height_to_hash[self.blockchain.peak_height]]
while curr.sub_block_height > 1:
if self.blockchain.sub_blocks[curr.header_hash].sub_epoch_summary_included is not None:
break
curr = self.blockchain.sub_blocks[curr.prev_hash]
# Finding the fork point allows us to only download headers and blocks from the fork point
fork_point_height: uint32 = curr.sub_block_height
self.log.info(f"Fork point at height {fork_point_height}")
total_proof_blocks = peak_height - curr.sub_block_height
# Finding the fork point allows us to only download headers and blocks from the fork point
# TODO (almog): fix
fork_point_height: int = -1
self.log.info(f"Fork point at height {fork_point_height}")
total_proof_sub_blocks = target_peak_sb_height - curr.sub_block_height
else:
fork_point_height: int = -1
# TODO: almog
# send weight proof message, continue on first response
response = await send_all_first_reply(
"request_proof_of_weight", full_node_protocol.RequestProofOfWeight(total_proof_blocks, peak_height), peers
)
# weight proof is from our latest known ses
# if validated continue else fail
if not self.weight_proof_handler.validate_weight_proof(response[0], curr):
self.log.error(f"invalid weight proof {response}")
return
# response = await send_all_first_reply(
# "request_proof_of_weight",
# full_node_protocol.RequestProofOfWeight(total_proof_sub_blocks, target_peak_sb_height),
# peers,
# )
# # weight proof is from our latest known ses
# # if validated continue else fail
# if not self.weight_proof_handler.validate_weight_proof(response[0], curr):
# self.log.error(f"invalid weight proof {response}")
# return
self.sync_peers_handler = SyncPeersHandler(
self.sync_store, peers, fork_point_height, self.blockchain, peak_height, self.server
self.sync_store, peers, fork_point_height, self.blockchain, target_peak_sb_height, self.server
)
# Start processing blocks that we have received (no block yet)
block_processor = SyncBlocksProcessor(self.sync_store, fork_point_height, uint32(peak_height), self.blockchain)
block_processor = SyncBlocksProcessor(
self.sync_store, fork_point_height, uint32(target_peak_sb_height), self.blockchain
)
block_processor_task = asyncio.create_task(block_processor.process())
peak: Optional[SubBlockRecord] = self.blockchain.get_peak()
while not self.sync_peers_handler.done():
self.log.info("Looping")
# Periodically checks for done, timeouts, shutdowns, new peers or disconnected peers.
if self._shut_down:
block_processor.shut_down()
@ -365,7 +379,7 @@ class FullNode:
new_peak.prev_hash,
),
)
self.server.send_to_all([msg], NodeType.WALLET)
await self.server.send_to_all([msg], NodeType.WALLET)
self._state_changed("sub_block")
await asyncio.sleep(5)
@ -377,10 +391,10 @@ class FullNode:
return
# A successful sync will leave the height at least as high as peak_height
assert self.blockchain.get_peak().sub_block_height >= peak_height
assert self.blockchain.get_peak().sub_block_height >= target_peak_sb_height
self.log.info(
f"Finished sync up to height {peak_height}. Total time: "
f"Finished sync up to height {target_peak_sb_height}. Total time: "
f"{round((time.time() - sync_start_time)/60, 2)} minutes."
)
@ -431,14 +445,7 @@ class FullNode:
# This is a block we asked for during sync
if self.sync_peers_handler is not None:
requests = await self.sync_peers_handler.new_block(sub_block)
for req in requests:
msg = req.message
node_id = req.specific_peer_node_id
if node_id is not None:
await self.server.send_to_specific([msg], node_id)
else:
await self.server.send_to_all([msg], NodeType.FULL_NODE)
await self.sync_peers_handler.new_block(sub_block)
return
# Adds the block to seen, and check if it's seen before (which means header is in memory)

View File

@ -178,13 +178,11 @@ class FullNodeStore:
# Skip if already present
for slot, _, _ in self.finished_sub_slots:
if slot == eos:
log.warning("Already")
return []
if eos.challenge_chain.challenge_chain_end_of_slot_vdf.challenge != last_slot_ch:
# This slot does not append to our next slot
# This prevent other peers from appending fake VDFs to our cache
log.warning("Not append")
return None
# TODO: Fix
@ -237,12 +235,10 @@ class FullNodeStore:
eos.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf.challenge
!= icc_start_challenge_hash
):
log.error("Bad icc")
return None
else:
# Empty slot after the peak
if eos.reward_chain.end_of_slot_vdf.challenge != last_slot_rc_hash:
log.error("empty slot after peak")
return None
if (
@ -254,12 +250,11 @@ class FullNodeStore:
eos.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf.challenge
!= last_slot.infused_challenge_chain.get_hash()
):
log.warning("ANother bad icc")
return None
self.finished_sub_slots.append((eos, [None] * self.constants.NUM_SPS_SUB_SLOT, total_iters))
new_ips = []
new_ips: List[timelord_protocol.NewInfusionPointVDF] = []
for ip in self.future_ip_cache.get(eos.reward_chain.get_hash(), []):
new_ips.append(ip)

View File

@ -17,14 +17,14 @@ class SyncBlocksProcessor:
def __init__(
self,
sync_store: SyncStore,
fork_height: uint32,
fork_height: int,
peak_height: uint32,
blockchain: Blockchain,
):
self.sync_store = sync_store
self.blockchain = blockchain
self.fork_height = fork_height
self.peak_height = peak_height
self.sync_store: SyncStore = sync_store
self.blockchain: Blockchain = blockchain
self.fork_height: int = fork_height
self.peak_height: uint32 = peak_height
self._shut_down = False
self.BATCH_SIZE = 10
self.SLEEP_INTERVAL = 10
@ -90,6 +90,7 @@ class SyncBlocksProcessor:
raise RuntimeError(f"Invalid block {block.header_hash}")
assert self.blockchain.get_peak().sub_block_height >= block.sub_block_height
del self.sync_store.potential_blocks[block.sub_block_height]
self.sync_store.add_header_hashes_added(block.sub_block_height, block.header_hash)
log.info(

View File

@ -6,7 +6,7 @@ from typing import Any, AsyncGenerator, Dict, List, Union
from src.consensus.blockchain import Blockchain
from src.full_node.sync_store import SyncStore
from src.protocols import full_node_protocol
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
from src.server.outbound_message import Message, OutboundMessage
from src.server.server import ChiaServer
from src.server.ws_connection import WSChiaConnection
from src.types.full_block import FullBlock
@ -32,7 +32,7 @@ class SyncPeersHandler:
# and the time the request was sent.
current_outbound_sets: Dict[bytes32, Dict[uint32, uint64]]
sync_store: SyncStore
fully_validated_up_to: uint32
fully_validated_up_to: int
potential_blocks_received: Dict[uint32, asyncio.Event]
potential_blocks: Dict[uint32, Any]
@ -40,7 +40,7 @@ class SyncPeersHandler:
self,
sync_store: SyncStore,
peers: List[WSChiaConnection],
fork_height: uint32,
fork_height: int,
blockchain: Blockchain,
peak_height: uint32,
server: ChiaServer,
@ -117,8 +117,10 @@ class SyncPeersHandler:
# Refresh the fully_validated_up_to pointer
for height in range(self.fully_validated_up_to + 1, self.peak_height + 1):
if self.sync_store.get_header_hashes_added(uint32(height)) is not None:
log.info(f"FV: {height}")
self.fully_validated_up_to = uint32(height)
else:
log.info(f"NOT FV: {height}")
break
# Number of request slots
@ -127,6 +129,7 @@ class SyncPeersHandler:
free_slots += self.MAX_REQUESTS_PER_PEER - len(request_set)
to_send: List[uint32] = []
# Finds a block height
for height in range(
self.fully_validated_up_to + 1,
@ -166,8 +169,8 @@ class SyncPeersHandler:
# Add to peer request
node_id, request_set = outbound_sets_list[index % len(outbound_sets_list)]
request_set[uint32(height)] = uint64(int(time.time()))
request = full_node_protocol.RequestSubBlock(height, True)
log.warning(f"Requesting sb: {height}")
msg = Message("request_sub_block", request)
await self.server.send_to_specific([msg], node_id)
@ -194,7 +197,7 @@ class SyncPeersHandler:
# remove block from request set
for node_id, request_set in self.current_outbound_sets.items():
request_set.pop(header_hash, None)
request_set.pop(block.sub_block_height, None)
# add to request sets
await self.add_to_request_sets()

View File

@ -257,7 +257,11 @@ class FullNodeDiscovery:
empty_tables = False
initiate_connection = self._num_needed_peers() > 0 or has_collision or is_feeler
if addr is not None and initiate_connection:
asyncio.create_task(self.server.start_client(addr, is_feeler=disconnect_after_handshake))
asyncio.create_task(
self.server.start_client(
addr, is_feeler=disconnect_after_handshake, on_connect=self.server.on_connect
)
)
sleep_interval = 1 + len(groups) * 0.5
sleep_interval = min(sleep_interval, self.peer_connect_interval)
await asyncio.sleep(sleep_interval)

View File

@ -30,37 +30,36 @@ class TestFullSync:
@pytest.mark.asyncio
async def test_basic_sync(self, two_nodes):
num_blocks = 40
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 10)
blocks = bt.get_consecutive_blocks(num_blocks)
full_node_1, full_node_2, server_1, server_2 = two_nodes
for i in range(1, num_blocks):
await full_node_1.full_node._respond_sub_block(full_node_protocol.RespondSubBlock(blocks[i]))
for block in blocks:
await full_node_1.full_node.respond_sub_block(full_node_protocol.RespondSubBlock(block))
await server_2.start_client(PeerInfo("localhost", uint16(server_1._port)), None)
# The second node should eventually catch up to the first one, and have the
# same tip at height num_blocks - 1 (or at least num_blocks - 3, in case we sync to a
# worse tip)
# same tip at height num_blocks - 1 (or at least num_blocks - 3, in case we sync to below the tip)
await time_out_assert(60, node_height_at_least, True, full_node_2, num_blocks - 3)
@pytest.mark.asyncio
async def test_short_sync(self, two_nodes):
num_blocks = 10
num_blocks_2 = 4
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 10)
blocks_2 = bt.get_consecutive_blocks(test_constants, num_blocks_2, [], 10, seed=b"123")
num_blocks = 15
num_blocks_2 = 12
blocks = bt.get_consecutive_blocks(num_blocks)
blocks_2 = bt.get_consecutive_blocks(num_blocks_2, seed=b"123")
full_node_1, full_node_2, server_1, server_2 = two_nodes
# 10 blocks to node_1
for i in range(1, num_blocks):
await full_node_1.full_node._respond_sub_block(full_node_protocol.RespondSubBlock(blocks[i]))
# 15 blocks to node_1
for block in blocks:
await full_node_1.full_node.respond_sub_block(full_node_protocol.RespondSubBlock(block))
# 4 different blocks to node_2
for i in range(1, num_blocks_2):
await full_node_2.full_node._respond_sub_block(full_node_protocol.RespondSubBlock(blocks_2[i]))
# 12 different blocks to node_2
for block in blocks_2:
await full_node_2.full_node.respond_sub_block(full_node_protocol.RespondSubBlock(block))
# 6th block from node_1 to node_2
await full_node_2.full_node._respond_sub_block(full_node_protocol.RespondSubBlock(blocks[5]))
# 13th block from node_1 to node_2
# await full_node_2.full_node.respond_sub_block(full_node_protocol.RespondSubBlock(blocks[-1]))
await server_2.start_client(PeerInfo("localhost", uint16(server_1._port)), None)
await time_out_assert(60, node_height_at_least, True, full_node_2, num_blocks - 1)