From 1823f84293ab30d3da936f2d9d6dcf0c7dbfc34e Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 26 Nov 2020 01:22:13 -0500 Subject: [PATCH] rebase clean --- src/farmer.py | 78 +- src/farmer_api.py | 419 +++--- src/full_node/full_node.py | 1012 ++------------ src/full_node/full_node_api.py | 1919 +++++++++------------------ src/full_node/sync_peers_handler.py | 54 +- src/harvester_api.py | 212 +-- src/timelord_api.py | 127 +- src/timelord_new.py | 28 - 8 files changed, 1143 insertions(+), 2706 deletions(-) diff --git a/src/farmer.py b/src/farmer.py index 5763a5fca1c5..ea199bd2d3e6 100644 --- a/src/farmer.py +++ b/src/farmer.py @@ -1,24 +1,19 @@ +import asyncio import logging -from typing import Dict, List, Optional, Callable, Set, Tuple +from typing import Dict, List, Optional, Callable, Tuple from blspy import G1Element -from src.server.server import ChiaServer from src.server.ws_connection import WSChiaConnection from src.util.keychain import Keychain from src.consensus.constants import ConsensusConstants -from src.consensus.pot_iterations import ( - calculate_iterations_quality, - calculate_sp_interval_iters, -) + from src.protocols import farmer_protocol, harvester_protocol from src.server.outbound_message import Message, NodeType from src.types.proof_of_space import ProofOfSpace from src.types.sized_bytes import bytes32 -from src.types.pool_target import PoolTarget -from src.util.api_decorators import api_request -from src.util.ints import uint32, uint64 +from src.util.ints import uint64 from src.wallet.derive_keys import master_sk_to_farmer_sk, master_sk_to_pool_sk from src.util.chech32 import decode_puzzle_hash @@ -58,10 +53,9 @@ class Farmer: self.cache_clear_task: asyncio.Task self.constants = consensus_constants self._shut_down = False - self.server: Optional[ChiaServer] = None + self.server = None self.keychain = keychain self.state_changed_callback: Optional[Callable] = None - self.log = log if len(self._get_public_keys()) == 0: error_str = "No keys exist. Please run 'chia keys generate' or open the UI." @@ -94,25 +88,17 @@ class Farmer: async def _on_connect(self, peer: WSChiaConnection): # Sends a handshake to the harvester + msg = harvester_protocol.HarvesterHandshake( + self._get_public_keys(), + self.pool_public_keys, + ) if peer.connection_type is NodeType.HARVESTER: - h_msg = harvester_protocol.HarvesterHandshake( - self._get_public_keys(), self.pool_public_keys - ) - msg = Message("harvester_handshake", h_msg) + msg = Message("harvester_handshake", msg) await peer.send_message(msg) - if self.current_weight in self.challenges: - for posf in self.challenges[self.current_weight]: - message = harvester_protocol.NewChallenge(posf.challenge_hash) - msg = Message("new_challenge", message) - await peer.send_message(msg) - def _set_server(self, server): self.server = server - def _set_state_changed_callback(self, callback: Callable): - self.state_changed_callback = callback - def _state_changed(self, change: str): if self.state_changed_callback is not None: self.state_changed_callback(change) @@ -122,31 +108,21 @@ class Farmer: def _get_private_keys(self): all_sks = self.keychain.get_all_private_keys() - return [master_sk_to_farmer_sk(sk) for sk, _ in all_sks] + [ - master_sk_to_pool_sk(sk) for sk, _ in all_sks - ] + return [master_sk_to_farmer_sk(sk) for sk, _ in all_sks] + [master_sk_to_pool_sk(sk) for sk, _ in all_sks] - async def _get_required_iters( - self, challenge_hash: bytes32, quality_string: bytes32, plot_size: uint8 - ): - weight: uint128 = self.challenge_to_weight[challenge_hash] - difficulty: uint64 = uint64(0) - for posf in self.challenges[weight]: - if posf.challenge_hash == challenge_hash: - difficulty = posf.difficulty - if difficulty == 0: - raise RuntimeError("Did not find challenge") - - estimate_min = ( - self.proof_of_time_estimate_ips - * self.constants.BLOCK_TIME_TARGET - / self.constants.MIN_ITERS_PROPORTION - ) - estimate_min = uint64(int(estimate_min)) - number_iters: uint64 = calculate_iterations_quality( - quality_string, - plot_size, - difficulty, - estimate_min, - ) - return number_iters + async def _periodically_clear_cache_task(self): + time_slept: uint64 = uint64(0) + while not self._shut_down: + if time_slept > self.constants.SUB_SLOT_TIME_TARGET * 10: + removed_keys: List[bytes32] = [] + for key, add_time in self.cache_add_time.items(): + self.sps.pop(key, None) + self.proofs_of_space.pop(key, None) + self.quality_str_to_identifiers.pop(key, None) + self.number_of_responses.pop(key, None) + removed_keys.append(key) + for key in removed_keys: + self.cache_add_time.pop(key, None) + time_slept = uint64(0) + time_slept += 1 + await asyncio.sleep(1) diff --git a/src/farmer_api.py b/src/farmer_api.py index 7948c3d4773b..0ee93c74973e 100644 --- a/src/farmer_api.py +++ b/src/farmer_api.py @@ -2,7 +2,7 @@ from typing import Optional, Callable from blspy import AugSchemeMPL, G2Element -from src.consensus.pot_iterations import calculate_iterations_quality +from src.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters from src.farmer import Farmer from src.protocols import harvester_protocol, farmer_protocol from src.server.outbound_message import Message, NodeType @@ -23,278 +23,221 @@ class FarmerAPI: self.farmer.state_changed_callback = callback @api_request - async def challenge_response( - self, - challenge_response: harvester_protocol.ChallengeResponse, - ) -> Optional[Message]: + async def new_proof_of_space(self, new_proof_of_space: harvester_protocol.NewProofOfSpace): """ This is a response from the harvester, for a NewChallenge. Here we check if the proof of space is sufficiently good, and if so, we ask for the whole proof. """ - height: uint32 = self.farmer.challenge_to_height[ - challenge_response.challenge_hash - ] - number_iters = await self.farmer._get_required_iters( - challenge_response.challenge_hash, - challenge_response.quality_string, - challenge_response.plot_size, - ) - if height < 1000: # As the difficulty adjusts, don't fetch all qualities - if ( - challenge_response.challenge_hash - not in self.farmer.challenge_to_best_iters - ): - self.farmer.challenge_to_best_iters[ - challenge_response.challenge_hash - ] = number_iters - elif ( - number_iters - < self.farmer.challenge_to_best_iters[challenge_response.challenge_hash] - ): - self.farmer.challenge_to_best_iters[ - challenge_response.challenge_hash - ] = number_iters - else: - return None + if new_proof_of_space.proof.challenge_hash not in self.farmer.number_of_responses: + self.farmer.number_of_responses[new_proof_of_space.proof.challenge_hash] = 0 - estimate_secs: float = number_iters / self.farmer.proof_of_time_estimate_ips - if challenge_response.challenge_hash not in self.farmer.challenge_to_estimates: - self.farmer.challenge_to_estimates[challenge_response.challenge_hash] = [] - self.farmer.challenge_to_estimates[challenge_response.challenge_hash].append( - estimate_secs - ) - - self.farmer.log.info( - f"Estimate: {estimate_secs}, rate: {self.farmer.proof_of_time_estimate_ips}" - ) - if ( - estimate_secs < self.farmer.config["pool_share_threshold"] - or estimate_secs < self.farmer.config["propagate_threshold"] - ): - - request = harvester_protocol.RequestProofOfSpace( - challenge_response.challenge_hash, - challenge_response.plot_id, - challenge_response.response_number, + if self.farmer.number_of_responses[new_proof_of_space.proof.challenge_hash] >= 5: + self.farmer.log.warning( + f"Surpassed 5 PoSpace for one SP, no longer submitting PoSpace for signage point " + f"{new_proof_of_space.proof.challenge_hash}" ) + return - self.farmer._state_changed("challenge") - msg = Message("request_proof_of_space", request) - return msg - return None + if new_proof_of_space.proof.challenge_hash not in self.farmer.sps: + self.farmer.log.warning( + f"Received response for challenge that we do not have {new_proof_of_space.proof.challenge_hash}" + ) + return - @api_request - async def respond_proof_of_space( - self, response: harvester_protocol.RespondProofOfSpace - ) -> Optional[Message]: - """ - This is a response from the harvester with a proof of space. We check it's validity, - and request a pool partial, a header signature, or both, if the proof is good enough. - """ + sp = self.farmer.sps[new_proof_of_space.proof.challenge_hash] - challenge_hash: bytes32 = response.proof.challenge_hash - challenge_weight: uint128 = self.farmer.challenge_to_weight[challenge_hash] - difficulty: uint64 = uint64(0) - for posf in self.farmer.challenges[challenge_weight]: - if posf.challenge_hash == challenge_hash: - difficulty = posf.difficulty - if difficulty == 0: - raise RuntimeError("Did not find challenge") - - computed_quality_string = response.proof.verify_and_get_quality_string( - self.farmer.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG + computed_quality_string = new_proof_of_space.proof.verify_and_get_quality_string( + self.farmer.constants, new_proof_of_space.challenge_hash, new_proof_of_space.proof.challenge_hash ) if computed_quality_string is None: - raise RuntimeError("Invalid proof of space") + self.farmer.log.error(f"Invalid proof of space {new_proof_of_space.proof}") + return - self.farmer.harvester_responses_proofs[ - (response.proof.challenge_hash, response.plot_id, response.response_number) - ] = response.proof - self.farmer.harvester_responses_proof_hash_to_info[ - response.proof.get_hash() - ] = ( - response.proof.challenge_hash, - response.plot_id, - response.response_number, - ) + self.farmer.number_of_responses[new_proof_of_space.proof.challenge_hash] += 1 - estimate_min = ( - self.farmer.proof_of_time_estimate_ips - * self.farmer.constants.BLOCK_TIME_TARGET - / self.farmer.constants.MIN_ITERS_PROPORTION - ) - estimate_min = uint64(int(estimate_min)) - number_iters: uint64 = calculate_iterations_quality( + required_iters: uint64 = calculate_iterations_quality( computed_quality_string, - response.proof.size, - difficulty, - estimate_min, + new_proof_of_space.proof.size, + sp.difficulty, + new_proof_of_space.proof.challenge_hash, ) - estimate_secs: float = number_iters / self.farmer.proof_of_time_estimate_ips + # Double check that the iters are good + assert required_iters < calculate_sp_interval_iters(sp.slot_iterations, sp.sub_slot_iters) - if estimate_secs < self.farmer.config["pool_share_threshold"]: - # TODO: implement pooling - pass - if estimate_secs < self.farmer.config["propagate_threshold"]: - pool_pk = bytes(response.proof.pool_public_key) - if pool_pk not in self.farmer.pool_sks_map: - self.farmer.log.error( - f"Don't have the private key for the pool key used by harvester: {pool_pk.hex()}" + self.farmer._state_changed("proof") + + # Proceed at getting the signatures for this PoSpace + request = harvester_protocol.RequestSignatures( + new_proof_of_space.plot_identifier, + new_proof_of_space.proof.challenge_hash, + [sp.challenge_chain_sp, sp.reward_chain_sp], + ) + + if new_proof_of_space.proof.challenge_hash not in self.farmer.proofs_of_space: + self.farmer.proofs_of_space[new_proof_of_space.proof.challenge_hash] = [ + ( + new_proof_of_space.plot_identifier, + new_proof_of_space.proof, + ) + ] + else: + self.farmer.proofs_of_space[new_proof_of_space.proof.challenge_hash].append( + ( + new_proof_of_space.plot_identifier, + new_proof_of_space.proof, ) - return None - pool_target: PoolTarget = PoolTarget(self.farmer.pool_target, uint32(0)) - pool_target_signature: G2Element = AugSchemeMPL.sign( - self.farmer.pool_sks_map[pool_pk], bytes(pool_target) ) + self.farmer.quality_str_to_identifiers[computed_quality_string] = ( + new_proof_of_space.plot_identifier, + new_proof_of_space.proof.challenge_hash, + ) - request2 = farmer_protocol.RequestHeaderHash( - challenge_hash, - response.proof, - pool_target, - pool_target_signature, - self.farmer.wallet_target, - ) - msg = Message("request_header_hash", request2) - assert self.farmer.server is not None - await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE) - return None - return None + msg = Message("request_signatures", request) + return msg @api_request - async def respond_signature(self, response: harvester_protocol.RespondSignature): + async def respond_signatures(self, response: harvester_protocol.RespondSignatures): """ - Receives a signature on a block header hash, which is required for submitting - a block to the blockchain. + There are two cases: receiving signatures for sps, or receiving signatures for the block. """ - header_hash = response.message - proof_of_space: bytes32 = self.farmer.header_hash_to_pos[header_hash] - validates: bool = False - for sk in self.farmer._get_private_keys(): - pk = sk.get_g1() - if pk == response.farmer_pk: - agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk) - assert agg_pk == proof_of_space.plot_public_key - farmer_share = AugSchemeMPL.sign(sk, header_hash, agg_pk) - agg_sig = AugSchemeMPL.aggregate( - [response.message_signature, farmer_share] - ) - validates = AugSchemeMPL.verify(agg_pk, header_hash, agg_sig) + if response.sp_hash not in self.farmer.sps: + self.farmer.log.warning(f"Do not have challenge hash {response.challenge_hash}") + return + is_sp_signatures: bool = False + sp = self.farmer.sps[response.sp_hash] + if response.sp_hash == response.message_signatures[0]: + assert sp.reward_chain_sp == response.message_signatures[1] + is_sp_signatures = True - if validates: - break - assert validates + pospace = None + for plot_identifier, _, candidate_pospace in self.farmer.proofs_of_space[response.sp_hash]: + if plot_identifier == response.plot_identifier: + pospace = candidate_pospace + assert pospace is not None - pos_hash: bytes32 = proof_of_space.get_hash() + if is_sp_signatures: + ( + challenge_chain_sp, + challenge_chain_sp_harv_sig, + ) = response.message_signatures[0] + reward_chain_sp, reward_chain_sp_harv_sig = response.message_signatures[1] + for sk in self.farmer._get_private_keys(): + pk = sk.get_g1() + if pk == response.farmer_pk: + agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk) + assert agg_pk == pospace.plot_public_key + farmer_share_cc_sp = AugSchemeMPL.sign(sk, challenge_chain_sp, agg_pk) + agg_sig_cc_sp = AugSchemeMPL.aggregate([challenge_chain_sp_harv_sig, farmer_share_cc_sp]) + assert AugSchemeMPL.verify(agg_pk, challenge_chain_sp, agg_sig_cc_sp) - request = farmer_protocol.HeaderSignature(pos_hash, header_hash, agg_sig) - msg = Message("header_signature", request) - assert self.farmer.server is not None - await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE) + computed_quality_string = pospace.verify_and_get_quality_string( + self.farmer.constants, sp.challenge_hash, challenge_chain_sp + ) + + # This means it passes the sp filter + if computed_quality_string is not None: + farmer_share_rc_sp = AugSchemeMPL.sign(sk, reward_chain_sp, agg_pk) + agg_sig_rc_sp = AugSchemeMPL.aggregate([reward_chain_sp_harv_sig, farmer_share_rc_sp]) + assert AugSchemeMPL.verify(agg_pk, reward_chain_sp, agg_sig_rc_sp) + + pool_pk = bytes(pospace.pool_public_key) + if pool_pk not in self.farmer.pool_sks_map: + self.farmer.log.error(f"Don't have the private key for the pool key used by harvester: {pool_pk.hex()}") + return + pool_target: PoolTarget = PoolTarget(self.farmer.pool_target, uint32(0)) + pool_target_signature: G2Element = AugSchemeMPL.sign( + self.farmer.pool_sks_map[pool_pk], bytes(pool_target) + ) + request = farmer_protocol.DeclareProofOfSpace( + response.challenge_hash, + challenge_chain_sp, + sp.signage_point_index, + reward_chain_sp, + pospace, + agg_sig_cc_sp, + agg_sig_rc_sp, + self.farmer.wallet_target, + pool_target, + pool_target_signature, + ) + + msg = Message("declare_proof_of_space", request) + await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE) + return + else: + self.farmer.log.warning(f"Have invalid PoSpace {pospace}") + + else: + # This is a response with block signatures + for sk in self.farmer._get_private_keys(): + ( + foliage_sub_block_hash, + foliage_sub_block_sig_harvester, + ) = response.message_signatures[0] + ( + foliage_block_hash, + foliage_block_sig_harvester, + ) = response.message_signatures[1] + pk = sk.get_g1() + if pk == response.farmer_pk: + computed_quality_string = pospace.verify_and_get_quality_string(self.farmer.constants, None, None) + + agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk) + assert agg_pk == pospace.plot_public_key + foliage_sub_block_sig_farmer = AugSchemeMPL.sign(sk, foliage_sub_block_hash, agg_pk) + foliage_block_sig_farmer = AugSchemeMPL.sign(sk, foliage_block_hash, agg_pk) + foliage_sub_block_agg_sig = AugSchemeMPL.aggregate( + [foliage_sub_block_sig_harvester, foliage_sub_block_sig_farmer] + ) + foliage_block_agg_sig = AugSchemeMPL.aggregate( + [foliage_block_sig_harvester, foliage_block_sig_farmer] + ) + assert AugSchemeMPL.verify(agg_pk, foliage_sub_block_hash, foliage_sub_block_agg_sig) + assert AugSchemeMPL.verify(agg_pk, foliage_block_hash, foliage_block_agg_sig) + + request = farmer_protocol.SignedValues( + computed_quality_string, + foliage_sub_block_agg_sig, + foliage_block_agg_sig, + ) + + msg = Message("signed_values", request) + await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE) """ FARMER PROTOCOL (FARMER <-> FULL NODE) """ @api_request - async def header_hash(self, response: farmer_protocol.HeaderHash): - """ - Full node responds with the hash of the created header - """ - header_hash: bytes32 = response.header_hash + async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignagePoint): + message = harvester_protocol.NewSignagePoint( + new_signage_point.challenge_hash, + new_signage_point.difficulty, + new_signage_point.sub_slot_iters, + new_signage_point.signage_point_index, + new_signage_point.challenge_chain_sp, + ) - ( - challenge_hash, - plot_id, - response_number, - ) = self.farmer.harvester_responses_proof_hash_to_info[response.pos_hash] - pos = self.farmer.harvester_responses_proofs[ - challenge_hash, plot_id, response_number - ] - self.farmer.header_hash_to_pos[header_hash] = pos - - # TODO: only send to the harvester who made the proof of space, not all harvesters - request = harvester_protocol.RequestSignature(plot_id, header_hash) - - msg = Message("request_signature", request) - assert self.farmer.server is not None + msg = Message("new_signage_point", message) await self.farmer.server.send_to_all([msg], NodeType.HARVESTER) + self.farmer.sps[new_signage_point.challenge_chain_sp] = new_signage_point + self.farmer._state_changed("signage_point") @api_request - async def proof_of_space_finalized( - self, - proof_of_space_finalized: farmer_protocol.ProofOfSpaceFinalized, - ): - """ - Full node notifies farmer that a proof of space has been completed. It gets added to the - challenges list at that weight, and weight is updated if necessary - """ - get_proofs: bool = False - if ( - proof_of_space_finalized.weight >= self.farmer.current_weight - and proof_of_space_finalized.challenge_hash - not in self.farmer.seen_challenges - ): - # Only get proofs for new challenges, at a current or new weight - get_proofs = True - if proof_of_space_finalized.weight > self.farmer.current_weight: - self.farmer.current_weight = proof_of_space_finalized.weight + async def request_signed_values(self, full_node_request: farmer_protocol.RequestSignedValues): + if full_node_request.quality_string not in self.farmer.quality_str_to_identifiers: + self.farmer.log.error(f"Do not have quality string {full_node_request.quality_string}") + return - self.farmer.log.info( - f"\tCurrent weight set to {self.farmer.current_weight}" - ) - self.farmer.seen_challenges.add(proof_of_space_finalized.challenge_hash) - if proof_of_space_finalized.weight not in self.farmer.challenges: - self.farmer.challenges[proof_of_space_finalized.weight] = [ - proof_of_space_finalized - ] - else: - self.farmer.challenges[proof_of_space_finalized.weight].append( - proof_of_space_finalized - ) - self.farmer.challenge_to_weight[ - proof_of_space_finalized.challenge_hash - ] = proof_of_space_finalized.weight - self.farmer.challenge_to_height[ - proof_of_space_finalized.challenge_hash - ] = proof_of_space_finalized.height + plot_identifier, challenge_hash = self.farmer.quality_str_to_identifiers[full_node_request.quality_string] + request = harvester_protocol.RequestSignatures( + plot_identifier, + challenge_hash, + [ + full_node_request.foliage_sub_block_hash, + full_node_request.foliage_block_hash, + ], + ) - if get_proofs: - message = harvester_protocol.NewChallenge( - proof_of_space_finalized.challenge_hash - ) - - msg = Message("new_challenge", message) - assert self.farmer.server is not None - await self.farmer.server.send_to_all([msg], NodeType.HARVESTER) - # This allows the collection of estimates from the harvesters - self.farmer._state_changed("challenge") - - @api_request - async def proof_of_space_arrived( - self, - proof_of_space_arrived: farmer_protocol.ProofOfSpaceArrived, - ) -> Optional[Message]: - - """ - Full node notifies the farmer that a new proof of space was created. The farmer can use this - information to decide whether to propagate a proof. - """ - if proof_of_space_arrived.weight not in self.farmer.unfinished_challenges: - self.farmer.unfinished_challenges[proof_of_space_arrived.weight] = [] - else: - self.farmer.unfinished_challenges[proof_of_space_arrived.weight].append( - proof_of_space_arrived.quality_string - ) - return None - - @api_request - async def proof_of_time_rate( - self, - proof_of_time_rate: farmer_protocol.ProofOfTimeRate, - ) -> Optional[Message]: - """ - Updates our internal estimate of the iterations per second for the fastest proof of time - in the network. - """ - self.farmer.proof_of_time_estimate_ips = proof_of_time_rate.pot_estimate_ips - return None + msg = Message("request_signatures", request) + await self.farmer.server.send_to_all([msg], NodeType.HARVESTER) diff --git a/src/full_node/full_node.py b/src/full_node/full_node.py index 5159747b4d9a..a93ead316d48 100644 --- a/src/full_node/full_node.py +++ b/src/full_node/full_node.py @@ -1,16 +1,13 @@ import asyncio +import dataclasses import logging import time -from pathlib import Path -from typing import AsyncGenerator, Dict, Optional, Callable, Tuple, List -import random import traceback + from pathlib import Path from typing import AsyncGenerator, Optional, Dict, Callable, List, Tuple, Any import aiosqlite -from blspy import G2Element -from chiabip158 import PyBIP158 -import dataclasses +import src.server.ws_connection as ws from src.consensus.constants import ConsensusConstants from src.consensus.difficulty_adjustment import get_sub_slot_iters_and_difficulty @@ -20,42 +17,36 @@ from src.consensus.pot_iterations import ( is_overflow_sub_block, calculate_iterations_quality, ) +from src.consensus.pot_iterations import is_overflow_sub_block + from src.full_node.block_store import BlockStore from src.consensus.blockchain import Blockchain, ReceiveBlockResult from src.full_node.coin_store import CoinStore from src.full_node.full_node_store import FullNodeStore from src.consensus.make_sub_epoch_summary import next_sub_epoch_summary from src.full_node.mempool_manager import MempoolManager -from src.full_node.signage_point import SignagePoint from src.full_node.sub_block_record import SubBlockRecord from src.full_node.sync_blocks_processor import SyncBlocksProcessor from src.full_node.sync_peers_handler import SyncPeersHandler from src.full_node.sync_store import SyncStore from src.protocols import ( - farmer_protocol, full_node_protocol, timelord_protocol, wallet_protocol, ) -from src.protocols.full_node_protocol import AllHeaderHashes -from src.server.connection_utils import send_all_first_reply + from src.server.node_discovery import FullNodePeers from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage from src.server.server import ChiaServer -from src.types.end_of_slot_bundle import EndOfSubSlotBundle from src.server.ws_connection import WSChiaConnection -from src.types.challenge import Challenge +from src.types.end_of_slot_bundle import EndOfSubSlotBundle from src.types.full_block import FullBlock -from src.types.mempool_inclusion_status import MempoolInclusionStatus -from src.types.mempool_item import MempoolItem + from src.types.sized_bytes import bytes32 -from src.types.spend_bundle import SpendBundle from src.types.sub_epoch_summary import SubEpochSummary from src.types.unfinished_block import UnfinishedBlock -from src.util.api_decorators import api_request -from src.consensus.block_creation import create_unfinished_block, unfinished_block_to_full_block from src.util.errors import ConsensusError -from src.util.ints import uint32, uint64, uint128, uint8 +from src.util.ints import uint32, uint128, uint8 from src.util.path import mkdir, path_from_root OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None] @@ -168,137 +159,61 @@ class FullNode: if self.state_changed_callback is not None: self.state_changed_callback(change) - async def _send_tips_to_farmers(self): - """ - Sends all of the current heads to all farmer peers. Also sends the latest - estimated proof of time rate, so farmer can calulate which proofs are good. - """ - - requests: List[farmer_protocol.ProofOfSpaceFinalized] = [] - async with self.blockchain.lock: - tips: List[Header] = self.blockchain.get_current_tips() - for tip in tips: - full_tip: Optional[FullBlock] = await self.block_store.get_block( - tip.header_hash - ) - assert full_tip is not None - challenge: Optional[Challenge] = self.blockchain.get_challenge(full_tip) - assert challenge is not None - challenge_hash = challenge.get_hash() - if tip.height > 0: - difficulty: uint64 = self.blockchain.get_next_difficulty( - self.blockchain.headers[tip.prev_header_hash] - ) - else: - difficulty = uint64(tip.weight) - requests.append( - farmer_protocol.ProofOfSpaceFinalized( - challenge_hash, tip.height, tip.weight, difficulty - ) - ) - full_block: Optional[FullBlock] = await self.block_store.get_block( - tips[0].header_hash - ) - assert full_block is not None - proof_of_time_min_iters: uint64 = self.blockchain.get_next_min_iters( - full_block - ) - proof_of_time_rate: uint64 = uint64( - proof_of_time_min_iters - * self.constants.MIN_ITERS_PROPORTION - // (self.constants.BLOCK_TIME_TARGET) - ) - rate_update = farmer_protocol.ProofOfTimeRate(proof_of_time_rate) - messages = [Message("proof_of_time_rate", rate_update)] - - for request in requests: - messages.append(Message("proof_of_space_finalized", request)) - - await self.server.send_to_all(messages, NodeType.FARMER) - - async def _send_challenges_to_timelords( - self, time_lords: List[WSChiaConnection] = None - ): + async def _send_peak_to_timelords(self): """ Sends all of the current peaks (as well as unfinished blocks) to timelords """ - full_messages = [] - timelord_messages = [] - - challenge_requests: List[timelord_protocol.ChallengeStart] = [] - pos_info_requests: List[timelord_protocol.ProofOfSpaceInfo] = [] - tips: List[Header] = self.blockchain.get_current_tips() - tips_blocks: List[Optional[FullBlock]] = [ - await self.block_store.get_block(tip.header_hash) for tip in tips - ] - for tip in tips_blocks: - assert tip is not None - challenge = self.blockchain.get_challenge(tip) - assert challenge is not None - challenge_requests.append( - timelord_protocol.ChallengeStart(challenge.get_hash(), tip.weight) + peak_block = await self.blockchain.get_full_peak() + peak = self.blockchain.sub_blocks[peak_block.header_hash] + difficulty = self.blockchain.get_next_difficulty(peak, False) + if peak is not None: + ses: Optional[SubEpochSummary] = next_sub_epoch_summary( + self.constants, + self.blockchain.sub_blocks, + self.blockchain.height_to_hash, + peak.signage_point_index, + peak.sub_slot_itsub_slot_iters, + peak_block, + ) + timelord_new_peak: timelord_protocol.NewPeak = timelord_protocol.NewPeak( + peak_block.reward_chain_sub_block, difficulty, peak.deficit, peak.sub_slot_iters, ses ) - tip_hashes = [tip.header_hash for tip in tips] - tip_infos = [ - (tup[0], tup[1]) - for tup in list( - (await self.full_node_store.get_unfinished_blocks()).items() - ) - if tup[1].prev_header_hash in tip_hashes - ] - for ((chall, iters), _) in tip_infos: - pos_info_requests.append(timelord_protocol.ProofOfSpaceInfo(chall, iters)) - - # Sends our best unfinished block (proof of space) to peer - for ((_, iters), block) in sorted(tip_infos, key=lambda t: t[0][1]): - if block.height < self.full_node_store.get_unfinished_block_leader()[0]: - continue - unfinished_block_msg = full_node_protocol.NewUnfinishedBlock( - block.prev_header_hash, iters, block.header_hash - ) - full_messages.append(Message("new_unfinished_block", unfinished_block_msg)) - break - for challenge_msg in challenge_requests: - timelord_messages.append(Message("challenge_start", challenge_msg)) - for pos_info_msg in pos_info_requests: - timelord_messages.append(Message("proof_of_space_info", pos_info_msg)) - - if self.server is not None: - await self.server.send_to_all(timelord_messages, NodeType.TIMELORD) - await self.server.send_to_all(full_messages, NodeType.FULL_NODE) + # Tell timelord about the new peak + msg = Message("new_peak", timelord_new_peak) + await self.server.send_to_all([msg], NodeType.TIMELORD) async def _on_connect(self, connection: WSChiaConnection): """ - Whenever we connect to another node / wallet, send them our current heads. Also send heads to farmers - and challenges to timelords. - """ - - self.full_node_peers.add_message( - "new_inbound_connection", connection.get_peer_info() + Whenever we connect to another node / wallet, send them our current heads. Also send heads to farmers + and challenges to timelords. + """ + peak_full: FullBlock = await self.blockchain.get_full_peak() + peak: SubBlockRecord = self.blockchain.sub_blocks[peak_full.header_hash] + request_node = full_node_protocol.NewPeak( + peak.header_hash, + peak.sub_block_height, + peak.weight, + peak.sub_block_height, + peak_full.reward_chain_sub_block.get_unfinished().get_hash(), ) if connection.connection_type is NodeType.FULL_NODE: - tips: List[Header] = self.blockchain.get_current_tips() - for t in tips: - request = full_node_protocol.NewTip(t.height, t.weight, t.header_hash) - msg = Message("new_tip", request) - await connection.send_message(msg) - # Send filter to node and request mempool items that are not in it my_filter = self.mempool_manager.get_filter() mempool_request = full_node_protocol.RequestMempoolTransactions(my_filter) + msg = Message("request_mempool_transactions", mempool_request) await connection.send_message(msg) - elif connection.connection_type is NodeType.WALLET: + + return Message("new_peak", request_node) + elif connection.connection_type is NodeType.WALLET: # If connected to a wallet, send the LCA - lca = self.blockchain.lca_block - new_lca = wallet_protocol.NewLCA(lca.header_hash, lca.height, lca.weight) - msg = Message("new_lca", new_lca) - await connection.send_message(msg) + request_wallet = wallet_protocol.NewPeak( + peak.header_hash, peak.sub_block_height, peak.weight, peak.sub_block_height + ) + return Message("new_peak", request_wallet) elif connection.connection_type is NodeType.TIMELORD: - await self._send_challenges_to_timelords() - elif connection.connection_type is NodeType.FARMER: - await self._send_tips_to_farmers() + await self._send_peak_to_timelords() async def _on_disconnect(self, connection: WSChiaConnection): self.log.info("peer disconnected") @@ -331,19 +246,13 @@ class FullNode: self.log.info("Waiting to receive peaks from peers.") self.sync_peers_handler = None self.sync_store.waiting_for_peaks = True - nodes = list(self.server.full_nodes.values()) - - # TODO: better way to tell that we have finished receiving tips + # TODO: better way to tell that we have finished receiving peaks # TODO: fix DOS issue. Attacker can request syncing to an invalid blockchain await asyncio.sleep(2) highest_weight: uint128 = uint128(0) - tip_block: Optional[FullBlock] = None - tip_height = 0 + peak_height: uint32 = uint32(0) sync_start_time = time.time() - if self.server is None: - return - # Based on responses from peers about the current heads, see which head is the heaviest # (similar to longest chain rule). self.sync_store.waiting_for_peaks = False @@ -373,7 +282,7 @@ class FullNode: peers: List[WSChiaConnection] = list(self.server.full_nodes.values()) self.sync_peers_handler = SyncPeersHandler( - self.sync_store, peers, fork_point_height, self.blockchain, peak_height + self.sync_store, peers, fork_point_height, self.blockchain, peak_height, self.server ) # Start processing blocks that we have received (no block yet) @@ -393,15 +302,17 @@ class FullNode: break if block_processor_task.done(): break + await self.sync_peers_handler.monitor_timeouts() cur_peers: List[WSChiaConnection] = [ con for id, con in self.server.all_connections.items() if ( - con.peer_node_id is not None - and con.connection_type == NodeType.FULL_NODE + con.peer_node_id is not None + and con.connection_type == NodeType.FULL_NODE ) ] + for node_id in cur_peers: if node_id not in peers: self.sync_peers_handler.new_node_connected(node_id) @@ -411,14 +322,11 @@ class FullNode: self.sync_peers_handler.node_disconnected(node_id) peers = cur_peers - async for msg in self.sync_peers_handler.add_to_request_sets(): - yield msg # Send more requests if we can + await self.sync_peers_handler.add_to_request_sets() new_peak = self.blockchain.get_peak() if new_peak != peak: - yield OutboundMessage( - NodeType.WALLET, - Message( + msg = Message( "new_peak", wallet_protocol.NewPeak( new_peak.header_hash, @@ -426,9 +334,8 @@ class FullNode: new_peak.weight, new_peak.prev_hash, ), - ), - Delivery.BROADCAST, - ) + ) + self.server.send_to_all([msg], NodeType.WALLET) self._state_changed("sub_block") await asyncio.sleep(5) @@ -464,148 +371,22 @@ class FullNode: for block in potential_fut_blocks: if self._shut_down: return - async for msg in self.respond_sub_block(full_node_protocol.RespondSubBlock(block)): - yield msg + await self.respond_sub_block(full_node_protocol.RespondSubBlock(block)) # Update timelords with most recent information - async for msg in self._send_peak_to_timelords(): - yield msg + await self._send_peak_to_timelords() peak: SubBlockRecord = self.blockchain.get_peak() request_wallet = wallet_protocol.NewPeak( peak.header_hash, peak.sub_block_height, peak.weight, peak.sub_block_height ) - yield OutboundMessage(NodeType.WALLET, Message("new_peak", request_wallet), Delivery.BROADCAST) + msg = Message("new_peak", request_wallet) + await self.server.send_to_all([msg], NodeType.WALLET) self._state_changed("sub_block") - @api_request - async def new_peak(self, request: full_node_protocol.NewPeak) -> OutboundMessageGenerator: - """ - A peer notifies us that they have added a new peak to their blockchain. If we don't have it, - we can ask for it. - """ - # Check if we have this block in the blockchain - if self.blockchain.contains_sub_block(request.header_hash): - return - - # TODO: potential optimization, don't request blocks that we have already sent out - request_transactions: bool = ( - self.full_node_store.get_unfinished_block(request.unfinished_reward_block_hash) is None - ) - message = Message( - "request_sub_block", - full_node_protocol.RequestSubBlock(request.sub_block_height, request_transactions), - ) - yield OutboundMessage(NodeType.FULL_NODE, message, Delivery.RESPOND) - - @api_request - async def new_transaction(self, transaction: full_node_protocol.NewTransaction) -> OutboundMessageGenerator: - """ - A peer notifies us of a new transaction. - Requests a full transaction if we haven't seen it previously, and if the fees are enough. - """ - # Ignore if syncing - if self.sync_store.get_sync_mode(): - return - # Ignore if already seen - if self.mempool_manager.seen(transaction.transaction_id): - return - - if self.mempool_manager.is_fee_enough(transaction.fees, transaction.cost): - request_tx = full_node_protocol.RequestTransaction(transaction.transaction_id) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("request_transaction", request_tx), - Delivery.RESPOND, - ) - - @api_request - async def request_transaction(self, request: full_node_protocol.RequestTransaction) -> OutboundMessageGenerator: - """ Peer has requested a full transaction from us. """ - # Ignore if syncing - if self.sync_store.get_sync_mode(): - return - spend_bundle = self.mempool_manager.get_spendbundle(request.transaction_id) - if spend_bundle is None: - return - - transaction = full_node_protocol.RespondTransaction(spend_bundle) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("respond_transaction", transaction), - Delivery.RESPOND, - ) - - self.log.info(f"sending transaction (tx_id: {spend_bundle.name()}) to peer") - - @api_request - async def respond_transaction(self, tx: full_node_protocol.RespondTransaction) -> OutboundMessageGenerator: - """ - Receives a full transaction from peer. - If tx is added to mempool, send tx_id to others. (new_transaction) - """ - # Ignore if syncing - if self.sync_store.get_sync_mode(): - return - - async with self.blockchain.lock: - # Ignore if we have already added this transaction - if self.mempool_manager.get_spendbundle(tx.transaction.name()) is not None: - return - cost, status, error = await self.mempool_manager.add_spendbundle(tx.transaction) - if status == MempoolInclusionStatus.SUCCESS: - self.log.info(f"Added transaction to mempool: {tx.transaction.name()}") - fees = tx.transaction.fees() - assert fees >= 0 - assert cost is not None - new_tx = full_node_protocol.NewTransaction( - tx.transaction.name(), - cost, - uint64(tx.transaction.fees()), - ) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("new_transaction", new_tx), - Delivery.BROADCAST_TO_OTHERS, - ) - else: - self.log.warning( - f"Was not able to add transaction with id {tx.transaction.name()}, {status} error: {error}" - ) - return - - @api_request - async def request_proof_of_weight(self, tx: full_node_protocol.RequestProofOfWeight) -> OutboundMessageGenerator: - # TODO(mariano/almog) - pass - - @api_request - async def respond_proof_of_weight(self, tx: full_node_protocol.RespondProofOfWeight) -> OutboundMessageGenerator: - # TODO(mariano/almog) - pass - - @api_request - async def request_sub_block(self, request_block: full_node_protocol.RequestSubBlock) -> OutboundMessageGenerator: - if request_block.height not in self.blockchain.height_to_hash: - return - block: Optional[FullBlock] = await self.block_store.get_full_block( - self.blockchain.height_to_hash[request_block.height] - ) - if block is not None: - if not request_block.include_transaction_block: - block = dataclasses.replace(block, transactions_generator=None) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("respond_block", full_node_protocol.RespondSubBlock(block)), - Delivery.RESPOND, - ) - return - return - - @api_request async def respond_sub_block( self, respond_sub_block: full_node_protocol.RespondSubBlock - ) -> OutboundMessageGenerator: + ): """ Receive a full block from a peer full node (or ourselves). """ @@ -619,17 +400,15 @@ class FullNode: # This is a block we asked for during sync if self.sync_peers_handler is not None: - resp: List[OutboundMessage] = await self.sync_peers_handler.new_block( - respond_block.block - ) - for req in resp: - type = req.peer_type + requests = await self.sync_peers_handler.new_block(sub_block) + for req in requests: + msg = req.message node_id = req.specific_peer_node_id - message = req.message - if node_id is None: - await self.server.send_to_all([message], type) + if node_id is not None: + await self.server.send_to_specific([msg], node_id) else: - await self.server.send_to_specific([message], node_id) + await self.server.send_to_all([msg], NodeType.FULL_NODE) + return # Adds the block to seen, and check if it's seen before (which means header is in memory) header_hash = sub_block.header.get_hash() @@ -675,16 +454,14 @@ class FullNode: ) try: # Performs sync, and catch exceptions so we don't close the connection - async for ret_msg in self._sync(): - yield ret_msg + await self._sync() except asyncio.CancelledError: self.log.error("Syncing failed, CancelledError") except Exception as e: tb = traceback.format_exc() self.log.error(f"Error with syncing: {type(e)}{tb}") finally: - async for ret_msg in self._finish_sync(): - yield ret_msg + await self._finish_sync() elif sub_block.height >= peak_height - 5: # Allows shallow reorgs by simply requesting the previous height repeatedly @@ -698,7 +475,7 @@ class FullNode: full_node_protocol.RequestSubBlock(uint32(sub_block.height - 1), True), ) self.full_node_store.add_disconnected_block(sub_block) - yield OutboundMessage(NodeType.FULL_NODE, msg, Delivery.RESPOND) + return msg return elif added == ReceiveBlockResult.NEW_PEAK: # Only propagate blocks which extend the blockchain (becomes one of the heads) @@ -754,23 +531,17 @@ class FullNode: uint8(0), added_eos.reward_chain.end_of_slot_vdf.challenge, ) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("new_signage_point_or_end_of_sub_slot", broadcast), - Delivery.BROADCAST, - ) + msg = Message("new_signage_point_or_end_of_sub_slot", broadcast) + self.server.send_to_all([msg], NodeType.FullNode) if new_peak.height % 1000 == 0: # Occasionally clear the seen list to keep it small self.full_node_store.clear_seen_unfinished_blocks() - async for msg in self._send_peak_to_timelords(): - yield msg + await self._send_peak_to_timelords() # Tell full nodes about the new peak - yield OutboundMessage( - NodeType.FULL_NODE, - Message( + msg = Message( "new_peak", full_node_protocol.NewPeak( sub_block.header_hash, @@ -779,13 +550,11 @@ class FullNode: fork_height, sub_block.reward_chain_sub_block.get_unfinished().get_hash(), ), - ), - Delivery.BROADCAST_TO_OTHERS, - ) + ) + self.server.send_to_all([msg], NodeType.FULL_NODE) + # Tell wallets about the new peak - yield OutboundMessage( - NodeType.WALLET, - Message( + msg = Message( "new_peak", wallet_protocol.NewPeak( sub_block.header_hash, @@ -793,9 +562,8 @@ class FullNode: sub_block.weight, fork_height, ), - ), - Delivery.BROADCAST, - ) + ) + self.server.send_to_all([msg], NodeType.Wallet) elif added == ReceiveBlockResult.ADDED_AS_ORPHAN: self.log.info(f"Received orphan block of height {sub_block.height}") @@ -808,8 +576,7 @@ class FullNode: # Recursively process the next block if we have it if next_block is not None: - async for ret_msg in self.respond_sub_block(full_node_protocol.RespondSubBlock(next_block)): - yield ret_msg + await self.respond_sub_block(full_node_protocol.RespondSubBlock(next_block)) # Removes all temporary data for old blocks clear_height = uint32(max(0, self.blockchain.get_peak().height - 50)) @@ -818,42 +585,10 @@ class FullNode: self.full_node_store.clear_unfinished_blocks_below(clear_height) self._state_changed("sub_block") - @api_request - async def new_unfinished_sub_block( - self, new_unfinished_sub_block: full_node_protocol.NewUnfinishedSubBlock - ) -> OutboundMessageGenerator: - if self.full_node_store.get_unfinished_block(new_unfinished_sub_block.unfinished_reward_hash) is not None: - return - yield OutboundMessage( - NodeType.FULL_NODE, - Message( - "request_unfinished_sub_block", - full_node_protocol.RequestUnfinishedSubBlock(new_unfinished_sub_block.unfinished_reward_hash), - ), - Delivery.RESPOND, - ) - - @api_request - async def request_unfinished_sub_block( - self, request_unfinished_sub_block: full_node_protocol.RequestUnfinishedSubBlock - ) -> OutboundMessageGenerator: - unfinished_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block( - request_unfinished_sub_block.unfinished_reward_hash - ) - if unfinished_block is not None: - yield OutboundMessage( - NodeType.FULL_NODE, - Message( - "respond_unfinished_block", - full_node_protocol.RespondUnfinishedSubBlock(unfinished_block), - ), - Delivery.RESPOND, - ) - - @api_request - async def respond_unfinished_sub_block( - self, respond_unfinished_sub_block: full_node_protocol.RespondUnfinishedSubBlock - ) -> OutboundMessageGenerator: + async def _respond_unfinished_sub_block( + self, respond_unfinished_sub_block: full_node_protocol.RespondUnfinishedSubBlock, + peer: Optional[ws.WSChiaConnection] + ) -> Optional[Message]: """ We have received an unfinished sub-block, either created by us, or from another peer. We can validate it and if it's a good block, propagate it to other peers and @@ -911,570 +646,13 @@ class FullNode: ), ) - yield OutboundMessage( - NodeType.TIMELORD, - Message("new_unfinished_sub_block", timelord_request), - Delivery.BROADCAST, - ) + msg = Message("new_unfinished_sub_block", timelord_request) + self.server.send_to_all([msg], NodeType.TIMELORD) full_node_request = full_node_protocol.NewUnfinishedSubBlock(block.reward_chain_sub_block.get_hash()) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("new_unfinished_sub_block", full_node_request), - Delivery.BROADCAST_TO_OTHERS, - ) + msg = Message("new_unfinished_sub_block", full_node_request) + if peer is not None: + self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) + else: + self.server.send_to_all([msg], NodeType.FULL_NODE) self._state_changed("sub_block") - - @api_request - async def new_signage_point_or_end_of_sub_slot( - self, new_sp: full_node_protocol.NewSignagePointOrEndOfSubSlot - ) -> OutboundMessageGenerator: - if ( - self.full_node_store.get_signage_point_by_index( - new_sp.challenge_hash, new_sp.index_from_challenge, new_sp.last_rc_infusion - ) - is not None - ): - return - - full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot( - new_sp.challenge_hash, new_sp.index_from_challenge, new_sp.last_rc_infusion - ) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("request_signage_point_or_end_of_sub_slot", full_node_request), - Delivery.RESPOND, - ) - - @api_request - async def request_signage_point_or_end_of_sub_slot( - self, request: full_node_protocol.RequestSignagePointOrEndOfSubSlot - ) -> OutboundMessageGenerator: - if request.index_from_challenge == 0: - sub_slot: Optional[Tuple[EndOfSubSlotBundle, int]] = self.full_node_store.get_sub_slot( - request.challenge_hash - ) - if sub_slot is not None: - yield OutboundMessage( - NodeType.FULL_NODE, - Message("respond_end_of_slot", full_node_protocol.RespondEndOfSubSlot(sub_slot[0])), - Delivery.RESPOND, - ) - else: - self.log.warning("") - else: - sp: Optional[SignagePoint] = self.full_node_store.get_signage_point_by_index( - request.challenge_hash, request.index_from_challenge, request.last_rc_infusion - ) - if sp is not None: - full_node_response = full_node_protocol.RespondSignagePoint( - request.index_from_challenge, - sp.cc_vdf, - sp.cc_proof, - sp.rc_vdf, - sp.rc_proof, - ) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("respond_signage_point", full_node_response), - Delivery.RESPOND, - ) - - @api_request - async def respond_signage_point(self, request: full_node_protocol.RespondSignagePoint) -> OutboundMessageGenerator: - peak = self.blockchain.get_peak() - next_sub_slot_iters = self.blockchain.get_next_slot_iters(peak.get_hash(), True) - - added = self.full_node_store.new_signage_point( - request.index_from_challenge, - self.blockchain.sub_blocks, - self.blockchain.get_peak(), - next_sub_slot_iters, - SignagePoint( - request.challenge_chain_vdf, - request.challenge_chain_proof, - request.reward_chain_vdf, - request.reward_chain_proof, - ), - ) - - if added: - broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot( - request.challenge_chain_vdf.challenge, - request.index_from_challenge, - request.reward_chain_vdf.challenge, - ) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("new_signage_point_or_end_of_sub_slot", broadcast), - Delivery.BROADCAST_TO_OTHERS, - ) - - @api_request - async def respond_end_of_sub_slot( - self, request: full_node_protocol.RespondEndOfSubSlot - ) -> OutboundMessageGenerator: - - added = self.full_node_store.new_finished_sub_slot( - request.end_of_slot_bundle, self.blockchain.sub_blocks, self.blockchain.get_peak() - ) - - if added: - broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot( - request.end_of_slot_bundle.challenge_chain.get_hash(), - uint8(0), - request.end_of_slot_bundle.reward_chain.end_of_slot_vdf.challenge, - ) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("new_signage_point_or_end_of_sub_slot", broadcast), - Delivery.BROADCAST_TO_OTHERS, - ) - - @api_request - async def request_mempool_transactions( - self, request: full_node_protocol.RequestMempoolTransactions - ) -> OutboundMessageGenerator: - received_filter = PyBIP158(bytearray(request.filter)) - - items: List[MempoolItem] = await self.mempool_manager.get_items_not_in_filter(received_filter) - - for item in items: - transaction = full_node_protocol.RespondTransaction(item.spend_bundle) - yield OutboundMessage( - NodeType.FULL_NODE, - Message("respond_transaction", transaction), - Delivery.RESPOND, - ) - - # FARMER PROTOCOL - @api_request - async def declare_proof_of_space(self, request: farmer_protocol.DeclareProofOfSpace) -> OutboundMessageGenerator: - """ - Creates a block body and header, with the proof of space, coinbase, and fee targets provided - by the farmer, and sends the hash of the header data back to the farmer. - """ - if request.pool_target is None or request.pool_signature is None: - raise ValueError("Adaptable pool protocol not yet available.") - - # Checks that the proof of space is a response to a recent challenge and valid SP - pos_sub_slot: Optional[Tuple[EndOfSubSlotBundle, int]] = self.full_node_store.get_sub_slot( - request.proof_of_space.challenge - ) - sp_vdfs: Optional[SignagePoint] = self.full_node_store.get_signage_point(request.challenge_chain_sp) - - if sp_vdfs is None or pos_sub_slot is None: - self.log.warning(f"Received proof of space for an unknown signage point: {request}") - return - - # Now we know that the proof of space has a signage point either: - # 1. In the previous sub-slot of the peak (overflow) - # 2. In the same sub-slot as the peak - # 3. In a future sub-slot that we already know of - - # Checks that the proof of space is valid - quality_string: Optional[bytes32] = request.proof_of_space.verify_and_get_quality_string( - self.constants, request.challenge_hash, request.challenge_chain_sp - ) - assert len(quality_string) == 32 - - # Grab best transactions from Mempool for given tip target - async with self.blockchain.lock: - peak: Optional[SubBlockRecord] = self.blockchain.get_peak() - if peak is None: - spend_bundle: Optional[SpendBundle] = None - else: - spend_bundle: Optional[SpendBundle] = await self.mempool_manager.create_bundle_from_mempool( - peak.header_hash - ) - if pos_sub_slot[0].challenge_chain.new_difficulty is not None: - difficulty = pos_sub_slot[0].challenge_chain.new_difficulty - sub_slot_iters = pos_sub_slot[0].challenge_chain.new_sub_slot_iters - else: - if peak is None or peak.height == 0: - difficulty = self.constants.DIFFICULTY_STARTING - sub_slot_iters = self.constants.SUB_SLOT_ITERS_STARTING - else: - difficulty = uint64(peak.weight - self.blockchain.sub_blocks[peak.prev_hash].weight) - sub_slot_iters = peak.sub_slot_iters - - required_iters: uint64 = calculate_iterations_quality( - quality_string, - request.proof_of_space.size, - difficulty, - request.challenge_chain_sp, - ) - sp_iters: uint64 = calculate_sp_iters(self.constants, sub_slot_iters, request.signage_point_index) - ip_iters: uint64 = calculate_ip_iters( - self.constants, sub_slot_iters, request.signage_point_index, required_iters - ) - total_iters_pos_slot: uint128 = pos_sub_slot[2] - - def get_plot_sig(to_sign, _) -> G2Element: - if to_sign == request.challenge_chain_sp: - return request.challenge_chain_sp_signature - if to_sign == request.reward_chain_sp: - return request.reward_chain_sp_signature - return G2Element.infinity() - - finished_sub_slots: List[EndOfSubSlotBundle] = self.full_node_store.get_finished_sub_slots( - peak, self.blockchain.sub_blocks, request.proof_of_space.challenge - ) - - unfinished_block: Optional[UnfinishedBlock] = create_unfinished_block( - self.constants, - total_iters_pos_slot, - sub_slot_iters, - request.signage_point_index, - sp_iters, - ip_iters, - request.proof_of_space, - pos_sub_slot[0].challenge_chain.get_hash(), - request.farmer_puzzle_hash, - request.pool_target, - get_plot_sig, - get_plot_sig, - sp_vdfs, - uint64(int(time.time())), - b"", - spend_bundle, - peak, - self.blockchain.sub_blocks, - finished_sub_slots, - ) - self.full_node_store.add_candidate_block(quality_string, unfinished_block) - - message = farmer_protocol.RequestSignedValues( - quality_string, - unfinished_block.foliage_sub_block.get_hash(), - unfinished_block.foliage_block.get_hash(), - ) - yield OutboundMessage(NodeType.FARMER, Message("request_signed_values", message), Delivery.RESPOND) - - @api_request - async def signed_values(self, farmer_request: farmer_protocol.SignedValues) -> OutboundMessageGenerator: - """ - Signature of header hash, by the harvester. This is enough to create an unfinished - block, which only needs a Proof of Time to be finished. If the signature is valid, - we call the unfinished_block routine. - """ - candidate: Optional[UnfinishedBlock] = self.full_node_store.get_candidate_block(farmer_request.quality_string) - if candidate is None: - self.log.warning(f"Quality string {farmer_request.quality_string} not found in database") - return - - fsb2 = dataclasses.replace( - candidate.foliage_sub_block, - foliage_sub_block_signature=farmer_request.foliage_sub_block_signature, - ) - fsb3 = dataclasses.replace(fsb2, foliage_block_signature=farmer_request.foliage_block_signature) - new_candidate = dataclasses.replace(candidate, foliage_sub_block=fsb3) - - # Propagate to ourselves (which validates and does further propagations) - request = full_node_protocol.RespondUnfinishedSubBlock(new_candidate) - - async for m in self.respond_unfinished_sub_block(request): - # Yield all new messages (propagation to peers) - yield m - - # TIMELORD PROTOCOL - @api_request - async def new_infusion_point_vdf(self, request: timelord_protocol.NewInfusionPointVDF) -> OutboundMessageGenerator: - # Lookup unfinished blocks - unfinished_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block( - request.unfinished_reward_hash - ) - - if unfinished_block is None: - self.log.warning( - f"Do not have unfinished reward chain block {request.unfinished_reward_hash}, cannot finish." - ) - - prev_sb: Optional[SubBlockRecord] = None - if request.reward_chain_ip_vdf.challenge == self.constants.FIRST_RC_CHALLENGE: - # Genesis - assert unfinished_block.height == 0 - else: - # Find the prev block - curr: Optional[SubBlockRecord] = self.blockchain.get_peak() - if curr is None: - self.log.warning(f"Have no blocks in chain, so can not complete block {unfinished_block.height}") - return - num_sb_checked = 0 - while num_sb_checked < 10: - if curr.reward_infusion_new_challenge == request.reward_chain_ip_vdf.challenge: - # Found our prev block - prev_sb = curr - break - curr = self.blockchain.sub_blocks.get(curr.prev_hash, None) - if curr is None: - return - num_sb_checked += 1 - - # If not found, cache keyed on prev block - if prev_sb is None: - self.full_node_store.add_to_future_ip(request) - return - - sub_slot_iters, difficulty = get_sub_slot_iters_and_difficulty( - self.constants, unfinished_block, self.blockchain.height_to_hash, prev_sb, self.blockchain.sub_blocks - ) - overflow = is_overflow_sub_block(self.constants, unfinished_block.reward_chain_sub_block.signage_point_index) - if overflow: - finished_sub_slots = self.full_node_store.get_finished_sub_slots( - prev_sb, - self.blockchain.sub_blocks, - unfinished_block.reward_chain_sub_block.proof_of_space.challenge, - True, - ) - else: - finished_sub_slots = unfinished_block.finished_sub_slots - - block: FullBlock = unfinished_block_to_full_block( - unfinished_block, - request.challenge_chain_ip_vdf, - request.challenge_chain_ip_proof, - request.reward_chain_ip_vdf, - request.reward_chain_ip_proof, - request.infused_challenge_chain_ip_vdf, - request.infused_challenge_chain_ip_proof, - finished_sub_slots, - prev_sb, - difficulty, - ) - - async for msg in self.respond_sub_block(full_node_protocol.RespondSubBlock(block)): - yield msg - - @api_request - async def new_signage_point_vdf(self, request: timelord_protocol.NewSignagePointVDF) -> OutboundMessageGenerator: - full_node_message = full_node_protocol.RespondSignagePoint( - request.index_from_challenge, - request.challenge_chain_sp_vdf, - request.challenge_chain_sp_proof, - request.reward_chain_sp_vdf, - request.reward_chain_sp_proof, - ) - async for msg in self.respond_signage_point(full_node_message): - yield msg - - @api_request - async def new_end_of_sub_slot_vdf(self, request: timelord_protocol.NewEndOfSubSlotVDF) -> OutboundMessageGenerator: - # Calls our own internal message to handle the end of sub slot, and potentially broadcasts to other peers. - full_node_message = full_node_protocol.RespondEndOfSubSlot(request.end_of_sub_slot_bundle) - async for msg in self.respond_end_of_sub_slot(full_node_message): - yield msg - - # WALLET PROTOCOL - # @api_request - # async def send_transaction(self, tx: wallet_protocol.SendTransaction) -> OutboundMessageGenerator: - # # Ignore if syncing - # if self.sync_store.get_sync_mode(): - # status = MempoolInclusionStatus.FAILED - # error: Optional[Err] = Err.UNKNOWN - # else: - # async with self.blockchain.lock: - # cost, status, error = await self.mempool_manager.add_spendbundle(tx.transaction) - # if status == MempoolInclusionStatus.SUCCESS: - # self.log.info(f"Added transaction to mempool: {tx.transaction.name()}") - # # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS - # # vector. - # fees = tx.transaction.fees() - # assert fees >= 0 - # assert cost is not None - # new_tx = full_node_protocol.NewTransaction( - # tx.transaction.name(), - # cost, - # uint64(tx.transaction.fees()), - # ) - # yield OutboundMessage( - # NodeType.FULL_NODE, - # Message("new_transaction", new_tx), - # Delivery.BROADCAST_TO_OTHERS, - # ) - # else: - # self.log.warning( - # f"Wasn't able to add transaction with id {tx.transaction.name()}, " - # f"status {status} error: {error}" - # ) - # - # error_name = error.name if error is not None else None - # if status == MempoolInclusionStatus.SUCCESS: - # response = wallet_protocol.TransactionAck(tx.transaction.name(), status, error_name) - # else: - # # If if failed/pending, but it previously succeeded (in mempool), this is idempotence, return SUCCESS - # if self.mempool_manager.get_spendbundle(tx.transaction.name()) is not None: - # response = wallet_protocol.TransactionAck(tx.transaction.name(), MempoolInclusionStatus.SUCCESS, None) - # else: - # response = wallet_protocol.TransactionAck(tx.transaction.name(), status, error_name) - # yield OutboundMessage(NodeType.WALLET, Message("transaction_ack", response), Delivery.RESPOND) - # - # @api_request - # async def request_header(self, request: wallet_protocol.RequestHeader) -> OutboundMessageGenerator: - # full_block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) - # if full_block is not None: - # header_block: Optional[HeaderBlock] = full_block.get_header_block() - # if header_block is not None and header_block.height == request.height: - # response = wallet_protocol.RespondHeader(header_block, full_block.transactions_filter) - # yield OutboundMessage( - # NodeType.WALLET, - # Message("respond_header", response), - # Delivery.RESPOND, - # ) - # return - # reject = wallet_protocol.RejectHeaderRequest(request.height, request.header_hash) - # yield OutboundMessage( - # NodeType.WALLET, - # Message("reject_header_request", reject), - # Delivery.RESPOND, - # ) - # - # @api_request - # async def request_removals(self, request: wallet_protocol.RequestRemovals) -> OutboundMessageGenerator: - # block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) - # if ( - # block is None - # or block.height != request.height - # or block.height not in self.blockchain.height_to_hash - # or self.blockchain.height_to_hash[block.height] != block.header_hash - # ): - # reject = wallet_protocol.RejectRemovalsRequest(request.height, request.header_hash) - # yield OutboundMessage( - # NodeType.WALLET, - # Message("reject_removals_request", reject), - # Delivery.RESPOND, - # ) - # return - # - # assert block is not None - # all_removals, _ = await block.tx_removals_and_additions() - # - # coins_map: List[Tuple[bytes32, Optional[Coin]]] = [] - # proofs_map: List[Tuple[bytes32, bytes]] = [] - # - # # If there are no transactions, respond with empty lists - # if block.transactions_generator is None: - # proofs: Optional[List] - # if request.coin_names is None: - # proofs = None - # else: - # proofs = [] - # response = wallet_protocol.RespondRemovals(block.height, block.header_hash, [], proofs) - # elif request.coin_names is None or len(request.coin_names) == 0: - # for removal in all_removals: - # cr = await self.coin_store.get_coin_record(removal) - # assert cr is not None - # coins_map.append((cr.coin.name(), cr.coin)) - # response = wallet_protocol.RespondRemovals(block.height, block.header_hash, coins_map, None) - # else: - # assert block.transactions_generator - # removal_merkle_set = MerkleSet() - # for coin_name in all_removals: - # removal_merkle_set.add_already_hashed(coin_name) - # assert removal_merkle_set.get_root() == block.header.data.removals_root - # for coin_name in request.coin_names: - # result, proof = removal_merkle_set.is_included_already_hashed(coin_name) - # proofs_map.append((coin_name, proof)) - # if coin_name in all_removals: - # cr = await self.coin_store.get_coin_record(coin_name) - # assert cr is not None - # coins_map.append((coin_name, cr.coin)) - # assert result - # else: - # coins_map.append((coin_name, None)) - # assert not result - # response = wallet_protocol.RespondRemovals(block.height, block.header_hash, coins_map, proofs_map) - # - # yield OutboundMessage( - # NodeType.WALLET, - # Message("respond_removals", response), - # Delivery.RESPOND, - # ) - # - # @api_request - # async def request_additions(self, request: wallet_protocol.RequestAdditions) -> OutboundMessageGenerator: - # block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) - # if ( - # block is None - # or block.height != request.height - # or block.height not in self.blockchain.height_to_hash - # or self.blockchain.height_to_hash[block.height] != block.header_hash - # ): - # reject = wallet_protocol.RejectAdditionsRequest(request.height, request.header_hash) - # yield OutboundMessage( - # NodeType.WALLET, - # Message("reject_additions_request", reject), - # Delivery.RESPOND, - # ) - # return - # - # assert block is not None - # _, additions = await block.tx_removals_and_additions() - # puzzlehash_coins_map: Dict[bytes32, List[Coin]] = {} - # for coin in additions + [block.get_coinbase(), block.get_fees_coin()]: - # if coin.puzzle_hash in puzzlehash_coins_map: - # puzzlehash_coins_map[coin.puzzle_hash].append(coin) - # else: - # puzzlehash_coins_map[coin.puzzle_hash] = [coin] - # - # coins_map: List[Tuple[bytes32, List[Coin]]] = [] - # proofs_map: List[Tuple[bytes32, bytes, Optional[bytes]]] = [] - # - # if request.puzzle_hashes is None: - # for puzzle_hash, coins in puzzlehash_coins_map.items(): - # coins_map.append((puzzle_hash, coins)) - # response = wallet_protocol.RespondAdditions(block.height, block.header_hash, coins_map, None) - # else: - # # Create addition Merkle set - # addition_merkle_set = MerkleSet() - # # Addition Merkle set contains puzzlehash and hash of all coins with that puzzlehash - # for puzzle, coins in puzzlehash_coins_map.items(): - # addition_merkle_set.add_already_hashed(puzzle) - # addition_merkle_set.add_already_hashed(hash_coin_list(coins)) - # - # assert addition_merkle_set.get_root() == block.header.data.additions_root - # for puzzle_hash in request.puzzle_hashes: - # result, proof = addition_merkle_set.is_included_already_hashed(puzzle_hash) - # if puzzle_hash in puzzlehash_coins_map: - # coins_map.append((puzzle_hash, puzzlehash_coins_map[puzzle_hash])) - # hash_coin_str = hash_coin_list(puzzlehash_coins_map[puzzle_hash]) - # result_2, proof_2 = addition_merkle_set.is_included_already_hashed(hash_coin_str) - # assert result - # assert result_2 - # proofs_map.append((puzzle_hash, proof, proof_2)) - # else: - # coins_map.append((puzzle_hash, [])) - # assert not result - # proofs_map.append((puzzle_hash, proof, None)) - # response = wallet_protocol.RespondAdditions(block.height, block.header_hash, coins_map, proofs_map) - # - # yield OutboundMessage( - # NodeType.WALLET, - # Message("respond_additions", response), - # Delivery.RESPOND, - # ) - # - # @api_request - # async def request_generator(self, request: wallet_protocol.RequestGenerator) -> OutboundMessageGenerator: - # full_block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) - # if full_block is not None: - # if full_block.transactions_generator is not None: - # wrapper = GeneratorResponse( - # full_block.height, - # full_block.header_hash, - # full_block.transactions_generator, - # ) - # response = wallet_protocol.RespondGenerator(wrapper) - # yield OutboundMessage( - # NodeType.WALLET, - # Message("respond_generator", response), - # Delivery.RESPOND, - # ) - # return - # - # reject = wallet_protocol.RejectGeneratorRequest(request.height, request.header_hash) - # yield OutboundMessage( - # NodeType.WALLET, - # Message("reject_generator_request", reject), - # Delivery.RESPOND, - # ) diff --git a/src/full_node/full_node_api.py b/src/full_node/full_node_api.py index ab4d0b0e1d0c..1533b3357d0a 100644 --- a/src/full_node/full_node_api.py +++ b/src/full_node/full_node_api.py @@ -1,16 +1,22 @@ import asyncio +import dataclasses import time -import src.server.ws_connection as ws -from typing import AsyncGenerator, Dict, List, Optional, Tuple, Callable -from chiabip158 import PyBIP158 -from chiapos import Verifier -from blspy import G2Element, AugSchemeMPL +import traceback -from src.consensus.block_rewards import calculate_base_fee, calculate_block_reward -from src.consensus.pot_iterations import calculate_iterations -from src.consensus.coinbase import create_coinbase_coin, create_fees_coin +import src.server.ws_connection as ws +from typing import AsyncGenerator, List, Optional, Tuple, Callable +from chiabip158 import PyBIP158 +from blspy import G2Element + +from src.consensus.block_creation import unfinished_block_to_full_block, create_unfinished_block +from src.consensus.blockchain import ReceiveBlockResult +from src.consensus.difficulty_adjustment import get_sub_slot_iters_and_difficulty +from src.consensus.pot_iterations import is_overflow_sub_block, calculate_ip_iters, \ + calculate_sp_iters, calculate_iterations_quality from src.full_node.full_node import FullNode +from src.full_node.signage_point import SignagePoint +from src.full_node.sub_block_record import SubBlockRecord from src.protocols import ( introducer_protocol, @@ -19,27 +25,19 @@ from src.protocols import ( timelord_protocol, wallet_protocol, ) -from src.protocols.wallet_protocol import GeneratorResponse -from src.server.outbound_message import Message, NodeType, OutboundMessage -from src.types.challenge import Challenge -from src.types.coin import Coin, hash_coin_list +from src.server.outbound_message import Message, NodeType, OutboundMessage, Delivery + +from src.types.end_of_slot_bundle import EndOfSubSlotBundle from src.types.full_block import FullBlock -from src.types.header import Header, HeaderData -from src.types.header_block import HeaderBlock + from src.types.mempool_inclusion_status import MempoolInclusionStatus from src.types.mempool_item import MempoolItem -from src.types.program import Program -from src.types.proof_of_space import ProofOfSpace -from src.types.proof_of_time import ProofOfTime from src.types.sized_bytes import bytes32 from src.types.spend_bundle import SpendBundle +from src.types.unfinished_block import UnfinishedBlock from src.util.api_decorators import api_request, peer_required -from src.full_node.bundle_tools import best_solution_program -from src.full_node.cost_calculator import calculate_cost_of_program -from src.util.errors import ConsensusError, Err -from src.util.hash import std_hash -from src.util.ints import uint32, uint64, uint128 -from src.util.merkle_set import MerkleSet +from src.util.errors import ConsensusError +from src.util.ints import uint32, uint64, uint128, uint8 from src.types.peer_info import PeerInfo OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None] @@ -58,6 +56,10 @@ class FullNodeAPI: def server(self): return self.full_node.server + @property + def log(self): + return self.full_node.log + @peer_required @api_request async def request_peers( @@ -74,1014 +76,34 @@ class FullNodeAPI: async def respond_peers( self, request: introducer_protocol.RespondPeers, - peer: ws.WSChiaConnection, - ): - await self.full_node.full_node_peers.respond_peers( - request, peer.get_peer_info(), True - ) - # Pseudo-message to close the connection - if peer.connection_type is NodeType.INTRODUCER: - await peer.close() + peer: ws.WSChiaConnection + ) -> Optional[Message]: + await self.full_node.full_node_peers.respond_peers(request, peer.get_peer_info(), False) + await peer.close() + return None @api_request - async def new_tip(self, request: full_node_protocol.NewTip): + async def new_peak(self, request: full_node_protocol.NewPeak) -> Optional[Message]: """ - A peer notifies us that they have added a new tip to their blockchain. If we don't have it, + A peer notifies us that they have added a new peak to their blockchain. If we don't have it, we can ask for it. """ # Check if we have this block in the blockchain - if self.full_node.blockchain.contains_block(request.header_hash): + if self.full_node.blockchain.contains_sub_block(request.header_hash): return # TODO: potential optimization, don't request blocks that we have already sent out - # a "request_block" message for. + request_transactions: bool = ( + self.full_node.full_node_store.get_unfinished_block(request.unfinished_reward_block_hash) is None + ) message = Message( - "request_block", - full_node_protocol.RequestBlock(request.height, request.header_hash), + "request_sub_block", + full_node_protocol.RequestSubBlock(request.sub_block_height, request_transactions), ) return message @api_request - async def removing_tip(self, request: full_node_protocol.RemovingTip): - """ - A peer notifies us that they have removed a tip from their blockchain. - """ - self.full_node.log.info("removing tip not implemented") - - @api_request - async def request_transaction( - self, request: full_node_protocol.RequestTransaction - ) -> Optional[Message]: - """ Peer has requested a full transaction from us. """ - # Ignore if syncing - if self.full_node.sync_store.get_sync_mode(): - return None - - spend_bundle = self.full_node.mempool_manager.get_spendbundle( - request.transaction_id - ) - if spend_bundle is None: - reject = full_node_protocol.RejectTransactionRequest(request.transaction_id) - msg = Message("reject_transaction_request", reject) - return msg - - transaction = full_node_protocol.RespondTransaction(spend_bundle) - msg = Message("respond_transaction", transaction) - self.full_node.log.info( - f"sending transaction (tx_id: {spend_bundle.name()}) to peer" - ) - return msg - - @api_request - async def respond_transaction( - self, tx: full_node_protocol.RespondTransaction - ) -> Optional[Message]: - """ - Receives a full transaction from peer. - If tx is added to mempool, send tx_id to others. (new_transaction) - """ - # Ignore if syncing - if self.full_node.sync_store.get_sync_mode(): - return None - - async with self.full_node.blockchain.lock: - # Ignore if we have already added this transaction - if ( - self.full_node.mempool_manager.get_spendbundle(tx.transaction.name()) - is not None - ): - return None - cost, status, error = await self.full_node.mempool_manager.add_spendbundle( - tx.transaction - ) - if status == MempoolInclusionStatus.SUCCESS: - self.full_node.log.info( - f"Added transaction to mempool: {tx.transaction.name()}" - ) - fees = tx.transaction.fees() - assert fees >= 0 - assert cost is not None - new_tx = full_node_protocol.NewTransaction( - tx.transaction.name(), - cost, - uint64(tx.transaction.fees()), - ) - message = Message("new_transaction", new_tx) - if self.full_node.server is not None: - await self.full_node.server.send_to_all( - [message], NodeType.FULL_NODE - ) - else: - self.full_node.log.warning( - f"Wasn't able to add transaction with id {tx.transaction.name()}, {status} error: {error}" - ) - return None - return None - - @api_request - async def reject_transaction_request( - self, - reject: full_node_protocol.RejectTransactionRequest, - ) -> Optional[Message]: - """ - The peer rejects the request for a transaction. - """ - self.full_node.log.warning(f"Rejected request for transaction {reject}") - return None - - @api_request - async def new_proof_of_time( - self, - new_proof_of_time: full_node_protocol.NewProofOfTime, - ) -> Optional[Message]: - if new_proof_of_time.witness_type == 0: - # A honest sanitizer will always sanitize until the LCA block. - if new_proof_of_time.height >= self.full_node.blockchain.lca_block.height: - return None - # If we already have the compact PoT in a connected to header block, return - blocks: List[FullBlock] = await self.full_node.block_store.get_blocks_at( - [new_proof_of_time.height] - ) - if new_proof_of_time.height not in self.full_node.blockchain.height_to_hash: - self.full_node.log.error( - f"Height {new_proof_of_time.height} not found in height_to_hash." - ) - return None - header_hash = self.full_node.blockchain.height_to_hash[ - new_proof_of_time.height - ] - for block in blocks: - assert block.proof_of_time is not None - if ( - block.proof_of_time.witness_type == 0 - and block.header_hash == header_hash - ): - return None - else: - # If we don't have an unfinished block for this PoT, we don't care about it - if ( - await self.full_node.full_node_store.get_unfinished_block( - ( - new_proof_of_time.challenge_hash, - new_proof_of_time.number_of_iterations, - ) - ) - ) is None: - return None - - # If we already have the PoT in a finished block, return - blocks = await self.full_node.block_store.get_blocks_at( - [new_proof_of_time.height] - ) - for block in blocks: - if ( - block.proof_of_time is not None - and block.proof_of_time.challenge_hash - == new_proof_of_time.challenge_hash - and block.proof_of_time.number_of_iterations - == new_proof_of_time.number_of_iterations - ): - return None - - self.full_node.full_node_store.add_proof_of_time_heights( - ( - new_proof_of_time.challenge_hash, - new_proof_of_time.number_of_iterations, - ), - new_proof_of_time.height, - ) - message = Message( - "request_proof_of_time", - full_node_protocol.RequestProofOfTime( - new_proof_of_time.height, - new_proof_of_time.challenge_hash, - new_proof_of_time.number_of_iterations, - new_proof_of_time.witness_type, - ), - ) - return message - - @api_request - async def request_proof_of_time( - self, - request_proof_of_time: full_node_protocol.RequestProofOfTime, - ) -> Optional[Message]: - blocks: List[FullBlock] = await self.full_node.block_store.get_blocks_at( - [request_proof_of_time.height] - ) - for block in blocks: - if ( - block.proof_of_time is not None - and block.proof_of_time.challenge_hash - == request_proof_of_time.challenge_hash - and block.proof_of_time.number_of_iterations - == request_proof_of_time.number_of_iterations - and block.proof_of_time.witness_type - == request_proof_of_time.witness_type - ): - msg = Message( - "respond_proof_of_time", - full_node_protocol.RespondProofOfTime(block.proof_of_time), - ) - return msg - reject = Message( - "reject_proof_of_time_request", - full_node_protocol.RejectProofOfTimeRequest( - request_proof_of_time.challenge_hash, - request_proof_of_time.number_of_iterations, - ), - ) - return reject - - @api_request - async def respond_proof_of_time( - self, - respond_proof_of_time: full_node_protocol.RespondProofOfTime, - ) -> Optional[Message]: - """ - A proof of time, received by a peer full node. If we have the rest of the block, - we can complete it. Otherwise, we just verify and propagate the proof. - """ - processed = False - if respond_proof_of_time.proof.witness_type == 0: - request = timelord_protocol.ProofOfTimeFinished(respond_proof_of_time.proof) - - await self.proof_of_time_finished(request) - processed = True - - if ( - await self.full_node.full_node_store.get_unfinished_block( - ( - respond_proof_of_time.proof.challenge_hash, - respond_proof_of_time.proof.number_of_iterations, - ) - ) - ) is not None: - height: Optional[ - uint32 - ] = self.full_node.full_node_store.get_proof_of_time_heights( - ( - respond_proof_of_time.proof.challenge_hash, - respond_proof_of_time.proof.number_of_iterations, - ) - ) - if height is not None: - message = Message( - "new_proof_of_time", - full_node_protocol.NewProofOfTime( - height, - respond_proof_of_time.proof.challenge_hash, - respond_proof_of_time.proof.number_of_iterations, - respond_proof_of_time.proof.witness_type, - ), - ) - if self.full_node.server is not None: - await self.full_node.server.send_to_all( - [message], NodeType.FULL_NODE - ) - if not processed: - request = timelord_protocol.ProofOfTimeFinished( - respond_proof_of_time.proof - ) - await self.proof_of_time_finished(request) - return None - - @api_request - async def reject_proof_of_time_request( - self, - reject: full_node_protocol.RejectProofOfTimeRequest, - ) -> Optional[Message]: - self.full_node.log.warning(f"Rejected PoT Request {reject}") - return None - - async def _respond_compact_proof_of_time(self, proof: ProofOfTime): - """ - A proof of time, received by a peer full node. If we have the rest of the block, - we can complete it. Otherwise, we just verify and propagate the proof. - """ - height: Optional[uint32] = self.full_node.block_store.get_height_proof_of_time( - proof.challenge_hash, - proof.number_of_iterations, - ) - if height is None: - self.full_node.log.info("No block for compact proof of time.") - return - if not proof.is_valid(self.full_node.constants.DISCRIMINANT_SIZE_BITS): - self.full_node.log.error("Invalid compact proof of time.") - return None - - blocks: List[FullBlock] = await self.full_node.block_store.get_blocks_at( - [height] - ) - for block in blocks: - assert block.proof_of_time is not None - if ( - block.proof_of_time.witness_type != 0 - and block.proof_of_time.challenge_hash == proof.challenge_hash - and block.proof_of_time.number_of_iterations - == proof.number_of_iterations - ): - block_new = FullBlock( - block.proof_of_space, - proof, - block.header, - block.transactions_generator, - block.transactions_filter, - ) - if self.full_node.block_store.seen_compact_proof( - proof.challenge_hash, - proof.number_of_iterations, - ): - return None - await self.full_node.block_store.add_block(block_new) - self.full_node.log.info( - f"Stored compact block at height {block.height}." - ) - message = Message( - "new_proof_of_time", - full_node_protocol.NewProofOfTime( - height, - proof.challenge_hash, - proof.number_of_iterations, - proof.witness_type, - ), - ) - if self.full_node.server is not None: - await self.full_node.server.send_to_all( - [message], NodeType.FULL_NODE - ) - return None - - @api_request - async def new_unfinished_block( - self, - new_unfinished_block: full_node_protocol.NewUnfinishedBlock, - ) -> Optional[Message]: - if self.full_node.blockchain.contains_block( - new_unfinished_block.new_header_hash - ): - return None - if not self.full_node.blockchain.contains_block( - new_unfinished_block.previous_header_hash - ): - return None - prev_block: Optional[FullBlock] = await self.full_node.block_store.get_block( - new_unfinished_block.previous_header_hash - ) - if prev_block is not None: - challenge = self.full_node.blockchain.get_challenge(prev_block) - if challenge is not None: - if ( - await ( - self.full_node.full_node_store.get_unfinished_block( - ( - challenge.get_hash(), - new_unfinished_block.number_of_iterations, - ) - ) - ) - is not None - ): - return None - assert challenge is not None - - message = Message( - "request_unfinished_block", - full_node_protocol.RequestUnfinishedBlock( - new_unfinished_block.new_header_hash - ), - ) - return message - return None - - @api_request - async def request_unfinished_block( - self, - request_unfinished_block: full_node_protocol.RequestUnfinishedBlock, - ) -> Optional[Message]: - for _, block in ( - await self.full_node.full_node_store.get_unfinished_blocks() - ).items(): - if block.header_hash == request_unfinished_block.header_hash: - message = Message( - "respond_unfinished_block", - full_node_protocol.RespondUnfinishedBlock(block), - ) - return message - fetched: Optional[FullBlock] = await self.full_node.block_store.get_block( - request_unfinished_block.header_hash - ) - if fetched is not None: - message = Message( - "respond_unfinished_block", - full_node_protocol.RespondUnfinishedBlock(fetched), - ) - return message - - reject = Message( - "reject_unfinished_block_request", - full_node_protocol.RejectUnfinishedBlockRequest( - request_unfinished_block.header_hash - ), - ) - return reject - - # WALLET PROTOCOL - @api_request - async def send_transaction( - self, tx: wallet_protocol.SendTransaction - ) -> Optional[Message]: - # Ignore if syncing - if self.full_node.sync_store.get_sync_mode(): - status = MempoolInclusionStatus.FAILED - error: Optional[Err] = Err.UNKNOWN - else: - async with self.full_node.blockchain.lock: - ( - cost, - status, - error, - ) = await self.full_node.mempool_manager.add_spendbundle(tx.transaction) - if status == MempoolInclusionStatus.SUCCESS: - self.full_node.log.info( - f"Added transaction to mempool: {tx.transaction.name()}" - ) - # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS - # vector. - fees = tx.transaction.fees() - assert fees >= 0 - assert cost is not None - new_tx = full_node_protocol.NewTransaction( - tx.transaction.name(), - cost, - uint64(tx.transaction.fees()), - ) - message = Message("new_transaction", new_tx) - if self.full_node.server is not None: - await self.full_node.server.send_to_all( - [message], NodeType.FULL_NODE - ) - else: - self.full_node.log.warning( - f"Wasn't able to add transaction with id {tx.transaction.name()}, " - f"status {status} error: {error}" - ) - - error_name = error.name if error is not None else None - if status == MempoolInclusionStatus.SUCCESS: - response = wallet_protocol.TransactionAck( - tx.transaction.name(), status, error_name - ) - else: - # If if failed/pending, but it previously succeeded (in mempool), this is idempotence, return SUCCESS - if ( - self.full_node.mempool_manager.get_spendbundle(tx.transaction.name()) - is not None - ): - response = wallet_protocol.TransactionAck( - tx.transaction.name(), MempoolInclusionStatus.SUCCESS, None - ) - else: - response = wallet_protocol.TransactionAck( - tx.transaction.name(), status, error_name - ) - message = Message("transaction_ack", response) - return message - - @api_request - async def request_all_proof_hashes( - self, request: wallet_protocol.RequestAllProofHashes - ) -> Optional[Message]: - proof_hashes_map = await self.full_node.block_store.get_proof_hashes() - curr = self.full_node.blockchain.lca_block - - hashes: List[Tuple[bytes32, Optional[uint64], Optional[uint64]]] = [] - while curr.height > 0: - difficulty_update: Optional[uint64] = None - iters_update: Optional[uint64] = None - if ( - curr.height % self.full_node.constants.DIFFICULTY_EPOCH - == self.full_node.constants.DIFFICULTY_DELAY - ): - difficulty_update = self.full_node.blockchain.get_next_difficulty( - self.full_node.blockchain.headers[curr.prev_header_hash] - ) - if (curr.height + 1) % self.full_node.constants.DIFFICULTY_EPOCH == 0: - iters_update = curr.data.total_iters - hashes.append( - (proof_hashes_map[curr.header_hash], difficulty_update, iters_update) - ) - curr = self.full_node.blockchain.headers[curr.prev_header_hash] - - hashes.append( - ( - proof_hashes_map[self.full_node.blockchain.genesis.header_hash], - uint64(self.full_node.blockchain.genesis.weight), - None, - ) - ) - response = wallet_protocol.RespondAllProofHashes(list(reversed(hashes))) - - message = Message("respond_all_proof_hashes", response) - return message - - @api_request - async def request_all_header_hashes_after( - self, - request: wallet_protocol.RequestAllHeaderHashesAfter, - ) -> Optional[Message]: - header_hash: Optional[bytes32] = self.full_node.blockchain.height_to_hash.get( - request.starting_height, None - ) - if header_hash is None: - reject = wallet_protocol.RejectAllHeaderHashesAfterRequest( - request.starting_height, request.previous_challenge_hash - ) - message = Message("reject_all_header_hashes_after_request", reject) - return message - block: Optional[FullBlock] = await self.full_node.block_store.get_block( - header_hash - ) - header_hash_again: Optional[ - bytes32 - ] = self.full_node.blockchain.height_to_hash.get(request.starting_height, None) - - if ( - block is None - or block.proof_of_space.challenge_hash != request.previous_challenge_hash - or header_hash_again != header_hash - ): - reject = wallet_protocol.RejectAllHeaderHashesAfterRequest( - request.starting_height, request.previous_challenge_hash - ) - message = Message("reject_all_header_hashes_after_request", reject) - return message - header_hashes: List[bytes32] = [] - for height in range( - request.starting_height, self.full_node.blockchain.lca_block.height + 1 - ): - header_hashes.append( - self.full_node.blockchain.height_to_hash[uint32(height)] - ) - response = wallet_protocol.RespondAllHeaderHashesAfter( - request.starting_height, request.previous_challenge_hash, header_hashes - ) - - message = Message("respond_all_header_hashes_after", response) - return message - - @api_request - async def request_header( - self, request: wallet_protocol.RequestHeader - ) -> Optional[Message]: - full_block: Optional[FullBlock] = await self.full_node.block_store.get_block( - request.header_hash - ) - if full_block is not None: - header_block: Optional[ - HeaderBlock - ] = self.full_node.blockchain.get_header_block(full_block) - if header_block is not None and header_block.height == request.height: - response = wallet_protocol.RespondHeader( - header_block, full_block.transactions_filter - ) - message = Message("respond_header", response) - return message - reject = wallet_protocol.RejectHeaderRequest( - request.height, request.header_hash - ) - - message = Message("reject_header_request", reject) - return message - - @api_request - async def request_removals( - self, request: wallet_protocol.RequestRemovals - ) -> Optional[Message]: - block: Optional[FullBlock] = await self.full_node.block_store.get_block( - request.header_hash - ) - if ( - block is None - or block.height != request.height - or block.height not in self.full_node.blockchain.height_to_hash - or self.full_node.blockchain.height_to_hash[block.height] - != block.header_hash - ): - reject = wallet_protocol.RejectRemovalsRequest( - request.height, request.header_hash - ) - message = Message("reject_removals_request", reject) - return message - - assert block is not None - all_removals, _ = await block.tx_removals_and_additions() - - coins_map: List[Tuple[bytes32, Optional[Coin]]] = [] - proofs_map: List[Tuple[bytes32, bytes]] = [] - - # If there are no transactions, respond with empty lists - if block.transactions_generator is None: - proofs: Optional[List] - if request.coin_names is None: - proofs = None - else: - proofs = [] - response = wallet_protocol.RespondRemovals( - block.height, block.header_hash, [], proofs - ) - elif request.coin_names is None or len(request.coin_names) == 0: - for removal in all_removals: - cr = await self.full_node.coin_store.get_coin_record(removal) - assert cr is not None - coins_map.append((cr.coin.name(), cr.coin)) - response = wallet_protocol.RespondRemovals( - block.height, block.header_hash, coins_map, None - ) - else: - assert block.transactions_generator - removal_merkle_set = MerkleSet() - for coin_name in all_removals: - removal_merkle_set.add_already_hashed(coin_name) - assert removal_merkle_set.get_root() == block.header.data.removals_root - for coin_name in request.coin_names: - result, proof = removal_merkle_set.is_included_already_hashed(coin_name) - proofs_map.append((coin_name, proof)) - if coin_name in all_removals: - cr = await self.full_node.coin_store.get_coin_record(coin_name) - assert cr is not None - coins_map.append((coin_name, cr.coin)) - assert result - else: - coins_map.append((coin_name, None)) - assert not result - response = wallet_protocol.RespondRemovals( - block.height, block.header_hash, coins_map, proofs_map - ) - - message = Message("respond_removals", response) - return message - - @api_request - async def request_additions( - self, request: wallet_protocol.RequestAdditions - ) -> Optional[Message]: - block: Optional[FullBlock] = await self.full_node.block_store.get_block( - request.header_hash - ) - if ( - block is None - or block.height != request.height - or block.height not in self.full_node.blockchain.height_to_hash - or self.full_node.blockchain.height_to_hash[block.height] - != block.header_hash - ): - reject = wallet_protocol.RejectAdditionsRequest( - request.height, request.header_hash - ) - message = Message("reject_additions_request", reject) - return message - - assert block is not None - _, additions = await block.tx_removals_and_additions() - puzzlehash_coins_map: Dict[bytes32, List[Coin]] = {} - for coin in additions + [block.get_coinbase(), block.get_fees_coin()]: - if coin.puzzle_hash in puzzlehash_coins_map: - puzzlehash_coins_map[coin.puzzle_hash].append(coin) - else: - puzzlehash_coins_map[coin.puzzle_hash] = [coin] - - coins_map: List[Tuple[bytes32, List[Coin]]] = [] - proofs_map: List[Tuple[bytes32, bytes, Optional[bytes]]] = [] - - if request.puzzle_hashes is None: - for puzzle_hash, coins in puzzlehash_coins_map.items(): - coins_map.append((puzzle_hash, coins)) - response = wallet_protocol.RespondAdditions( - block.height, block.header_hash, coins_map, None - ) - else: - # Create addition Merkle set - addition_merkle_set = MerkleSet() - # Addition Merkle set contains puzzlehash and hash of all coins with that puzzlehash - for puzzle, coins in puzzlehash_coins_map.items(): - addition_merkle_set.add_already_hashed(puzzle) - addition_merkle_set.add_already_hashed(hash_coin_list(coins)) - - assert addition_merkle_set.get_root() == block.header.data.additions_root - for puzzle_hash in request.puzzle_hashes: - result, proof = addition_merkle_set.is_included_already_hashed( - puzzle_hash - ) - if puzzle_hash in puzzlehash_coins_map: - coins_map.append((puzzle_hash, puzzlehash_coins_map[puzzle_hash])) - hash_coin_str = hash_coin_list(puzzlehash_coins_map[puzzle_hash]) - result_2, proof_2 = addition_merkle_set.is_included_already_hashed( - hash_coin_str - ) - assert result - assert result_2 - proofs_map.append((puzzle_hash, proof, proof_2)) - else: - coins_map.append((puzzle_hash, [])) - assert not result - proofs_map.append((puzzle_hash, proof, None)) - response = wallet_protocol.RespondAdditions( - block.height, block.header_hash, coins_map, proofs_map - ) - - message = Message("respond_additions", response) - return message - - @api_request - async def request_generator( - self, request: wallet_protocol.RequestGenerator - ) -> Optional[Message]: - full_block: Optional[FullBlock] = await self.full_node.block_store.get_block( - request.header_hash - ) - if full_block is not None: - if full_block.transactions_generator is not None: - wrapper = GeneratorResponse( - full_block.height, - full_block.header_hash, - full_block.transactions_generator, - ) - response = wallet_protocol.RespondGenerator(wrapper) - message = Message("respond_generator", response) - return message - - reject = wallet_protocol.RejectGeneratorRequest( - request.height, request.header_hash - ) - - message = Message("reject_generator_request", reject) - return message - - @api_request - async def respond_header_block( - self, request: full_node_protocol.RespondHeaderBlock - ) -> Optional[Message]: - """ - Receive header blocks from a peer. - """ - self.full_node.log.info(f"Received header block {request.header_block.height}.") - if self.full_node.sync_peers_handler is not None: - requests = await self.full_node.sync_peers_handler.new_block( - request.header_block - ) - if self.full_node.server is None: - return None - for req in requests: - msg = req.message - node_id = req.specific_peer_node_id - if node_id is not None: - await self.full_node.server.send_to_specific([msg], node_id) - else: - await self.full_node.server.send_to_all([msg], NodeType.FULL_NODE) - return None - - @peer_required - @api_request - async def reject_header_block_request( - self, - request: full_node_protocol.RejectHeaderBlockRequest, - peer: ws.WSChiaConnection, - ) -> Optional[Message]: - self.full_node.log.warning(f"Reject header block request, {request}") - if self.full_node.sync_store.get_sync_mode(): - await peer.close() - return None - - @api_request - async def request_header_hash( - self, request: farmer_protocol.RequestHeaderHash - ) -> Optional[Message]: - """ - Creates a block body and header, with the proof of space, coinbase, and fee targets provided - by the farmer, and sends the hash of the header data back to the farmer. - """ - plot_id: bytes32 = request.proof_of_space.get_plot_id() - - # Checks that the proof of space is valid - quality_string: bytes = Verifier().validate_proof( - plot_id, - request.proof_of_space.size, - request.challenge_hash, - bytes(request.proof_of_space.proof), - ) - assert len(quality_string) == 32 - - # Retrieves the correct tip for the challenge - tips: List[Header] = self.full_node.blockchain.get_current_tips() - tips_blocks: List[Optional[FullBlock]] = [ - await self.full_node.block_store.get_block(tip.header_hash) for tip in tips - ] - target_tip_block: Optional[FullBlock] = None - target_tip: Optional[Header] = None - for tip in tips_blocks: - assert tip is not None - tip_challenge: Optional[ - Challenge - ] = self.full_node.blockchain.get_challenge(tip) - assert tip_challenge is not None - if tip_challenge.get_hash() == request.challenge_hash: - target_tip_block = tip - target_tip = tip.header - if target_tip is None: - self.full_node.log.warning( - f"Challenge hash: {request.challenge_hash} not in one of three tips" - ) - return None - - assert target_tip is not None - # Grab best transactions from Mempool for given tip target - async with self.full_node.blockchain.lock: - spend_bundle: Optional[ - SpendBundle - ] = await self.full_node.mempool_manager.create_bundle_for_tip(target_tip) - spend_bundle_fees = 0 - aggregate_sig: G2Element = request.pool_target_signature - solution_program: Optional[Program] = None - - if spend_bundle: - solution_program = best_solution_program(spend_bundle) - spend_bundle_fees = spend_bundle.fees() - aggregate_sig = AugSchemeMPL.aggregate( - [spend_bundle.aggregated_signature, aggregate_sig] - ) - - base_fee_reward = calculate_base_fee(target_tip.height + 1) - full_fee_reward = uint64(int(base_fee_reward + spend_bundle_fees)) - - # Calculate the cost of transactions - cost = uint64(0) - if solution_program: - _, _, cost = calculate_cost_of_program( - solution_program, self.full_node.constants.CLVM_COST_RATIO_CONSTANT - ) - - extension_data: bytes32 = bytes32([0] * 32) - - # Creates a block with transactions, coinbase, and fees - # Creates the block header - prev_header_hash: bytes32 = target_tip.get_hash() - timestamp: uint64 = uint64(int(time.time())) - - # Create filter - byte_array_tx: List[bytes32] = [] - if spend_bundle: - additions: List[Coin] = spend_bundle.additions() - removals: List[Coin] = spend_bundle.removals() - for coin in additions: - byte_array_tx.append(bytearray(coin.puzzle_hash)) - for coin in removals: - byte_array_tx.append(bytearray(coin.name())) - - byte_array_tx.append(bytearray(request.farmer_rewards_puzzle_hash)) - byte_array_tx.append(bytearray(request.pool_target.puzzle_hash)) - - bip158: PyBIP158 = PyBIP158(byte_array_tx) - encoded_filter: bytes = bytes(bip158.GetEncoded()) - - proof_of_space_hash: bytes32 = request.proof_of_space.get_hash() - difficulty = self.full_node.blockchain.get_next_difficulty(target_tip) - - assert target_tip_block is not None - vdf_min_iters: uint64 = self.full_node.blockchain.get_next_min_iters( - target_tip_block - ) - - iterations_needed: uint64 = calculate_iterations( - request.proof_of_space, - difficulty, - vdf_min_iters, - self.full_node.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG, - ) - - removal_merkle_set = MerkleSet() - addition_merkle_set = MerkleSet() - - additions = [] - removals = [] - - if spend_bundle: - additions = spend_bundle.additions() - removals = spend_bundle.removals() - - # Create removal Merkle set - for coin in removals: - removal_merkle_set.add_already_hashed(coin.name()) - cb_reward = calculate_block_reward(target_tip.height + 1) - cb_coin = create_coinbase_coin( - target_tip.height + 1, request.pool_target.puzzle_hash, cb_reward - ) - fees_coin = create_fees_coin( - target_tip.height + 1, - request.farmer_rewards_puzzle_hash, - full_fee_reward, - ) - - # Create addition Merkle set - puzzlehash_coins_map: Dict[bytes32, List[Coin]] = {} - for coin in additions + [cb_coin, fees_coin]: - if coin.puzzle_hash in puzzlehash_coins_map: - puzzlehash_coins_map[coin.puzzle_hash].append(coin) - else: - puzzlehash_coins_map[coin.puzzle_hash] = [coin] - - # Addition Merkle set contains puzzlehash and hash of all coins with that puzzlehash - for puzzle, coins in puzzlehash_coins_map.items(): - addition_merkle_set.add_already_hashed(puzzle) - addition_merkle_set.add_already_hashed(hash_coin_list(coins)) - - additions_root = addition_merkle_set.get_root() - removal_root = removal_merkle_set.get_root() - - generator_hash = ( - solution_program.get_tree_hash() - if solution_program is not None - else bytes32([0] * 32) - ) - filter_hash = std_hash(encoded_filter) - - block_header_data: HeaderData = HeaderData( - uint32(target_tip.height + 1), - prev_header_hash, - timestamp, - filter_hash, - proof_of_space_hash, - uint128(target_tip.weight + difficulty), - uint64(target_tip.data.total_iters + iterations_needed), - additions_root, - removal_root, - request.farmer_rewards_puzzle_hash, - full_fee_reward, - request.pool_target, - aggregate_sig, - cost, - extension_data, - generator_hash, - ) - - block_header_data_hash: bytes32 = block_header_data.get_hash() - - # Stores this block so we can submit it to the blockchain after it's signed by harvester - self.full_node.full_node_store.add_candidate_block( - proof_of_space_hash, - solution_program, - encoded_filter, - block_header_data, - request.proof_of_space, - target_tip.height + 1, - ) - - message = farmer_protocol.HeaderHash( - proof_of_space_hash, block_header_data_hash - ) - - return Message("header_hash", message) - - @api_request - async def header_signature( - self, header_signature: farmer_protocol.HeaderSignature - ) -> Optional[Message]: - """ - Signature of header hash, by the harvester. This is enough to create an unfinished - block, which only needs a Proof of Time to be finished. If the signature is valid, - we call the unfinished_block routine. - """ - candidate: Optional[ - Tuple[Optional[Program], bytes, HeaderData, ProofOfSpace] - ] = self.full_node.full_node_store.get_candidate_block( - header_signature.pos_hash - ) - if candidate is None: - self.full_node.log.warning( - f"PoS hash {header_signature.pos_hash} not found in database" - ) - return None - # Verifies that we have the correct header and body stored - generator, filt, block_header_data, pos = candidate - - assert block_header_data.get_hash() == header_signature.header_hash - - block_header: Header = Header( - block_header_data, header_signature.header_signature - ) - unfinished_block_obj: FullBlock = FullBlock( - pos, None, block_header, generator, filt - ) - - # Propagate to ourselves (which validates and does further propagations) - request = full_node_protocol.RespondUnfinishedBlock(unfinished_block_obj) - msg = await self.respond_unfinished_block(request) - return msg - - @api_request - async def new_transaction( - self, transaction: full_node_protocol.NewTransaction - ) -> Optional[Message]: + async def new_transaction(self, transaction: full_node_protocol.NewTransaction) -> Optional[Message]: """ A peer notifies us of a new transaction. Requests a full transaction if we haven't seen it previously, and if the fees are enough. @@ -1093,291 +115,662 @@ class FullNodeAPI: if self.full_node.mempool_manager.seen(transaction.transaction_id): return None - if self.full_node.mempool_manager.is_fee_enough( - transaction.fees, transaction.cost - ): - requestTX = full_node_protocol.RequestTransaction( - transaction.transaction_id - ) - return Message("request_transaction", requestTX) + if self.full_node.mempool_manager.is_fee_enough(transaction.fees, transaction.cost): + request_tx = full_node_protocol.RequestTransaction(transaction.transaction_id) + msg = Message("request_transaction", request_tx) + return msg - return None - - # TIMELORD PROTOCOL @api_request - async def proof_of_time_finished( - self, request: timelord_protocol.ProofOfTimeFinished - ) -> Optional[Message]: - """ - A proof of time, received by a peer timelord. We can use this to complete a block, - and call the block routine (which handles propagation and verification of blocks). - """ - if request.proof.witness_type == 0: - await self._respond_compact_proof_of_time(request.proof) - - dict_key = ( - request.proof.challenge_hash, - request.proof.number_of_iterations, - ) - - unfinished_block_obj: Optional[ - FullBlock - ] = await self.full_node.full_node_store.get_unfinished_block(dict_key) - if not unfinished_block_obj: - if request.proof.witness_type > 0: - self.full_node.log.warning( - f"Received a proof of time that we cannot use to complete a block {dict_key}" - ) - return None - - new_full_block: FullBlock = FullBlock( - unfinished_block_obj.proof_of_space, - request.proof, - unfinished_block_obj.header, - unfinished_block_obj.transactions_generator, - unfinished_block_obj.transactions_filter, - ) - + async def request_transaction(self, request: full_node_protocol.RequestTransaction) -> Optional[Message]: + """ Peer has requested a full transaction from us. """ + # Ignore if syncing if self.full_node.sync_store.get_sync_mode(): - self.full_node.sync_store.add_potential_future_block(new_full_block) - else: - await self.respond_block(full_node_protocol.RespondBlock(new_full_block)) - return None + return + spend_bundle = self.full_node.mempool_manager.get_spendbundle(request.transaction_id) + if spend_bundle is None: + return - @api_request - async def request_block( - self, request_block: full_node_protocol.RequestBlock - ) -> Optional[Message]: - block: Optional[FullBlock] = await self.full_node.block_store.get_block( - request_block.header_hash - ) - if block is not None: - return Message("respond_block", full_node_protocol.RespondBlock(block)) + transaction = full_node_protocol.RespondTransaction(spend_bundle) - reject = Message( - "reject_block_request", - full_node_protocol.RejectBlockRequest( - request_block.height, request_block.header_hash - ), - ) - return reject - - @api_request - async def respond_block( - self, respond_block: full_node_protocol.RespondBlock - ) -> Optional[Message]: - await self.full_node._respond_block(respond_block) - return None + msg = Message("respond_transaction", transaction) + self.log.info(f"sending transaction (tx_id: {spend_bundle.name()}) to peer") + return msg @peer_required @api_request - async def reject_block_request( - self, reject: full_node_protocol.RejectBlockRequest, peer: ws.WSChiaConnection - ) -> Optional[Message]: - self.full_node.log.warning(f"Rejected block request {reject}") + async def respond_transaction(self, tx: full_node_protocol.RespondTransaction, peer: ws.WSChiaConnection) -> Optional[Message]: + """ + Receives a full transaction from peer. + If tx is added to mempool, send tx_id to others. (new_transaction) + """ + # Ignore if syncing if self.full_node.sync_store.get_sync_mode(): - await peer.close() - return None + return + + async with self.full_node.blockchain.lock: + # Ignore if we have already added this transaction + if self.full_node.mempool_manager.get_spendbundle(tx.transaction.name()) is not None: + return + cost, status, error = await self.full_node.mempool_manager.add_spendbundle(tx.transaction) + if status == MempoolInclusionStatus.SUCCESS: + self.log.info(f"Added transaction to mempool: {tx.transaction.name()}") + fees = tx.transaction.fees() + assert fees >= 0 + assert cost is not None + new_tx = full_node_protocol.NewTransaction( + tx.transaction.name(), + cost, + uint64(tx.transaction.fees()), + ) + message = Message("new_transaction", new_tx) + await self.server.send_to_all_except([message], NodeType.FULL_NODE, peer.peer_node_id) + else: + self.log.warning( + f"Was not able to add transaction with id {tx.transaction.name()}, {status} error: {error}" + ) + return + + @api_request + async def request_proof_of_weight(self, tx: full_node_protocol.RequestProofOfWeight) -> OutboundMessageGenerator: + # TODO(mariano/almog) + pass + + @api_request + async def respond_proof_of_weight(self, tx: full_node_protocol.RespondProofOfWeight) -> OutboundMessageGenerator: + # TODO(mariano/almog) + pass + + @api_request + async def request_sub_block(self, request_block: full_node_protocol.RequestSubBlock) -> Optional[Message]: + if request_block.height not in self.full_node.blockchain.height_to_hash: + return + block: Optional[FullBlock] = await self.full_node.block_store.get_full_block( + self.full_node.blockchain.height_to_hash[request_block.height] + ) + if block is not None: + if not request_block.include_transaction_block: + block = dataclasses.replace(block, transactions_generator=None) + msg = Message("respond_block", full_node_protocol.RespondSubBlock(block)) + return msg + return + + @api_request + async def respond_sub_block( + self, respond_sub_block: full_node_protocol.RespondSubBlock + ) -> Optional[Message]: + """ + Receive a full block from a peer full node (or ourselves). + """ + return await self.full_node.respond_sub_block(respond_sub_block) + + @api_request + async def new_unfinished_sub_block( + self, new_unfinished_sub_block: full_node_protocol.NewUnfinishedSubBlock + ) -> Optional[Message]: + if self.full_node.full_node_store.get_unfinished_block(new_unfinished_sub_block.unfinished_reward_hash) is not None: + return + + msg = Message( + "request_unfinished_sub_block", + full_node_protocol.RequestUnfinishedSubBlock(new_unfinished_sub_block.unfinished_reward_hash), + ) + return msg + + @api_request + async def request_unfinished_sub_block( + self, request_unfinished_sub_block: full_node_protocol.RequestUnfinishedSubBlock + ) -> Optional[Message]: + unfinished_block: Optional[UnfinishedBlock] = self.full_node.full_node_store.get_unfinished_block( + request_unfinished_sub_block.unfinished_reward_hash + ) + if unfinished_block is not None: + msg = Message( + "respond_unfinished_block", + full_node_protocol.RespondUnfinishedSubBlock(unfinished_block), + ) + return msg + + @peer_required + @api_request + async def respond_unfinished_sub_block( + self, respond_unfinished_sub_block: full_node_protocol.RespondUnfinishedSubBlock, peer: ws.WSChiaConnection + ) -> Optional[Message]: + return await self.full_node._respond_unfinished_sub_block(respond_unfinished_sub_block, peer) + + @api_request + async def new_signage_point_or_end_of_sub_slot( + self, new_sp: full_node_protocol.NewSignagePointOrEndOfSubSlot + ) -> Optional[Message]: + if ( + self.full_node.full_node_store.get_signage_point_by_index( + new_sp.challenge_hash, new_sp.index_from_challenge, new_sp.last_rc_infusion + ) + is not None + ): + return + + full_node_request = full_node_protocol.RequestSignagePointOrEndOfSubSlot( + new_sp.challenge_hash, new_sp.index_from_challenge, new_sp.last_rc_infusion + ) + + return Message("request_signage_point_or_end_of_sub_slot", full_node_request) + + @api_request + async def request_signage_point_or_end_of_sub_slot( + self, request: full_node_protocol.RequestSignagePointOrEndOfSubSlot + ) -> Optional[Message]: + if request.index_from_challenge == 0: + sub_slot: Optional[Tuple[EndOfSubSlotBundle, int]] = self.full_node.full_node_store.get_sub_slot( + request.challenge_hash + ) + if sub_slot is not None: + return Message("respond_end_of_slot", full_node_protocol.RespondEndOfSubSlot(sub_slot[0])) + else: + self.log.warning("") + else: + sp: Optional[SignagePoint] = self.full_node.full_node_store.get_signage_point_by_index( + request.challenge_hash, request.index_from_challenge, request.last_rc_infusion + ) + if sp is not None: + full_node_response = full_node_protocol.RespondSignagePoint( + request.index_from_challenge, + sp.cc_vdf, + sp.cc_proof, + sp.rc_vdf, + sp.rc_proof, + ) + return Message("respond_signage_point", full_node_response) + + @peer_required + @api_request + async def respond_signage_point(self, request: full_node_protocol.RespondSignagePoint, peer: ws.WSChiaConnection) -> Optional[Message]: + peak = self.full_node.blockchain.get_peak() + next_sub_slot_iters = self.full_node.blockchain.get_next_slot_iters(peak.get_hash(), True) + + added = self.full_node.full_node_store.new_signage_point( + request.index_from_challenge, + self.full_node.blockchain.sub_blocks, + self.full_node.blockchain.get_peak(), + next_sub_slot_iters, + SignagePoint( + request.challenge_chain_vdf, + request.challenge_chain_proof, + request.reward_chain_vdf, + request.reward_chain_proof, + ), + ) + + if added: + broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot( + request.challenge_chain_vdf.challenge_hash, + request.index_from_challenge, + request.reward_chain_vdf.challenge_hash, + ) + msg = Message("new_signage_point_or_end_of_sub_slot", broadcast) + self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) + + return + + @peer_required + @api_request + async def respond_end_of_sub_slot( + self, request: full_node_protocol.RespondEndOfSubSlot, peer: ws.WSChiaConnection + ) -> Optional[Message]: + + added = self.full_node.full_node_store.new_finished_sub_slot( + request.end_of_slot_bundle, self.full_node.blockchain.sub_blocks, self.full_node.blockchain.get_peak() + ) + + if added: + broadcast = full_node_protocol.NewSignagePointOrEndOfSubSlot( + request.end_of_slot_bundle.challenge_chain.get_hash(), + uint8(0), + request.end_of_slot_bundle.reward_chain.end_of_slot_vdf.challenge_hash, + ) + msg = Message("new_signage_point_or_end_of_sub_slot", broadcast) + self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id) + + return @peer_required @api_request async def request_mempool_transactions( - self, - request: full_node_protocol.RequestMempoolTransactions, - peer: ws.WSChiaConnection, + self, request: full_node_protocol.RequestMempoolTransactions, peer: ws.WSChiaConnection ) -> Optional[Message]: received_filter = PyBIP158(bytearray(request.filter)) - items: List[ - MempoolItem - ] = await self.full_node.mempool_manager.get_items_not_in_filter( - received_filter - ) + items: List[MempoolItem] = await self.full_node.mempool_manager.get_items_not_in_filter(received_filter) for item in items: transaction = full_node_protocol.RespondTransaction(item.spend_bundle) - await peer.send_message(Message("respond_transaction", transaction)) - - return None + msg = Message("respond_transaction", transaction) + await peer.send_message(msg) + return + # FARMER PROTOCOL @api_request - async def respond_unfinished_block( - self, respond_unfinished_block: full_node_protocol.RespondUnfinishedBlock - ) -> Optional[Message]: + async def declare_proof_of_space(self, request: farmer_protocol.DeclareProofOfSpace) -> Optional[Message]: """ - We have received an unfinished block, either created by us, or from another peer. - We can validate it and if it's a good block, propagate it to other peers and - timelords. + Creates a block body and header, with the proof of space, coinbase, and fee targets provided + by the farmer, and sends the hash of the header data back to the farmer. """ - block = respond_unfinished_block.block - # Adds the unfinished block to seen, and check if it's seen before, to prevent - # processing it twice - if self.full_node.full_node_store.seen_unfinished_block(block.header_hash): - return None + if request.pool_target is None or request.pool_signature is None: + raise ValueError("Adaptable pool protocol not yet available.") - if not self.full_node.blockchain.is_child_of_head(block): - return None + # Checks that the proof of space is a response to a recent challenge and valid SP + pos_sub_slot: Optional[Tuple[EndOfSubSlotBundle, int]] = self.full_node.full_node_store.get_sub_slot( + request.proof_of_space.challenge_hash + ) + sp_vdfs: Optional[SignagePoint] = self.full_node.full_node_store.get_signage_point(request.challenge_chain_sp) - prev_full_block: Optional[ - FullBlock - ] = await self.full_node.block_store.get_block(block.prev_header_hash) + if sp_vdfs is None or pos_sub_slot is None: + self.log.warning(f"Received proof of space for an unknown signage point: {request}") + return - assert prev_full_block is not None + # Now we know that the proof of space has a signage point either: + # 1. In the previous sub-slot of the peak (overflow) + # 2. In the same sub-slot as the peak + # 3. In a future sub-slot that we already know of + + # Checks that the proof of space is valid + quality_string: Optional[bytes32] = request.proof_of_space.verify_and_get_quality_string( + self.full_node.constants, request.challenge_hash, request.challenge_chain_sp + ) + assert len(quality_string) == 32 + + # Grab best transactions from Mempool for given tip target async with self.full_node.blockchain.lock: - ( - error_code, - iterations_needed, - ) = await self.full_node.blockchain.validate_unfinished_block( - block, prev_full_block - ) - - if error_code is not None: - raise ConsensusError(error_code) - assert iterations_needed is not None - - challenge = self.full_node.blockchain.get_challenge(prev_full_block) - assert challenge is not None - challenge_hash = challenge.get_hash() - - if ( - await ( - self.full_node.full_node_store.get_unfinished_block( - (challenge_hash, iterations_needed) - ) - ) - is not None - ): - return None - - expected_time: uint64 = uint64( - int( - iterations_needed - / (self.full_node.full_node_store.get_proof_of_time_estimate_ips()) - ) - ) - - if expected_time > self.full_node.constants.PROPAGATION_DELAY_THRESHOLD: - self.full_node.log.info( - f"Block is slow, expected {expected_time} seconds, waiting" - ) - # If this block is slow, sleep to allow faster blocks to come out first - await asyncio.sleep(5) - - leader: Tuple[ - uint32, uint64 - ] = self.full_node.full_node_store.get_unfinished_block_leader() - if leader is None or block.height > leader[0]: - self.full_node.log.info( - f"This is the first unfinished block at height {block.height}, so propagate." - ) - # If this is the first block we see at this height, propagate - self.full_node.full_node_store.set_unfinished_block_leader( - (block.height, expected_time) - ) - elif block.height == leader[0]: - if ( - expected_time - > leader[1] + self.full_node.constants.PROPAGATION_THRESHOLD - ): - # If VDF is expected to finish X seconds later than the best, don't propagate - self.full_node.log.info( - f"VDF will finish too late {expected_time} seconds, so don't propagate" - ) - return None - elif expected_time < leader[1]: - self.full_node.log.info( - f"New best unfinished block at height {block.height}" - ) - # If this will be the first block to finalize, update our leader - self.full_node.full_node_store.set_unfinished_block_leader( - (leader[0], expected_time) + peak: Optional[SubBlockRecord] = self.full_node.blockchain.get_peak() + if peak is None: + spend_bundle: Optional[SpendBundle] = None + else: + spend_bundle: Optional[SpendBundle] = await self.full_node.mempool_manager.create_bundle_from_mempool( + peak.header_hash ) + if pos_sub_slot[0].challenge_chain.new_difficulty is not None: + difficulty = pos_sub_slot[0].challenge_chain.new_difficulty + sub_slot_iters = pos_sub_slot[0].challenge_chain.new_sub_slot_iters else: - # If we have seen an unfinished block at a greater or equal height, don't propagate - self.full_node.log.info( - "Unfinished block at old height, so don't propagate" - ) - return None + if peak is None or peak.height == 0: + difficulty = self.full_node.constants.DIFFICULTY_STARTING + sub_slot_iters = self.full_node.constants.SUB_SLOT_ITERS_STARTING + else: + difficulty = uint64(peak.weight - self.full_node.blockchain.sub_blocks[peak.prev_hash].weight) + sub_slot_iters = peak.sub_slot_iters - await self.full_node.full_node_store.add_unfinished_block( - (challenge_hash, iterations_needed), block + required_iters: uint64 = calculate_iterations_quality( + quality_string, + request.proof_of_space.size, + difficulty, + request.challenge_chain_sp, + ) + sp_iters: uint64 = calculate_sp_iters(self.full_node.constants, sub_slot_iters, request.signage_point_index) + ip_iters: uint64 = calculate_ip_iters( + self.full_node.constants, sub_slot_iters, request.signage_point_index, required_iters + ) + total_iters_pos_slot: uint128 = pos_sub_slot[2] + + def get_plot_sig(to_sign, _) -> G2Element: + if to_sign == request.challenge_chain_sp: + return request.challenge_chain_sp_signature + if to_sign == request.reward_chain_sp: + return request.reward_chain_sp_signature + return G2Element.infinity() + + finished_sub_slots: List[EndOfSubSlotBundle] = self.full_node.full_node_store.get_finished_sub_slots( + peak, self.full_node.blockchain.sub_blocks, request.proof_of_space.challenge_hash ) - timelord_request = timelord_protocol.ProofOfSpaceInfo( - challenge_hash, iterations_needed + unfinished_block: Optional[UnfinishedBlock] = create_unfinished_block( + self.full_node.constants, + total_iters_pos_slot, + request.signage_point_index, + sp_iters, + ip_iters, + request.proof_of_space, + pos_sub_slot[0].challenge_chain.get_hash(), + request.farmer_puzzle_hash, + request.pool_target, + get_plot_sig, + get_plot_sig, + sp_vdfs, + uint64(int(time.time())), + b"", + spend_bundle, + peak, + self.full_node.blockchain.sub_blocks, + finished_sub_slots, ) + self.full_node.full_node_store.add_candidate_block(quality_string, unfinished_block) - message = Message("proof_of_space_info", timelord_request) - if self.full_node.server is not None: - await self.full_node.server.send_to_all([message], NodeType.TIMELORD) - new_unfinished_block = full_node_protocol.NewUnfinishedBlock( - block.prev_header_hash, iterations_needed, block.header_hash + message = farmer_protocol.RequestSignedValues( + quality_string, + unfinished_block.foliage_sub_block.get_hash(), + unfinished_block.foliage_block.get_hash(), ) - - f_message = Message("new_unfinished_block", new_unfinished_block) - if self.full_node.server is not None: - await self.full_node.server.send_to_all([f_message], NodeType.FULL_NODE) - self.full_node._state_changed("block") - return None + return Message("request_signed_values", message) @api_request - async def reject_unfinished_block_request( - self, - reject: full_node_protocol.RejectUnfinishedBlockRequest, - ) -> Optional[Message]: - self.full_node.log.warning(f"Rejected unfinished block request {reject}") - return None - - @api_request - async def request_all_header_hashes( - self, request: full_node_protocol.RequestAllHeaderHashes - ) -> Optional[Message]: - try: - header_hashes = self.full_node.blockchain.get_header_hashes( - request.tip_header_hash - ) - message = Message( - "all_header_hashes", full_node_protocol.AllHeaderHashes(header_hashes) - ) - return message - except ValueError: - self.full_node.log.info("Do not have requested header hashes.") - return None - - @api_request - async def all_header_hashes( - self, - all_header_hashes: full_node_protocol.AllHeaderHashes, - ) -> Optional[Message]: - assert len(all_header_hashes.header_hashes) > 0 - self.full_node.sync_store.set_potential_hashes(all_header_hashes.header_hashes) - phr = self.full_node.sync_store.get_potential_hashes_received() - assert phr is not None - phr.set() - return None - - @api_request - async def request_header_block( - self, request: full_node_protocol.RequestHeaderBlock - ) -> Optional[Message]: + async def signed_values(self, farmer_request: farmer_protocol.SignedValues) -> Optional[Message]: """ - A peer requests a list of header blocks, by height. Used for syncing or light clients. + Signature of header hash, by the harvester. This is enough to create an unfinished + block, which only needs a Proof of Time to be finished. If the signature is valid, + we call the unfinished_block routine. """ - full_block: Optional[FullBlock] = await self.full_node.block_store.get_block( - request.header_hash - ) - if full_block is not None: - header_block: Optional[ - HeaderBlock - ] = self.full_node.blockchain.get_header_block(full_block) - if header_block is not None and header_block.height == request.height: - response = full_node_protocol.RespondHeaderBlock(header_block) + candidate: Optional[UnfinishedBlock] = self.full_node.full_node_store.get_candidate_block(farmer_request.quality_string) + if candidate is None: + self.log.warning(f"Quality string {farmer_request.quality_string} not found in database") + return - message = Message("respond_header_block", response) - return message - reject = full_node_protocol.RejectHeaderBlockRequest( - request.height, request.header_hash + fsb2 = dataclasses.replace( + candidate.foliage_sub_block, + foliage_sub_block_signature=farmer_request.foliage_sub_block_signature, + ) + fsb3 = dataclasses.replace(fsb2, foliage_block_signature=farmer_request.foliage_block_signature) + new_candidate = dataclasses.replace(candidate, foliage_sub_block=fsb3) + + # Propagate to ourselves (which validates and does further propagations) + request = full_node_protocol.RespondUnfinishedSubBlock(new_candidate) + + await self.full_node._respond_unfinished_sub_block(request, None) + + # TIMELORD PROTOCOL + @api_request + async def new_infusion_point_vdf(self, request: timelord_protocol.NewInfusionPointVDF) -> Optional[Message]: + # Lookup unfinished blocks + unfinished_block: Optional[UnfinishedBlock] = self.full_node.full_node_store.get_unfinished_block( + request.unfinished_reward_hash ) - message = Message("reject_header_block_request", reject) - return message + if unfinished_block is None: + self.log.warning( + f"Do not have unfinished reward chain block {request.unfinished_reward_hash}, cannot finish." + ) + + prev_sb: Optional[SubBlockRecord] = None + if request.reward_chain_ip_vdf.challenge_hash == self.full_node.constants.FIRST_RC_CHALLENGE: + # Genesis + assert unfinished_block.height == 0 + else: + # Find the prev block + curr: Optional[SubBlockRecord] = self.full_node.blockchain.get_peak() + if curr is None: + self.log.warning(f"Have no blocks in chain, so can not complete block {unfinished_block.height}") + return + num_sb_checked = 0 + while num_sb_checked < 10: + if curr.reward_infusion_new_challenge == request.reward_chain_ip_vdf.challenge_hash: + # Found our prev block + prev_sb = curr + break + curr = self.full_node.blockchain.sub_blocks.get(curr.prev_hash, None) + if curr is None: + return + num_sb_checked += 1 + + # If not found, cache keyed on prev block + if prev_sb is None: + self.full_node.full_node_store.add_to_future_ip(request) + return + + sub_slot_iters, difficulty = get_sub_slot_iters_and_difficulty( + self.full_node.constants, unfinished_block, self.full_node.blockchain.height_to_hash, prev_sb, self.full_node.blockchain.sub_blocks + ) + overflow = is_overflow_sub_block(self.full_node.constants, unfinished_block.reward_chain_sub_block.signage_point_index) + if overflow: + finished_sub_slots = self.full_node.full_node_store.get_finished_sub_slots( + prev_sb, + self.full_node.blockchain.sub_blocks, + unfinished_block.reward_chain_sub_block.proof_of_space.challenge_hash, + True, + ) + else: + finished_sub_slots = unfinished_block.finished_sub_slots + + block: FullBlock = unfinished_block_to_full_block( + unfinished_block, + request.challenge_chain_ip_vdf, + request.challenge_chain_ip_proof, + request.reward_chain_ip_vdf, + request.reward_chain_ip_proof, + request.infused_challenge_chain_ip_vdf, + request.infused_challenge_chain_ip_proof, + finished_sub_slots, + prev_sb, + difficulty, + ) + + await self.respond_sub_block(full_node_protocol.RespondSubBlock(block)) + + @peer_required + @api_request + async def new_signage_point_vdf(self, request: timelord_protocol.NewSignagePointVDF, peer: ws.WSChiaConnection) -> Optional[Message]: + full_node_message = full_node_protocol.RespondSignagePoint( + request.index_from_challenge, + request.challenge_chain_sp_vdf, + request.challenge_chain_sp_proof, + request.reward_chain_sp_vdf, + request.reward_chain_sp_proof, + ) + return await self.respond_signage_point(full_node_message, peer) + + @peer_required + @api_request + async def new_end_of_sub_slot_vdf(self, request: timelord_protocol.NewEndOfSubSlotVDF, peer: ws.WSChiaConnection) -> Optional[Message]: + # Calls our own internal message to handle the end of sub slot, and potentially broadcasts to other peers. + full_node_message = full_node_protocol.RespondEndOfSubSlot(request.end_of_sub_slot_bundle) + return await self.respond_end_of_sub_slot(full_node_message, peer) + + # WALLET PROTOCOL + # @api_request + # async def send_transaction(self, tx: wallet_protocol.SendTransaction) -> OutboundMessageGenerator: + # # Ignore if syncing + # if self.sync_store.get_sync_mode(): + # status = MempoolInclusionStatus.FAILED + # error: Optional[Err] = Err.UNKNOWN + # else: + # async with self.blockchain.lock: + # cost, status, error = await self.mempool_manager.add_spendbundle(tx.transaction) + # if status == MempoolInclusionStatus.SUCCESS: + # self.log.info(f"Added transaction to mempool: {tx.transaction.name()}") + # # Only broadcast successful transactions, not pending ones. Otherwise it's a DOS + # # vector. + # fees = tx.transaction.fees() + # assert fees >= 0 + # assert cost is not None + # new_tx = full_node_protocol.NewTransaction( + # tx.transaction.name(), + # cost, + # uint64(tx.transaction.fees()), + # ) + # yield OutboundMessage( + # NodeType.FULL_NODE, + # Message("new_transaction", new_tx), + # Delivery.BROADCAST_TO_OTHERS, + # ) + # else: + # self.log.warning( + # f"Wasn't able to add transaction with id {tx.transaction.name()}, " + # f"status {status} error: {error}" + # ) + # + # error_name = error.name if error is not None else None + # if status == MempoolInclusionStatus.SUCCESS: + # response = wallet_protocol.TransactionAck(tx.transaction.name(), status, error_name) + # else: + # # If if failed/pending, but it previously succeeded (in mempool), this is idempotence, return SUCCESS + # if self.mempool_manager.get_spendbundle(tx.transaction.name()) is not None: + # response = wallet_protocol.TransactionAck(tx.transaction.name(), MempoolInclusionStatus.SUCCESS, None) + # else: + # response = wallet_protocol.TransactionAck(tx.transaction.name(), status, error_name) + # yield OutboundMessage(NodeType.WALLET, Message("transaction_ack", response), Delivery.RESPOND) + # + # @api_request + # async def request_header(self, request: wallet_protocol.RequestHeader) -> OutboundMessageGenerator: + # full_block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) + # if full_block is not None: + # header_block: Optional[HeaderBlock] = full_block.get_header_block() + # if header_block is not None and header_block.height == request.height: + # response = wallet_protocol.RespondHeader(header_block, full_block.transactions_filter) + # yield OutboundMessage( + # NodeType.WALLET, + # Message("respond_header", response), + # Delivery.RESPOND, + # ) + # return + # reject = wallet_protocol.RejectHeaderRequest(request.height, request.header_hash) + # yield OutboundMessage( + # NodeType.WALLET, + # Message("reject_header_request", reject), + # Delivery.RESPOND, + # ) + # + # @api_request + # async def request_removals(self, request: wallet_protocol.RequestRemovals) -> OutboundMessageGenerator: + # block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) + # if ( + # block is None + # or block.height != request.height + # or block.height not in self.blockchain.height_to_hash + # or self.blockchain.height_to_hash[block.height] != block.header_hash + # ): + # reject = wallet_protocol.RejectRemovalsRequest(request.height, request.header_hash) + # yield OutboundMessage( + # NodeType.WALLET, + # Message("reject_removals_request", reject), + # Delivery.RESPOND, + # ) + # return + # + # assert block is not None + # all_removals, _ = await block.tx_removals_and_additions() + # + # coins_map: List[Tuple[bytes32, Optional[Coin]]] = [] + # proofs_map: List[Tuple[bytes32, bytes]] = [] + # + # # If there are no transactions, respond with empty lists + # if block.transactions_generator is None: + # proofs: Optional[List] + # if request.coin_names is None: + # proofs = None + # else: + # proofs = [] + # response = wallet_protocol.RespondRemovals(block.height, block.header_hash, [], proofs) + # elif request.coin_names is None or len(request.coin_names) == 0: + # for removal in all_removals: + # cr = await self.coin_store.get_coin_record(removal) + # assert cr is not None + # coins_map.append((cr.coin.name(), cr.coin)) + # response = wallet_protocol.RespondRemovals(block.height, block.header_hash, coins_map, None) + # else: + # assert block.transactions_generator + # removal_merkle_set = MerkleSet() + # for coin_name in all_removals: + # removal_merkle_set.add_already_hashed(coin_name) + # assert removal_merkle_set.get_root() == block.header.data.removals_root + # for coin_name in request.coin_names: + # result, proof = removal_merkle_set.is_included_already_hashed(coin_name) + # proofs_map.append((coin_name, proof)) + # if coin_name in all_removals: + # cr = await self.coin_store.get_coin_record(coin_name) + # assert cr is not None + # coins_map.append((coin_name, cr.coin)) + # assert result + # else: + # coins_map.append((coin_name, None)) + # assert not result + # response = wallet_protocol.RespondRemovals(block.height, block.header_hash, coins_map, proofs_map) + # + # yield OutboundMessage( + # NodeType.WALLET, + # Message("respond_removals", response), + # Delivery.RESPOND, + # ) + # + # @api_request + # async def request_additions(self, request: wallet_protocol.RequestAdditions) -> OutboundMessageGenerator: + # block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) + # if ( + # block is None + # or block.height != request.height + # or block.height not in self.blockchain.height_to_hash + # or self.blockchain.height_to_hash[block.height] != block.header_hash + # ): + # reject = wallet_protocol.RejectAdditionsRequest(request.height, request.header_hash) + # yield OutboundMessage( + # NodeType.WALLET, + # Message("reject_additions_request", reject), + # Delivery.RESPOND, + # ) + # return + # + # assert block is not None + # _, additions = await block.tx_removals_and_additions() + # puzzlehash_coins_map: Dict[bytes32, List[Coin]] = {} + # for coin in additions + [block.get_coinbase(), block.get_fees_coin()]: + # if coin.puzzle_hash in puzzlehash_coins_map: + # puzzlehash_coins_map[coin.puzzle_hash].append(coin) + # else: + # puzzlehash_coins_map[coin.puzzle_hash] = [coin] + # + # coins_map: List[Tuple[bytes32, List[Coin]]] = [] + # proofs_map: List[Tuple[bytes32, bytes, Optional[bytes]]] = [] + # + # if request.puzzle_hashes is None: + # for puzzle_hash, coins in puzzlehash_coins_map.items(): + # coins_map.append((puzzle_hash, coins)) + # response = wallet_protocol.RespondAdditions(block.height, block.header_hash, coins_map, None) + # else: + # # Create addition Merkle set + # addition_merkle_set = MerkleSet() + # # Addition Merkle set contains puzzlehash and hash of all coins with that puzzlehash + # for puzzle, coins in puzzlehash_coins_map.items(): + # addition_merkle_set.add_already_hashed(puzzle) + # addition_merkle_set.add_already_hashed(hash_coin_list(coins)) + # + # assert addition_merkle_set.get_root() == block.header.data.additions_root + # for puzzle_hash in request.puzzle_hashes: + # result, proof = addition_merkle_set.is_included_already_hashed(puzzle_hash) + # if puzzle_hash in puzzlehash_coins_map: + # coins_map.append((puzzle_hash, puzzlehash_coins_map[puzzle_hash])) + # hash_coin_str = hash_coin_list(puzzlehash_coins_map[puzzle_hash]) + # result_2, proof_2 = addition_merkle_set.is_included_already_hashed(hash_coin_str) + # assert result + # assert result_2 + # proofs_map.append((puzzle_hash, proof, proof_2)) + # else: + # coins_map.append((puzzle_hash, [])) + # assert not result + # proofs_map.append((puzzle_hash, proof, None)) + # response = wallet_protocol.RespondAdditions(block.height, block.header_hash, coins_map, proofs_map) + # + # yield OutboundMessage( + # NodeType.WALLET, + # Message("respond_additions", response), + # Delivery.RESPOND, + # ) + # + # @api_request + # async def request_generator(self, request: wallet_protocol.RequestGenerator) -> OutboundMessageGenerator: + # full_block: Optional[FullBlock] = await self.block_store.get_block(request.header_hash) + # if full_block is not None: + # if full_block.transactions_generator is not None: + # wrapper = GeneratorResponse( + # full_block.height, + # full_block.header_hash, + # full_block.transactions_generator, + # ) + # response = wallet_protocol.RespondGenerator(wrapper) + # yield OutboundMessage( + # NodeType.WALLET, + # Message("respond_generator", response), + # Delivery.RESPOND, + # ) + # return + # + # reject = wallet_protocol.RejectGeneratorRequest(request.height, request.header_hash) + # yield OutboundMessage( + # NodeType.WALLET, + # Message("reject_generator_request", reject), + # Delivery.RESPOND, + # ) diff --git a/src/full_node/sync_peers_handler.py b/src/full_node/sync_peers_handler.py index dce196b95626..823a61539681 100644 --- a/src/full_node/sync_peers_handler.py +++ b/src/full_node/sync_peers_handler.py @@ -7,6 +7,7 @@ from src.consensus.blockchain import Blockchain from src.full_node.sync_store import SyncStore from src.protocols import full_node_protocol from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage +from src.server.server import ChiaServer from src.server.ws_connection import WSChiaConnection from src.types.full_block import FullBlock from src.types.header_block import HeaderBlock @@ -42,6 +43,7 @@ class SyncPeersHandler: fork_height: uint32, blockchain: Blockchain, peak_height: uint32, + server: ChiaServer ): self.sync_store = sync_store # Set of outbound requests for every full_node peer, and time sent @@ -61,7 +63,7 @@ class SyncPeersHandler: self.potential_blocks_received = self.sync_store.potential_blocks_received self.potential_blocks = self.sync_store.potential_blocks - + self.server = server # No blocks received yet for height in range(self.fully_validated_up_to + 1, peak_height + 1): self.potential_blocks_received[uint32(height)] = asyncio.Event() @@ -77,14 +79,33 @@ class SyncPeersHandler: # We have received all blocks return True - async def _add_to_request_sets(self) -> List[OutboundMessage]: + async def monitor_timeouts(self): + """ + If any of our requests have timed out, disconnects from the node that should + have responded. + """ + current_time = time.time() + remove_node_ids = [] + for node_id, outbound_set in self.current_outbound_sets.items(): + for _, time_requested in outbound_set.items(): + if current_time - float(time_requested) > self.BLOCK_RESPONSE_TIMEOUT: + remove_node_ids.append(node_id) + for rn_id in remove_node_ids: + if rn_id in self.current_outbound_sets: + log.warning(f"Timeout receiving block, closing connection with node {rn_id}") + self.current_outbound_sets.pop(rn_id, None) + if rn_id in self.server.all_connections: + con = self.server.all_connections[rn_id] + await con.close() + + async def add_to_request_sets(self): """ Refreshes the pointers of how far we validated and how far we downloaded. Then goes through all peers and sends requests to peers for the blocks we have not requested yet, or have requested to a peer that did not respond in time or disconnected. """ if not self.sync_store.get_sync_mode(): - return [] + return # fork fully validated MAX_GAP target (peak) # $$$$$X$$$$$$$$$$$$$$$X================----==---=--====---=--X-------> @@ -108,8 +129,8 @@ class SyncPeersHandler: to_send: List[uint32] = [] # Finds a block height for height in range( - self.fully_validated_up_to + 1, - min(self.fully_validated_up_to + self.MAX_GAP + 1, self.peak_height + 1), + self.fully_validated_up_to + 1, + min(self.fully_validated_up_to + self.MAX_GAP + 1, self.peak_height + 1), ): if len(to_send) == free_slots: # No more slots to send to any peers @@ -135,8 +156,6 @@ class SyncPeersHandler: outbound_sets_list = list(self.current_outbound_sets.items()) outbound_sets_list.sort(key=lambda x: len(x[1])) index = 0 - messages: List[Any] = [] - node_id = None for height in to_send: # Find a the next peer with an empty slot. There must be an empty slot: to_send # includes up to free_slots things, and current_outbound sets cannot change since there is @@ -148,22 +167,14 @@ class SyncPeersHandler: node_id, request_set = outbound_sets_list[index % len(outbound_sets_list)] request_set[uint32(height)] = uint64(int(time.time())) - request = full_node_protocol.RequestBlock( - height, self.header_hashes[height] - ) - msg = OutboundMessage( - NodeType.FULL_NODE, - Message("request_block", request), - Delivery.SPECIFIC, - node_id, - ) - messages.append(msg) + request = full_node_protocol.RequestSubBlock(height, True) + msg = Message("request_block", request) + await self.server.send_to_specific([msg], node_id) - return messages async def new_block( self, block: Union[FullBlock, HeaderBlock] - ) -> List[OutboundMessage]: + ): """ A new block was received from a peer. """ @@ -178,7 +189,7 @@ class SyncPeersHandler: # save block to DB self.potential_blocks[block.height] = block if not self.sync_store.get_sync_mode(): - return [] + return assert block.height in self.potential_blocks_received @@ -189,8 +200,7 @@ class SyncPeersHandler: request_set.pop(header_hash, None) # add to request sets - requests = await self._add_to_request_sets() - return requests + await self.add_to_request_sets() def new_node_connected(self, node_id: bytes32): """ diff --git a/src/harvester_api.py b/src/harvester_api.py index a49e40c561b8..17796e4e45c7 100644 --- a/src/harvester_api.py +++ b/src/harvester_api.py @@ -1,17 +1,22 @@ +import asyncio +import dataclasses +import time from pathlib import Path -from typing import Optional, Callable +from typing import Optional, Callable, List, Tuple from blspy import AugSchemeMPL, G2Element from chiapos import DiskProver +from src.consensus.pot_iterations import calculate_sp_interval_iters, calculate_iterations_quality from src.harvester import Harvester from src.plotting.plot_tools import PlotInfo from src.protocols import harvester_protocol from src.server.outbound_message import Message from src.server.ws_connection import WSChiaConnection from src.types.proof_of_space import ProofOfSpace +from src.types.sized_bytes import bytes32 from src.util.api_decorators import api_request, peer_required -from src.util.ints import uint8 +from src.util.ints import uint8, uint64 class HarvesterAPI: @@ -23,135 +28,162 @@ class HarvesterAPI: def _set_state_changed_callback(self, callback: Callable): self.harvester.state_changed_callback = callback - @peer_required @api_request - async def harvester_handshake( - self, - harvester_handshake: harvester_protocol.HarvesterHandshake, - peer: WSChiaConnection, - ): + async def harvester_handshake(self, harvester_handshake: harvester_protocol.HarvesterHandshake): """ Handshake between the harvester and farmer. The harvester receives the pool public keys, as well as the farmer pks, which must be put into the plots, before the plotting process begins. We cannot use any plots which have different keys in them. """ - self.harvester.farmer_public_keys = harvester_handshake.farmer_public_keys - self.harvester.pool_public_keys = harvester_handshake.pool_public_keys + self.farmer_public_keys = harvester_handshake.farmer_public_keys + self.pool_public_keys = harvester_handshake.pool_public_keys await self.harvester._refresh_plots() if len(self.harvester.provers) == 0: - self.harvester.log.warning( - "Not farming any plots on this harvester. Check your configuration." - ) + self.harvester.log.warning("Not farming any plots on this harvester. Check your configuration.") return - for new_challenge in self.harvester.cached_challenges: - async for msg in self.harvester._new_challenge(new_challenge): - await peer.send_message(msg) - - self.harvester.cached_challenges = [] self.harvester._state_changed("plots") @peer_required @api_request - async def new_challenge( - self, new_challenge: harvester_protocol.NewChallenge, peer: WSChiaConnection - ): - async for msg in self.harvester._new_challenge(new_challenge): - await peer.send_message(msg) - - @api_request - async def request_proof_of_space( - self, request: harvester_protocol.RequestProofOfSpace - ): + async def new_signage_point(self, new_challenge: harvester_protocol.NewSignagePoint, peer: WSChiaConnection): """ - The farmer requests a proof of space, for one of the plots. - We look up the correct plot based on the plot id and response number, lookup the proof, - and return it. + The harvester receives a new signage point from the farmer, this happens at the start of each slot. + The harvester does a few things: + 1. The harvester applies the plot filter for each of the plots, to select the proportion which are eligible + for this signage point and challenge. + 2. The harvester gets the qualities for each plot. This is approximately 7 reads per plot which qualifies. + Note that each plot may have 0, 1, 2, etc qualities for that challenge: but on average it will have 1. + 3. Checks the required_iters for each quality and the given signage point, to see which are eligible for + inclusion (required_iters < sp_interval_iters). + 4. Looks up the full proof of space in the plot for each quality, approximately 64 reads per quality + 5. Returns the proof of space to the farmer """ - response: Optional[harvester_protocol.RespondProofOfSpace] = None - challenge_hash = request.challenge_hash - filename = Path(request.plot_id).resolve() - index = request.response_number - proof_xs: bytes - plot_info = self.harvester.provers[filename] + if len(self.pool_public_keys) == 0 or len(self.farmer_public_keys) == 0: + # This means that we have not received the handshake yet + return - try: + start = time.time() + assert len(new_challenge.challenge_hash) == 32 + + # Refresh plots to see if there are any new ones + await self.harvester._refresh_plots() + + loop = asyncio.get_running_loop() + + def blocking_lookup(filename: Path, plot_info: PlotInfo, prover: DiskProver) -> List[ProofOfSpace]: + # Uses the DiskProver object to lookup qualities. This is a blocking call, + # so it should be run in a thread pool. try: - proof_xs = plot_info.prover.get_full_proof(challenge_hash, index) - except RuntimeError: - prover = DiskProver(str(filename)) - self.harvester.provers[filename] = PlotInfo( - prover, - plot_info.pool_public_key, - plot_info.farmer_public_key, - plot_info.plot_public_key, - plot_info.local_sk, - plot_info.file_size, - plot_info.time_modified, + sp_challenge_hash = ProofOfSpace.calculate_new_challenge_hash( + plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash ) - proof_xs = self.harvester.provers[filename].prover.get_full_proof( - challenge_hash, index + quality_strings = prover.get_qualities_for_challenge(sp_challenge_hash) + except Exception: + self.harvester.log.error("Error using prover object. Reinitializing prover object.") + self.harvester.provers[filename] = dataclasses.replace(plot_info, prover=DiskProver(str(filename))) + return [] + + responses: List[ProofOfSpace] = [] + if quality_strings is not None: + # Found proofs of space (on average 1 is expected per plot) + for index, quality_str in enumerate(quality_strings): + required_iters: uint64 = calculate_iterations_quality( + quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash + ) + sp_interval_iters = calculate_sp_interval_iters(self.harvester.constants, new_challenge.sub_slot_iters) + if required_iters < sp_interval_iters: + # Found a very good proof of space! will fetch the whole proof from disk, then send to farmer + try: + proof_xs = prover.get_full_proof(sp_challenge_hash, index) + except RuntimeError: + self.harvester.log.error(f"Exception fetching full proof for {filename}") + continue + + plot_public_key = ProofOfSpace.generate_plot_public_key( + plot_info.local_sk.get_g1(), plot_info.farmer_public_key + ) + responses.append( + ProofOfSpace( + sp_challenge_hash, + plot_info.pool_public_key, + None, + plot_public_key, + uint8(prover.get_size()), + proof_xs, + ) + ) + return responses + + async def lookup_challenge(filename: Path, prover: DiskProver) -> List[harvester_protocol.NewProofOfSpace]: + # Executes a DiskProverLookup in a thread pool, and returns responses + all_responses: List[harvester_protocol.NewProofOfSpace] = [] + proofs_of_space: List[ProofOfSpace] = await loop.run_in_executor( + self.harvester.executor, blocking_lookup, filename, prover + ) + for proof_of_space in proofs_of_space: + all_responses.append( + harvester_protocol.NewProofOfSpace( + new_challenge.challenge_hash, prover.get_id(), proof_of_space, new_challenge.signage_point_index + ) ) - except KeyError: - self.harvester.log.warning(f"KeyError plot {filename} does not exist.") + return all_responses - plot_info = self.harvester.provers[filename] - plot_public_key = ProofOfSpace.generate_plot_public_key( - plot_info.local_sk.get_g1(), plot_info.farmer_public_key - ) + awaitables = [] + for try_plot_filename, try_plot_info in self.harvester.provers.items(): + if try_plot_filename.exists(): + # Passes the plot filter (does not check sp filter yet though, since we have not reached sp) + # This is being executed at the beginning of the slot + if ProofOfSpace.passes_plot_filter( + self.harvester.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash + ): + awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover)) - proof_of_space: ProofOfSpace = ProofOfSpace( - challenge_hash, - plot_info.pool_public_key, - plot_public_key, - uint8(self.harvester.provers[filename].prover.get_size()), - proof_xs, + # Concurrently executes all lookups on disk, to take advantage of multiple disk parallelism + total_proofs_found = 0 + for sublist_awaitable in asyncio.as_completed(awaitables): + for response in await sublist_awaitable: + total_proofs_found += 1 + msg = Message("challenge_response", response) + await peer.send_message(msg) + self.harvester.log.info( + f"{len(awaitables)} plots were eligible for farming {new_challenge.challenge_hash.hex()[:10]}..." + f" Found {total_proofs_found} proofs. Time: {time.time() - start}. " + f"Total {len(self.harvester.provers)} plots" ) - response = harvester_protocol.RespondProofOfSpace( - request.plot_id, - request.response_number, - proof_of_space, - ) - if response: - msg = Message("respond_proof_of_space", response) - return msg @api_request - async def request_signature(self, request: harvester_protocol.RequestSignature): + async def request_signatures(self, request: harvester_protocol.RequestSignatures): """ The farmer requests a signature on the header hash, for one of the proofs that we found. A signature is created on the header hash using the harvester private key. This can also be used for pooling. """ - plot_info = None try: - plot_info = self.harvester.provers[Path(request.plot_id).resolve()] + plot_info = self.harvester.provers[Path(request.plot_identifier).resolve()] except KeyError: - self.harvester.log.warning( - f"KeyError plot {request.plot_id} does not exist." - ) + self.harvester.log.warning(f"KeyError plot {request.plot_identifier} does not exist.") return local_sk = plot_info.local_sk - agg_pk = ProofOfSpace.generate_plot_public_key( - local_sk.get_g1(), plot_info.farmer_public_key - ) + agg_pk = ProofOfSpace.generate_plot_public_key(local_sk.get_g1(), plot_info.farmer_public_key) # This is only a partial signature. When combined with the farmer's half, it will # form a complete PrependSignature. - signature: G2Element = AugSchemeMPL.sign(local_sk, request.message, agg_pk) + message_signatures: List[Tuple[bytes32, G2Element]] = [] + for message in request.messages: + signature: G2Element = AugSchemeMPL.sign(local_sk, message, agg_pk) + message_signatures.append((message, signature)) - response: harvester_protocol.RespondSignature = ( - harvester_protocol.RespondSignature( - request.plot_id, - request.message, - local_sk.get_g1(), - plot_info.farmer_public_key, - signature, - ) + response: harvester_protocol.RespondSignatures = harvester_protocol.RespondSignatures( + request.plot_identifier, + request.sp_hash, + local_sk.get_g1(), + plot_info.farmer_public_key, + message_signatures, ) - msg = Message("respond_signature", response) + msg = Message("respond_signatures", response) return msg diff --git a/src/timelord_api.py b/src/timelord_api.py index 5a8e84953d6d..ea3bc70892f3 100644 --- a/src/timelord_api.py +++ b/src/timelord_api.py @@ -1,8 +1,10 @@ from typing import Callable from src.protocols import timelord_protocol -from src.timelord import Timelord +from src.timelord_new import Timelord +from src.timelord_new import IterationType, iters_from_sub_block from src.util.api_decorators import api_request +from src.util.ints import uint64 class TimelordAPI: @@ -14,103 +16,34 @@ class TimelordAPI: def _set_state_changed_callback(self, callback: Callable): pass - @api_request - async def challenge_start(self, challenge_start: timelord_protocol.ChallengeStart): - """ - The full node notifies the timelord node that a new challenge is active, and work - should be started on it. We add the challenge into the queue if it's worth it to have. - """ - async with self.timelord.lock: - if not self.timelord.sanitizer_mode: - if challenge_start.challenge_hash in self.timelord.seen_discriminants: - self.timelord.log.info( - f"Have already seen this challenge hash {challenge_start.challenge_hash}. Ignoring." - ) - return - if challenge_start.weight <= self.timelord.best_weight_three_proofs: - self.timelord.log.info( - "Not starting challenge, already three proofs at that weight" - ) - return - self.timelord.seen_discriminants.append(challenge_start.challenge_hash) - self.timelord.discriminant_queue.append( - (challenge_start.challenge_hash, challenge_start.weight) - ) - self.timelord.log.info("Appended to discriminant queue.") - else: - disc_dict = dict(self.timelord.discriminant_queue) - if challenge_start.challenge_hash in disc_dict: - self.timelord.log.info( - "Challenge already in discriminant queue. Ignoring." - ) - return - if challenge_start.challenge_hash in self.timelord.active_discriminants: - self.timelord.log.info("Challenge currently running. Ignoring.") - return - - self.timelord.discriminant_queue.append( - (challenge_start.challenge_hash, challenge_start.weight) - ) - if challenge_start.weight not in self.timelord.max_known_weights: - self.timelord.max_known_weights.append(challenge_start.weight) - self.timelord.max_known_weights.sort() - if len(self.timelord.max_known_weights) > 5: - self.timelord.max_known_weights = ( - self.timelord.max_known_weights[-5:] - ) + @property + def lock(self): + return self.timelord.lock @api_request - async def proof_of_space_info( - self, - proof_of_space_info: timelord_protocol.ProofOfSpaceInfo, - ): - """ - Notification from full node about a new proof of space for a challenge. If we already - have a process for this challenge, we should communicate to the process to tell it how - many iterations to run for. - """ - async with self.timelord.lock: - if not self.timelord.sanitizer_mode: - self.timelord.log.info( - f"proof_of_space_info {proof_of_space_info.challenge_hash} {proof_of_space_info.iterations_needed}" - ) - if ( - proof_of_space_info.challenge_hash - in self.timelord.done_discriminants - ): - self.timelord.log.info( - f"proof_of_space_info {proof_of_space_info.challenge_hash} already done, returning" - ) - return - else: - disc_dict = dict(self.timelord.discriminant_queue) - if proof_of_space_info.challenge_hash in disc_dict: - challenge_weight = disc_dict[proof_of_space_info.challenge_hash] - if challenge_weight >= min(self.timelord.max_known_weights): - self.timelord.log.info( - "Not storing iter, waiting for more block confirmations." - ) - return - else: - self.timelord.log.info("Not storing iter, challenge inactive.") - return - - if proof_of_space_info.challenge_hash not in self.timelord.pending_iters: - self.timelord.pending_iters[proof_of_space_info.challenge_hash] = [] - if proof_of_space_info.challenge_hash not in self.timelord.submitted_iters: - self.timelord.submitted_iters[proof_of_space_info.challenge_hash] = [] - + async def new_peak(self, new_peak: timelord_protocol.NewPeak): + async with self.lock: if ( - proof_of_space_info.iterations_needed - not in self.timelord.pending_iters[proof_of_space_info.challenge_hash] - and proof_of_space_info.iterations_needed - not in self.timelord.submitted_iters[proof_of_space_info.challenge_hash] + self.timelord.last_state is None + or self.timelord.last_state.get_weight() < new_peak.weight ): - self.timelord.log.info( - f"proof_of_space_info {proof_of_space_info.challenge_hash} adding " - f"{proof_of_space_info.iterations_needed} to " - f"{self.timelord.pending_iters[proof_of_space_info.challenge_hash]}" - ) - self.timelord.pending_iters[proof_of_space_info.challenge_hash].append( - proof_of_space_info.iterations_needed - ) + self.new_peak = new_peak + + @api_request + async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock): + async with self.lock: + if not self.timelord._accept_unfinished_block(new_unfinished_subblock): + return + sp_iters, ip_iters = iters_from_sub_block( + new_unfinished_subblock.reward_chain_sub_block, + self.timelord.last_state.get_ips(), + self.timelord.last_state.get_difficulty(), + ) + last_ip_iters = self.timelord.last_state.get_last_ip() + if sp_iters < ip_iters: + self.timelord.overflow_blocks.append(new_unfinished_subblock) + elif ip_iters > last_ip_iters: + self.timelord.unfinished_blocks.append(new_unfinished_subblock) + for chain in Chain: + self.timelord.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters)) + self.timelord.iteration_to_proof_type[ip_iters - self.timelord.last_ip_iters] = IterationType.INFUSION_POINT diff --git a/src/timelord_new.py b/src/timelord_new.py index 95e101d7c93a..476ab92e2d9a 100644 --- a/src/timelord_new.py +++ b/src/timelord_new.py @@ -225,34 +225,6 @@ class Timelord: def _set_server(self, server: ChiaServer): self.server = server - @api_request - async def new_peak(self, new_peak: timelord_protocol.NewPeak): - async with self.lock: - if ( - self.last_state is None - or self.last_state.get_weight() < new_peak.weight - ): - self.new_peak = new_peak - - @api_request - async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock): - async with self.lock: - if not self._accept_unfinished_block(new_unfinished_subblock): - return - sp_iters, ip_iters = iters_from_sub_block( - new_unfinished_subblock.reward_chain_sub_block, - self.last_state.get_ips(), - self.last_state.get_difficulty(), - ) - last_ip_iters = self.last_state.get_last_ip() - if sp_iters < ip_iters: - self.overflow_blocks.append(new_unfinished_subblock) - elif ip_iters > last_ip_iters: - self.unfinished_blocks.append(new_unfinished_subblock) - for chain in Chain: - self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters)) - self.iteration_to_proof_type[ip_iters - self.last_ip_iters] = IterationType.INFUSION_POINT - def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool: # Total unfinished block iters needs to exceed peak's iters. if self.last_state.get_total_iters() >= block.total_iters: