Separate out timelord api

This commit is contained in:
Mariano Sorgente 2020-12-02 12:01:47 +09:00 committed by Yostra
parent bbb3d28559
commit 6d8c8be087
4 changed files with 51 additions and 122 deletions

View File

@ -2,6 +2,7 @@ import pathlib
from typing import Dict
from src.consensus.constants import ConsensusConstants
from src.consensus.default_constants import DEFAULT_CONSTANTS
from src.timelord import Timelord
from src.server.outbound_message import NodeType
@ -13,20 +14,20 @@ from src.util.default_root import DEFAULT_ROOT_PATH
from src.server.start_service import run_service
# See: https://bugs.python.org/issue29288
u"".encode("idna")
"".encode("idna")
SERVICE_NAME = "timelord"
def service_kwargs_for_timelord(
root_path: pathlib.Path, config: Dict, discriminant_size_bits: int
root_path: pathlib.Path,
config: Dict,
constants: ConsensusConstants,
) -> Dict:
connect_peers = [
PeerInfo(config["full_node_peer"]["host"], config["full_node_peer"]["port"])
]
connect_peers = [PeerInfo(config["full_node_peer"]["host"], config["full_node_peer"]["port"])]
node = Timelord(config, discriminant_size_bits)
node = Timelord(config, constants)
peer_api = TimelordAPI(node)
kwargs = dict(
@ -45,9 +46,7 @@ def service_kwargs_for_timelord(
def main():
config = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", SERVICE_NAME)
kwargs = service_kwargs_for_timelord(
DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS.DISCRIMINANT_SIZE_BITS
)
kwargs = service_kwargs_for_timelord(DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS)
return run_service(**kwargs)

View File

@ -2,6 +2,7 @@ import asyncio
import io
import logging
import time
from asyncio import StreamReader, StreamWriter
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
@ -25,7 +26,6 @@ from src.types.reward_chain_sub_block import (
from src.types.sized_bytes import bytes32
from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs
from src.types.vdf import VDFInfo, VDFProof
from src.util.api_decorators import api_request
from src.util.ints import uint64, uint8, uint128, int512
from src.types.sub_epoch_summary import SubEpochSummary
from chiapos import Verifier
@ -272,33 +272,7 @@ class Timelord:
def set_server(self, server: ChiaServer):
self.server = server
@api_request
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
async with self.lock:
if self.last_state is None or self.last_state.get_weight() < new_peak.weight:
self.new_peak = new_peak
@api_request
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
async with self.lock:
if not self._accept_unfinished_block(new_unfinished_subblock):
return
sp_iters, ip_iters = iters_from_sub_block(
self.constants,
new_unfinished_subblock.reward_chain_sub_block,
self.last_state.get_sub_slot_iters(),
self.last_state.get_difficulty(),
)
last_ip_iters = self.last_state.get_last_ip()
if sp_iters < ip_iters:
self.overflow_blocks.append(new_unfinished_subblock)
elif ip_iters > last_ip_iters:
self.unfinished_blocks.append(new_unfinished_subblock)
for chain in Chain:
self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
self.iteration_to_proof_type[ip_iters - last_ip_iters] = IterationType.INFUSION_POINT
def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool:
def accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool:
# Total unfinished block iters needs to exceed peak's iters.
if self.last_state.get_total_iters() >= block.total_iters:
return False
@ -355,7 +329,7 @@ class Timelord:
self.iters_submitted[chain] = []
self.iteration_to_proof_type = {}
for block in self.unfinished_blocks:
if not self._accept_unfinished_block(block):
if not self.accept_unfinished_block(block):
continue
block_sp_iters, block_ip_iters = iters_from_sub_block(
self.constants,
@ -664,7 +638,15 @@ class Timelord:
# Check for end of subslot, respawn chains and build EndOfSubslotBundle.
await self._check_for_end_of_subslot()
async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer):
async def _do_process_communication(
self,
chain: Chain,
challenge_hash: bytes32,
initial_form: ClassgroupElement,
ip: str,
reader: StreamReader,
writer: StreamWriter,
):
disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS)
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
# the timelord tells the vdf_client what to execute.
@ -714,8 +696,7 @@ class Timelord:
try:
msg = data.decode()
except Exception as e:
# log.error(f"Exception while decoding data {e}")
pass
log.error(f"Exception while decoding data {e}")
if msg == "STOP":
log.info(f"Stopped client running on ip {ip}.")
async with self.lock:
@ -767,8 +748,7 @@ class Timelord:
proof_bytes,
)
if not vdf_proof.is_valid(self.constants, vdf_info):
if not vdf_proof.is_valid(self.constants, initial_form, vdf_info):
log.error("Invalid proof of time!")
# continue
async with self.lock:
self.proofs_finished.append((chain, vdf_info, vdf_proof))

View File

@ -1,8 +1,9 @@
from typing import Callable
from src.protocols import timelord_protocol
from src.timelord import Timelord
from src.timelord import Timelord, iters_from_sub_block, Chain, IterationType
from src.util.api_decorators import api_request
from src.util.ints import uint64
class TimelordAPI:
@ -14,79 +15,28 @@ class TimelordAPI:
def _set_state_changed_callback(self, callback: Callable):
pass
# @property
# def log(self):
# return self.timelord.log
#
# @api_request
# async def challenge_start(self, challenge_start: timelord_protocol.ChallengeStart):
# """
# The full node notifies the timelord node that a new challenge is active, and work
# should be started on it. We add the challenge into the queue if it's worth it to have.
# """
# async with self.timelord.lock:
# if not self.timelord.sanitizer_mode:
# if challenge_start.challenge in self.timelord.seen_discriminants:
# self.log.info(f"Have already seen this challenge hash {challenge_start.challenge}. Ignoring.")
# return
# if challenge_start.weight <= self.timelord.best_weight_three_proofs:
# self.log.info("Not starting challenge, already three proofs at that weight")
# return
# self.timelord.seen_discriminants.append(challenge_start.challenge)
# self.timelord.discriminant_queue.append((challenge_start.challenge, challenge_start.weight))
# self.log.info("Appended to discriminant queue.")
# else:
# disc_dict = dict(self.timelord.discriminant_queue)
# if challenge_start.challenge in disc_dict:
# self.timelord.log.info("Challenge already in discriminant queue. Ignoring.")
# return
# if challenge_start.challenge in self.timelord.active_discriminants:
# self.timelord.log.info("Challenge currently running. Ignoring.")
# return
#
# self.timelord.discriminant_queue.append((challenge_start.challenge, challenge_start.weight))
# if challenge_start.weight not in self.timelord.max_known_weights:
# self.timelord.max_known_weights.append(challenge_start.weight)
# self.timelord.max_known_weights.sort()
# if len(self.timelord.max_known_weights) > 5:
# self.timelord.max_known_weights = self.timelord.max_known_weights[-5:]
#
# @api_request
# async def proof_of_space_info(self, proof_of_space_info: timelord_protocol.ProofOfSpaceInfo):
# """
# Notification from full node about a new proof of space for a challenge. If we already
# have a process for this challenge, we should communicate to the process to tell it how
# many iterations to run for.
# """
# async with self.timelord.lock:
# if not self.timelord.sanitizer_mode:
# self.timelord.log.info(f"proof_of_space_info {proof_of_space_info.challenge} {proof_of_space_info.iterations_needed}")
# if proof_of_space_info.challenge in self.timelord.done_discriminants:
# self.timelord.log.info(f"proof_of_space_info {proof_of_space_info.challenge} already done, returning")
# return
# else:
# disc_dict = dict(self.timelord.discriminant_queue)
# if proof_of_space_info.challenge in disc_dict:
# challenge_weight = disc_dict[proof_of_space_info.challenge]
# if challenge_weight >= min(self.timelord.max_known_weights):
# self.log.info("Not storing iter, waiting for more block confirmations.")
# return
# else:
# self.log.info("Not storing iter, challenge inactive.")
# return
#
# if proof_of_space_info.challenge not in self.timelord.pending_iters:
# self.timelord.pending_iters[proof_of_space_info.challenge] = []
# if proof_of_space_info.challenge not in self.timelord.submitted_iters:
# self.timelord.submitted_iters[proof_of_space_info.challenge] = []
#
# if (
# proof_of_space_info.iterations_needed not in self.timelord.pending_iters[proof_of_space_info.challenge]
# and proof_of_space_info.iterations_needed not in self.timelord.submitted_iters[proof_of_space_info.challenge]
# ):
# self.timelord.log.info(
# f"proof_of_space_info {proof_of_space_info.challenge} adding "
# f"{proof_of_space_info.iterations_needed} to "
# f"{self.timelord.pending_iters[proof_of_space_info.challenge]}"
# )
# self.timelord.pending_iters[proof_of_space_info.challenge].append(proof_of_space_info.iterations_needed)
@api_request
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
async with self.timelord.lock:
if self.timelord.last_state is None or self.timelord.last_state.get_weight() < new_peak.weight:
self.timelord.new_peak = new_peak
@api_request
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
async with self.timelord.lock:
if not self.timelord.accept_unfinished_block(new_unfinished_subblock):
return
sp_iters, ip_iters = iters_from_sub_block(
self.timelord.constants,
new_unfinished_subblock.reward_chain_sub_block,
self.timelord.last_state.get_sub_slot_iters(),
self.timelord.last_state.get_difficulty(),
)
last_ip_iters = self.timelord.last_state.get_last_ip()
if sp_iters < ip_iters:
self.timelord.overflow_blocks.append(new_unfinished_subblock)
elif ip_iters > last_ip_iters:
self.timelord.unfinished_blocks.append(new_unfinished_subblock)
for chain in Chain:
self.timelord.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
self.timelord.iteration_to_proof_type[ip_iters - last_ip_iters] = IterationType.INFUSION_POINT

View File

@ -244,7 +244,7 @@ async def setup_timelord(port, full_node_port, sanitizer, consensus_constants: C
if sanitizer:
config["vdf_server"]["port"] = 7999
kwargs = service_kwargs_for_timelord(bt.root_path, config, consensus_constants.DISCRIMINANT_SIZE_BITS)
kwargs = service_kwargs_for_timelord(bt.root_path, config, consensus_constants)
kwargs.update(
parse_cli_args=False,
connect_to_daemon=False,