From 6d8c8be0879d81f592e56bef6cad16f831dcd5a7 Mon Sep 17 00:00:00 2001 From: Mariano Sorgente Date: Wed, 2 Dec 2020 12:01:47 +0900 Subject: [PATCH] Separate out timelord api --- src/server/start_timelord.py | 17 +++--- src/timelord.py | 48 +++++----------- src/timelord_api.py | 106 +++++++++-------------------------- tests/setup_nodes.py | 2 +- 4 files changed, 51 insertions(+), 122 deletions(-) diff --git a/src/server/start_timelord.py b/src/server/start_timelord.py index 29deaf9b8642..6d4b7b1de565 100644 --- a/src/server/start_timelord.py +++ b/src/server/start_timelord.py @@ -2,6 +2,7 @@ import pathlib from typing import Dict +from src.consensus.constants import ConsensusConstants from src.consensus.default_constants import DEFAULT_CONSTANTS from src.timelord import Timelord from src.server.outbound_message import NodeType @@ -13,20 +14,20 @@ from src.util.default_root import DEFAULT_ROOT_PATH from src.server.start_service import run_service # See: https://bugs.python.org/issue29288 -u"".encode("idna") +"".encode("idna") SERVICE_NAME = "timelord" def service_kwargs_for_timelord( - root_path: pathlib.Path, config: Dict, discriminant_size_bits: int + root_path: pathlib.Path, + config: Dict, + constants: ConsensusConstants, ) -> Dict: - connect_peers = [ - PeerInfo(config["full_node_peer"]["host"], config["full_node_peer"]["port"]) - ] + connect_peers = [PeerInfo(config["full_node_peer"]["host"], config["full_node_peer"]["port"])] - node = Timelord(config, discriminant_size_bits) + node = Timelord(config, constants) peer_api = TimelordAPI(node) kwargs = dict( @@ -45,9 +46,7 @@ def service_kwargs_for_timelord( def main(): config = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", SERVICE_NAME) - kwargs = service_kwargs_for_timelord( - DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS.DISCRIMINANT_SIZE_BITS - ) + kwargs = service_kwargs_for_timelord(DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS) return run_service(**kwargs) diff --git a/src/timelord.py b/src/timelord.py index de1c61481bc5..1bde7862e5da 100644 --- a/src/timelord.py +++ b/src/timelord.py @@ -2,6 +2,7 @@ import asyncio import io import logging import time +from asyncio import StreamReader, StreamWriter from enum import Enum from typing import Dict, List, Optional, Tuple, Union @@ -25,7 +26,6 @@ from src.types.reward_chain_sub_block import ( from src.types.sized_bytes import bytes32 from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs from src.types.vdf import VDFInfo, VDFProof -from src.util.api_decorators import api_request from src.util.ints import uint64, uint8, uint128, int512 from src.types.sub_epoch_summary import SubEpochSummary from chiapos import Verifier @@ -272,33 +272,7 @@ class Timelord: def set_server(self, server: ChiaServer): self.server = server - @api_request - async def new_peak(self, new_peak: timelord_protocol.NewPeak): - async with self.lock: - if self.last_state is None or self.last_state.get_weight() < new_peak.weight: - self.new_peak = new_peak - - @api_request - async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock): - async with self.lock: - if not self._accept_unfinished_block(new_unfinished_subblock): - return - sp_iters, ip_iters = iters_from_sub_block( - self.constants, - new_unfinished_subblock.reward_chain_sub_block, - self.last_state.get_sub_slot_iters(), - self.last_state.get_difficulty(), - ) - last_ip_iters = self.last_state.get_last_ip() - if sp_iters < ip_iters: - self.overflow_blocks.append(new_unfinished_subblock) - elif ip_iters > last_ip_iters: - self.unfinished_blocks.append(new_unfinished_subblock) - for chain in Chain: - self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters)) - self.iteration_to_proof_type[ip_iters - last_ip_iters] = IterationType.INFUSION_POINT - - def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool: + def accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool: # Total unfinished block iters needs to exceed peak's iters. if self.last_state.get_total_iters() >= block.total_iters: return False @@ -355,7 +329,7 @@ class Timelord: self.iters_submitted[chain] = [] self.iteration_to_proof_type = {} for block in self.unfinished_blocks: - if not self._accept_unfinished_block(block): + if not self.accept_unfinished_block(block): continue block_sp_iters, block_ip_iters = iters_from_sub_block( self.constants, @@ -664,7 +638,15 @@ class Timelord: # Check for end of subslot, respawn chains and build EndOfSubslotBundle. await self._check_for_end_of_subslot() - async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer): + async def _do_process_communication( + self, + chain: Chain, + challenge_hash: bytes32, + initial_form: ClassgroupElement, + ip: str, + reader: StreamReader, + writer: StreamWriter, + ): disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS) # Depending on the flags 'fast_algorithm' and 'sanitizer_mode', # the timelord tells the vdf_client what to execute. @@ -714,8 +696,7 @@ class Timelord: try: msg = data.decode() except Exception as e: - # log.error(f"Exception while decoding data {e}") - pass + log.error(f"Exception while decoding data {e}") if msg == "STOP": log.info(f"Stopped client running on ip {ip}.") async with self.lock: @@ -767,8 +748,7 @@ class Timelord: proof_bytes, ) - if not vdf_proof.is_valid(self.constants, vdf_info): + if not vdf_proof.is_valid(self.constants, initial_form, vdf_info): log.error("Invalid proof of time!") - # continue async with self.lock: self.proofs_finished.append((chain, vdf_info, vdf_proof)) diff --git a/src/timelord_api.py b/src/timelord_api.py index bc6ecee4e5ce..bd0f8d76791c 100644 --- a/src/timelord_api.py +++ b/src/timelord_api.py @@ -1,8 +1,9 @@ from typing import Callable from src.protocols import timelord_protocol -from src.timelord import Timelord - +from src.timelord import Timelord, iters_from_sub_block, Chain, IterationType +from src.util.api_decorators import api_request +from src.util.ints import uint64 class TimelordAPI: @@ -14,79 +15,28 @@ class TimelordAPI: def _set_state_changed_callback(self, callback: Callable): pass - # @property - # def log(self): - # return self.timelord.log - # - # @api_request - # async def challenge_start(self, challenge_start: timelord_protocol.ChallengeStart): - # """ - # The full node notifies the timelord node that a new challenge is active, and work - # should be started on it. We add the challenge into the queue if it's worth it to have. - # """ - # async with self.timelord.lock: - # if not self.timelord.sanitizer_mode: - # if challenge_start.challenge in self.timelord.seen_discriminants: - # self.log.info(f"Have already seen this challenge hash {challenge_start.challenge}. Ignoring.") - # return - # if challenge_start.weight <= self.timelord.best_weight_three_proofs: - # self.log.info("Not starting challenge, already three proofs at that weight") - # return - # self.timelord.seen_discriminants.append(challenge_start.challenge) - # self.timelord.discriminant_queue.append((challenge_start.challenge, challenge_start.weight)) - # self.log.info("Appended to discriminant queue.") - # else: - # disc_dict = dict(self.timelord.discriminant_queue) - # if challenge_start.challenge in disc_dict: - # self.timelord.log.info("Challenge already in discriminant queue. Ignoring.") - # return - # if challenge_start.challenge in self.timelord.active_discriminants: - # self.timelord.log.info("Challenge currently running. Ignoring.") - # return - # - # self.timelord.discriminant_queue.append((challenge_start.challenge, challenge_start.weight)) - # if challenge_start.weight not in self.timelord.max_known_weights: - # self.timelord.max_known_weights.append(challenge_start.weight) - # self.timelord.max_known_weights.sort() - # if len(self.timelord.max_known_weights) > 5: - # self.timelord.max_known_weights = self.timelord.max_known_weights[-5:] - # - # @api_request - # async def proof_of_space_info(self, proof_of_space_info: timelord_protocol.ProofOfSpaceInfo): - # """ - # Notification from full node about a new proof of space for a challenge. If we already - # have a process for this challenge, we should communicate to the process to tell it how - # many iterations to run for. - # """ - # async with self.timelord.lock: - # if not self.timelord.sanitizer_mode: - # self.timelord.log.info(f"proof_of_space_info {proof_of_space_info.challenge} {proof_of_space_info.iterations_needed}") - # if proof_of_space_info.challenge in self.timelord.done_discriminants: - # self.timelord.log.info(f"proof_of_space_info {proof_of_space_info.challenge} already done, returning") - # return - # else: - # disc_dict = dict(self.timelord.discriminant_queue) - # if proof_of_space_info.challenge in disc_dict: - # challenge_weight = disc_dict[proof_of_space_info.challenge] - # if challenge_weight >= min(self.timelord.max_known_weights): - # self.log.info("Not storing iter, waiting for more block confirmations.") - # return - # else: - # self.log.info("Not storing iter, challenge inactive.") - # return - # - # if proof_of_space_info.challenge not in self.timelord.pending_iters: - # self.timelord.pending_iters[proof_of_space_info.challenge] = [] - # if proof_of_space_info.challenge not in self.timelord.submitted_iters: - # self.timelord.submitted_iters[proof_of_space_info.challenge] = [] - # - # if ( - # proof_of_space_info.iterations_needed not in self.timelord.pending_iters[proof_of_space_info.challenge] - # and proof_of_space_info.iterations_needed not in self.timelord.submitted_iters[proof_of_space_info.challenge] - # ): - # self.timelord.log.info( - # f"proof_of_space_info {proof_of_space_info.challenge} adding " - # f"{proof_of_space_info.iterations_needed} to " - # f"{self.timelord.pending_iters[proof_of_space_info.challenge]}" - # ) - # self.timelord.pending_iters[proof_of_space_info.challenge].append(proof_of_space_info.iterations_needed) + @api_request + async def new_peak(self, new_peak: timelord_protocol.NewPeak): + async with self.timelord.lock: + if self.timelord.last_state is None or self.timelord.last_state.get_weight() < new_peak.weight: + self.timelord.new_peak = new_peak + + @api_request + async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock): + async with self.timelord.lock: + if not self.timelord.accept_unfinished_block(new_unfinished_subblock): + return + sp_iters, ip_iters = iters_from_sub_block( + self.timelord.constants, + new_unfinished_subblock.reward_chain_sub_block, + self.timelord.last_state.get_sub_slot_iters(), + self.timelord.last_state.get_difficulty(), + ) + last_ip_iters = self.timelord.last_state.get_last_ip() + if sp_iters < ip_iters: + self.timelord.overflow_blocks.append(new_unfinished_subblock) + elif ip_iters > last_ip_iters: + self.timelord.unfinished_blocks.append(new_unfinished_subblock) + for chain in Chain: + self.timelord.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters)) + self.timelord.iteration_to_proof_type[ip_iters - last_ip_iters] = IterationType.INFUSION_POINT diff --git a/tests/setup_nodes.py b/tests/setup_nodes.py index db9a1e38ff1a..281e34cc769c 100644 --- a/tests/setup_nodes.py +++ b/tests/setup_nodes.py @@ -244,7 +244,7 @@ async def setup_timelord(port, full_node_port, sanitizer, consensus_constants: C if sanitizer: config["vdf_server"]["port"] = 7999 - kwargs = service_kwargs_for_timelord(bt.root_path, config, consensus_constants.DISCRIMINANT_SIZE_BITS) + kwargs = service_kwargs_for_timelord(bt.root_path, config, consensus_constants) kwargs.update( parse_cli_args=False, connect_to_daemon=False,