Fix issue with finalizing blocks while syncing

This commit is contained in:
Mariano Sorgente 2019-11-20 13:35:38 +09:00
parent 0ed0227b3f
commit 55a08b36c7
4 changed files with 61 additions and 18 deletions

View File

@ -80,17 +80,25 @@ class FullNode:
async def _send_challenges_to_timelords(self, delivery: Delivery = Delivery.BROADCAST) -> \
AsyncGenerator[OutboundMessage, None]:
"""
Sends all of the current heads to all timelord peers.
Sends all of the current heads (as well as Pos infos) to all timelord peers.
"""
requests: List[timelord_protocol.ChallengeStart] = []
challenge_requests: List[timelord_protocol.ChallengeStart] = []
pos_info_requests: List[timelord_protocol.ProofOfSpaceInfo] = []
async with (await self.store.get_lock()):
for head in self.blockchain.get_current_tips():
assert head.challenge
challenge_hash = head.challenge.get_hash()
requests.append(timelord_protocol.ChallengeStart(challenge_hash, head.challenge.total_weight))
challenge_requests.append(timelord_protocol.ChallengeStart(challenge_hash, head.challenge.total_weight))
for request in requests:
leader_height, leader_timestamp = await self.store.get_unfinished_block_leader()
leader_infos = [tup[0] for tup in list((await self.store.get_unfinished_blocks()).items())
if tup[1].height == leader_height]
for chall, iters in leader_infos:
pos_info_requests.append(timelord_protocol.ProofOfSpaceInfo(chall, iters))
for request in challenge_requests:
yield OutboundMessage(NodeType.TIMELORD, Message("challenge_start", request), delivery)
if request in pos_info_requests:
yield OutboundMessage(NodeType.TIMELORD, Message("proof_of_space_info", request), delivery)
async def _on_connect(self) -> AsyncGenerator[OutboundMessage, None]:
"""
@ -127,6 +135,7 @@ class FullNode:
"""
introducer = self.config['introducer_peer']
introducer_peerinfo = PeerInfo(introducer['host'], introducer['port'])
async def introducer_client():
async def on_connect():
msg = Message("request_peers", peer_protocol.RequestPeers())
@ -163,6 +172,7 @@ class FullNode:
highest_weight: uint64 = uint64(0)
tip_block: FullBlock
tip_height = 0
caught_up = False
# Based on responses from peers about the current heads, see which head is the heaviest
# (similar to longest chain rule).
@ -177,15 +187,18 @@ class FullNode:
tip_height = block.header_block.challenge.height
if highest_weight <= max([t.weight for t in self.blockchain.get_current_tips()]):
log.info("Not performing sync, already caught up.")
await self.store.set_sync_mode(False)
await self.store.clear_sync_information()
return
caught_up = True
if caught_up:
async for msg in self._finish_sync():
yield msg
return
assert tip_block
log.info(f"Tip block {tip_block.header_hash} tip height {tip_block.height}")
# Now, we download all of the headers in order to verify the weight
# TODO: use queue here, request a few at a time
# TODO: send multiple API calls out at once
timeout = 20
timeout = 30
sleep_interval = 3
total_time_slept = 0
headers: List[HeaderBlock] = []
@ -260,12 +273,25 @@ class FullNode:
log.info(f"Took {time.time() - start} seconds to validate and add block {block.height}.")
assert max([h.height for h in self.blockchain.get_current_tips()]) >= height
await self.store.set_proof_of_time_estimate_ips(await self.blockchain.get_next_ips(block.header_hash))
log.info(f"Finished sync up to height {tip_height}")
async for msg in self._finish_sync():
yield msg
async def _finish_sync(self):
"""
Finalize sync by setting sync mode to False, clearing all sync information, and adding any final
blocks that we have finalized recently.
"""
async with (await self.store.get_lock()):
log.info(f"Finished sync up to height {tip_height}")
potential_fut_blocks = (await self.store.get_potential_future_blocks()).copy()
await self.store.set_sync_mode(False)
await self.store.clear_sync_information()
for block in potential_fut_blocks:
async for msg in self.block(peer_protocol.Block(block)):
yield msg
# Update farmers and timelord with most recent information
async for msg in self._send_challenges_to_timelords():
yield msg
@ -483,8 +509,15 @@ class FullNode:
unfinished_block_obj.header_block.header)
new_full_block: FullBlock = FullBlock(new_header_block, unfinished_block_obj.body)
async for msg in self.block(peer_protocol.Block(new_full_block)):
yield msg
async with (await self.store.get_lock()):
sync_mode = await self.store.get_sync_mode()
if sync_mode:
async with (await self.store.get_lock()):
await self.store.add_potential_future_block(new_full_block)
else:
async for msg in self.block(peer_protocol.Block(new_full_block)):
yield msg
# PEER PROTOCOL
@api_request

