From bbb3d285592b3d6858892d17459b934222aa2c78 Mon Sep 17 00:00:00 2001 From: Mariano Sorgente Date: Wed, 2 Dec 2020 11:53:35 +0900 Subject: [PATCH] Timelord changes and harvester api --- src/harvester.py | 199 +-------- src/harvester_api.py | 17 +- src/rpc/harvester_rpc_api.py | 16 +- src/server/server.py | 84 ++-- src/timelord.py | 767 +++++++++++++++++++++++++++++++++- src/timelord_new.py | 775 ----------------------------------- src/timelord_old.py | 465 --------------------- 7 files changed, 819 insertions(+), 1504 deletions(-) delete mode 100644 src/timelord_new.py delete mode 100644 src/timelord_old.py diff --git a/src/harvester.py b/src/harvester.py index 587571c7ba6d..66f92be30b73 100644 --- a/src/harvester.py +++ b/src/harvester.py @@ -3,27 +3,17 @@ import asyncio from concurrent.futures.thread import ThreadPoolExecutor from pathlib import Path from typing import Dict, Optional, Tuple, List, Callable, Set -import time import concurrent -import dataclasses from blspy import G1Element -from chiapos import DiskProver from src.consensus.constants import ConsensusConstants -from src.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters -from src.protocols import harvester_protocol -from src.server.outbound_message import Message -from src.types.proof_of_space import ProofOfSpace -from src.types.sized_bytes import bytes32 -from src.util.api_decorators import api_request -from src.util.ints import uint8, uint64 from src.plotting.plot_tools import ( load_plots, PlotInfo, - remove_plot_directory, - add_plot_directory, - get_plot_directories, + remove_plot_directory as remove_plot_directory_pt, + add_plot_directory as add_plot_directory_pt, + get_plot_directories as get_plot_directories_pt, ) log = logging.getLogger(__name__) @@ -78,7 +68,7 @@ class Harvester: if self.state_changed_callback is not None: self.state_changed_callback(change) - def _get_plots(self) -> Tuple[List[Dict], List[str], List[str]]: + def get_plots(self) -> Tuple[List[Dict], List[str], List[str]]: response_plots: List[Dict] = [] for path, plot_info in self.provers.items(): prover = plot_info.prover @@ -102,7 +92,7 @@ class Harvester: [str(s) for s in self.no_key_filenames], ) - async def _refresh_plots(self): + async def refresh_plots(self): locked: bool = self._refresh_lock.locked() changed: bool = False async with self._refresh_lock: @@ -119,7 +109,7 @@ class Harvester: if changed: self._state_changed("plots") - def _delete_plot(self, str_path: str): + def delete_plot(self, str_path: str): path = Path(str_path).resolve() if path in self.provers: del self.provers[path] @@ -131,180 +121,17 @@ class Harvester: self._state_changed("plots") return True - async def _add_plot_directory(self, str_path: str) -> bool: - add_plot_directory(str_path, self.root_path) - await self._refresh_plots() + async def add_plot_directory(self, str_path: str) -> bool: + add_plot_directory_pt(str_path, self.root_path) + await self.refresh_plots() return True - async def _get_plot_directories(self) -> List[str]: - return get_plot_directories(self.root_path) + async def get_plot_directories(self) -> List[str]: + return get_plot_directories_pt(self.root_path) - async def _remove_plot_directory(self, str_path: str) -> bool: - remove_plot_directory(str_path, self.root_path) + async def remove_plot_directory(self, str_path: str) -> bool: + remove_plot_directory_pt(str_path, self.root_path) return True def set_server(self, server): self.server = server - - @api_request - async def harvester_handshake(self, harvester_handshake: harvester_protocol.HarvesterHandshake): - """ - Handshake between the harvester and farmer. The harvester receives the pool public keys, - as well as the farmer pks, which must be put into the plots, before the plotting process begins. - We cannot use any plots which have different keys in them. - """ - self.farmer_public_keys = harvester_handshake.farmer_public_keys - self.pool_public_keys = harvester_handshake.pool_public_keys - - await self._refresh_plots() - - if len(self.provers) == 0: - log.warning("Not farming any plots on this harvester. Check your configuration.") - return - - self._state_changed("plots") - - @api_request - async def new_signage_point(self, new_challenge: harvester_protocol.NewSignagePoint): - """ - The harvester receives a new signage point from the farmer, this happens at the start of each slot. - The harvester does a few things: - 1. The harvester applies the plot filter for each of the plots, to select the proportion which are eligible - for this signage point and challenge. - 2. The harvester gets the qualities for each plot. This is approximately 7 reads per plot which qualifies. - Note that each plot may have 0, 1, 2, etc qualities for that challenge: but on average it will have 1. - 3. Checks the required_iters for each quality and the given signage point, to see which are eligible for - inclusion (required_iters < sp_interval_iters). - 4. Looks up the full proof of space in the plot for each quality, approximately 64 reads per quality - 5. Returns the proof of space to the farmer - """ - - if len(self.pool_public_keys) == 0 or len(self.farmer_public_keys) == 0: - # This means that we have not received the handshake yet - return - - start = time.time() - assert len(new_challenge.challenge_hash) == 32 - - # Refresh plots to see if there are any new ones - await self._refresh_plots() - - loop = asyncio.get_running_loop() - - def blocking_lookup(filename: Path, plot_info: PlotInfo, prover: DiskProver) -> List[ProofOfSpace]: - # Uses the DiskProver object to lookup qualities. This is a blocking call, - # so it should be run in a thread pool. - try: - sp_challenge_hash = ProofOfSpace.calculate_pos_challenge( - plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash - ) - quality_strings = prover.get_qualities_for_challenge(sp_challenge_hash) - except Exception: - log.error("Error using prover object. Reinitializing prover object.") - self.provers[filename] = dataclasses.replace(plot_info, prover=DiskProver(str(filename))) - return [] - - responses: List[ProofOfSpace] = [] - if quality_strings is not None: - # Found proofs of space (on average 1 is expected per plot) - for index, quality_str in enumerate(quality_strings): - required_iters: uint64 = calculate_iterations_quality( - quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash - ) - sp_interval_iters = calculate_sp_interval_iters(self.constants, new_challenge.sub_slot_iters) - if required_iters < sp_interval_iters: - # Found a very good proof of space! will fetch the whole proof from disk, then send to farmer - try: - proof_xs = prover.get_full_proof(sp_challenge_hash, index) - except RuntimeError: - log.error(f"Exception fetching full proof for {filename}") - continue - - plot_public_key = ProofOfSpace.generate_plot_public_key( - plot_info.local_sk.get_g1(), plot_info.farmer_public_key - ) - responses.append( - ProofOfSpace( - sp_challenge_hash, - plot_info.pool_public_key, - None, - plot_public_key, - uint8(prover.get_size()), - proof_xs, - ) - ) - return responses - - async def lookup_challenge(filename: Path, prover: DiskProver) -> List[harvester_protocol.NewProofOfSpace]: - # Executes a DiskProverLookup in a thread pool, and returns responses - all_responses: List[harvester_protocol.NewProofOfSpace] = [] - proofs_of_space: List[ProofOfSpace] = await loop.run_in_executor( - self.executor, blocking_lookup, filename, prover - ) - for proof_of_space in proofs_of_space: - all_responses.append( - harvester_protocol.NewProofOfSpace( - new_challenge.challenge_hash, prover.get_id(), proof_of_space, new_challenge.signage_point_index - ) - ) - return all_responses - - awaitables = [] - for try_plot_filename, try_plot_info in self.provers.items(): - if try_plot_filename.exists(): - # Passes the plot filter (does not check sp filter yet though, since we have not reached sp) - # This is being executed at the beginning of the slot - if ProofOfSpace.passes_plot_filter( - self.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash - ): - awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover)) - - # Concurrently executes all lookups on disk, to take advantage of multiple disk parallelism - total_proofs_found = 0 - for sublist_awaitable in asyncio.as_completed(awaitables): - for response in await sublist_awaitable: - total_proofs_found += 1 - msg = Message("challenge_response", response) - yield msg - log.info( - f"{len(awaitables)} plots were eligible for farming {new_challenge.challenge_hash.hex()[:10]}..." - f" Found {total_proofs_found} proofs. Time: {time.time() - start}. " - f"Total {len(self.provers)} plots" - ) - - @api_request - async def request_signatures(self, request: harvester_protocol.RequestSignatures): - """ - The farmer requests a signature on the header hash, for one of the proofs that we found. - A signature is created on the header hash using the harvester private key. This can also - be used for pooling. - """ - try: - plot_info = self.provers[Path(request.plot_identifier).resolve()] - except KeyError: - log.warning(f"KeyError plot {request.plot_identifier} does not exist.") - return - - local_sk = plot_info.local_sk - agg_pk = ProofOfSpace.generate_plot_public_key(local_sk.get_g1(), plot_info.farmer_public_key) - - # This is only a partial signature. When combined with the farmer's half, it will - # form a complete PrependSignature. - message_signatures: List[Tuple[bytes32, G2Element]] = [] - for message in request.messages: - signature: G2Element = AugSchemeMPL.sign(local_sk, message, agg_pk) - message_signatures.append((message, signature)) - - response: harvester_protocol.RespondSignatures = harvester_protocol.RespondSignatures( - request.plot_identifier, - request.sp_hash, - local_sk.get_g1(), - plot_info.farmer_public_key, - message_signatures, - ) - - yield OutboundMessage( - NodeType.FARMER, - Message("respond_signatures", response), - Delivery.RESPOND, - ) diff --git a/src/harvester_api.py b/src/harvester_api.py index 17796e4e45c7..e7f22529ec27 100644 --- a/src/harvester_api.py +++ b/src/harvester_api.py @@ -4,7 +4,7 @@ import time from pathlib import Path from typing import Optional, Callable, List, Tuple -from blspy import AugSchemeMPL, G2Element +from blspy import AugSchemeMPL, G2Element, G1Element from chiapos import DiskProver from src.consensus.pot_iterations import calculate_sp_interval_iters, calculate_iterations_quality @@ -24,6 +24,8 @@ class HarvesterAPI: def __init__(self, harvester): self.harvester = harvester + self.farmer_public_keys: List[G1Element] = [] + self.pool_public_keys: List[G1Element] = [] def _set_state_changed_callback(self, callback: Callable): self.harvester.state_changed_callback = callback @@ -38,7 +40,7 @@ class HarvesterAPI: self.farmer_public_keys = harvester_handshake.farmer_public_keys self.pool_public_keys = harvester_handshake.pool_public_keys - await self.harvester._refresh_plots() + await self.harvester.refresh_plots() if len(self.harvester.provers) == 0: self.harvester.log.warning("Not farming any plots on this harvester. Check your configuration.") @@ -69,7 +71,7 @@ class HarvesterAPI: assert len(new_challenge.challenge_hash) == 32 # Refresh plots to see if there are any new ones - await self.harvester._refresh_plots() + await self.harvester.refresh_plots() loop = asyncio.get_running_loop() @@ -93,7 +95,9 @@ class HarvesterAPI: required_iters: uint64 = calculate_iterations_quality( quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash ) - sp_interval_iters = calculate_sp_interval_iters(self.harvester.constants, new_challenge.sub_slot_iters) + sp_interval_iters = calculate_sp_interval_iters( + self.harvester.constants, new_challenge.sub_slot_iters + ) if required_iters < sp_interval_iters: # Found a very good proof of space! will fetch the whole proof from disk, then send to farmer try: @@ -137,7 +141,10 @@ class HarvesterAPI: # Passes the plot filter (does not check sp filter yet though, since we have not reached sp) # This is being executed at the beginning of the slot if ProofOfSpace.passes_plot_filter( - self.harvester.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash + self.harvester.constants, + try_plot_info.prover.get_id(), + new_challenge.challenge_hash, + new_challenge.sp_hash, ): awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover)) diff --git a/src/rpc/harvester_rpc_api.py b/src/rpc/harvester_rpc_api.py index 1ec748b5b19d..f7ebc0c4153f 100644 --- a/src/rpc/harvester_rpc_api.py +++ b/src/rpc/harvester_rpc_api.py @@ -22,14 +22,12 @@ class HarvesterRpcApi: async def _state_changed(self, change: str) -> List[Dict]: if change == "plots": data = await self.get_plots({}) - payload = create_payload( - "get_plots", data, self.service_name, "wallet_ui", string=False - ) + payload = create_payload("get_plots", data, self.service_name, "wallet_ui", string=False) return [payload] return [] async def get_plots(self, request: Dict) -> Dict: - plots, failed_to_open, not_found = self.service._get_plots() + plots, failed_to_open, not_found = self.service.get_plots() return { "plots": plots, "failed_to_open_filenames": failed_to_open, @@ -37,27 +35,27 @@ class HarvesterRpcApi: } async def refresh_plots(self, request: Dict) -> Dict: - await self.service._refresh_plots() + await self.service.refresh_plots() return {} async def delete_plot(self, request: Dict) -> Dict: filename = request["filename"] - if self.service._delete_plot(filename): + if self.service.delete_plot(filename): return {} raise ValueError(f"Not able to delete file {filename}") async def add_plot_directory(self, request: Dict) -> Dict: dirname = request["dirname"] - if await self.service._add_plot_directory(dirname): + if await self.service.add_plot_directory(dirname): return {} raise ValueError(f"Did not add plot directory {dirname}") async def get_plot_directories(self, request: Dict) -> Dict: - plot_dirs = await self.service._get_plot_directories() + plot_dirs = await self.service.get_plot_directories() return {"directories": plot_dirs} async def remove_plot_directory(self, request: Dict) -> Dict: dirname = request["dirname"] - if await self.service._remove_plot_directory(dirname): + if await self.service.remove_plot_directory(dirname): return {} raise ValueError(f"Did not remove plot directory {dirname}") diff --git a/src/server/server.py b/src/server/server.py index 142ada7d6c0f..b7782c85ba97 100644 --- a/src/server/server.py +++ b/src/server/server.py @@ -29,21 +29,15 @@ def ssl_context_for_server( private_cert_path: Path, private_key_path: Path, require_cert: bool = False ) -> Optional[ssl.SSLContext]: ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.CLIENT_AUTH) - ssl_context.load_cert_chain( - certfile=str(private_cert_path), keyfile=str(private_key_path) - ) + ssl_context.load_cert_chain(certfile=str(private_cert_path), keyfile=str(private_key_path)) ssl_context.load_verify_locations(str(private_cert_path)) ssl_context.verify_mode = ssl.CERT_REQUIRED if require_cert else ssl.CERT_NONE return ssl_context -def ssl_context_for_client( - private_cert_path: Path, private_key_path: Path, auth: bool -) -> Optional[ssl.SSLContext]: +def ssl_context_for_client(private_cert_path: Path, private_key_path: Path, auth: bool) -> Optional[ssl.SSLContext]: ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.SERVER_AUTH) - ssl_context.load_cert_chain( - certfile=str(private_cert_path), keyfile=str(private_key_path) - ) + ssl_context.load_cert_chain(certfile=str(private_cert_path), keyfile=str(private_key_path)) if auth: ssl_context.verify_mode = ssl.CERT_REQUIRED ssl_context.load_verify_locations(str(private_cert_path)) @@ -92,9 +86,7 @@ class ChiaServer: self.root_path = root_path self.config = config self.on_connect: Optional[Callable] = None - self.incoming_messages: Queue[ - Tuple[Payload, WSChiaConnection] - ] = asyncio.Queue() + self.incoming_messages: Queue[Tuple[Payload, WSChiaConnection]] = asyncio.Queue() self.shut_down_event = asyncio.Event() if self._local_type is NodeType.INTRODUCER: @@ -123,9 +115,7 @@ class ChiaServer: self.runner = web.AppRunner(self.app, access_log=None) await self.runner.setup() require_cert = self._local_type not in (NodeType.FULL_NODE, NodeType.INTRODUCER) - ssl_context = ssl_context_for_server( - self._private_cert_path, self._private_key_path, require_cert - ) + ssl_context = ssl_context_for_server(self._private_cert_path, self._private_key_path, require_cert) if self._local_type not in [NodeType.WALLET, NodeType.HARVESTER]: self.site = web.TCPSite( self.runner, @@ -164,10 +154,7 @@ class ChiaServer: assert handshake is True await self.connection_added(connection, self.on_connect) - if ( - self._local_type is NodeType.INTRODUCER - and connection.connection_type is NodeType.FULL_NODE - ): + if self._local_type is NodeType.INTRODUCER and connection.connection_type is NodeType.FULL_NODE: self.introducer_peers.add(connection.get_peer_info()) except Exception as e: error_stack = traceback.format_exc() @@ -196,18 +183,14 @@ class ChiaServer: Tries to connect to the target node, adding one connection into the pipeline, if successful. An on connect method can also be specified, and this will be saved into the instance variables. """ - ssl_context = ssl_context_for_client( - self._private_cert_path, self._private_key_path, auth - ) + ssl_context = ssl_context_for_client(self._private_cert_path, self._private_key_path, auth) session = None try: timeout = aiohttp.ClientTimeout(total=10) session = aiohttp.ClientSession(timeout=timeout) url = f"wss://{target_node.host}:{target_node.port}/ws" self.log.info(f"Connecting: {url}") - ws = await session.ws_connect( - url, autoclose=False, autoping=True, ssl=ssl_context - ) + ws = await session.ws_connect(url, autoclose=False, autoping=True, ssl=ssl_context) if ws is not None: connection = WSChiaConnection( self._local_type, @@ -232,12 +215,14 @@ class ChiaServer: await self.connection_added(connection, on_connect) self.log.info("Connected") return True + except aiohttp.client_exceptions.ClientConnectorError as e: + self.log.warning(f"{e}") except Exception as e: error_stack = traceback.format_exc() self.log.error(f"Exception: {e}") - if session is not None: - await session.close() self.log.error(f"Exception Stack: {error_stack}") + if session is not None: + await session.close() return False @@ -258,40 +243,24 @@ class ChiaServer: async def api_call(payload: Payload, connection: WSChiaConnection): try: full_message = payload.msg - connection.log.info( - f"<- {full_message.function} from peer {connection.peer_node_id}" - ) - if len( - full_message.function - ) == 0 or full_message.function.startswith("_"): + connection.log.info(f"<- {full_message.function} from peer {connection.peer_node_id}") + if len(full_message.function) == 0 or full_message.function.startswith("_"): # This prevents remote calling of private methods that start with "_" - self.log.error( - f"Non existing function: {full_message.function}" - ) - raise ProtocolError( - Err.INVALID_PROTOCOL_MESSAGE, [full_message.function] - ) + self.log.error(f"Non existing function: {full_message.function}") + raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]) f = getattr(self.api, full_message.function, None) if f is None: - self.log.error( - f"Non existing function: {full_message.function}" - ) - raise ProtocolError( - Err.INVALID_PROTOCOL_MESSAGE, [full_message.function] - ) + self.log.error(f"Non existing function: {full_message.function}") + raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]) if not hasattr(f, "api_function"): self.log.error("Peer trying to call non api function") - raise ProtocolError( - Err.INVALID_PROTOCOL_MESSAGE, [full_message.function] - ) + raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]) if hasattr(f, "peer_required"): - response: Optional[Message] = await f( - full_message.data, connection - ) + response: Optional[Message] = await f(full_message.data, connection) else: response = await f(full_message.data) @@ -302,9 +271,7 @@ class ChiaServer: except Exception as e: tb = traceback.format_exc() - connection.log.error( - f"Exception: {e}, closing connection {connection}. {tb}" - ) + connection.log.error(f"Exception: {e}, closing connection {connection}. {tb}") await connection.close() asyncio.create_task(api_call(payload_inc, connection_inc)) @@ -328,14 +295,9 @@ class ChiaServer: for message in messages: await connection.send_message(message) - async def send_to_all_except( - self, messages: List[Message], node_type: NodeType, exclude: bytes32 - ): + async def send_to_all_except(self, messages: List[Message], node_type: NodeType, exclude: bytes32): for _, connection in self.all_connections.items(): - if ( - connection.connection_type is node_type - and connection.peer_node_id != exclude - ): + if connection.connection_type is node_type and connection.peer_node_id != exclude: for message in messages: await connection.send_message(message) diff --git a/src/timelord.py b/src/timelord.py index 1a22e478bb05..de1c61481bc5 100644 --- a/src/timelord.py +++ b/src/timelord.py @@ -2,12 +2,773 @@ import asyncio import io import logging import time -import socket -from typing import Dict, List, Optional, Tuple +from enum import Enum +from typing import Dict, List, Optional, Tuple, Union + +from chiavdf import create_discriminant + +from src.consensus.constants import ConsensusConstants +from src.consensus.pot_iterations import ( + calculate_iterations_quality, + calculate_sp_iters, + calculate_ip_iters, +) +from src.protocols import timelord_protocol +from src.server.outbound_message import NodeType, Message +from src.server.server import ChiaServer +from src.types.classgroup import ClassgroupElement +from src.types.end_of_slot_bundle import EndOfSubSlotBundle +from src.types.reward_chain_sub_block import ( + RewardChainSubBlock, + RewardChainSubBlockUnfinished, +) +from src.types.sized_bytes import bytes32 +from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs +from src.types.vdf import VDFInfo, VDFProof +from src.util.api_decorators import api_request +from src.util.ints import uint64, uint8, uint128, int512 +from src.types.sub_epoch_summary import SubEpochSummary +from chiapos import Verifier +from src.types.slots import ChallengeBlockInfo log = logging.getLogger(__name__) +def iters_from_sub_block( + constants, + reward_chain_sub_block: Union[RewardChainSubBlock, RewardChainSubBlockUnfinished], + sub_slot_iters: uint64, + difficulty: uint64, +) -> Tuple[uint64, uint64]: + v = Verifier() + pos = reward_chain_sub_block.proof_of_space + plot_id: bytes32 = pos.get_plot_id() + quality = v.validate_proof(plot_id, pos.size, pos.challenge_hash, bytes(pos.proof)) + + if reward_chain_sub_block.challenge_chain_sp_vdf is None: + assert reward_chain_sub_block.signage_point_index == 0 + cc_sp: bytes32 = reward_chain_sub_block.proof_of_space.challenge_hash + else: + cc_sp: bytes32 = reward_chain_sub_block.challenge_chain_sp_vdf.get_hash() + required_iters = calculate_iterations_quality( + quality, + reward_chain_sub_block.proof_of_space.size, + difficulty, + cc_sp, + ) + return ( + calculate_sp_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index), + calculate_ip_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index, required_iters), + ) + + +class EndOfSubSlotData: + eos_bundle: EndOfSubSlotBundle + sub_slot_iters: uint64 + new_difficulty: uint64 + deficit: uint8 + + def __init__(self, eos_bundle, sub_slot_iters, new_difficulty, deficit): + self.eos_bundle = eos_bundle + self.sub_slot_iters = sub_slot_iters + self.new_difficulty = new_difficulty + self.deficit = deficit + + +class Chain(Enum): + CHALLENGE_CHAIN = 1 + REWARD_CHAIN = 2 + INFUSED_CHALLENGE_CHAIN = 3 + + +class IterationType(Enum): + SIGNAGE_POINT = 1 + INFUSION_POINT = 2 + END_OF_SUBSLOT = 3 + + +class LastState: + def __init__(self, constants: ConsensusConstants): + self.peak: Optional[timelord_protocol.NewPeak] = None + self.subslot_end: Optional[EndOfSubSlotData] = None + self.last_ip: uint64 = uint64(0) + self.deficit: uint8 = uint8(0) + self.sub_epoch_summary: Optional[SubEpochSummary] = None + self.constants: ConsensusConstants = constants + self.last_weight: uint128 = uint128(0) + self.total_iters: uint128 = uint128(0) + self.last_peak_challenge: Optional[bytes32] = None + + def set_state(self, state): + if isinstance(state, timelord_protocol.NewPeak): + self.peak = state + self.subslot_end = None + _, self.last_ip = iters_from_sub_block( + self.constants, + state.reward_chain_sub_block, + state.sub_slot_iters, + state.difficulty, + ) + self.deficit = state.deficit + self.sub_epoch_summary = state.sub_epoch_summary + self.last_weight = state.reward_chain_sub_block.weight + self.total_iters = state.reward_chain_sub_block.total_iters + self.last_peak_challenge = state.reward_chain_sub_block.get_hash() + if isinstance(state, EndOfSubSlotData): + self.peak = None + self.subslot_end = state + self.last_ip = 0 + self.deficit = state.deficit + + def is_empty(self) -> bool: + return self.peak is None and self.subslot_end is None + + def get_sub_slot_iters(self) -> uint64: + if self.peak is not None: + return self.peak.sub_slot_iters + return self.subslot_end.sub_slot_iters + + def get_weight(self) -> uint128: + return self.last_weight + + def get_total_iters(self) -> uint128: + return self.total_iters + + def get_last_peak_challenge(self) -> Optional[bytes32]: + return self.last_peak_challenge + + def get_difficulty(self) -> uint64: + if self.peak is not None: + return self.peak.difficulty + return self.subslot_end.new_difficulty + + def get_last_ip(self) -> uint64: + return self.last_ip + + def get_deficit(self) -> uint8: + if self.peak is not None: + return self.peak.deficit + return self.subslot_end.deficit + + def get_sub_epoch_summary(self) -> Optional[SubEpochSummary]: + return self.sub_epoch_summary + + def get_challenge(self, chain: Chain) -> Optional[bytes32]: + if self.peak is not None: + sub_block = self.peak.reward_chain_sub_block + if chain == Chain.CHALLENGE_CHAIN: + return sub_block.challenge_chain_ip_vdf.challenge_hash + if chain == Chain.REWARD_CHAIN: + return sub_block.get_hash() + if chain == Chain.INFUSED_CHALLENGE_CHAIN: + if sub_block.infused_challenge_chain_ip_vdf is not None: + return sub_block.infused_challenge_chain_ip_vdf.challenge_hash + if self.peak.deficit == 4: + return ChallengeBlockInfo( + sub_block.proof_of_space, + sub_block.challenge_chain_sp_vdf, + sub_block.challenge_chain_sp_signature, + sub_block.challenge_chain_ip_vdf, + ).get_hash() + return None + if self.subslot_end is not None: + if chain == Chain.CHALLENGE_CHAIN: + return self.subslot_end.eos_bundle.challenge_chain.get_hash() + if chain == Chain.REWARD_CHAIN: + return self.subslot_end.eos_bundle.reward_chain.get_hash() + if chain == Chain.INFUSED_CHALLENGE_CHAIN: + if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK: + return self.subslot_end.eos_bundle.infused_challenge_chain.get_hash() + else: + return None + return None + + def get_initial_form(self, chain: Chain) -> Optional[ClassgroupElement]: + if self.peak is not None: + sub_block = self.peak.reward_chain_sub_block + if chain == Chain.CHALLENGE_CHAIN: + return sub_block.challenge_chain_ip_vdf.output + if chain == Chain.REWARD_CHAIN: + return ClassgroupElement.get_default_element() + if chain == Chain.INFUSED_CHALLENGE_CHAIN: + if sub_block.infused_challenge_chain_ip_vdf is not None: + return sub_block.infused_challenge_chain_ip_vdf.output + elif self.peak.deficit == 4: + return ClassgroupElement.get_default_element() + else: + return None + if self.subslot_end is not None: + if chain == Chain.CHALLENGE_CHAIN or chain == Chain.REWARD_CHAIN: + return ClassgroupElement.get_default_element() + if chain == Chain.INFUSED_CHALLENGE_CHAIN: + if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK: + return ClassgroupElement.get_default_element() + else: + return None + return None + + class Timelord: - def __init__(self, config: Dict, discriminant_size_bits: int): + def __init__(self, config: Dict, constants: ConsensusConstants): + self.config = config + self.constants = constants + self._shut_down = False + self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = [] + self.lock: asyncio.Lock = asyncio.Lock() + self.potential_free_clients: List = [] + self.ip_whitelist = self.config["vdf_clients"]["ip"] + self.server: Optional[ChiaServer] = None + self.chain_type_to_stream: Dict[Chain, Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = {} + self.chain_start_time: Dict = {} + # Chains that currently don't have a vdf_client. + self.unspawned_chains: List[Chain] = [ + Chain.CHALLENGE_CHAIN, + Chain.REWARD_CHAIN, + Chain.INFUSED_CHALLENGE_CHAIN, + ] + # Chains that currently accept iterations. + self.allows_iters: List[Chain] = [] + # Last peak received, None if it's already processed. + self.new_peak: Optional[timelord_protocol.NewPeak] = None + # Last end of subslot bundle, None if we built a peak on top of it. + self.new_subslot_end: Optional[EndOfSubSlotData] = None + # Last state received. Can either be a new peak or a new EndOfSubslotBundle. + self.last_state: LastState = LastState(self.constants) + # Unfinished block info, iters adjusted to the last peak. + self.unfinished_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = [] + # Signage points iters, adjusted to the last peak. + self.signage_point_iters: List[uint64] = [] + # For each chain, send those info when the process spawns. + self.iters_to_submit: Dict[Chain, List[uint64]] = {} + self.iters_submitted: Dict[Chain, List[uint64]] = {} + # For each iteration submitted, know if it's a signage point, an infusion point or an end of slot. + self.iteration_to_proof_type: Dict[uint64, IterationType] = {} + # List of proofs finished. + self.proofs_finished: List[Tuple[Chain, VDFInfo, VDFProof]] = [] + # Data to send at vdf_client initialization. + self.finished_sp = 0 + self.overflow_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = [] + self.main_loop = None + self.vdf_server = None + self._shut_down = False + + async def _start(self): + log.info("Starting timelord.") + self.main_loop = asyncio.create_task(self._manage_chains()) + + self.vdf_server = await asyncio.start_server( + self._handle_client, + self.config["vdf_server"]["host"], + self.config["vdf_server"]["port"], + ) + log.info("Started timelord.") + + def _close(self): + self._shut_down = True + + async def _await_closed(self): pass + + def set_server(self, server: ChiaServer): + self.server = server + + @api_request + async def new_peak(self, new_peak: timelord_protocol.NewPeak): + async with self.lock: + if self.last_state is None or self.last_state.get_weight() < new_peak.weight: + self.new_peak = new_peak + + @api_request + async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock): + async with self.lock: + if not self._accept_unfinished_block(new_unfinished_subblock): + return + sp_iters, ip_iters = iters_from_sub_block( + self.constants, + new_unfinished_subblock.reward_chain_sub_block, + self.last_state.get_sub_slot_iters(), + self.last_state.get_difficulty(), + ) + last_ip_iters = self.last_state.get_last_ip() + if sp_iters < ip_iters: + self.overflow_blocks.append(new_unfinished_subblock) + elif ip_iters > last_ip_iters: + self.unfinished_blocks.append(new_unfinished_subblock) + for chain in Chain: + self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters)) + self.iteration_to_proof_type[ip_iters - last_ip_iters] = IterationType.INFUSION_POINT + + def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool: + # Total unfinished block iters needs to exceed peak's iters. + if self.last_state.get_total_iters() >= block.total_iters: + return False + # The peak hash of the rc-sub-block must match + # the signage point rc VDF challenge hash of the unfinished sub-block. + if ( + block.reward_chain_sp_vdf is not None + and self.last_state.get_last_peak_challenge() is not None + and self.last_state.get_last_peak_challenge() != block.reward_chain_sp_vdf.challenge_hash + ): + return False + return True + + async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + async with self.lock: + client_ip = writer.get_extra_info("peername")[0] + log.info(f"New timelord connection from client: {client_ip}.") + if client_ip in self.ip_whitelist: + self.free_clients.append((client_ip, reader, writer)) + log.info(f"Added new VDF client {client_ip}.") + for ip, end_time in list(self.potential_free_clients): + if ip == client_ip: + self.potential_free_clients.remove((ip, end_time)) + break + + async def _stop_chain(self, chain: Chain): + stop_ip, _, stop_writer = self.chain_type_to_stream[chain] + self.potential_free_clients.append((stop_ip, time.time())) + stop_writer.write(b"010") + await stop_writer.drain() + if chain in self.allows_iters: + self.allows_iters.remove(chain) + self.unspawned_chains.append(chain) + + async def _reset_chains(self): + # First, stop all chains. + ip_iters = self.last_state.get_last_ip() + sub_slot_iters = self.last_state.get_sub_slot_iters() + difficulty = self.last_state.get_difficulty() + for chain in self.chain_type_to_stream.keys(): + await self._stop_chain(chain) + # Adjust all signage points iterations to the peak. + iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT) + self.signage_point_iters = [ + k * iters_per_signage - ip_iters + for k in range(1, self.constants.NUM_SPS_SUB_SLOT + 1) + if k * iters_per_signage - ip_iters > 0 and k * iters_per_signage < sub_slot_iters + ] + # Adjust all unfinished blocks iterations to the peak. + new_unfinished_blocks = [] + self.proofs_finished = [] + for chain in Chain: + self.iters_to_submit[chain] = [] + self.iters_submitted[chain] = [] + self.iteration_to_proof_type = {} + for block in self.unfinished_blocks: + if not self._accept_unfinished_block(block): + continue + block_sp_iters, block_ip_iters = iters_from_sub_block( + self.constants, + block, + sub_slot_iters, + difficulty, + ) + new_block_iters = block_ip_iters - ip_iters + if new_block_iters > 0: + new_unfinished_blocks.append(block) + for chain in Chain: + self.iters_to_submit[chain].append(new_block_iters) + self.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT + # Remove all unfinished blocks that have already passed. + self.unfinished_blocks = new_unfinished_blocks + # Signage points. + if len(self.signage_point_iters) > 0: + count_signage = 0 + for signage in self.signage_point_iters: + for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]: + self.iters_to_submit[chain].append(signage) + self.iteration_to_proof_type[signage] = IterationType.SIGNAGE_POINT + count_signage += 1 + if count_signage == 3: + break + # TODO: handle the special case when infusion point is the end of subslot. + left_subslot_iters = sub_slot_iters - ip_iters + log.info(f"Left subslot iters: {left_subslot_iters}.") + for chain in Chain: + self.iters_to_submit[chain].append(left_subslot_iters) + self.iteration_to_proof_type[left_subslot_iters] = IterationType.END_OF_SUBSLOT + + async def _handle_new_peak(self): + self.last_state.set_state(self.new_peak) + self.new_peak = None + await self._reset_chains() + + async def _handle_subslot_end(self): + self.finished_sp = 0 + self.last_state.set_state(self.new_subslot_end) + self.new_subslot_end = None + await self._reset_chains() + + async def _map_chains_with_vdf_clients(self): + while not self._shut_down: + picked_chain = None + async with self.lock: + if len(self.free_clients) == 0: + break + ip, reader, writer = self.free_clients[0] + for chain_type in self.unspawned_chains: + challenge_hash = self.last_state.get_challenge(chain_type) + initial_form = self.last_state.get_initial_form(chain_type) + if challenge_hash is not None and initial_form is not None: + picked_chain = chain_type + break + if picked_chain is None: + break + picked_chain = self.unspawned_chains[0] + self.chain_type_to_stream[picked_chain] = (ip, reader, writer) + self.free_clients = self.free_clients[1:] + self.unspawned_chains = self.unspawned_chains[1:] + self.chain_start_time[picked_chain] = time.time() + + log.info(f"Mapping free vdf_client with chain: {picked_chain}.") + asyncio.create_task( + self._do_process_communication(picked_chain, challenge_hash, initial_form, ip, reader, writer) + ) + + async def _submit_iterations(self): + for chain in Chain: + if chain in self.allows_iters: + _, _, writer = self.chain_type_to_stream[chain] + for iteration in self.iters_to_submit[chain]: + if iteration in self.iters_submitted[chain]: + continue + prefix = str(len(str(iteration))) + if len(str(iteration)) < 10: + prefix = "0" + prefix + iter_str = prefix + str(iteration) + writer.write(iter_str.encode()) + await writer.drain() + self.iters_submitted[chain].append(iteration) + + def _clear_proof_list(self, iters: uint64): + return [ + (chain, info, proof) for chain, info, proof in self.proofs_finished if info.number_of_iterations != iters + ] + + async def _check_for_new_sp(self): + signage_iters = [ + iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.SIGNAGE_POINT + ] + if len(signage_iters) == 0: + return + for signage_iter in signage_iters: + proofs_with_iter = [ + (chain, info, proof) + for chain, info, proof in self.proofs_finished + if info.number_of_iterations == signage_iter + ] + # Wait for both cc and rc to have the signage point. + if len(proofs_with_iter) == 2: + cc_info: Optional[VDFInfo] = None + cc_proof: Optional[VDFProof] = None + rc_info: Optional[VDFInfo] = None + rc_proof: Optional[VDFProof] = None + for chain, info, proof in proofs_with_iter: + if chain == Chain.CHALLENGE_CHAIN: + cc_info = info + cc_proof = proof + if chain == Chain.REWARD_CHAIN: + rc_info = info + rc_proof = proof + if cc_info is None or cc_proof is None or rc_info is None or rc_proof is None: + log.error(f"Insufficient signage point data {signage_iter}") + continue + response = timelord_protocol.NewSignagePointVDF( + uint8(self.finished_sp), + cc_info, + cc_proof, + rc_info, + rc_proof, + ) + if self.server is not None: + msg = Message("new_signage_point_vdf", response) + await self.server.send_to_all([msg], NodeType.FULL_NODE) + # Cleanup the signage point from memory. + self.signage_point_iters.remove(signage_iter) + self.finished_sp += 1 + self.proofs_finished = self._clear_proof_list(signage_iter) + # Send the next 3 signage point to the chains. + next_iters_count = 0 + for next_sp in self.signage_point_iters: + for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]: + if next_sp not in self.iters_submitted[chain] and next_sp not in self.iters_to_submit[chain]: + self.iters_to_submit[chain].append(next_sp) + self.iteration_to_proof_type[next_sp] = IterationType.SIGNAGE_POINT + next_iters_count += 1 + if next_iters_count == 3: + break + + async def _check_for_new_ip(self): + infusion_iters = [ + iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.INFUSION_POINT + ] + for iteration in infusion_iters: + proofs_with_iter = [ + (chain, info, proof) + for chain, info, proof in self.proofs_finished + if info.number_of_iterations == iteration + ] + if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None: + chain_count = 3 + else: + chain_count = 2 + if len(proofs_with_iter) == chain_count: + block = None + for unfinished_block in self.unfinished_blocks: + _, ip_iters = iters_from_sub_block( + self.constants, + unfinished_block.reward_chain_sub_block, + self.last_state.get_sub_slot_iters(), + self.last_state.get_difficulty(), + ) + if ip_iters - self.last_state.get_last_ip() == iteration: + block = unfinished_block + break + if block is not None: + self.unfinished_blocks.remove(block) + challenge_hash = block.reward_chain_sub_block.get_hash() + icc_info: Optional[VDFInfo] = None + icc_proof: Optional[VDFProof] = None + cc_info: Optional[VDFInfo] = None + cc_proof: Optional[VDFProof] = None + rc_info: Optional[VDFInfo] = None + rc_proof: Optional[VDFProof] = None + for chain, info, proof in proofs_with_iter: + if chain == Chain.CHALLENGE_CHAIN: + cc_info = info + cc_proof = proof + if chain == Chain.REWARD_CHAIN: + rc_info = info + rc_proof = proof + if chain == Chain.INFUSED_CHALLENGE_CHAIN: + icc_info = info + icc_proof = proof + if cc_info is None or cc_proof is None or rc_info is None or rc_proof is None: + log.error( + f"Insufficient VDF proofs for infusion point ch: {challenge_hash} iterations:{iteration}" + ) + response = timelord_protocol.NewInfusionPointVDF( + challenge_hash, + cc_info, + cc_proof, + rc_info, + rc_proof, + icc_info, + icc_proof, + ) + msg = Message("new_infusion_point_vdf", response) + await self.server.send_to_all([msg], NodeType.FULL_NODE) + for iteration in infusion_iters: + self.proofs_finished = self._clear_proof_list(iteration) + + async def _check_for_end_of_subslot(self): + left_subslot_iters = [ + iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.END_OF_SUBSLOT + ] + if len(left_subslot_iters) == 0: + return + chains_finished = [ + (chain, info, proof) + for chain, info, proof in self.proofs_finished + if info.number_of_iterations == left_subslot_iters[0] + ] + if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None: + chain_count = 3 + else: + chain_count = 2 + if len(chains_finished) == chain_count: + icc_ip_vdf: Optional[VDFInfo] = None + icc_ip_proof: Optional[VDFProof] = None + cc_vdf: Optional[VDFInfo] = None + cc_proof: Optional[VDFProof] = None + rc_vdf: Optional[VDFInfo] = None + rc_proof: Optional[VDFProof] = None + for chain, info, proof in chains_finished: + if chain == Chain.CHALLENGE_CHAIN: + cc_vdf = info + cc_proof = proof + if chain == Chain.REWARD_CHAIN: + rc_vdf = info + rc_proof = proof + if chain == Chain.INFUSED_CHALLENGE_CHAIN: + icc_ip_vdf = info + icc_ip_proof = proof + assert cc_proof is not None and rc_proof is not None and cc_vdf is not None and rc_vdf is not None + log.info("Collected end of subslot vdfs.") + + icc_sub_slot: Optional[InfusedChallengeChainSubSlot] = ( + None if icc_ip_vdf is None else InfusedChallengeChainSubSlot(icc_ip_vdf) + ) + icc_sub_slot_hash = icc_sub_slot.get_hash() if self.last_state.get_deficit() == 0 else None + if self.last_state.get_sub_epoch_summary() is not None: + ses_hash = self.last_state.get_sub_epoch_summary().get_hash() + new_sub_slot_iters = self.last_state.get_sub_epoch_summary().new_sub_slot_iters + new_difficulty = self.last_state.get_sub_epoch_summary().new_difficulty + else: + ses_hash = None + new_sub_slot_iters = self.last_state.get_sub_slot_iters() + new_difficulty = self.last_state.get_difficulty() + cc_sub_slot = ChallengeChainSubSlot(cc_vdf, icc_sub_slot_hash, ses_hash, new_sub_slot_iters, new_difficulty) + eos_deficit: uint8 = ( + self.last_state.get_deficit() + if self.last_state.get_deficit() > 0 + else self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK + ) + rc_sub_slot = RewardChainSubSlot( + rc_vdf, + cc_sub_slot.get_hash(), + icc_sub_slot.get_hash() if icc_sub_slot is not None else None, + eos_deficit, + ) + eos_bundle = EndOfSubSlotBundle( + cc_sub_slot, + icc_sub_slot, + rc_sub_slot, + SubSlotProofs(cc_proof, icc_ip_proof, rc_proof), + ) + if self.server is not None: + msg = Message("end_of_sub_slot_bundle", timelord_protocol.NewEndOfSubSlotVDF(eos_bundle)) + await self.server.send_to_all([msg], NodeType.FULL_NODE) + log.info("Built end of subslot bundle.") + self.unfinished_blocks = self.overflow_blocks + self.overflow_blocks = [] + self.new_subslot_end = EndOfSubSlotData( + eos_bundle, + new_sub_slot_iters, + new_difficulty, + eos_deficit, + ) + + async def _manage_chains(self): + while not self._shut_down: + await asyncio.sleep(0.1) + # Didn't get any useful data, continue. + async with self.lock: + if self.last_state.is_empty() and self.new_peak is None: + continue + # Map free vdf_clients to unspawned chains. + await self._map_chains_with_vdf_clients() + async with self.lock: + # We've got a new peak, process it. + if self.new_peak is not None: + await self._handle_new_peak() + # A subslot ended, process it. + if self.new_subslot_end is not None: + await self._handle_subslot_end() + # Submit pending iterations. + await self._submit_iterations() + # Check for new signage point and broadcast it if present. + await self._check_for_new_sp() + # Check for new infusion point and broadcast it if present. + await self._check_for_new_ip() + # Check for end of subslot, respawn chains and build EndOfSubslotBundle. + await self._check_for_end_of_subslot() + + async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer): + disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS) + # Depending on the flags 'fast_algorithm' and 'sanitizer_mode', + # the timelord tells the vdf_client what to execute. + if self.config["fast_algorithm"]: + # Run n-wesolowski (fast) algorithm. + writer.write(b"N") + else: + # Run two-wesolowski (slow) algorithm. + writer.write(b"N") + await writer.drain() + + prefix = str(len(str(disc))) + if len(prefix) == 1: + prefix = "00" + prefix + if len(prefix) == 2: + prefix = "0" + prefix + writer.write((prefix + str(disc)).encode()) + await writer.drain() + + # Send (a, b) from 'initial_form'. + for num in [initial_form.a, initial_form.b]: + prefix = len(str(num)) + prefix_len = len(str(prefix)) + writer.write((str(prefix_len) + str(prefix) + str(num)).encode()) + await writer.drain() + try: + ok = await reader.readexactly(2) + except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e: + log.warning(f"{type(e)} {e}") + return + + if ok.decode() != "OK": + return + + log.info("Got handshake with VDF client.") + async with self.lock: + self.allows_iters.append(chain) + # Listen to the client until "STOP" is received. + while True: + try: + data = await reader.readexactly(4) + except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e: + log.warning(f"{type(e)} {e}") + break + + msg = "" + try: + msg = data.decode() + except Exception as e: + # log.error(f"Exception while decoding data {e}") + pass + if msg == "STOP": + log.info(f"Stopped client running on ip {ip}.") + async with self.lock: + writer.write(b"ACK") + await writer.drain() + break + else: + try: + # This must be a proof, 4 bytes is length prefix + length = int.from_bytes(data, "big") + proof = await reader.readexactly(length) + stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode())) + except ( + asyncio.IncompleteReadError, + ConnectionResetError, + Exception, + ) as e: + log.warning(f"{type(e)} {e}") + break + + iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True)) + + y_size_bytes = stdout_bytes_io.read(8) + y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True)) + + y_bytes = stdout_bytes_io.read(y_size) + witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True)) + proof_bytes: bytes = stdout_bytes_io.read() + + # Verifies our own proof just in case + a = int.from_bytes(y_bytes[:129], "big", signed=True) + b = int.from_bytes(y_bytes[129:], "big", signed=True) + output = ClassgroupElement(int512(a), int512(b)) + time_taken = time.time() - self.chain_start_time[chain] + ips = int(iterations_needed / time_taken * 10) / 10 + log.info( + f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}" + f" iters." + f"Estimated IPS: {ips}. Chain: {chain}" + ) + + vdf_info: VDFInfo = VDFInfo( + challenge_hash, + iterations_needed, + output, + ) + vdf_proof: VDFProof = VDFProof( + witness_type, + proof_bytes, + ) + + if not vdf_proof.is_valid(self.constants, vdf_info): + log.error("Invalid proof of time!") + # continue + async with self.lock: + self.proofs_finished.append((chain, vdf_info, vdf_proof)) diff --git a/src/timelord_new.py b/src/timelord_new.py deleted file mode 100644 index 8f96647a6b34..000000000000 --- a/src/timelord_new.py +++ /dev/null @@ -1,775 +0,0 @@ -import asyncio -import io -import logging -import time -from enum import Enum -from typing import Dict, List, Optional, Tuple, Union - -from chiavdf import create_discriminant - -from src.consensus.constants import ConsensusConstants -from src.consensus.default_constants import DEFAULT_CONSTANTS -from src.consensus.pot_iterations import ( - calculate_iterations_quality, - calculate_sp_iters, - calculate_ip_iters, -) -from blspy import G2Element -from src.protocols import timelord_protocol -from src.server.outbound_message import OutboundMessage, NodeType, Message, Delivery -from src.server.server import ChiaServer -from src.types.classgroup import ClassgroupElement -from src.types.end_of_slot_bundle import EndOfSubSlotBundle -from src.types.reward_chain_sub_block import ( - RewardChainSubBlock, - RewardChainSubBlockUnfinished, -) -from src.types.sized_bytes import bytes32 -from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs -from src.types.vdf import VDFInfo, VDFProof -from src.util.api_decorators import api_request -from src.util.ints import uint64, uint8, uint128, int512 -from src.types.sub_epoch_summary import SubEpochSummary -from src.util.block_tools import BlockTools -from chiapos import Verifier -from src.types.slots import ChallengeBlockInfo - -log = logging.getLogger(__name__) - - -def iters_from_sub_block( - constants, - reward_chain_sub_block: Union[RewardChainSubBlock, RewardChainSubBlockUnfinished], - sub_slot_iters: uint64, - difficulty: uint64, -) -> Tuple[uint64, uint64]: - v = Verifier() - pos = reward_chain_sub_block.proof_of_space - plot_id: bytes32 = pos.get_plot_id() - quality = v.validate_proof(plot_id, pos.size, pos.challenge_hash, bytes(pos.proof)) - - if reward_chain_sub_block.challenge_chain_sp_vdf is None: - assert reward_chain_sub_block.signage_point_index == 0 - cc_sp: bytes32 = reward_chain_sub_block.proof_of_space.challenge_hash - else: - cc_sp: bytes32 = reward_chain_sub_block.challenge_chain_sp_vdf.get_hash() - required_iters = calculate_iterations_quality( - quality, - reward_chain_sub_block.proof_of_space.size, - difficulty, - cc_sp, - ) - return ( - calculate_sp_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index), - calculate_ip_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index, required_iters), - ) - -class EndOfSubSlotData: - eos_bundle: EndOfSubSlotBundle - sub_slot_iters: uint64 - new_difficulty: uint64 - deficit: uint8 - - def __init__(self, eos_bundle, sub_slot_iters, new_difficulty, deficit): - self.eos_bundle = eos_bundle - self.sub_slot_iters = sub_slot_iters - self.new_difficulty = new_difficulty - self.deficit = deficit - -class Chain(Enum): - CHALLENGE_CHAIN = 1 - REWARD_CHAIN = 2 - INFUSED_CHALLENGE_CHAIN = 3 - -class IterationType(Enum): - SIGNAGE_POINT = 1 - INFUSION_POINT = 2 - END_OF_SUBSLOT = 3 - -class LastState: - def __init__(self, constants): - self.peak: Optional[timelord_protocol.NewPeak] = None - self.subslot_end: Optional[EndOfSubSlotData] = None - self.last_ip: uint64 = 0 - self.deficit: uint8 = 0 - self.sub_epoch_summary: Optional[SubEpochSummary] = None - self.constants = constants - self.last_weight = 0 - self.total_iters = 0 - self.last_peak_challenge: Optional[bytes32] = None - - def set_state(self, state): - if isinstance(state, timelord_protocol.NewPeak): - self.peak = state - self.subslot_end = None - _, self.last_ip = iters_from_sub_block( - self.constants, - state.reward_chain_sub_block, - state.sub_slot_iters, - state.difficulty, - ) - self.deficit = state.deficit - self.sub_epoch_summary = state.sub_epoch_summary - self.last_weight = state.reward_chain_sub_block.weight - self.total_iters = state.reward_chain_sub_block.total_iters - self.last_peak_challenge = state.reward_chain_sub_block.get_hash() - if isinstance(state, EndOfSubSlotData): - self.peak = None - self.subslot_end = state - self.last_ip = 0 - self.deficit = state.deficit - - def is_empty(self) -> bool: - return self.peak is None and self.subslot_end is None - - def get_sub_slot_iters(self) -> uint64: - if self.peak is not None: - return self.peak.sub_slot_iters - return self.subslot_end.sub_slot_iters - - def get_weight(self) -> uint64: - return self.last_weight - - def get_total_iters(self) -> uint128: - return self.total_iters - - def get_last_peak_challenge(self) -> Optional[bytes32]: - return self.last_peak_challenge - - def get_difficulty(self) -> uint64: - if self.peak is not None: - return self.peak.difficulty - return self.subslot_end.new_difficulty - - def get_last_ip(self) -> uint64: - return self.last_ip - - def get_deficit(self) -> uint8: - if self.peak is not None: - return self.peak.deficit - return self.subslot_end.deficit - - def get_sub_epoch_summary(self) -> Optional[SubEpochSummary]: - return self.sub_epoch_summary - - def get_challenge(self, chain: Chain) -> Optional[bytes32]: - if self.peak is not None: - sub_block = self.peak.reward_chain_sub_block - if chain == Chain.CHALLENGE_CHAIN: - return sub_block.challenge_chain_ip_vdf.challenge_hash - if chain == Chain.REWARD_CHAIN: - return sub_block.get_hash() - if chain == Chain.INFUSED_CHALLENGE_CHAIN: - if sub_block.infused_challenge_chain_ip_vdf is not None: - return sub_block.infused_challenge_chain_ip_vdf.challenge_hash - if self.peak.deficit == 4: - return ChallengeBlockInfo( - sub_block.proof_of_space, - sub_block.challenge_chain_sp_vdf, - sub_block.challenge_chain_sp_signature, - sub_block.challenge_chain_ip_vdf, - ).get_hash() - return None - if self.subslot_end is not None: - if chain == Chain.CHALLENGE_CHAIN: - return self.subslot_end.eos_bundle.challenge_chain.get_hash() - if chain == Chain.REWARD_CHAIN: - return self.subslot_end.eos_bundle.reward_chain.get_hash() - if chain == Chain.INFUSED_CHALLENGE_CHAIN: - if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK: - return self.subslot_end.eos_bundle.infused_challenge_chain.get_hash() - else: - return None - return None - - def get_initial_form(self, chain: Chain) -> Optional[ClassgroupElement]: - if self.peak is not None: - sub_block = self.peak.reward_chain_sub_block - if chain == Chain.CHALLENGE_CHAIN: - return sub_block.challenge_chain_ip_vdf.output - if chain == Chain.REWARD_CHAIN: - return ClassgroupElement.get_default_element() - if chain == Chain.INFUSED_CHALLENGE_CHAIN: - if sub_block.infused_challenge_chain_ip_vdf is not None: - return sub_block.infused_challenge_chain_ip_vdf.output - elif self.peak.deficit == 4: - return ClassgroupElement.get_default_element() - else: - return None - if self.subslot_end is not None: - if chain == Chain.CHALLENGE_CHAIN or chain == Chain.REWARD_CHAIN: - return ClassgroupElement.get_default_element() - if chain == Chain.INFUSED_CHALLENGE_CHAIN: - if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK: - return ClassgroupElement.get_default_element() - else: - return None - return None - -class Timelord: - def __init__(self, config: Dict, constants: ConsensusConstants): - self.config = config - self.constants = constants - self._is_stopped = False - self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = [] - self.lock: asyncio.Lock = asyncio.Lock() - self.potential_free_clients: List = [] - self.ip_whitelist = self.config["vdf_clients"]["ip"] - self.server: Optional[ChiaServer] = None - self.chain_type_to_stream: Dict[Chain, Tuple[ip, asyncio.StreamReader, asyncio.StreamWriter]] = {} - self.chain_start_time: Dict = {} - # Chains that currently don't have a vdf_client. - self.unspawned_chains: List[Chain] = [ - Chain.CHALLENGE_CHAIN, - Chain.REWARD_CHAIN, - Chain.INFUSED_CHALLENGE_CHAIN, - ] - # Chains that currently accept iterations. - self.allows_iters: List[Chain] = [] - # Last peak received, None if it's already processed. - self.new_peak: Optional[timelord_protocol.NewPeak] = None - # Last end of subslot bundle, None if we built a peak on top of it. - self.new_subslot_end: Optional[EndOfSubSlotData] = None - # Last state received. Can either be a new peak or a new EndOfSubslotBundle. - self.last_state: LastState = LastState(self.constants) - # Unfinished block info, iters adjusted to the last peak. - self.unfinished_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = [] - # Signage points iters, adjusted to the last peak. - self.signage_point_iters: List[uint64] = [] - # For each chain, send those info when the process spawns. - self.iters_to_submit: Dict[str, List[uint64]] = {} - self.iters_submitted: Dict[str, List[uint64]] = {} - # For each iteration submitted, know if it's a signage point, an infusion point or an end of slot. - self.iteration_to_proof_type: Dict[uint64, IterationType] = {} - # List of proofs finished. - self.proofs_finished: List[Tuple[Chain, VDFInfo, VDFProof]] = [] - # Data to send at vdf_client initialization. - self.finished_sp = 0 - self.overflow_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = [] - - async def start(self): - log.info("Starting timelord.") - self.main_loop = asyncio.create_task(self._manage_chains()) - - self.vdf_server = await asyncio.start_server( - self._handle_client, - self.config["vdf_server"]["host"], - self.config["vdf_server"]["port"], - ) - log.info("Started timelord.") - - def set_server(self, server: ChiaServer): - self.server = server - - @api_request - async def new_peak(self, new_peak: timelord_protocol.NewPeak): - async with self.lock: - if ( - self.last_state is None - or self.last_state.get_weight() < new_peak.weight - ): - self.new_peak = new_peak - - @api_request - async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock): - async with self.lock: - if not self._accept_unfinished_block(new_unfinished_subblock): - return - sp_iters, ip_iters = iters_from_sub_block( - new_unfinished_subblock.reward_chain_sub_block, - self.last_state.get_sub_slot_iters(), - self.last_state.get_difficulty(), - ) - last_ip_iters = self.last_state.get_last_ip() - if sp_iters < ip_iters: - self.overflow_blocks.append(new_unfinished_subblock) - elif ip_iters > last_ip_iters: - self.unfinished_blocks.append(new_unfinished_subblock) - for chain in Chain: - self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters)) - self.iteration_to_proof_type[ip_iters - self.last_ip_iters] = IterationType.INFUSION_POINT - - def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool: - # Total unfinished block iters needs to exceed peak's iters. - if self.last_state.get_total_iters() >= block.total_iters: - return False - # The peak hash of the rc-sub-block must match - # the signage point rc VDF challenge hash of the unfinished sub-block. - if ( - block.reward_chain_sp_vdf is not None - and self.last_state.get_last_peak_challenge() is not None - and self.last_state.get_last_peak_challenge() - != block.reward_chain_sp_vdf.challenge_hash - ): - return False - return True - - async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - async with self.lock: - client_ip = writer.get_extra_info("peername")[0] - log.info(f"New timelord connection from client: {client_ip}.") - if client_ip in self.ip_whitelist: - self.free_clients.append((client_ip, reader, writer)) - log.info(f"Added new VDF client {client_ip}.") - for ip, end_time in list(self.potential_free_clients): - if ip == client_ip: - self.potential_free_clients.remove((ip, end_time)) - break - - async def _stop_chain(self, chain: Chain): - stop_ip, _, stop_writer = self.chain_type_to_stream[chain] - self.potential_free_clients.append((stop_ip, time.time())) - stop_writer.write(b"010") - await stop_writer.drain() - if chain in self.allows_iters: - self.allows_iters.remove(chain) - self.unspawned_chains.append(chain) - - async def _reset_chains(self): - # First, stop all chains. - ip_iters = self.last_state.get_last_ip() - sub_slot_iters = self.last_state.get_sub_slot_iters() - difficulty = self.last_state.get_difficulty() - for chain in self.chain_type_to_stream.keys(): - await self._stop_chain(chain) - # Adjust all signage points iterations to the peak. - iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT) - self.signage_point_iters = [ - k * iters_per_signage - ip_iters - for k in range(1, self.constants.NUM_SPS_SUB_SLOT + 1) - if k * iters_per_signage - ip_iters > 0 and k * iters_per_signage < sub_slot_iters - ] - # Adjust all unfinished blocks iterations to the peak. - new_unfinished_blocks = [] - self.proofs_finished = [] - for chain in Chain: - self.iters_to_submit[chain] = [] - self.iters_submitted[chain] = [] - self.iteration_to_proof_type = {} - for block in self.unfinished_blocks: - if not self._accept_unfinished_block(block): - continue - block_sp_iters, block_ip_iters = iters_from_sub_block( - self.constants, - blocks, - sub_slot_iters, - difficulty, - ) - new_block_iters = block_ip_iters - ip_iters - if new_block_iters > 0: - new_unfinished_blocks.append(block) - for chain in Chain: - self.iters_to_submit[chain].append(new_block_iters) - self.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT - # Remove all unfinished blocks that have already passed. - self.unfinished_blocks = new_unfinished_blocks - # Signage points. - if len(self.signage_point_iters) > 0: - count_signage = 0 - for signage in self.signage_point_iters: - for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]: - self.iters_to_submit[chain].append(signage) - self.iteration_to_proof_type[signage] = IterationType.SIGNAGE_POINT - count_signage += 1 - if count_signage == 3: - break - # TODO: handle the special case when infusion point is the end of subslot. - left_subslot_iters = sub_slot_iters - ip_iters - log.info(f"Left subslot iters: {left_subslot_iters}.") - for chain in Chain: - self.iters_to_submit[chain].append(left_subslot_iters) - self.iteration_to_proof_type[left_subslot_iters] = IterationType.END_OF_SUBSLOT - - async def _handle_new_peak(self): - self.last_state.set_state(self.new_peak) - self.new_peak = None - await self._reset_chains() - - async def _handle_subslot_end(self): - self.finished_sp = 0 - self.last_state.set_state(self.new_subslot_end) - self.new_subslot_end = None - await self._reset_chains() - - async def _map_chains_with_vdf_clients(self): - while not self._is_stopped: - picked_chain = None - async with self.lock: - if len(self.free_clients) == 0: - break - ip, reader, writer = self.free_clients[0] - for chain_type in self.unspawned_chains: - challenge_hash = self.last_state.get_challenge(chain_type) - initial_form = self.last_state.get_initial_form(chain_type) - if challenge_hash is not None and initial_form is not None: - picked_chain = chain_type - break - if picked_chain is None: - break - picked_chain = self.unspawned_chains[0] - self.chain_type_to_stream[picked_chain] = (ip, reader, writer) - self.free_clients = self.free_clients[1:] - self.unspawned_chains = self.unspawned_chains[1:] - self.chain_start_time[picked_chain] = time.time() - - log.info(f"Mapping free vdf_client with chain: {picked_chain}.") - asyncio.create_task( - self._do_process_communication(picked_chain, challenge_hash, initial_form, ip, reader, writer) - ) - - async def _submit_iterations(self): - for chain in Chain: - if chain in self.allows_iters: - _, _, writer = self.chain_type_to_stream[chain] - for iteration in self.iters_to_submit[chain]: - if iteration in self.iters_submitted[chain]: - continue - prefix = str(len(str(iteration))) - if len(str(iteration)) < 10: - prefix = "0" + prefix - iter_str = prefix + str(iteration) - writer.write(iter_str.encode()) - await writer.drain() - self.iters_submitted[chain].append(iteration) - - def _clear_proof_list(self, iter): - return [ - (chain, info, proof) for chain, info, proof in self.proofs_finished if info.number_of_iterations != iter - ] - - async def _check_for_new_sp(self): - signage_iters = [ - iteration for iteration, t in self.iteration_to_proof_type.items() - if t == IterationType.SIGNAGE_POINT - ] - if len(signage_iters) == 0: - return - for signage_iter in signage_iters: - proofs_with_iter = [ - (chain, info, proof) - for chain, info, proof in self.proofs_finished - if info.number_of_iterations == signage_iter - ] - # Wait for both cc and rc to have the signage point. - if len(proofs_with_iter) == 2: - for chain, info, proof in proofs_with_iter: - if chain == Chain.CHALLENGE_CHAIN: - cc_info = info - cc_proof = proof - if chain == Chain.REWARD_CHAIN: - rc_info = info - rc_proof = proof - response = timelord_protocol.NewSignagePointVDF( - self.finished_sp, - cc_info, - cc_proof, - rc_info, - rc_proof, - ) - if self.server is not None: - self.server.push_message( - OutboundMessage( - NodeType.FULL_NODE, - Message("new_signage_point_vdf", response), - Delivery.BROADCAST, - ) - ) - # Cleanup the signage point from memory. - self.signage_point_iters.remove(signage_iter) - self.finished_sp += 1 - self.proofs_finished = self._clear_proof_list(signage_iter) - # Send the next 3 signage point to the chains. - next_iters_count = 0 - for next_sp in self.signage_point_iters: - for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]: - if ( - next_sp not in self.iters_submitted[chain] - and next_sp not in self.iters_to_submit[chain] - ): - self.iters_to_submit[chain].append(next_sp) - self.iteration_to_proof_type[next_sp] = IterationType.SIGNAGE_POINT - next_iters_count += 1 - if next_iters_count == 3: - break - - async def _check_for_new_ip(self): - infusion_iters = [ - iteration for iteration, t in self.iteration_to_proof_type.items() - if t == IterationType.INFUSION_POINT - ] - for iteration in infusion_iters: - proofs_with_iter = [ - (chain, info, proof) - for chain, info, proof in self.proofs_finished - if info.number_of_iterations == iteration - ] - if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None: - chain_count = 3 - else: - chain_count = 2 - if len(proofs_with_iter) == chain_count: - block = None - for unfinished_block in self.unfinished_blocks: - _, ip_iters = iters_from_sub_block( - self.constants, - unfinished_block, - self.last_state.get_sub_slot_iters(), - self.last_state.get_difficulty(), - ) - if ip_iters - self.last_state.get_last_ip() == iteration: - block = unfinished_block - break - if block is not None: - self.unfinished_blocks.remove(block) - challenge_hash = block.reward_chain_sub_block.get_hash() - icc_info = None - icc_proof = None - for chain, info, proof in proofs_with_iter: - if chain == Chain.CHALLENGE_CHAIN: - cc_info = info - cc_proof = proof - if chain == Chain.REWARD_CHAIN: - rc_info = info - rc_proof = proof - if chain == Chain.INFUSED_CHALLENGE_CHAIN: - icc_info = info - icc_proof = proof - response = timelord_protocol.NewInfusionPointVDF( - challenge_hash, - cc_info, - cc_proof, - rc_info, - rc_proof, - icc_info, - icc_proof, - ) - self.server.push_message( - OutboundMessage( - NodeType.FULL_NODE, - Message("new_infusion_point_vdf", response), - Delivery.BROADCAST, - ) - ) - for iteration in infusion_iters: - self.proofs_finished = self._clear_proof_list(iteration) - - async def _check_for_end_of_subslot(self): - left_subslot_iters = [ - iteration for iteration, t in self.iteration_to_proof_type.items() - if t == IterationType.END_OF_SUBSLOT - ] - if len(left_subslot_iters) == 0: - return - chains_finished = [ - (chain, info, proof) - for chain, info, proof in self.proofs_finished - if info.number_of_iterations == left_subslot_iters[0] - ] - if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None: - chain_count = 3 - else: - chain_count = 2 - if len(chains_finished) == chain_count: - icc_ip_vdf: Optional[VDFInfo] = None - icc_ip_proof: Optional[VDFProof] = None - cc_vdf: Optional[VDFInfo] = None - cc_proof: Optional[VDFProof] = None - rc_vdf: Optional[VDFInfo] = None - rc_proof: Optional[VDFProof] = None - for chain, info, proof in chains_finished: - if chain == Chain.CHALLENGE_CHAIN: - cc_vdf = info - cc_proof = proof - if chain == Chain.REWARD_CHAIN: - rc_vdf = info - rc_proof = proof - if chain == Chain.INFUSED_CHALLENGE_CHAIN: - icc_ip_vdf = info - icc_ip_proof = proof - assert cc_proof is not None and rc_proof is not None and cc_vdf is not None and rc_vdf is not None - log.info("Collected end of subslot vdfs.") - - icc_sub_slot: Optional[InfusedChallengeChainSubSlot] = ( - None if icc_ip_vdf is None - else InfusedChallengeChainSubSlot(icc_ip_vdf) - ) - icc_sub_slot_hash = icc_sub_slot.get_hash() if self.last_state.get_deficit() == 0 else None - if self.last_state.get_sub_epoch_summary() is not None: - ses_hash = self.last_state.get_sub_epoch_summary().get_hash() - new_sub_slot_iters = self.last_state.get_sub_epoch_summary().new_sub_slot_iters - new_difficulty = self.last_state.get_sub_epoch_summary().new_difficulty - else: - ses_hash = None - new_sub_slot_iters = self.last_state.get_sub_slot_iters() - new_difficulty = self.last_state.get_difficulty() - cc_sub_slot = ChallengeChainSubSlot(cc_vdf, icc_sub_slot_hash, ses_hash, new_sub_slot_iters, new_difficulty) - eos_deficit: uint8 = ( - self.last_state.get_deficit() - if self.last_state.get_deficit() > 0 - else self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK - ) - rc_sub_slot = RewardChainSubSlot( - rc_vdf, - cc_sub_slot.get_hash(), - icc_sub_slot.get_hash() if icc_sub_slot is not None else None, - eos_deficit, - ) - eos_bundle = EndOfSubSlotBundle( - cc_sub_slot, - icc_sub_slot, - rc_sub_slot, - SubSlotProofs(cc_proof, icc_ip_proof, rc_proof), - ) - if self.server is not None: - self.server.push_message( - OutboundMessage( - NodeType.FULL_NODE, - Message("end_of_sub_slot_bundle", timelord_protocol.NewEndOfSubSlotVDF(eos_bundle)), - Delivery.BROADCAST, - ) - ) - log.info("Built end of subslot bundle.") - self.unfinished_blocks = self.overflow_blocks - self.overflow_blocks = [] - self.new_subslot_end = EndOfSubSlotData( - eos_bundle, - new_sub_slot_iters, - new_difficulty, - eos_deficit, - ) - - async def _manage_chains(self): - while not self._is_stopped: - await asyncio.sleep(0.1) - # Didn't get any useful data, continue. - async with self.lock: - if self.last_state.is_empty() and self.new_peak is None: - continue - # Map free vdf_clients to unspawned chains. - await self._map_chains_with_vdf_clients() - async with self.lock: - # We've got a new peak, process it. - if self.new_peak is not None: - await self._handle_new_peak() - # A subslot ended, process it. - if self.new_subslot_end is not None: - await self._handle_subslot_end() - # Submit pending iterations. - await self._submit_iterations() - # Check for new signage point and broadcast it if present. - await self._check_for_new_sp() - # Check for new infusion point and broadcast it if present. - await self._check_for_new_ip() - # Check for end of subslot, respawn chains and build EndOfSubslotBundle. - await self._check_for_end_of_subslot() - - async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer): - disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS) - # Depending on the flags 'fast_algorithm' and 'sanitizer_mode', - # the timelord tells the vdf_client what to execute. - if self.config["fast_algorithm"]: - # Run n-wesolowski (fast) algorithm. - writer.write(b"N") - else: - # Run two-wesolowski (slow) algorithm. - writer.write(b"N") - await writer.drain() - - prefix = str(len(str(disc))) - if len(prefix) == 1: - prefix = "00" + prefix - if len(prefix) == 2: - prefix = "0" + prefix - writer.write((prefix + str(disc)).encode()) - await writer.drain() - - # Send (a, b) from 'initial_form'. - for num in [initial_form.a, initial_form.b]: - prefix = len(str(num)) - prefix_len = len(str(prefix)) - writer.write((str(prefix_len) + str(prefix) + str(num)).encode()) - await writer.drain() - try: - ok = await reader.readexactly(2) - except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e: - log.warning(f"{type(e)} {e}") - return - - if ok.decode() != "OK": - return - - log.info("Got handshake with VDF client.") - async with self.lock: - self.allows_iters.append(chain) - # Listen to the client until "STOP" is received. - while True: - try: - data = await reader.readexactly(4) - except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e: - log.warning(f"{type(e)} {e}") - break - - msg = "" - try: - msg = data.decode() - except Exception as e: - # log.error(f"Exception while decoding data {e}") - pass - if msg == "STOP": - log.info(f"Stopped client running on ip {ip}.") - async with self.lock: - writer.write(b"ACK") - await writer.drain() - break - else: - try: - # This must be a proof, 4 bytes is length prefix - length = int.from_bytes(data, "big") - proof = await reader.readexactly(length) - stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode())) - except ( - asyncio.IncompleteReadError, - ConnectionResetError, - Exception, - ) as e: - log.warning(f"{type(e)} {e}") - break - - iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True)) - - y_size_bytes = stdout_bytes_io.read(8) - y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True)) - - y_bytes = stdout_bytes_io.read(y_size) - witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True)) - proof_bytes: bytes = stdout_bytes_io.read() - - # Verifies our own proof just in case - a = int.from_bytes(y_bytes[:129], "big", signed=True) - b = int.from_bytes(y_bytes[129:], "big", signed=True) - output = ClassgroupElement(int512(a), int512(b)) - time_taken = time.time() - self.chain_start_time[chain] - ips = int(iterations_needed / time_taken * 10) / 10 - log.info( - f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}" f" iters." - f"Estimated IPS: {ips}. Chain: {chain}" - ) - - vdf_info: VDFInfo = VDFInfo( - challenge_hash, - initial_form, - iterations_needed, - output, - ) - vdf_proof: VDFProof = VDFProof( - witness_type, - proof_bytes, - ) - - if not vdf_proof.is_valid(self.constants, vdf_info): - log.error("Invalid proof of time!") - # continue - async with self.lock: - self.proofs_finished.append( - (chain, vdf_info, vdf_proof) - ) diff --git a/src/timelord_old.py b/src/timelord_old.py deleted file mode 100644 index e2a873cda5d1..000000000000 --- a/src/timelord_old.py +++ /dev/null @@ -1,465 +0,0 @@ -import asyncio -import io -import logging -import time -import socket -from typing import Dict, List, Optional, Tuple - - -from chiavdf import create_discriminant -from src.protocols import timelord_protocol -from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage -from src.server.server import ChiaServer -from src.types.classgroup import ClassgroupElement -from src.types.vdf import ProofOfTime -from src.types.sized_bytes import bytes32 -from src.util.api_decorators import api_request -from src.util.ints import uint8, uint64, int512, uint128 - -log = logging.getLogger(__name__) - - -class Timelord: - def __init__(self, config: Dict, discriminant_size_bits: int): - self.discriminant_size_bits = discriminant_size_bits - self.config: Dict = config - self.ips_estimate = { - socket.gethostbyname(k): v - for k, v in list( - zip( - self.config["vdf_clients"]["ip"], - self.config["vdf_clients"]["ips_estimate"], - ) - ) - } - self.log = log - self.lock: asyncio.Lock = asyncio.Lock() - self.active_discriminants: Dict[bytes32, Tuple[asyncio.StreamWriter, uint64, str]] = {} - self.best_weight_three_proofs: int = -1 - self.active_discriminants_start_time: Dict = {} - self.pending_iters: Dict = {} - self.submitted_iters: Dict = {} - self.done_discriminants: List[bytes32] = [] - self.proofs_to_write: List[OutboundMessage] = [] - self.seen_discriminants: List[bytes32] = [] - self.proof_count: Dict = {} - self.avg_ips: Dict = {} - self.discriminant_queue: List[Tuple[bytes32, uint128]] = [] - self.max_connection_time = self.config["max_connection_time"] - self.potential_free_clients: List = [] - self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = [] - self.server: Optional[ChiaServer] = None - self.vdf_server = None - self._is_shutdown = False - self.sanitizer_mode = self.config["sanitizer_mode"] - self.last_time_seen_discriminant: Dict = {} - self.max_known_weights: List[uint128] = [] - - def set_server(self, server: ChiaServer): - self.server = server - - async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - async with self.lock: - client_ip = writer.get_extra_info("peername")[0] - log.info(f"New timelord connection from client: {client_ip}.") - if client_ip in self.ips_estimate.keys(): - self.free_clients.append((client_ip, reader, writer)) - log.info(f"Added new VDF client {client_ip}.") - for ip, end_time in list(self.potential_free_clients): - if ip == client_ip: - self.potential_free_clients.remove((ip, end_time)) - break - - async def _start(self): - if self.sanitizer_mode: - log.info("Starting timelord in sanitizer mode") - self.disc_queue = asyncio.create_task(self._manage_discriminant_queue_sanitizer()) - else: - log.info("Starting timelord in normal mode") - self.disc_queue = asyncio.create_task(self._manage_discriminant_queue()) - - self.vdf_server = await asyncio.start_server( - self._handle_client, - self.config["vdf_server"]["host"], - self.config["vdf_server"]["port"], - ) - - def _close(self): - self._is_shutdown = True - assert self.vdf_server is not None - self.vdf_server.close() - - async def _await_closed(self): - assert self.disc_queue is not None - await self.disc_queue - - async def _stop_worst_process(self, worst_weight_active): - # This is already inside a lock, no need to lock again. - log.info(f"Stopping one process at weight {worst_weight_active}") - stop_writer: Optional[asyncio.StreamWriter] = None - stop_discriminant: Optional[bytes32] = None - - low_weights = {k: v for k, v in self.active_discriminants.items() if v[1] == worst_weight_active} - no_iters = { - k: v for k, v in low_weights.items() if k not in self.pending_iters or len(self.pending_iters[k]) == 0 - } - - # If we have process(es) with no iters, stop the one that started the latest - if len(no_iters) > 0: - latest_start_time = max([self.active_discriminants_start_time[k] for k, _ in no_iters.items()]) - stop_discriminant, stop_writer = next( - (k, v[0]) for k, v in no_iters.items() if self.active_discriminants_start_time[k] == latest_start_time - ) - else: - # Otherwise, pick the one that finishes one proof the latest. - best_iter = {k: min(self.pending_iters[k]) for k, _ in low_weights.items()} - time_taken = {k: time.time() - self.active_discriminants_start_time[k] for k, _ in low_weights.items()} - - client_ip = [v[2] for _, v in low_weights.items()] - # ips maps an IP to the expected iterations per second of it. - ips = {} - for ip in client_ip: - if ip in self.avg_ips: - current_ips, _ = self.avg_ips[ip] - ips[ip] = current_ips - else: - ips[ip] = self.ips_estimate[ip] - - expected_finish = { - k: max(0, (best_iter[k] - time_taken[k] * ips[v[2]]) / ips[v[2]]) for k, v in low_weights.items() - } - worst_finish = max([v for v in expected_finish.values()]) - log.info(f"Worst finish time: {worst_finish}s") - stop_discriminant, stop_writer = next( - (k, v[0]) for k, v in low_weights.items() if expected_finish[k] == worst_finish - ) - assert stop_writer is not None - _, _, stop_ip = self.active_discriminants[stop_discriminant] - self.potential_free_clients.append((stop_ip, time.time())) - stop_writer.write(b"010") - await stop_writer.drain() - del self.active_discriminants[stop_discriminant] - del self.active_discriminants_start_time[stop_discriminant] - self.done_discriminants.append(stop_discriminant) - - async def _update_avg_ips(self, challenge_hash, iterations_needed, ip): - async with self.lock: - if challenge_hash in self.active_discriminants: - time_taken = time.time() - self.active_discriminants_start_time[challenge_hash] - ips = int(iterations_needed / time_taken * 10) / 10 - log.info( - f"Finished PoT, chall:{challenge_hash[:10].hex()}.." - f" {iterations_needed} iters. {int(time_taken*1000)/1000}s, {ips} ips" - ) - if ip not in self.avg_ips: - self.avg_ips[ip] = (ips, 1) - else: - prev_avg_ips, trials = self.avg_ips[ip] - new_avg_ips = int((prev_avg_ips * trials + ips) / (trials + 1)) - self.avg_ips[ip] = (new_avg_ips, trials + 1) - log.info(f"New estimate: {new_avg_ips}") - if iterations_needed in self.pending_iters[challenge_hash]: - self.pending_iters[challenge_hash].remove(iterations_needed) - else: - log.warning("Finished PoT for an unknown iteration.") - else: - log.info( - f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}" - f" iters. But challenge not active anymore" - ) - - async def _update_proofs_count(self, challenge_weight): - async with self.lock: - if challenge_weight not in self.proof_count: - self.proof_count[challenge_weight] = 1 - else: - self.proof_count[challenge_weight] += 1 - if self.proof_count[challenge_weight] >= 3: - log.info("Cleaning up clients.") - self.best_weight_three_proofs = max(self.best_weight_three_proofs, challenge_weight) - for active_disc in list(self.active_discriminants): - current_writer, current_weight, ip = self.active_discriminants[active_disc] - if current_weight <= challenge_weight: - log.info(f"Active weight cleanup: {current_weight}") - log.info(f"Cleanup weight: {challenge_weight}") - self.potential_free_clients.append((ip, time.time())) - current_writer.write(b"010") - await current_writer.drain() - del self.active_discriminants[active_disc] - del self.active_discriminants_start_time[active_disc] - self.done_discriminants.append(active_disc) - - async def _send_iterations(self, challenge_hash, writer): - alive_discriminant = True - while alive_discriminant: - async with self.lock: - if (challenge_hash in self.active_discriminants) and (challenge_hash in self.pending_iters): - if challenge_hash not in self.submitted_iters: - self.submitted_iters[challenge_hash] = [] - for iter in sorted(self.pending_iters[challenge_hash]): - if iter in self.submitted_iters[challenge_hash]: - continue - self.submitted_iters[challenge_hash].append(iter) - if len(str(iter)) < 10: - iter_size = "0" + str(len(str(iter))) - else: - iter_size = str(len(str(iter))) - writer.write((iter_size + str(iter)).encode()) - await writer.drain() - log.info(f"New iteration submitted: {iter}") - # Sanitizer will always work with only one iteration. - if self.sanitizer_mode: - alive_discriminant = False - await asyncio.sleep(1) - async with self.lock: - if challenge_hash in self.done_discriminants: - alive_discriminant = False - - async def _do_process_communication(self, challenge_hash, challenge_weight, ip, reader, writer): - disc: int = create_discriminant(challenge_hash, self.discriminant_size_bits) - # Depending on the flags 'fast_algorithm' and 'sanitizer_mode', - # the timelord tells the vdf_client what to execute. - if not self.sanitizer_mode: - if self.config["fast_algorithm"]: - # Run n-wesolowski (fast) algorithm. - writer.write(b"N") - else: - # Run two-wesolowski (slow) algorithm. - writer.write(b"T") - else: - # Create compact proofs of time. - writer.write(b"S") - await writer.drain() - - prefix = str(len(str(disc))) - if len(prefix) == 1: - prefix = "00" + prefix - writer.write((prefix + str(disc)).encode()) - await writer.drain() - - try: - ok = await reader.readexactly(2) - except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e: - log.warning(f"{type(e)} {e}") - async with self.lock: - if challenge_hash not in self.done_discriminants: - self.done_discriminants.append(challenge_hash) - if self.sanitizer_mode: - if challenge_hash in self.pending_iters: - del self.pending_iters[challenge_hash] - if challenge_hash in self.submitted_iters: - del self.submitted_iters[challenge_hash] - return - - if ok.decode() != "OK": - return - - log.info("Got handshake with VDF client.") - - async with self.lock: - self.active_discriminants[challenge_hash] = (writer, challenge_weight, ip) - self.active_discriminants_start_time[challenge_hash] = time.time() - - asyncio.create_task(self._send_iterations(challenge_hash, writer)) - - # Listen to the client until "STOP" is received. - while True: - try: - data = await reader.readexactly(4) - except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e: - log.warning(f"{type(e)} {e}") - async with self.lock: - if challenge_hash in self.active_discriminants: - del self.active_discriminants[challenge_hash] - if challenge_hash in self.active_discriminants_start_time: - del self.active_discriminants_start_time[challenge_hash] - if challenge_hash not in self.done_discriminants: - self.done_discriminants.append(challenge_hash) - if self.sanitizer_mode: - if challenge_hash in self.pending_iters: - del self.pending_iters[challenge_hash] - if challenge_hash in self.submitted_iters: - del self.submitted_iters[challenge_hash] - break - - msg = "" - try: - msg = data.decode() - except Exception as e: - log.error(f"Exception while decoding data {e}") - - if msg == "STOP": - log.info(f"Stopped client running on ip {ip}.") - async with self.lock: - writer.write(b"ACK") - await writer.drain() - break - else: - try: - # This must be a proof, 4bytes is length prefix - length = int.from_bytes(data, "big") - proof = await reader.readexactly(length) - stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode())) - except ( - asyncio.IncompleteReadError, - ConnectionResetError, - Exception, - ) as e: - log.warning(f"{type(e)} {e}") - async with self.lock: - if challenge_hash in self.active_discriminants: - del self.active_discriminants[challenge_hash] - if challenge_hash in self.active_discriminants_start_time: - del self.active_discriminants_start_time[challenge_hash] - if challenge_hash not in self.done_discriminants: - self.done_discriminants.append(challenge_hash) - if self.sanitizer_mode: - if challenge_hash in self.pending_iters: - del self.pending_iters[challenge_hash] - if challenge_hash in self.submitted_iters: - del self.submitted_iters[challenge_hash] - break - - iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True)) - - y_size_bytes = stdout_bytes_io.read(8) - y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True)) - - y_bytes = stdout_bytes_io.read(y_size) - witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True)) - proof_bytes: bytes = stdout_bytes_io.read() - - # Verifies our own proof just in case - a = int.from_bytes(y_bytes[:129], "big", signed=True) - b = int.from_bytes(y_bytes[129:], "big", signed=True) - - output = ClassgroupElement(int512(a), int512(b)) - - proof_of_time = ProofOfTime( - challenge_hash, - iterations_needed, - output, - witness_type, - proof_bytes, - ) - - if not proof_of_time.is_valid(self.discriminant_size_bits): - log.error("Invalid proof of time") - - response = timelord_protocol.ProofOfTimeFinished(proof_of_time) - - await self._update_avg_ips(challenge_hash, iterations_needed, ip) - - async with self.lock: - self.proofs_to_write.append( - OutboundMessage( - NodeType.FULL_NODE, - Message("proof_of_time_finished", response), - Delivery.BROADCAST, - ) - ) - - if not self.sanitizer_mode: - await self._update_proofs_count(challenge_weight) - else: - async with self.lock: - writer.write(b"010") - await writer.drain() - try: - del self.active_discriminants[challenge_hash] - del self.active_discriminants_start_time[challenge_hash] - del self.pending_iters[challenge_hash] - del self.submitted_iters[challenge_hash] - except KeyError: - log.error("Discriminant stopped abnormally.") - - async def _manage_discriminant_queue(self): - while not self._is_shutdown: - async with self.lock: - if len(self.discriminant_queue) > 0: - max_weight = max([h for _, h in self.discriminant_queue]) - if max_weight <= self.best_weight_three_proofs: - self.done_discriminants.extend([d for d, _ in self.discriminant_queue]) - self.discriminant_queue.clear() - else: - max_weight_disc = [d for d, h in self.discriminant_queue if h == max_weight] - with_iters = [ - d for d in max_weight_disc if d in self.pending_iters and len(self.pending_iters[d]) != 0 - ] - if len(with_iters) == 0: - disc = max_weight_disc[0] - else: - min_iter = min([min(self.pending_iters[d]) for d in with_iters]) - disc = next(d for d in with_iters if min(self.pending_iters[d]) == min_iter) - if len(self.free_clients) != 0: - ip, sr, sw = self.free_clients[0] - self.free_clients = self.free_clients[1:] - self.discriminant_queue.remove((disc, max_weight)) - asyncio.create_task(self._do_process_communication(disc, max_weight, ip, sr, sw)) - else: - self.potential_free_clients = [ - (ip, end_time) - for ip, end_time in self.potential_free_clients - if time.time() < end_time + self.max_connection_time - ] - if len(self.potential_free_clients) == 0 and len(self.active_discriminants) > 0: - worst_weight_active = min( - [ - h - for ( - _, - h, - _, - ) in self.active_discriminants.values() - ] - ) - if max_weight > worst_weight_active: - await self._stop_worst_process(worst_weight_active) - elif max_weight == worst_weight_active: - if disc in self.pending_iters and len(self.pending_iters[disc]) != 0: - if any( - (k not in self.pending_iters or len(self.pending_iters[k]) == 0) - for k, v in self.active_discriminants.items() - if v[1] == worst_weight_active - ): - log.info("Stopped process without iters for one with iters.") - await self._stop_worst_process(worst_weight_active) - - if len(self.proofs_to_write) > 0: - for msg in self.proofs_to_write: - self.server.push_message(msg) - self.proofs_to_write.clear() - await asyncio.sleep(0.5) - - async with self.lock: - for ( - stop_discriminant, - (stop_writer, _, _), - ) in self.active_discriminants.items(): - stop_writer.write(b"010") - await stop_writer.drain() - self.done_discriminants.append(stop_discriminant) - self.active_discriminants.clear() - self.active_discriminants_start_time.clear() - - async def _manage_discriminant_queue_sanitizer(self): - while not self._is_shutdown: - async with self.lock: - if len(self.discriminant_queue) > 0: - with_iters = [ - (d, w) - for d, w in self.discriminant_queue - if d in self.pending_iters and len(self.pending_iters[d]) != 0 - ] - if len(with_iters) > 0 and len(self.free_clients) > 0: - disc, weight = with_iters[0] - log.info(f"Creating compact weso proof: weight {weight}.") - ip, sr, sw = self.free_clients[0] - self.free_clients = self.free_clients[1:] - self.discriminant_queue.remove((disc, weight)) - asyncio.create_task(self._do_process_communication(disc, weight, ip, sr, sw)) - if len(self.proofs_to_write) > 0: - for msg in self.proofs_to_write: - self.server.push_message(msg) - self.proofs_to_write.clear() - await asyncio.sleep(3)