View File

@ -230,7 +230,8 @@ class ChiaServer:
and nothing is yielded.
"""
# Send handshake message
outbound_handshake = Message("handshake", Handshake(protocol_version, self._node_id, uint16(self._port), self._local_type))
outbound_handshake = Message("handshake", Handshake(protocol_version, self._node_id, uint16(self._port),
self._local_type))
try:
await connection.send(outbound_handshake)
@ -286,7 +287,7 @@ class ChiaServer:
# Read one message at a time, forever
yield (connection, message)
except asyncio.IncompleteReadError:
log.warning(f"Received EOF from {connection.get_peername()}, closing connection.")
log.info(f"Received EOF from {connection.get_peername()}, closing connection.")
except ConnectionError:
log.warning(f"Connection error by peer {connection.get_peername()}, closing connection.")
finally:

View File

@ -63,6 +63,10 @@ async def main():
full_node._start_bg_tasks()
log.info("Waiting to connect to some peers...")
await asyncio.sleep(3)
log.info(f"Connected to {len(server.global_connections.get_connections())} peers.")
if connect_to_farmer and not server_closed:
peer_info = PeerInfo(full_node.config['farmer_peer']['host'],
full_node.config['farmer_peer']['port'])
@ -73,10 +77,6 @@ async def main():
full_node.config['timelord_peer']['port'])
_ = await server.start_client(peer_info, None)
log.info("Waiting to connect to some peers...")
await asyncio.sleep(3)
log.info(f"Connected to {len(server.global_connections.get_connections())} peers.")
if not server_closed:
try:
async for msg in full_node._sync():

View File

@ -1,4 +1,4 @@
from typing import Tuple, Optional, Dict, Counter
from typing import Tuple, Optional, Dict, Counter, List
import collections
from asyncio import Lock, Event
from src.types.proof_of_space import ProofOfSpace
@ -29,6 +29,8 @@ class FullNodeStore:
# Event, which gets set whenever we receive the block at each height. Waited for by sync().
self.potential_blocks_received: Dict[uint32, Event] = {}
self.potential_future_blocks: List[FullBlock] = []
# These are the blocks that we created, but don't have the PoS from farmer yet,
# keyed from the proof of space hash
self.candidate_blocks: Dict[bytes32, Tuple[Body, HeaderData, ProofOfSpace]] = {}
@ -62,6 +64,7 @@ class FullNodeStore:
self.potential_headers.clear()
self.potential_blocks.clear()
self.potential_blocks_received.clear()
self.potential_future_blocks.clear()
async def add_potential_head(self, header_hash: bytes32):
self.potential_heads[header_hash] += 1
@ -93,6 +96,12 @@ class FullNodeStore:
async def get_potential_blocks_received(self, height: uint32) -> Event:
return self.potential_blocks_received[height]
async def add_potential_future_block(self, block: FullBlock):
self.potential_future_blocks.append(block)
async def get_potential_future_blocks(self):
return self.potential_future_blocks
async def add_candidate_block(self, pos_hash: bytes32, block: Tuple[Body, HeaderData, ProofOfSpace]):
self.candidate_blocks[pos_hash] = block