Timelord changes and harvester api

This commit is contained in:
Mariano Sorgente 2020-12-02 11:53:35 +09:00 committed by Yostra
parent 9d7961209b
commit bbb3d28559
7 changed files with 819 additions and 1504 deletions

View File

@ -3,27 +3,17 @@ import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, Optional, Tuple, List, Callable, Set
import time
import concurrent
import dataclasses
from blspy import G1Element
from chiapos import DiskProver
from src.consensus.constants import ConsensusConstants
from src.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters
from src.protocols import harvester_protocol
from src.server.outbound_message import Message
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from src.util.api_decorators import api_request
from src.util.ints import uint8, uint64
from src.plotting.plot_tools import (
load_plots,
PlotInfo,
remove_plot_directory,
add_plot_directory,
get_plot_directories,
remove_plot_directory as remove_plot_directory_pt,
add_plot_directory as add_plot_directory_pt,
get_plot_directories as get_plot_directories_pt,
)
log = logging.getLogger(__name__)
@ -78,7 +68,7 @@ class Harvester:
if self.state_changed_callback is not None:
self.state_changed_callback(change)
def _get_plots(self) -> Tuple[List[Dict], List[str], List[str]]:
def get_plots(self) -> Tuple[List[Dict], List[str], List[str]]:
response_plots: List[Dict] = []
for path, plot_info in self.provers.items():
prover = plot_info.prover
@ -102,7 +92,7 @@ class Harvester:
[str(s) for s in self.no_key_filenames],
)
async def _refresh_plots(self):
async def refresh_plots(self):
locked: bool = self._refresh_lock.locked()
changed: bool = False
async with self._refresh_lock:
@ -119,7 +109,7 @@ class Harvester:
if changed:
self._state_changed("plots")
def _delete_plot(self, str_path: str):
def delete_plot(self, str_path: str):
path = Path(str_path).resolve()
if path in self.provers:
del self.provers[path]
@ -131,180 +121,17 @@ class Harvester:
self._state_changed("plots")
return True
async def _add_plot_directory(self, str_path: str) -> bool:
add_plot_directory(str_path, self.root_path)
await self._refresh_plots()
async def add_plot_directory(self, str_path: str) -> bool:
add_plot_directory_pt(str_path, self.root_path)
await self.refresh_plots()
return True
async def _get_plot_directories(self) -> List[str]:
return get_plot_directories(self.root_path)
async def get_plot_directories(self) -> List[str]:
return get_plot_directories_pt(self.root_path)
async def _remove_plot_directory(self, str_path: str) -> bool:
remove_plot_directory(str_path, self.root_path)
async def remove_plot_directory(self, str_path: str) -> bool:
remove_plot_directory_pt(str_path, self.root_path)
return True
def set_server(self, server):
self.server = server
@api_request
async def harvester_handshake(self, harvester_handshake: harvester_protocol.HarvesterHandshake):
"""
Handshake between the harvester and farmer. The harvester receives the pool public keys,
as well as the farmer pks, which must be put into the plots, before the plotting process begins.
We cannot use any plots which have different keys in them.
"""
self.farmer_public_keys = harvester_handshake.farmer_public_keys
self.pool_public_keys = harvester_handshake.pool_public_keys
await self._refresh_plots()
if len(self.provers) == 0:
log.warning("Not farming any plots on this harvester. Check your configuration.")
return
self._state_changed("plots")
@api_request
async def new_signage_point(self, new_challenge: harvester_protocol.NewSignagePoint):
"""
The harvester receives a new signage point from the farmer, this happens at the start of each slot.
The harvester does a few things:
1. The harvester applies the plot filter for each of the plots, to select the proportion which are eligible
for this signage point and challenge.
2. The harvester gets the qualities for each plot. This is approximately 7 reads per plot which qualifies.
Note that each plot may have 0, 1, 2, etc qualities for that challenge: but on average it will have 1.
3. Checks the required_iters for each quality and the given signage point, to see which are eligible for
inclusion (required_iters < sp_interval_iters).
4. Looks up the full proof of space in the plot for each quality, approximately 64 reads per quality
5. Returns the proof of space to the farmer
"""
if len(self.pool_public_keys) == 0 or len(self.farmer_public_keys) == 0:
# This means that we have not received the handshake yet
return
start = time.time()
assert len(new_challenge.challenge_hash) == 32
# Refresh plots to see if there are any new ones
await self._refresh_plots()
loop = asyncio.get_running_loop()
def blocking_lookup(filename: Path, plot_info: PlotInfo, prover: DiskProver) -> List[ProofOfSpace]:
# Uses the DiskProver object to lookup qualities. This is a blocking call,
# so it should be run in a thread pool.
try:
sp_challenge_hash = ProofOfSpace.calculate_pos_challenge(
plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
)
quality_strings = prover.get_qualities_for_challenge(sp_challenge_hash)
except Exception:
log.error("Error using prover object. Reinitializing prover object.")
self.provers[filename] = dataclasses.replace(plot_info, prover=DiskProver(str(filename)))
return []
responses: List[ProofOfSpace] = []
if quality_strings is not None:
# Found proofs of space (on average 1 is expected per plot)
for index, quality_str in enumerate(quality_strings):
required_iters: uint64 = calculate_iterations_quality(
quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash
)
sp_interval_iters = calculate_sp_interval_iters(self.constants, new_challenge.sub_slot_iters)
if required_iters < sp_interval_iters:
# Found a very good proof of space! will fetch the whole proof from disk, then send to farmer
try:
proof_xs = prover.get_full_proof(sp_challenge_hash, index)
except RuntimeError:
log.error(f"Exception fetching full proof for {filename}")
continue
plot_public_key = ProofOfSpace.generate_plot_public_key(
plot_info.local_sk.get_g1(), plot_info.farmer_public_key
)
responses.append(
ProofOfSpace(
sp_challenge_hash,
plot_info.pool_public_key,
None,
plot_public_key,
uint8(prover.get_size()),
proof_xs,
)
)
return responses
async def lookup_challenge(filename: Path, prover: DiskProver) -> List[harvester_protocol.NewProofOfSpace]:
# Executes a DiskProverLookup in a thread pool, and returns responses
all_responses: List[harvester_protocol.NewProofOfSpace] = []
proofs_of_space: List[ProofOfSpace] = await loop.run_in_executor(
self.executor, blocking_lookup, filename, prover
)
for proof_of_space in proofs_of_space:
all_responses.append(
harvester_protocol.NewProofOfSpace(
new_challenge.challenge_hash, prover.get_id(), proof_of_space, new_challenge.signage_point_index
)
)
return all_responses
awaitables = []
for try_plot_filename, try_plot_info in self.provers.items():
if try_plot_filename.exists():
# Passes the plot filter (does not check sp filter yet though, since we have not reached sp)
# This is being executed at the beginning of the slot
if ProofOfSpace.passes_plot_filter(
self.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
):
awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover))
# Concurrently executes all lookups on disk, to take advantage of multiple disk parallelism
total_proofs_found = 0
for sublist_awaitable in asyncio.as_completed(awaitables):
for response in await sublist_awaitable:
total_proofs_found += 1
msg = Message("challenge_response", response)
yield msg
log.info(
f"{len(awaitables)} plots were eligible for farming {new_challenge.challenge_hash.hex()[:10]}..."
f" Found {total_proofs_found} proofs. Time: {time.time() - start}. "
f"Total {len(self.provers)} plots"
)
@api_request
async def request_signatures(self, request: harvester_protocol.RequestSignatures):
"""
The farmer requests a signature on the header hash, for one of the proofs that we found.
A signature is created on the header hash using the harvester private key. This can also
be used for pooling.
"""
try:
plot_info = self.provers[Path(request.plot_identifier).resolve()]
except KeyError:
log.warning(f"KeyError plot {request.plot_identifier} does not exist.")
return
local_sk = plot_info.local_sk
agg_pk = ProofOfSpace.generate_plot_public_key(local_sk.get_g1(), plot_info.farmer_public_key)
# This is only a partial signature. When combined with the farmer's half, it will
# form a complete PrependSignature.
message_signatures: List[Tuple[bytes32, G2Element]] = []
for message in request.messages:
signature: G2Element = AugSchemeMPL.sign(local_sk, message, agg_pk)
message_signatures.append((message, signature))
response: harvester_protocol.RespondSignatures = harvester_protocol.RespondSignatures(
request.plot_identifier,
request.sp_hash,
local_sk.get_g1(),
plot_info.farmer_public_key,
message_signatures,
)
yield OutboundMessage(
NodeType.FARMER,
Message("respond_signatures", response),
Delivery.RESPOND,
)

View File

@ -4,7 +4,7 @@ import time
from pathlib import Path
from typing import Optional, Callable, List, Tuple
from blspy import AugSchemeMPL, G2Element
from blspy import AugSchemeMPL, G2Element, G1Element
from chiapos import DiskProver
from src.consensus.pot_iterations import calculate_sp_interval_iters, calculate_iterations_quality
@ -24,6 +24,8 @@ class HarvesterAPI:
def __init__(self, harvester):
self.harvester = harvester
self.farmer_public_keys: List[G1Element] = []
self.pool_public_keys: List[G1Element] = []
def _set_state_changed_callback(self, callback: Callable):
self.harvester.state_changed_callback = callback
@ -38,7 +40,7 @@ class HarvesterAPI:
self.farmer_public_keys = harvester_handshake.farmer_public_keys
self.pool_public_keys = harvester_handshake.pool_public_keys
await self.harvester._refresh_plots()
await self.harvester.refresh_plots()
if len(self.harvester.provers) == 0:
self.harvester.log.warning("Not farming any plots on this harvester. Check your configuration.")
@ -69,7 +71,7 @@ class HarvesterAPI:
assert len(new_challenge.challenge_hash) == 32
# Refresh plots to see if there are any new ones
await self.harvester._refresh_plots()
await self.harvester.refresh_plots()
loop = asyncio.get_running_loop()
@ -93,7 +95,9 @@ class HarvesterAPI:
required_iters: uint64 = calculate_iterations_quality(
quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash
)
sp_interval_iters = calculate_sp_interval_iters(self.harvester.constants, new_challenge.sub_slot_iters)
sp_interval_iters = calculate_sp_interval_iters(
self.harvester.constants, new_challenge.sub_slot_iters
)
if required_iters < sp_interval_iters:
# Found a very good proof of space! will fetch the whole proof from disk, then send to farmer
try:
@ -137,7 +141,10 @@ class HarvesterAPI:
# Passes the plot filter (does not check sp filter yet though, since we have not reached sp)
# This is being executed at the beginning of the slot
if ProofOfSpace.passes_plot_filter(
self.harvester.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
self.harvester.constants,
try_plot_info.prover.get_id(),
new_challenge.challenge_hash,
new_challenge.sp_hash,
):
awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover))

View File

@ -22,14 +22,12 @@ class HarvesterRpcApi:
async def _state_changed(self, change: str) -> List[Dict]:
if change == "plots":
data = await self.get_plots({})
payload = create_payload(
"get_plots", data, self.service_name, "wallet_ui", string=False
)
payload = create_payload("get_plots", data, self.service_name, "wallet_ui", string=False)
return [payload]
return []
async def get_plots(self, request: Dict) -> Dict:
plots, failed_to_open, not_found = self.service._get_plots()
plots, failed_to_open, not_found = self.service.get_plots()
return {
"plots": plots,
"failed_to_open_filenames": failed_to_open,
@ -37,27 +35,27 @@ class HarvesterRpcApi:
}
async def refresh_plots(self, request: Dict) -> Dict:
await self.service._refresh_plots()
await self.service.refresh_plots()
return {}
async def delete_plot(self, request: Dict) -> Dict:
filename = request["filename"]
if self.service._delete_plot(filename):
if self.service.delete_plot(filename):
return {}
raise ValueError(f"Not able to delete file {filename}")
async def add_plot_directory(self, request: Dict) -> Dict:
dirname = request["dirname"]
if await self.service._add_plot_directory(dirname):
if await self.service.add_plot_directory(dirname):
return {}
raise ValueError(f"Did not add plot directory {dirname}")
async def get_plot_directories(self, request: Dict) -> Dict:
plot_dirs = await self.service._get_plot_directories()
plot_dirs = await self.service.get_plot_directories()
return {"directories": plot_dirs}
async def remove_plot_directory(self, request: Dict) -> Dict:
dirname = request["dirname"]
if await self.service._remove_plot_directory(dirname):
if await self.service.remove_plot_directory(dirname):
return {}
raise ValueError(f"Did not remove plot directory {dirname}")

View File

@ -29,21 +29,15 @@ def ssl_context_for_server(
private_cert_path: Path, private_key_path: Path, require_cert: bool = False
) -> Optional[ssl.SSLContext]:
ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(
certfile=str(private_cert_path), keyfile=str(private_key_path)
)
ssl_context.load_cert_chain(certfile=str(private_cert_path), keyfile=str(private_key_path))
ssl_context.load_verify_locations(str(private_cert_path))
ssl_context.verify_mode = ssl.CERT_REQUIRED if require_cert else ssl.CERT_NONE
return ssl_context
def ssl_context_for_client(
private_cert_path: Path, private_key_path: Path, auth: bool
) -> Optional[ssl.SSLContext]:
def ssl_context_for_client(private_cert_path: Path, private_key_path: Path, auth: bool) -> Optional[ssl.SSLContext]:
ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_context.load_cert_chain(
certfile=str(private_cert_path), keyfile=str(private_key_path)
)
ssl_context.load_cert_chain(certfile=str(private_cert_path), keyfile=str(private_key_path))
if auth:
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_verify_locations(str(private_cert_path))
@ -92,9 +86,7 @@ class ChiaServer:
self.root_path = root_path
self.config = config
self.on_connect: Optional[Callable] = None
self.incoming_messages: Queue[
Tuple[Payload, WSChiaConnection]
] = asyncio.Queue()
self.incoming_messages: Queue[Tuple[Payload, WSChiaConnection]] = asyncio.Queue()
self.shut_down_event = asyncio.Event()
if self._local_type is NodeType.INTRODUCER:
@ -123,9 +115,7 @@ class ChiaServer:
self.runner = web.AppRunner(self.app, access_log=None)
await self.runner.setup()
require_cert = self._local_type not in (NodeType.FULL_NODE, NodeType.INTRODUCER)
ssl_context = ssl_context_for_server(
self._private_cert_path, self._private_key_path, require_cert
)
ssl_context = ssl_context_for_server(self._private_cert_path, self._private_key_path, require_cert)
if self._local_type not in [NodeType.WALLET, NodeType.HARVESTER]:
self.site = web.TCPSite(
self.runner,
@ -164,10 +154,7 @@ class ChiaServer:
assert handshake is True
await self.connection_added(connection, self.on_connect)
if (
self._local_type is NodeType.INTRODUCER
and connection.connection_type is NodeType.FULL_NODE
):
if self._local_type is NodeType.INTRODUCER and connection.connection_type is NodeType.FULL_NODE:
self.introducer_peers.add(connection.get_peer_info())
except Exception as e:
error_stack = traceback.format_exc()
@ -196,18 +183,14 @@ class ChiaServer:
Tries to connect to the target node, adding one connection into the pipeline, if successful.
An on connect method can also be specified, and this will be saved into the instance variables.
"""
ssl_context = ssl_context_for_client(
self._private_cert_path, self._private_key_path, auth
)
ssl_context = ssl_context_for_client(self._private_cert_path, self._private_key_path, auth)
session = None
try:
timeout = aiohttp.ClientTimeout(total=10)
session = aiohttp.ClientSession(timeout=timeout)
url = f"wss://{target_node.host}:{target_node.port}/ws"
self.log.info(f"Connecting: {url}")
ws = await session.ws_connect(
url, autoclose=False, autoping=True, ssl=ssl_context
)
ws = await session.ws_connect(url, autoclose=False, autoping=True, ssl=ssl_context)
if ws is not None:
connection = WSChiaConnection(
self._local_type,
@ -232,12 +215,14 @@ class ChiaServer:
await self.connection_added(connection, on_connect)
self.log.info("Connected")
return True
except aiohttp.client_exceptions.ClientConnectorError as e:
self.log.warning(f"{e}")
except Exception as e:
error_stack = traceback.format_exc()
self.log.error(f"Exception: {e}")
if session is not None:
await session.close()
self.log.error(f"Exception Stack: {error_stack}")
if session is not None:
await session.close()
return False
@ -258,40 +243,24 @@ class ChiaServer:
async def api_call(payload: Payload, connection: WSChiaConnection):
try:
full_message = payload.msg
connection.log.info(
f"<- {full_message.function} from peer {connection.peer_node_id}"
)
if len(
full_message.function
) == 0 or full_message.function.startswith("_"):
connection.log.info(f"<- {full_message.function} from peer {connection.peer_node_id}")
if len(full_message.function) == 0 or full_message.function.startswith("_"):
# This prevents remote calling of private methods that start with "_"
self.log.error(
f"Non existing function: {full_message.function}"
)
raise ProtocolError(
Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]
)
self.log.error(f"Non existing function: {full_message.function}")
raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function])
f = getattr(self.api, full_message.function, None)
if f is None:
self.log.error(
f"Non existing function: {full_message.function}"
)
raise ProtocolError(
Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]
)
self.log.error(f"Non existing function: {full_message.function}")
raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function])
if not hasattr(f, "api_function"):
self.log.error("Peer trying to call non api function")
raise ProtocolError(
Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]
)
raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function])
if hasattr(f, "peer_required"):
response: Optional[Message] = await f(
full_message.data, connection
)
response: Optional[Message] = await f(full_message.data, connection)
else:
response = await f(full_message.data)
@ -302,9 +271,7 @@ class ChiaServer:
except Exception as e:
tb = traceback.format_exc()
connection.log.error(
f"Exception: {e}, closing connection {connection}. {tb}"
)
connection.log.error(f"Exception: {e}, closing connection {connection}. {tb}")
await connection.close()
asyncio.create_task(api_call(payload_inc, connection_inc))
@ -328,14 +295,9 @@ class ChiaServer:
for message in messages:
await connection.send_message(message)
async def send_to_all_except(
self, messages: List[Message], node_type: NodeType, exclude: bytes32
):
async def send_to_all_except(self, messages: List[Message], node_type: NodeType, exclude: bytes32):
for _, connection in self.all_connections.items():
if (
connection.connection_type is node_type
and connection.peer_node_id != exclude
):
if connection.connection_type is node_type and connection.peer_node_id != exclude:
for message in messages:
await connection.send_message(message)

View File

@ -2,12 +2,773 @@ import asyncio
import io
import logging
import time
import socket
from typing import Dict, List, Optional, Tuple
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
from chiavdf import create_discriminant
from src.consensus.constants import ConsensusConstants
from src.consensus.pot_iterations import (
calculate_iterations_quality,
calculate_sp_iters,
calculate_ip_iters,
)
from src.protocols import timelord_protocol
from src.server.outbound_message import NodeType, Message
from src.server.server import ChiaServer
from src.types.classgroup import ClassgroupElement
from src.types.end_of_slot_bundle import EndOfSubSlotBundle
from src.types.reward_chain_sub_block import (
RewardChainSubBlock,
RewardChainSubBlockUnfinished,
)
from src.types.sized_bytes import bytes32
from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs
from src.types.vdf import VDFInfo, VDFProof
from src.util.api_decorators import api_request
from src.util.ints import uint64, uint8, uint128, int512
from src.types.sub_epoch_summary import SubEpochSummary
from chiapos import Verifier
from src.types.slots import ChallengeBlockInfo
log = logging.getLogger(__name__)
def iters_from_sub_block(
constants,
reward_chain_sub_block: Union[RewardChainSubBlock, RewardChainSubBlockUnfinished],
sub_slot_iters: uint64,
difficulty: uint64,
) -> Tuple[uint64, uint64]:
v = Verifier()
pos = reward_chain_sub_block.proof_of_space
plot_id: bytes32 = pos.get_plot_id()
quality = v.validate_proof(plot_id, pos.size, pos.challenge_hash, bytes(pos.proof))
if reward_chain_sub_block.challenge_chain_sp_vdf is None:
assert reward_chain_sub_block.signage_point_index == 0
cc_sp: bytes32 = reward_chain_sub_block.proof_of_space.challenge_hash
else:
cc_sp: bytes32 = reward_chain_sub_block.challenge_chain_sp_vdf.get_hash()
required_iters = calculate_iterations_quality(
quality,
reward_chain_sub_block.proof_of_space.size,
difficulty,
cc_sp,
)
return (
calculate_sp_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index),
calculate_ip_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index, required_iters),
)
class EndOfSubSlotData:
eos_bundle: EndOfSubSlotBundle
sub_slot_iters: uint64
new_difficulty: uint64
deficit: uint8
def __init__(self, eos_bundle, sub_slot_iters, new_difficulty, deficit):
self.eos_bundle = eos_bundle
self.sub_slot_iters = sub_slot_iters
self.new_difficulty = new_difficulty
self.deficit = deficit
class Chain(Enum):
CHALLENGE_CHAIN = 1
REWARD_CHAIN = 2
INFUSED_CHALLENGE_CHAIN = 3
class IterationType(Enum):
SIGNAGE_POINT = 1
INFUSION_POINT = 2
END_OF_SUBSLOT = 3
class LastState:
def __init__(self, constants: ConsensusConstants):
self.peak: Optional[timelord_protocol.NewPeak] = None
self.subslot_end: Optional[EndOfSubSlotData] = None
self.last_ip: uint64 = uint64(0)
self.deficit: uint8 = uint8(0)
self.sub_epoch_summary: Optional[SubEpochSummary] = None
self.constants: ConsensusConstants = constants
self.last_weight: uint128 = uint128(0)
self.total_iters: uint128 = uint128(0)
self.last_peak_challenge: Optional[bytes32] = None
def set_state(self, state):
if isinstance(state, timelord_protocol.NewPeak):
self.peak = state
self.subslot_end = None
_, self.last_ip = iters_from_sub_block(
self.constants,
state.reward_chain_sub_block,
state.sub_slot_iters,
state.difficulty,
)
self.deficit = state.deficit
self.sub_epoch_summary = state.sub_epoch_summary
self.last_weight = state.reward_chain_sub_block.weight
self.total_iters = state.reward_chain_sub_block.total_iters
self.last_peak_challenge = state.reward_chain_sub_block.get_hash()
if isinstance(state, EndOfSubSlotData):
self.peak = None
self.subslot_end = state
self.last_ip = 0
self.deficit = state.deficit
def is_empty(self) -> bool:
return self.peak is None and self.subslot_end is None
def get_sub_slot_iters(self) -> uint64:
if self.peak is not None:
return self.peak.sub_slot_iters
return self.subslot_end.sub_slot_iters
def get_weight(self) -> uint128:
return self.last_weight
def get_total_iters(self) -> uint128:
return self.total_iters
def get_last_peak_challenge(self) -> Optional[bytes32]:
return self.last_peak_challenge
def get_difficulty(self) -> uint64:
if self.peak is not None:
return self.peak.difficulty
return self.subslot_end.new_difficulty
def get_last_ip(self) -> uint64:
return self.last_ip
def get_deficit(self) -> uint8:
if self.peak is not None:
return self.peak.deficit
return self.subslot_end.deficit
def get_sub_epoch_summary(self) -> Optional[SubEpochSummary]:
return self.sub_epoch_summary
def get_challenge(self, chain: Chain) -> Optional[bytes32]:
if self.peak is not None:
sub_block = self.peak.reward_chain_sub_block
if chain == Chain.CHALLENGE_CHAIN:
return sub_block.challenge_chain_ip_vdf.challenge_hash
if chain == Chain.REWARD_CHAIN:
return sub_block.get_hash()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if sub_block.infused_challenge_chain_ip_vdf is not None:
return sub_block.infused_challenge_chain_ip_vdf.challenge_hash
if self.peak.deficit == 4:
return ChallengeBlockInfo(
sub_block.proof_of_space,
sub_block.challenge_chain_sp_vdf,
sub_block.challenge_chain_sp_signature,
sub_block.challenge_chain_ip_vdf,
).get_hash()
return None
if self.subslot_end is not None:
if chain == Chain.CHALLENGE_CHAIN:
return self.subslot_end.eos_bundle.challenge_chain.get_hash()
if chain == Chain.REWARD_CHAIN:
return self.subslot_end.eos_bundle.reward_chain.get_hash()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
return self.subslot_end.eos_bundle.infused_challenge_chain.get_hash()
else:
return None
return None
def get_initial_form(self, chain: Chain) -> Optional[ClassgroupElement]:
if self.peak is not None:
sub_block = self.peak.reward_chain_sub_block
if chain == Chain.CHALLENGE_CHAIN:
return sub_block.challenge_chain_ip_vdf.output
if chain == Chain.REWARD_CHAIN:
return ClassgroupElement.get_default_element()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if sub_block.infused_challenge_chain_ip_vdf is not None:
return sub_block.infused_challenge_chain_ip_vdf.output
elif self.peak.deficit == 4:
return ClassgroupElement.get_default_element()
else:
return None
if self.subslot_end is not None:
if chain == Chain.CHALLENGE_CHAIN or chain == Chain.REWARD_CHAIN:
return ClassgroupElement.get_default_element()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
return ClassgroupElement.get_default_element()
else:
return None
return None
class Timelord:
def __init__(self, config: Dict, discriminant_size_bits: int):
def __init__(self, config: Dict, constants: ConsensusConstants):
self.config = config
self.constants = constants
self._shut_down = False
self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = []
self.lock: asyncio.Lock = asyncio.Lock()
self.potential_free_clients: List = []
self.ip_whitelist = self.config["vdf_clients"]["ip"]
self.server: Optional[ChiaServer] = None
self.chain_type_to_stream: Dict[Chain, Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = {}
self.chain_start_time: Dict = {}
# Chains that currently don't have a vdf_client.
self.unspawned_chains: List[Chain] = [
Chain.CHALLENGE_CHAIN,
Chain.REWARD_CHAIN,
Chain.INFUSED_CHALLENGE_CHAIN,
]
# Chains that currently accept iterations.
self.allows_iters: List[Chain] = []
# Last peak received, None if it's already processed.
self.new_peak: Optional[timelord_protocol.NewPeak] = None
# Last end of subslot bundle, None if we built a peak on top of it.
self.new_subslot_end: Optional[EndOfSubSlotData] = None
# Last state received. Can either be a new peak or a new EndOfSubslotBundle.
self.last_state: LastState = LastState(self.constants)
# Unfinished block info, iters adjusted to the last peak.
self.unfinished_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
# Signage points iters, adjusted to the last peak.
self.signage_point_iters: List[uint64] = []
# For each chain, send those info when the process spawns.
self.iters_to_submit: Dict[Chain, List[uint64]] = {}
self.iters_submitted: Dict[Chain, List[uint64]] = {}
# For each iteration submitted, know if it's a signage point, an infusion point or an end of slot.
self.iteration_to_proof_type: Dict[uint64, IterationType] = {}
# List of proofs finished.
self.proofs_finished: List[Tuple[Chain, VDFInfo, VDFProof]] = []
# Data to send at vdf_client initialization.
self.finished_sp = 0
self.overflow_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
self.main_loop = None
self.vdf_server = None
self._shut_down = False
async def _start(self):
log.info("Starting timelord.")
self.main_loop = asyncio.create_task(self._manage_chains())
self.vdf_server = await asyncio.start_server(
self._handle_client,
self.config["vdf_server"]["host"],
self.config["vdf_server"]["port"],
)
log.info("Started timelord.")
def _close(self):
self._shut_down = True
async def _await_closed(self):
pass
def set_server(self, server: ChiaServer):
self.server = server
@api_request
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
async with self.lock:
if self.last_state is None or self.last_state.get_weight() < new_peak.weight:
self.new_peak = new_peak
@api_request
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
async with self.lock:
if not self._accept_unfinished_block(new_unfinished_subblock):
return
sp_iters, ip_iters = iters_from_sub_block(
self.constants,
new_unfinished_subblock.reward_chain_sub_block,
self.last_state.get_sub_slot_iters(),
self.last_state.get_difficulty(),
)
last_ip_iters = self.last_state.get_last_ip()
if sp_iters < ip_iters:
self.overflow_blocks.append(new_unfinished_subblock)
elif ip_iters > last_ip_iters:
self.unfinished_blocks.append(new_unfinished_subblock)
for chain in Chain:
self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
self.iteration_to_proof_type[ip_iters - last_ip_iters] = IterationType.INFUSION_POINT
def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool:
# Total unfinished block iters needs to exceed peak's iters.
if self.last_state.get_total_iters() >= block.total_iters:
return False
# The peak hash of the rc-sub-block must match
# the signage point rc VDF challenge hash of the unfinished sub-block.
if (
block.reward_chain_sp_vdf is not None
and self.last_state.get_last_peak_challenge() is not None
and self.last_state.get_last_peak_challenge() != block.reward_chain_sp_vdf.challenge_hash
):
return False
return True
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
async with self.lock:
client_ip = writer.get_extra_info("peername")[0]
log.info(f"New timelord connection from client: {client_ip}.")
if client_ip in self.ip_whitelist:
self.free_clients.append((client_ip, reader, writer))
log.info(f"Added new VDF client {client_ip}.")
for ip, end_time in list(self.potential_free_clients):
if ip == client_ip:
self.potential_free_clients.remove((ip, end_time))
break
async def _stop_chain(self, chain: Chain):
stop_ip, _, stop_writer = self.chain_type_to_stream[chain]
self.potential_free_clients.append((stop_ip, time.time()))
stop_writer.write(b"010")
await stop_writer.drain()
if chain in self.allows_iters:
self.allows_iters.remove(chain)
self.unspawned_chains.append(chain)
async def _reset_chains(self):
# First, stop all chains.
ip_iters = self.last_state.get_last_ip()
sub_slot_iters = self.last_state.get_sub_slot_iters()
difficulty = self.last_state.get_difficulty()
for chain in self.chain_type_to_stream.keys():
await self._stop_chain(chain)
# Adjust all signage points iterations to the peak.
iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT)
self.signage_point_iters = [
k * iters_per_signage - ip_iters
for k in range(1, self.constants.NUM_SPS_SUB_SLOT + 1)
if k * iters_per_signage - ip_iters > 0 and k * iters_per_signage < sub_slot_iters
]
# Adjust all unfinished blocks iterations to the peak.
new_unfinished_blocks = []
self.proofs_finished = []
for chain in Chain:
self.iters_to_submit[chain] = []
self.iters_submitted[chain] = []
self.iteration_to_proof_type = {}
for block in self.unfinished_blocks:
if not self._accept_unfinished_block(block):
continue
block_sp_iters, block_ip_iters = iters_from_sub_block(
self.constants,
block,
sub_slot_iters,
difficulty,
)
new_block_iters = block_ip_iters - ip_iters
if new_block_iters > 0:
new_unfinished_blocks.append(block)
for chain in Chain:
self.iters_to_submit[chain].append(new_block_iters)
self.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT
# Remove all unfinished blocks that have already passed.
self.unfinished_blocks = new_unfinished_blocks
# Signage points.
if len(self.signage_point_iters) > 0:
count_signage = 0
for signage in self.signage_point_iters:
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
self.iters_to_submit[chain].append(signage)
self.iteration_to_proof_type[signage] = IterationType.SIGNAGE_POINT
count_signage += 1
if count_signage == 3:
break
# TODO: handle the special case when infusion point is the end of subslot.
left_subslot_iters = sub_slot_iters - ip_iters
log.info(f"Left subslot iters: {left_subslot_iters}.")
for chain in Chain:
self.iters_to_submit[chain].append(left_subslot_iters)
self.iteration_to_proof_type[left_subslot_iters] = IterationType.END_OF_SUBSLOT
async def _handle_new_peak(self):
self.last_state.set_state(self.new_peak)
self.new_peak = None
await self._reset_chains()
async def _handle_subslot_end(self):
self.finished_sp = 0
self.last_state.set_state(self.new_subslot_end)
self.new_subslot_end = None
await self._reset_chains()
async def _map_chains_with_vdf_clients(self):
while not self._shut_down:
picked_chain = None
async with self.lock:
if len(self.free_clients) == 0:
break
ip, reader, writer = self.free_clients[0]
for chain_type in self.unspawned_chains:
challenge_hash = self.last_state.get_challenge(chain_type)
initial_form = self.last_state.get_initial_form(chain_type)
if challenge_hash is not None and initial_form is not None:
picked_chain = chain_type
break
if picked_chain is None:
break
picked_chain = self.unspawned_chains[0]
self.chain_type_to_stream[picked_chain] = (ip, reader, writer)
self.free_clients = self.free_clients[1:]
self.unspawned_chains = self.unspawned_chains[1:]
self.chain_start_time[picked_chain] = time.time()
log.info(f"Mapping free vdf_client with chain: {picked_chain}.")
asyncio.create_task(
self._do_process_communication(picked_chain, challenge_hash, initial_form, ip, reader, writer)
)
async def _submit_iterations(self):
for chain in Chain:
if chain in self.allows_iters:
_, _, writer = self.chain_type_to_stream[chain]
for iteration in self.iters_to_submit[chain]:
if iteration in self.iters_submitted[chain]:
continue
prefix = str(len(str(iteration)))
if len(str(iteration)) < 10:
prefix = "0" + prefix
iter_str = prefix + str(iteration)
writer.write(iter_str.encode())
await writer.drain()
self.iters_submitted[chain].append(iteration)
def _clear_proof_list(self, iters: uint64):
return [
(chain, info, proof) for chain, info, proof in self.proofs_finished if info.number_of_iterations != iters
]
async def _check_for_new_sp(self):
signage_iters = [
iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.SIGNAGE_POINT
]
if len(signage_iters) == 0:
return
for signage_iter in signage_iters:
proofs_with_iter = [
(chain, info, proof)
for chain, info, proof in self.proofs_finished
if info.number_of_iterations == signage_iter
]
# Wait for both cc and rc to have the signage point.
if len(proofs_with_iter) == 2:
cc_info: Optional[VDFInfo] = None
cc_proof: Optional[VDFProof] = None
rc_info: Optional[VDFInfo] = None
rc_proof: Optional[VDFProof] = None
for chain, info, proof in proofs_with_iter:
if chain == Chain.CHALLENGE_CHAIN:
cc_info = info
cc_proof = proof
if chain == Chain.REWARD_CHAIN:
rc_info = info
rc_proof = proof
if cc_info is None or cc_proof is None or rc_info is None or rc_proof is None:
log.error(f"Insufficient signage point data {signage_iter}")
continue
response = timelord_protocol.NewSignagePointVDF(
uint8(self.finished_sp),
cc_info,
cc_proof,
rc_info,
rc_proof,
)
if self.server is not None:
msg = Message("new_signage_point_vdf", response)
await self.server.send_to_all([msg], NodeType.FULL_NODE)
# Cleanup the signage point from memory.
self.signage_point_iters.remove(signage_iter)
self.finished_sp += 1
self.proofs_finished = self._clear_proof_list(signage_iter)
# Send the next 3 signage point to the chains.
next_iters_count = 0
for next_sp in self.signage_point_iters:
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
if next_sp not in self.iters_submitted[chain] and next_sp not in self.iters_to_submit[chain]:
self.iters_to_submit[chain].append(next_sp)
self.iteration_to_proof_type[next_sp] = IterationType.SIGNAGE_POINT
next_iters_count += 1
if next_iters_count == 3:
break
async def _check_for_new_ip(self):
infusion_iters = [
iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.INFUSION_POINT
]
for iteration in infusion_iters:
proofs_with_iter = [
(chain, info, proof)
for chain, info, proof in self.proofs_finished
if info.number_of_iterations == iteration
]
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
chain_count = 3
else:
chain_count = 2
if len(proofs_with_iter) == chain_count:
block = None
for unfinished_block in self.unfinished_blocks:
_, ip_iters = iters_from_sub_block(
self.constants,
unfinished_block.reward_chain_sub_block,
self.last_state.get_sub_slot_iters(),
self.last_state.get_difficulty(),
)
if ip_iters - self.last_state.get_last_ip() == iteration:
block = unfinished_block
break
if block is not None:
self.unfinished_blocks.remove(block)
challenge_hash = block.reward_chain_sub_block.get_hash()
icc_info: Optional[VDFInfo] = None
icc_proof: Optional[VDFProof] = None
cc_info: Optional[VDFInfo] = None
cc_proof: Optional[VDFProof] = None
rc_info: Optional[VDFInfo] = None
rc_proof: Optional[VDFProof] = None
for chain, info, proof in proofs_with_iter:
if chain == Chain.CHALLENGE_CHAIN:
cc_info = info
cc_proof = proof
if chain == Chain.REWARD_CHAIN:
rc_info = info
rc_proof = proof
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
icc_info = info
icc_proof = proof
if cc_info is None or cc_proof is None or rc_info is None or rc_proof is None:
log.error(
f"Insufficient VDF proofs for infusion point ch: {challenge_hash} iterations:{iteration}"
)
response = timelord_protocol.NewInfusionPointVDF(
challenge_hash,
cc_info,
cc_proof,
rc_info,
rc_proof,
icc_info,
icc_proof,
)
msg = Message("new_infusion_point_vdf", response)
await self.server.send_to_all([msg], NodeType.FULL_NODE)
for iteration in infusion_iters:
self.proofs_finished = self._clear_proof_list(iteration)
async def _check_for_end_of_subslot(self):
left_subslot_iters = [
iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.END_OF_SUBSLOT
]
if len(left_subslot_iters) == 0:
return
chains_finished = [
(chain, info, proof)
for chain, info, proof in self.proofs_finished
if info.number_of_iterations == left_subslot_iters[0]
]
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
chain_count = 3
else:
chain_count = 2
if len(chains_finished) == chain_count:
icc_ip_vdf: Optional[VDFInfo] = None
icc_ip_proof: Optional[VDFProof] = None
cc_vdf: Optional[VDFInfo] = None
cc_proof: Optional[VDFProof] = None
rc_vdf: Optional[VDFInfo] = None
rc_proof: Optional[VDFProof] = None
for chain, info, proof in chains_finished:
if chain == Chain.CHALLENGE_CHAIN:
cc_vdf = info
cc_proof = proof
if chain == Chain.REWARD_CHAIN:
rc_vdf = info
rc_proof = proof
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
icc_ip_vdf = info
icc_ip_proof = proof
assert cc_proof is not None and rc_proof is not None and cc_vdf is not None and rc_vdf is not None
log.info("Collected end of subslot vdfs.")
icc_sub_slot: Optional[InfusedChallengeChainSubSlot] = (
None if icc_ip_vdf is None else InfusedChallengeChainSubSlot(icc_ip_vdf)
)
icc_sub_slot_hash = icc_sub_slot.get_hash() if self.last_state.get_deficit() == 0 else None
if self.last_state.get_sub_epoch_summary() is not None:
ses_hash = self.last_state.get_sub_epoch_summary().get_hash()
new_sub_slot_iters = self.last_state.get_sub_epoch_summary().new_sub_slot_iters
new_difficulty = self.last_state.get_sub_epoch_summary().new_difficulty
else:
ses_hash = None
new_sub_slot_iters = self.last_state.get_sub_slot_iters()
new_difficulty = self.last_state.get_difficulty()
cc_sub_slot = ChallengeChainSubSlot(cc_vdf, icc_sub_slot_hash, ses_hash, new_sub_slot_iters, new_difficulty)
eos_deficit: uint8 = (
self.last_state.get_deficit()
if self.last_state.get_deficit() > 0
else self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK
)
rc_sub_slot = RewardChainSubSlot(
rc_vdf,
cc_sub_slot.get_hash(),
icc_sub_slot.get_hash() if icc_sub_slot is not None else None,
eos_deficit,
)
eos_bundle = EndOfSubSlotBundle(
cc_sub_slot,
icc_sub_slot,
rc_sub_slot,
SubSlotProofs(cc_proof, icc_ip_proof, rc_proof),
)
if self.server is not None:
msg = Message("end_of_sub_slot_bundle", timelord_protocol.NewEndOfSubSlotVDF(eos_bundle))
await self.server.send_to_all([msg], NodeType.FULL_NODE)
log.info("Built end of subslot bundle.")
self.unfinished_blocks = self.overflow_blocks
self.overflow_blocks = []
self.new_subslot_end = EndOfSubSlotData(
eos_bundle,
new_sub_slot_iters,
new_difficulty,
eos_deficit,
)
async def _manage_chains(self):
while not self._shut_down:
await asyncio.sleep(0.1)
# Didn't get any useful data, continue.
async with self.lock:
if self.last_state.is_empty() and self.new_peak is None:
continue
# Map free vdf_clients to unspawned chains.
await self._map_chains_with_vdf_clients()
async with self.lock:
# We've got a new peak, process it.
if self.new_peak is not None:
await self._handle_new_peak()
# A subslot ended, process it.
if self.new_subslot_end is not None:
await self._handle_subslot_end()
# Submit pending iterations.
await self._submit_iterations()
# Check for new signage point and broadcast it if present.
await self._check_for_new_sp()
# Check for new infusion point and broadcast it if present.
await self._check_for_new_ip()
# Check for end of subslot, respawn chains and build EndOfSubslotBundle.
await self._check_for_end_of_subslot()
async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer):
disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS)
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
# the timelord tells the vdf_client what to execute.
if self.config["fast_algorithm"]:
# Run n-wesolowski (fast) algorithm.
writer.write(b"N")
else:
# Run two-wesolowski (slow) algorithm.
writer.write(b"N")
await writer.drain()
prefix = str(len(str(disc)))
if len(prefix) == 1:
prefix = "00" + prefix
if len(prefix) == 2:
prefix = "0" + prefix
writer.write((prefix + str(disc)).encode())
await writer.drain()
# Send (a, b) from 'initial_form'.
for num in [initial_form.a, initial_form.b]:
prefix = len(str(num))
prefix_len = len(str(prefix))
writer.write((str(prefix_len) + str(prefix) + str(num)).encode())
await writer.drain()
try:
ok = await reader.readexactly(2)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
log.warning(f"{type(e)} {e}")
return
if ok.decode() != "OK":
return
log.info("Got handshake with VDF client.")
async with self.lock:
self.allows_iters.append(chain)
# Listen to the client until "STOP" is received.
while True:
try:
data = await reader.readexactly(4)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
log.warning(f"{type(e)} {e}")
break
msg = ""
try:
msg = data.decode()
except Exception as e:
# log.error(f"Exception while decoding data {e}")
pass
if msg == "STOP":
log.info(f"Stopped client running on ip {ip}.")
async with self.lock:
writer.write(b"ACK")
await writer.drain()
break
else:
try:
# This must be a proof, 4 bytes is length prefix
length = int.from_bytes(data, "big")
proof = await reader.readexactly(length)
stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode()))
except (
asyncio.IncompleteReadError,
ConnectionResetError,
Exception,
) as e:
log.warning(f"{type(e)} {e}")
break
iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True))
y_size_bytes = stdout_bytes_io.read(8)
y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True))
y_bytes = stdout_bytes_io.read(y_size)
witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True))
proof_bytes: bytes = stdout_bytes_io.read()
# Verifies our own proof just in case
a = int.from_bytes(y_bytes[:129], "big", signed=True)
b = int.from_bytes(y_bytes[129:], "big", signed=True)
output = ClassgroupElement(int512(a), int512(b))
time_taken = time.time() - self.chain_start_time[chain]
ips = int(iterations_needed / time_taken * 10) / 10
log.info(
f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}"
f" iters."
f"Estimated IPS: {ips}. Chain: {chain}"
)
vdf_info: VDFInfo = VDFInfo(
challenge_hash,
iterations_needed,
output,
)
vdf_proof: VDFProof = VDFProof(
witness_type,
proof_bytes,
)
if not vdf_proof.is_valid(self.constants, vdf_info):
log.error("Invalid proof of time!")
# continue
async with self.lock:
self.proofs_finished.append((chain, vdf_info, vdf_proof))

View File

@ -1,775 +0,0 @@
import asyncio
import io
import logging
import time
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
from chiavdf import create_discriminant
from src.consensus.constants import ConsensusConstants
from src.consensus.default_constants import DEFAULT_CONSTANTS
from src.consensus.pot_iterations import (
calculate_iterations_quality,
calculate_sp_iters,
calculate_ip_iters,
)
from blspy import G2Element
from src.protocols import timelord_protocol
from src.server.outbound_message import OutboundMessage, NodeType, Message, Delivery
from src.server.server import ChiaServer
from src.types.classgroup import ClassgroupElement
from src.types.end_of_slot_bundle import EndOfSubSlotBundle
from src.types.reward_chain_sub_block import (
RewardChainSubBlock,
RewardChainSubBlockUnfinished,
)
from src.types.sized_bytes import bytes32
from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs
from src.types.vdf import VDFInfo, VDFProof
from src.util.api_decorators import api_request
from src.util.ints import uint64, uint8, uint128, int512
from src.types.sub_epoch_summary import SubEpochSummary
from src.util.block_tools import BlockTools
from chiapos import Verifier
from src.types.slots import ChallengeBlockInfo
log = logging.getLogger(__name__)
def iters_from_sub_block(
constants,
reward_chain_sub_block: Union[RewardChainSubBlock, RewardChainSubBlockUnfinished],
sub_slot_iters: uint64,
difficulty: uint64,
) -> Tuple[uint64, uint64]:
v = Verifier()
pos = reward_chain_sub_block.proof_of_space
plot_id: bytes32 = pos.get_plot_id()
quality = v.validate_proof(plot_id, pos.size, pos.challenge_hash, bytes(pos.proof))
if reward_chain_sub_block.challenge_chain_sp_vdf is None:
assert reward_chain_sub_block.signage_point_index == 0
cc_sp: bytes32 = reward_chain_sub_block.proof_of_space.challenge_hash
else:
cc_sp: bytes32 = reward_chain_sub_block.challenge_chain_sp_vdf.get_hash()
required_iters = calculate_iterations_quality(
quality,
reward_chain_sub_block.proof_of_space.size,
difficulty,
cc_sp,
)
return (
calculate_sp_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index),
calculate_ip_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index, required_iters),
)
class EndOfSubSlotData:
eos_bundle: EndOfSubSlotBundle
sub_slot_iters: uint64
new_difficulty: uint64
deficit: uint8
def __init__(self, eos_bundle, sub_slot_iters, new_difficulty, deficit):
self.eos_bundle = eos_bundle
self.sub_slot_iters = sub_slot_iters
self.new_difficulty = new_difficulty
self.deficit = deficit
class Chain(Enum):
CHALLENGE_CHAIN = 1
REWARD_CHAIN = 2
INFUSED_CHALLENGE_CHAIN = 3
class IterationType(Enum):
SIGNAGE_POINT = 1
INFUSION_POINT = 2
END_OF_SUBSLOT = 3
class LastState:
def __init__(self, constants):
self.peak: Optional[timelord_protocol.NewPeak] = None
self.subslot_end: Optional[EndOfSubSlotData] = None
self.last_ip: uint64 = 0
self.deficit: uint8 = 0
self.sub_epoch_summary: Optional[SubEpochSummary] = None
self.constants = constants
self.last_weight = 0
self.total_iters = 0
self.last_peak_challenge: Optional[bytes32] = None
def set_state(self, state):
if isinstance(state, timelord_protocol.NewPeak):
self.peak = state
self.subslot_end = None
_, self.last_ip = iters_from_sub_block(
self.constants,
state.reward_chain_sub_block,
state.sub_slot_iters,
state.difficulty,
)
self.deficit = state.deficit
self.sub_epoch_summary = state.sub_epoch_summary
self.last_weight = state.reward_chain_sub_block.weight
self.total_iters = state.reward_chain_sub_block.total_iters
self.last_peak_challenge = state.reward_chain_sub_block.get_hash()
if isinstance(state, EndOfSubSlotData):
self.peak = None
self.subslot_end = state
self.last_ip = 0
self.deficit = state.deficit
def is_empty(self) -> bool:
return self.peak is None and self.subslot_end is None
def get_sub_slot_iters(self) -> uint64:
if self.peak is not None:
return self.peak.sub_slot_iters
return self.subslot_end.sub_slot_iters
def get_weight(self) -> uint64:
return self.last_weight
def get_total_iters(self) -> uint128:
return self.total_iters
def get_last_peak_challenge(self) -> Optional[bytes32]:
return self.last_peak_challenge
def get_difficulty(self) -> uint64:
if self.peak is not None:
return self.peak.difficulty
return self.subslot_end.new_difficulty
def get_last_ip(self) -> uint64:
return self.last_ip
def get_deficit(self) -> uint8:
if self.peak is not None:
return self.peak.deficit
return self.subslot_end.deficit
def get_sub_epoch_summary(self) -> Optional[SubEpochSummary]:
return self.sub_epoch_summary
def get_challenge(self, chain: Chain) -> Optional[bytes32]:
if self.peak is not None:
sub_block = self.peak.reward_chain_sub_block
if chain == Chain.CHALLENGE_CHAIN:
return sub_block.challenge_chain_ip_vdf.challenge_hash
if chain == Chain.REWARD_CHAIN:
return sub_block.get_hash()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if sub_block.infused_challenge_chain_ip_vdf is not None:
return sub_block.infused_challenge_chain_ip_vdf.challenge_hash
if self.peak.deficit == 4:
return ChallengeBlockInfo(
sub_block.proof_of_space,
sub_block.challenge_chain_sp_vdf,
sub_block.challenge_chain_sp_signature,
sub_block.challenge_chain_ip_vdf,
).get_hash()
return None
if self.subslot_end is not None:
if chain == Chain.CHALLENGE_CHAIN:
return self.subslot_end.eos_bundle.challenge_chain.get_hash()
if chain == Chain.REWARD_CHAIN:
return self.subslot_end.eos_bundle.reward_chain.get_hash()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
return self.subslot_end.eos_bundle.infused_challenge_chain.get_hash()
else:
return None
return None
def get_initial_form(self, chain: Chain) -> Optional[ClassgroupElement]:
if self.peak is not None:
sub_block = self.peak.reward_chain_sub_block
if chain == Chain.CHALLENGE_CHAIN:
return sub_block.challenge_chain_ip_vdf.output
if chain == Chain.REWARD_CHAIN:
return ClassgroupElement.get_default_element()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if sub_block.infused_challenge_chain_ip_vdf is not None:
return sub_block.infused_challenge_chain_ip_vdf.output
elif self.peak.deficit == 4:
return ClassgroupElement.get_default_element()
else:
return None
if self.subslot_end is not None:
if chain == Chain.CHALLENGE_CHAIN or chain == Chain.REWARD_CHAIN:
return ClassgroupElement.get_default_element()
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
return ClassgroupElement.get_default_element()
else:
return None
return None
class Timelord:
def __init__(self, config: Dict, constants: ConsensusConstants):
self.config = config
self.constants = constants
self._is_stopped = False
self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = []
self.lock: asyncio.Lock = asyncio.Lock()
self.potential_free_clients: List = []
self.ip_whitelist = self.config["vdf_clients"]["ip"]
self.server: Optional[ChiaServer] = None
self.chain_type_to_stream: Dict[Chain, Tuple[ip, asyncio.StreamReader, asyncio.StreamWriter]] = {}
self.chain_start_time: Dict = {}
# Chains that currently don't have a vdf_client.
self.unspawned_chains: List[Chain] = [
Chain.CHALLENGE_CHAIN,
Chain.REWARD_CHAIN,
Chain.INFUSED_CHALLENGE_CHAIN,
]
# Chains that currently accept iterations.
self.allows_iters: List[Chain] = []
# Last peak received, None if it's already processed.
self.new_peak: Optional[timelord_protocol.NewPeak] = None
# Last end of subslot bundle, None if we built a peak on top of it.
self.new_subslot_end: Optional[EndOfSubSlotData] = None
# Last state received. Can either be a new peak or a new EndOfSubslotBundle.
self.last_state: LastState = LastState(self.constants)
# Unfinished block info, iters adjusted to the last peak.
self.unfinished_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
# Signage points iters, adjusted to the last peak.
self.signage_point_iters: List[uint64] = []
# For each chain, send those info when the process spawns.
self.iters_to_submit: Dict[str, List[uint64]] = {}
self.iters_submitted: Dict[str, List[uint64]] = {}
# For each iteration submitted, know if it's a signage point, an infusion point or an end of slot.
self.iteration_to_proof_type: Dict[uint64, IterationType] = {}
# List of proofs finished.
self.proofs_finished: List[Tuple[Chain, VDFInfo, VDFProof]] = []
# Data to send at vdf_client initialization.
self.finished_sp = 0
self.overflow_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
async def start(self):
log.info("Starting timelord.")
self.main_loop = asyncio.create_task(self._manage_chains())
self.vdf_server = await asyncio.start_server(
self._handle_client,
self.config["vdf_server"]["host"],
self.config["vdf_server"]["port"],
)
log.info("Started timelord.")
def set_server(self, server: ChiaServer):
self.server = server
@api_request
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
async with self.lock:
if (
self.last_state is None
or self.last_state.get_weight() < new_peak.weight
):
self.new_peak = new_peak
@api_request
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
async with self.lock:
if not self._accept_unfinished_block(new_unfinished_subblock):
return
sp_iters, ip_iters = iters_from_sub_block(
new_unfinished_subblock.reward_chain_sub_block,
self.last_state.get_sub_slot_iters(),
self.last_state.get_difficulty(),
)
last_ip_iters = self.last_state.get_last_ip()
if sp_iters < ip_iters:
self.overflow_blocks.append(new_unfinished_subblock)
elif ip_iters > last_ip_iters:
self.unfinished_blocks.append(new_unfinished_subblock)
for chain in Chain:
self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
self.iteration_to_proof_type[ip_iters - self.last_ip_iters] = IterationType.INFUSION_POINT
def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool:
# Total unfinished block iters needs to exceed peak's iters.
if self.last_state.get_total_iters() >= block.total_iters:
return False
# The peak hash of the rc-sub-block must match
# the signage point rc VDF challenge hash of the unfinished sub-block.
if (
block.reward_chain_sp_vdf is not None
and self.last_state.get_last_peak_challenge() is not None
and self.last_state.get_last_peak_challenge()
!= block.reward_chain_sp_vdf.challenge_hash
):
return False
return True
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
async with self.lock:
client_ip = writer.get_extra_info("peername")[0]
log.info(f"New timelord connection from client: {client_ip}.")
if client_ip in self.ip_whitelist:
self.free_clients.append((client_ip, reader, writer))
log.info(f"Added new VDF client {client_ip}.")
for ip, end_time in list(self.potential_free_clients):
if ip == client_ip:
self.potential_free_clients.remove((ip, end_time))
break
async def _stop_chain(self, chain: Chain):
stop_ip, _, stop_writer = self.chain_type_to_stream[chain]
self.potential_free_clients.append((stop_ip, time.time()))
stop_writer.write(b"010")
await stop_writer.drain()
if chain in self.allows_iters:
self.allows_iters.remove(chain)
self.unspawned_chains.append(chain)
async def _reset_chains(self):
# First, stop all chains.
ip_iters = self.last_state.get_last_ip()
sub_slot_iters = self.last_state.get_sub_slot_iters()
difficulty = self.last_state.get_difficulty()
for chain in self.chain_type_to_stream.keys():
await self._stop_chain(chain)
# Adjust all signage points iterations to the peak.
iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT)
self.signage_point_iters = [
k * iters_per_signage - ip_iters
for k in range(1, self.constants.NUM_SPS_SUB_SLOT + 1)
if k * iters_per_signage - ip_iters > 0 and k * iters_per_signage < sub_slot_iters
]
# Adjust all unfinished blocks iterations to the peak.
new_unfinished_blocks = []
self.proofs_finished = []
for chain in Chain:
self.iters_to_submit[chain] = []
self.iters_submitted[chain] = []
self.iteration_to_proof_type = {}
for block in self.unfinished_blocks:
if not self._accept_unfinished_block(block):
continue
block_sp_iters, block_ip_iters = iters_from_sub_block(
self.constants,
blocks,
sub_slot_iters,
difficulty,
)
new_block_iters = block_ip_iters - ip_iters
if new_block_iters > 0:
new_unfinished_blocks.append(block)
for chain in Chain:
self.iters_to_submit[chain].append(new_block_iters)
self.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT
# Remove all unfinished blocks that have already passed.
self.unfinished_blocks = new_unfinished_blocks
# Signage points.
if len(self.signage_point_iters) > 0:
count_signage = 0
for signage in self.signage_point_iters:
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
self.iters_to_submit[chain].append(signage)
self.iteration_to_proof_type[signage] = IterationType.SIGNAGE_POINT
count_signage += 1
if count_signage == 3:
break
# TODO: handle the special case when infusion point is the end of subslot.
left_subslot_iters = sub_slot_iters - ip_iters
log.info(f"Left subslot iters: {left_subslot_iters}.")
for chain in Chain:
self.iters_to_submit[chain].append(left_subslot_iters)
self.iteration_to_proof_type[left_subslot_iters] = IterationType.END_OF_SUBSLOT
async def _handle_new_peak(self):
self.last_state.set_state(self.new_peak)
self.new_peak = None
await self._reset_chains()
async def _handle_subslot_end(self):
self.finished_sp = 0
self.last_state.set_state(self.new_subslot_end)
self.new_subslot_end = None
await self._reset_chains()
async def _map_chains_with_vdf_clients(self):
while not self._is_stopped:
picked_chain = None
async with self.lock:
if len(self.free_clients) == 0:
break
ip, reader, writer = self.free_clients[0]
for chain_type in self.unspawned_chains:
challenge_hash = self.last_state.get_challenge(chain_type)
initial_form = self.last_state.get_initial_form(chain_type)
if challenge_hash is not None and initial_form is not None:
picked_chain = chain_type
break
if picked_chain is None:
break
picked_chain = self.unspawned_chains[0]
self.chain_type_to_stream[picked_chain] = (ip, reader, writer)
self.free_clients = self.free_clients[1:]
self.unspawned_chains = self.unspawned_chains[1:]
self.chain_start_time[picked_chain] = time.time()
log.info(f"Mapping free vdf_client with chain: {picked_chain}.")
asyncio.create_task(
self._do_process_communication(picked_chain, challenge_hash, initial_form, ip, reader, writer)
)
async def _submit_iterations(self):
for chain in Chain:
if chain in self.allows_iters:
_, _, writer = self.chain_type_to_stream[chain]
for iteration in self.iters_to_submit[chain]:
if iteration in self.iters_submitted[chain]:
continue
prefix = str(len(str(iteration)))
if len(str(iteration)) < 10:
prefix = "0" + prefix
iter_str = prefix + str(iteration)
writer.write(iter_str.encode())
await writer.drain()
self.iters_submitted[chain].append(iteration)
def _clear_proof_list(self, iter):
return [
(chain, info, proof) for chain, info, proof in self.proofs_finished if info.number_of_iterations != iter
]
async def _check_for_new_sp(self):
signage_iters = [
iteration for iteration, t in self.iteration_to_proof_type.items()
if t == IterationType.SIGNAGE_POINT
]
if len(signage_iters) == 0:
return
for signage_iter in signage_iters:
proofs_with_iter = [
(chain, info, proof)
for chain, info, proof in self.proofs_finished
if info.number_of_iterations == signage_iter
]
# Wait for both cc and rc to have the signage point.
if len(proofs_with_iter) == 2:
for chain, info, proof in proofs_with_iter:
if chain == Chain.CHALLENGE_CHAIN:
cc_info = info
cc_proof = proof
if chain == Chain.REWARD_CHAIN:
rc_info = info
rc_proof = proof
response = timelord_protocol.NewSignagePointVDF(
self.finished_sp,
cc_info,
cc_proof,
rc_info,
rc_proof,
)
if self.server is not None:
self.server.push_message(
OutboundMessage(
NodeType.FULL_NODE,
Message("new_signage_point_vdf", response),
Delivery.BROADCAST,
)
)
# Cleanup the signage point from memory.
self.signage_point_iters.remove(signage_iter)
self.finished_sp += 1
self.proofs_finished = self._clear_proof_list(signage_iter)
# Send the next 3 signage point to the chains.
next_iters_count = 0
for next_sp in self.signage_point_iters:
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
if (
next_sp not in self.iters_submitted[chain]
and next_sp not in self.iters_to_submit[chain]
):
self.iters_to_submit[chain].append(next_sp)
self.iteration_to_proof_type[next_sp] = IterationType.SIGNAGE_POINT
next_iters_count += 1
if next_iters_count == 3:
break
async def _check_for_new_ip(self):
infusion_iters = [
iteration for iteration, t in self.iteration_to_proof_type.items()
if t == IterationType.INFUSION_POINT
]
for iteration in infusion_iters:
proofs_with_iter = [
(chain, info, proof)
for chain, info, proof in self.proofs_finished
if info.number_of_iterations == iteration
]
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
chain_count = 3
else:
chain_count = 2
if len(proofs_with_iter) == chain_count:
block = None
for unfinished_block in self.unfinished_blocks:
_, ip_iters = iters_from_sub_block(
self.constants,
unfinished_block,
self.last_state.get_sub_slot_iters(),
self.last_state.get_difficulty(),
)
if ip_iters - self.last_state.get_last_ip() == iteration:
block = unfinished_block
break
if block is not None:
self.unfinished_blocks.remove(block)
challenge_hash = block.reward_chain_sub_block.get_hash()
icc_info = None
icc_proof = None
for chain, info, proof in proofs_with_iter:
if chain == Chain.CHALLENGE_CHAIN:
cc_info = info
cc_proof = proof
if chain == Chain.REWARD_CHAIN:
rc_info = info
rc_proof = proof
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
icc_info = info
icc_proof = proof
response = timelord_protocol.NewInfusionPointVDF(
challenge_hash,
cc_info,
cc_proof,
rc_info,
rc_proof,
icc_info,
icc_proof,
)
self.server.push_message(
OutboundMessage(
NodeType.FULL_NODE,
Message("new_infusion_point_vdf", response),
Delivery.BROADCAST,
)
)
for iteration in infusion_iters:
self.proofs_finished = self._clear_proof_list(iteration)
async def _check_for_end_of_subslot(self):
left_subslot_iters = [
iteration for iteration, t in self.iteration_to_proof_type.items()
if t == IterationType.END_OF_SUBSLOT
]
if len(left_subslot_iters) == 0:
return
chains_finished = [
(chain, info, proof)
for chain, info, proof in self.proofs_finished
if info.number_of_iterations == left_subslot_iters[0]
]
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
chain_count = 3
else:
chain_count = 2
if len(chains_finished) == chain_count:
icc_ip_vdf: Optional[VDFInfo] = None
icc_ip_proof: Optional[VDFProof] = None
cc_vdf: Optional[VDFInfo] = None
cc_proof: Optional[VDFProof] = None
rc_vdf: Optional[VDFInfo] = None
rc_proof: Optional[VDFProof] = None
for chain, info, proof in chains_finished:
if chain == Chain.CHALLENGE_CHAIN:
cc_vdf = info
cc_proof = proof
if chain == Chain.REWARD_CHAIN:
rc_vdf = info
rc_proof = proof
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
icc_ip_vdf = info
icc_ip_proof = proof
assert cc_proof is not None and rc_proof is not None and cc_vdf is not None and rc_vdf is not None
log.info("Collected end of subslot vdfs.")
icc_sub_slot: Optional[InfusedChallengeChainSubSlot] = (
None if icc_ip_vdf is None
else InfusedChallengeChainSubSlot(icc_ip_vdf)
)
icc_sub_slot_hash = icc_sub_slot.get_hash() if self.last_state.get_deficit() == 0 else None
if self.last_state.get_sub_epoch_summary() is not None:
ses_hash = self.last_state.get_sub_epoch_summary().get_hash()
new_sub_slot_iters = self.last_state.get_sub_epoch_summary().new_sub_slot_iters
new_difficulty = self.last_state.get_sub_epoch_summary().new_difficulty
else:
ses_hash = None
new_sub_slot_iters = self.last_state.get_sub_slot_iters()
new_difficulty = self.last_state.get_difficulty()
cc_sub_slot = ChallengeChainSubSlot(cc_vdf, icc_sub_slot_hash, ses_hash, new_sub_slot_iters, new_difficulty)
eos_deficit: uint8 = (
self.last_state.get_deficit()
if self.last_state.get_deficit() > 0
else self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK
)
rc_sub_slot = RewardChainSubSlot(
rc_vdf,
cc_sub_slot.get_hash(),
icc_sub_slot.get_hash() if icc_sub_slot is not None else None,
eos_deficit,
)
eos_bundle = EndOfSubSlotBundle(
cc_sub_slot,
icc_sub_slot,
rc_sub_slot,
SubSlotProofs(cc_proof, icc_ip_proof, rc_proof),
)
if self.server is not None:
self.server.push_message(
OutboundMessage(
NodeType.FULL_NODE,
Message("end_of_sub_slot_bundle", timelord_protocol.NewEndOfSubSlotVDF(eos_bundle)),
Delivery.BROADCAST,
)
)
log.info("Built end of subslot bundle.")
self.unfinished_blocks = self.overflow_blocks
self.overflow_blocks = []
self.new_subslot_end = EndOfSubSlotData(
eos_bundle,
new_sub_slot_iters,
new_difficulty,
eos_deficit,
)
async def _manage_chains(self):
while not self._is_stopped:
await asyncio.sleep(0.1)
# Didn't get any useful data, continue.
async with self.lock:
if self.last_state.is_empty() and self.new_peak is None:
continue
# Map free vdf_clients to unspawned chains.
await self._map_chains_with_vdf_clients()
async with self.lock:
# We've got a new peak, process it.
if self.new_peak is not None:
await self._handle_new_peak()
# A subslot ended, process it.
if self.new_subslot_end is not None:
await self._handle_subslot_end()
# Submit pending iterations.
await self._submit_iterations()
# Check for new signage point and broadcast it if present.
await self._check_for_new_sp()
# Check for new infusion point and broadcast it if present.
await self._check_for_new_ip()
# Check for end of subslot, respawn chains and build EndOfSubslotBundle.
await self._check_for_end_of_subslot()
async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer):
disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS)
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
# the timelord tells the vdf_client what to execute.
if self.config["fast_algorithm"]:
# Run n-wesolowski (fast) algorithm.
writer.write(b"N")
else:
# Run two-wesolowski (slow) algorithm.
writer.write(b"N")
await writer.drain()
prefix = str(len(str(disc)))
if len(prefix) == 1:
prefix = "00" + prefix
if len(prefix) == 2:
prefix = "0" + prefix
writer.write((prefix + str(disc)).encode())
await writer.drain()
# Send (a, b) from 'initial_form'.
for num in [initial_form.a, initial_form.b]:
prefix = len(str(num))
prefix_len = len(str(prefix))
writer.write((str(prefix_len) + str(prefix) + str(num)).encode())
await writer.drain()
try:
ok = await reader.readexactly(2)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
log.warning(f"{type(e)} {e}")
return
if ok.decode() != "OK":
return
log.info("Got handshake with VDF client.")
async with self.lock:
self.allows_iters.append(chain)
# Listen to the client until "STOP" is received.
while True:
try:
data = await reader.readexactly(4)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
log.warning(f"{type(e)} {e}")
break
msg = ""
try:
msg = data.decode()
except Exception as e:
# log.error(f"Exception while decoding data {e}")
pass
if msg == "STOP":
log.info(f"Stopped client running on ip {ip}.")
async with self.lock:
writer.write(b"ACK")
await writer.drain()
break
else:
try:
# This must be a proof, 4 bytes is length prefix
length = int.from_bytes(data, "big")
proof = await reader.readexactly(length)
stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode()))
except (
asyncio.IncompleteReadError,
ConnectionResetError,
Exception,
) as e:
log.warning(f"{type(e)} {e}")
break
iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True))
y_size_bytes = stdout_bytes_io.read(8)
y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True))
y_bytes = stdout_bytes_io.read(y_size)
witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True))
proof_bytes: bytes = stdout_bytes_io.read()
# Verifies our own proof just in case
a = int.from_bytes(y_bytes[:129], "big", signed=True)
b = int.from_bytes(y_bytes[129:], "big", signed=True)
output = ClassgroupElement(int512(a), int512(b))
time_taken = time.time() - self.chain_start_time[chain]
ips = int(iterations_needed / time_taken * 10) / 10
log.info(
f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}" f" iters."
f"Estimated IPS: {ips}. Chain: {chain}"
)
vdf_info: VDFInfo = VDFInfo(
challenge_hash,
initial_form,
iterations_needed,
output,
)
vdf_proof: VDFProof = VDFProof(
witness_type,
proof_bytes,
)
if not vdf_proof.is_valid(self.constants, vdf_info):
log.error("Invalid proof of time!")
# continue
async with self.lock:
self.proofs_finished.append(
(chain, vdf_info, vdf_proof)
)

View File

@ -1,465 +0,0 @@
import asyncio
import io
import logging
import time
import socket
from typing import Dict, List, Optional, Tuple
from chiavdf import create_discriminant
from src.protocols import timelord_protocol
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
from src.server.server import ChiaServer
from src.types.classgroup import ClassgroupElement
from src.types.vdf import ProofOfTime
from src.types.sized_bytes import bytes32
from src.util.api_decorators import api_request
from src.util.ints import uint8, uint64, int512, uint128
log = logging.getLogger(__name__)
class Timelord:
def __init__(self, config: Dict, discriminant_size_bits: int):
self.discriminant_size_bits = discriminant_size_bits
self.config: Dict = config
self.ips_estimate = {
socket.gethostbyname(k): v
for k, v in list(
zip(
self.config["vdf_clients"]["ip"],
self.config["vdf_clients"]["ips_estimate"],
)
)
}
self.log = log
self.lock: asyncio.Lock = asyncio.Lock()
self.active_discriminants: Dict[bytes32, Tuple[asyncio.StreamWriter, uint64, str]] = {}
self.best_weight_three_proofs: int = -1
self.active_discriminants_start_time: Dict = {}
self.pending_iters: Dict = {}
self.submitted_iters: Dict = {}
self.done_discriminants: List[bytes32] = []
self.proofs_to_write: List[OutboundMessage] = []
self.seen_discriminants: List[bytes32] = []
self.proof_count: Dict = {}
self.avg_ips: Dict = {}
self.discriminant_queue: List[Tuple[bytes32, uint128]] = []
self.max_connection_time = self.config["max_connection_time"]
self.potential_free_clients: List = []
self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = []
self.server: Optional[ChiaServer] = None
self.vdf_server = None
self._is_shutdown = False
self.sanitizer_mode = self.config["sanitizer_mode"]
self.last_time_seen_discriminant: Dict = {}
self.max_known_weights: List[uint128] = []
def set_server(self, server: ChiaServer):
self.server = server
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
async with self.lock:
client_ip = writer.get_extra_info("peername")[0]
log.info(f"New timelord connection from client: {client_ip}.")
if client_ip in self.ips_estimate.keys():
self.free_clients.append((client_ip, reader, writer))
log.info(f"Added new VDF client {client_ip}.")
for ip, end_time in list(self.potential_free_clients):
if ip == client_ip:
self.potential_free_clients.remove((ip, end_time))
break
async def _start(self):
if self.sanitizer_mode:
log.info("Starting timelord in sanitizer mode")
self.disc_queue = asyncio.create_task(self._manage_discriminant_queue_sanitizer())
else:
log.info("Starting timelord in normal mode")
self.disc_queue = asyncio.create_task(self._manage_discriminant_queue())
self.vdf_server = await asyncio.start_server(
self._handle_client,
self.config["vdf_server"]["host"],
self.config["vdf_server"]["port"],
)
def _close(self):
self._is_shutdown = True
assert self.vdf_server is not None
self.vdf_server.close()
async def _await_closed(self):
assert self.disc_queue is not None
await self.disc_queue
async def _stop_worst_process(self, worst_weight_active):
# This is already inside a lock, no need to lock again.
log.info(f"Stopping one process at weight {worst_weight_active}")
stop_writer: Optional[asyncio.StreamWriter] = None
stop_discriminant: Optional[bytes32] = None
low_weights = {k: v for k, v in self.active_discriminants.items() if v[1] == worst_weight_active}
no_iters = {
k: v for k, v in low_weights.items() if k not in self.pending_iters or len(self.pending_iters[k]) == 0
}
# If we have process(es) with no iters, stop the one that started the latest
if len(no_iters) > 0:
latest_start_time = max([self.active_discriminants_start_time[k] for k, _ in no_iters.items()])
stop_discriminant, stop_writer = next(
(k, v[0]) for k, v in no_iters.items() if self.active_discriminants_start_time[k] == latest_start_time
)
else:
# Otherwise, pick the one that finishes one proof the latest.
best_iter = {k: min(self.pending_iters[k]) for k, _ in low_weights.items()}
time_taken = {k: time.time() - self.active_discriminants_start_time[k] for k, _ in low_weights.items()}
client_ip = [v[2] for _, v in low_weights.items()]
# ips maps an IP to the expected iterations per second of it.
ips = {}
for ip in client_ip:
if ip in self.avg_ips:
current_ips, _ = self.avg_ips[ip]
ips[ip] = current_ips
else:
ips[ip] = self.ips_estimate[ip]
expected_finish = {
k: max(0, (best_iter[k] - time_taken[k] * ips[v[2]]) / ips[v[2]]) for k, v in low_weights.items()
}
worst_finish = max([v for v in expected_finish.values()])
log.info(f"Worst finish time: {worst_finish}s")
stop_discriminant, stop_writer = next(
(k, v[0]) for k, v in low_weights.items() if expected_finish[k] == worst_finish
)
assert stop_writer is not None
_, _, stop_ip = self.active_discriminants[stop_discriminant]
self.potential_free_clients.append((stop_ip, time.time()))
stop_writer.write(b"010")
await stop_writer.drain()
del self.active_discriminants[stop_discriminant]
del self.active_discriminants_start_time[stop_discriminant]
self.done_discriminants.append(stop_discriminant)
async def _update_avg_ips(self, challenge_hash, iterations_needed, ip):
async with self.lock:
if challenge_hash in self.active_discriminants:
time_taken = time.time() - self.active_discriminants_start_time[challenge_hash]
ips = int(iterations_needed / time_taken * 10) / 10
log.info(
f"Finished PoT, chall:{challenge_hash[:10].hex()}.."
f" {iterations_needed} iters. {int(time_taken*1000)/1000}s, {ips} ips"
)
if ip not in self.avg_ips:
self.avg_ips[ip] = (ips, 1)
else:
prev_avg_ips, trials = self.avg_ips[ip]
new_avg_ips = int((prev_avg_ips * trials + ips) / (trials + 1))
self.avg_ips[ip] = (new_avg_ips, trials + 1)
log.info(f"New estimate: {new_avg_ips}")
if iterations_needed in self.pending_iters[challenge_hash]:
self.pending_iters[challenge_hash].remove(iterations_needed)
else:
log.warning("Finished PoT for an unknown iteration.")
else:
log.info(
f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}"
f" iters. But challenge not active anymore"
)
async def _update_proofs_count(self, challenge_weight):
async with self.lock:
if challenge_weight not in self.proof_count:
self.proof_count[challenge_weight] = 1
else:
self.proof_count[challenge_weight] += 1
if self.proof_count[challenge_weight] >= 3:
log.info("Cleaning up clients.")
self.best_weight_three_proofs = max(self.best_weight_three_proofs, challenge_weight)
for active_disc in list(self.active_discriminants):
current_writer, current_weight, ip = self.active_discriminants[active_disc]
if current_weight <= challenge_weight:
log.info(f"Active weight cleanup: {current_weight}")
log.info(f"Cleanup weight: {challenge_weight}")
self.potential_free_clients.append((ip, time.time()))
current_writer.write(b"010")
await current_writer.drain()
del self.active_discriminants[active_disc]
del self.active_discriminants_start_time[active_disc]
self.done_discriminants.append(active_disc)
async def _send_iterations(self, challenge_hash, writer):
alive_discriminant = True
while alive_discriminant:
async with self.lock:
if (challenge_hash in self.active_discriminants) and (challenge_hash in self.pending_iters):
if challenge_hash not in self.submitted_iters:
self.submitted_iters[challenge_hash] = []
for iter in sorted(self.pending_iters[challenge_hash]):
if iter in self.submitted_iters[challenge_hash]:
continue
self.submitted_iters[challenge_hash].append(iter)
if len(str(iter)) < 10:
iter_size = "0" + str(len(str(iter)))
else:
iter_size = str(len(str(iter)))
writer.write((iter_size + str(iter)).encode())
await writer.drain()
log.info(f"New iteration submitted: {iter}")
# Sanitizer will always work with only one iteration.
if self.sanitizer_mode:
alive_discriminant = False
await asyncio.sleep(1)
async with self.lock:
if challenge_hash in self.done_discriminants:
alive_discriminant = False
async def _do_process_communication(self, challenge_hash, challenge_weight, ip, reader, writer):
disc: int = create_discriminant(challenge_hash, self.discriminant_size_bits)
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
# the timelord tells the vdf_client what to execute.
if not self.sanitizer_mode:
if self.config["fast_algorithm"]:
# Run n-wesolowski (fast) algorithm.
writer.write(b"N")
else:
# Run two-wesolowski (slow) algorithm.
writer.write(b"T")
else:
# Create compact proofs of time.
writer.write(b"S")
await writer.drain()
prefix = str(len(str(disc)))
if len(prefix) == 1:
prefix = "00" + prefix
writer.write((prefix + str(disc)).encode())
await writer.drain()
try:
ok = await reader.readexactly(2)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
log.warning(f"{type(e)} {e}")
async with self.lock:
if challenge_hash not in self.done_discriminants:
self.done_discriminants.append(challenge_hash)
if self.sanitizer_mode:
if challenge_hash in self.pending_iters:
del self.pending_iters[challenge_hash]
if challenge_hash in self.submitted_iters:
del self.submitted_iters[challenge_hash]
return
if ok.decode() != "OK":
return
log.info("Got handshake with VDF client.")
async with self.lock:
self.active_discriminants[challenge_hash] = (writer, challenge_weight, ip)
self.active_discriminants_start_time[challenge_hash] = time.time()
asyncio.create_task(self._send_iterations(challenge_hash, writer))
# Listen to the client until "STOP" is received.
while True:
try:
data = await reader.readexactly(4)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
log.warning(f"{type(e)} {e}")
async with self.lock:
if challenge_hash in self.active_discriminants:
del self.active_discriminants[challenge_hash]
if challenge_hash in self.active_discriminants_start_time:
del self.active_discriminants_start_time[challenge_hash]
if challenge_hash not in self.done_discriminants:
self.done_discriminants.append(challenge_hash)
if self.sanitizer_mode:
if challenge_hash in self.pending_iters:
del self.pending_iters[challenge_hash]
if challenge_hash in self.submitted_iters:
del self.submitted_iters[challenge_hash]
break
msg = ""
try:
msg = data.decode()
except Exception as e:
log.error(f"Exception while decoding data {e}")
if msg == "STOP":
log.info(f"Stopped client running on ip {ip}.")
async with self.lock:
writer.write(b"ACK")
await writer.drain()
break
else:
try:
# This must be a proof, 4bytes is length prefix
length = int.from_bytes(data, "big")
proof = await reader.readexactly(length)
stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode()))
except (
asyncio.IncompleteReadError,
ConnectionResetError,
Exception,
) as e:
log.warning(f"{type(e)} {e}")
async with self.lock:
if challenge_hash in self.active_discriminants:
del self.active_discriminants[challenge_hash]
if challenge_hash in self.active_discriminants_start_time:
del self.active_discriminants_start_time[challenge_hash]
if challenge_hash not in self.done_discriminants:
self.done_discriminants.append(challenge_hash)
if self.sanitizer_mode:
if challenge_hash in self.pending_iters:
del self.pending_iters[challenge_hash]
if challenge_hash in self.submitted_iters:
del self.submitted_iters[challenge_hash]
break
iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True))
y_size_bytes = stdout_bytes_io.read(8)
y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True))
y_bytes = stdout_bytes_io.read(y_size)
witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True))
proof_bytes: bytes = stdout_bytes_io.read()
# Verifies our own proof just in case
a = int.from_bytes(y_bytes[:129], "big", signed=True)
b = int.from_bytes(y_bytes[129:], "big", signed=True)
output = ClassgroupElement(int512(a), int512(b))
proof_of_time = ProofOfTime(
challenge_hash,
iterations_needed,
output,
witness_type,
proof_bytes,
)
if not proof_of_time.is_valid(self.discriminant_size_bits):
log.error("Invalid proof of time")
response = timelord_protocol.ProofOfTimeFinished(proof_of_time)
await self._update_avg_ips(challenge_hash, iterations_needed, ip)
async with self.lock:
self.proofs_to_write.append(
OutboundMessage(
NodeType.FULL_NODE,
Message("proof_of_time_finished", response),
Delivery.BROADCAST,
)
)
if not self.sanitizer_mode:
await self._update_proofs_count(challenge_weight)
else:
async with self.lock:
writer.write(b"010")
await writer.drain()
try:
del self.active_discriminants[challenge_hash]
del self.active_discriminants_start_time[challenge_hash]
del self.pending_iters[challenge_hash]
del self.submitted_iters[challenge_hash]
except KeyError:
log.error("Discriminant stopped abnormally.")
async def _manage_discriminant_queue(self):
while not self._is_shutdown:
async with self.lock:
if len(self.discriminant_queue) > 0:
max_weight = max([h for _, h in self.discriminant_queue])
if max_weight <= self.best_weight_three_proofs:
self.done_discriminants.extend([d for d, _ in self.discriminant_queue])
self.discriminant_queue.clear()
else:
max_weight_disc = [d for d, h in self.discriminant_queue if h == max_weight]
with_iters = [
d for d in max_weight_disc if d in self.pending_iters and len(self.pending_iters[d]) != 0
]
if len(with_iters) == 0:
disc = max_weight_disc[0]
else:
min_iter = min([min(self.pending_iters[d]) for d in with_iters])
disc = next(d for d in with_iters if min(self.pending_iters[d]) == min_iter)
if len(self.free_clients) != 0:
ip, sr, sw = self.free_clients[0]
self.free_clients = self.free_clients[1:]
self.discriminant_queue.remove((disc, max_weight))
asyncio.create_task(self._do_process_communication(disc, max_weight, ip, sr, sw))
else:
self.potential_free_clients = [
(ip, end_time)
for ip, end_time in self.potential_free_clients
if time.time() < end_time + self.max_connection_time
]
if len(self.potential_free_clients) == 0 and len(self.active_discriminants) > 0:
worst_weight_active = min(
[
h
for (
_,
h,
_,
) in self.active_discriminants.values()
]
)
if max_weight > worst_weight_active:
await self._stop_worst_process(worst_weight_active)
elif max_weight == worst_weight_active:
if disc in self.pending_iters and len(self.pending_iters[disc]) != 0:
if any(
(k not in self.pending_iters or len(self.pending_iters[k]) == 0)
for k, v in self.active_discriminants.items()
if v[1] == worst_weight_active
):
log.info("Stopped process without iters for one with iters.")
await self._stop_worst_process(worst_weight_active)
if len(self.proofs_to_write) > 0:
for msg in self.proofs_to_write:
self.server.push_message(msg)
self.proofs_to_write.clear()
await asyncio.sleep(0.5)
async with self.lock:
for (
stop_discriminant,
(stop_writer, _, _),
) in self.active_discriminants.items():
stop_writer.write(b"010")
await stop_writer.drain()
self.done_discriminants.append(stop_discriminant)
self.active_discriminants.clear()
self.active_discriminants_start_time.clear()
async def _manage_discriminant_queue_sanitizer(self):
while not self._is_shutdown:
async with self.lock:
if len(self.discriminant_queue) > 0:
with_iters = [
(d, w)
for d, w in self.discriminant_queue
if d in self.pending_iters and len(self.pending_iters[d]) != 0
]
if len(with_iters) > 0 and len(self.free_clients) > 0:
disc, weight = with_iters[0]
log.info(f"Creating compact weso proof: weight {weight}.")
ip, sr, sw = self.free_clients[0]
self.free_clients = self.free_clients[1:]
self.discriminant_queue.remove((disc, weight))
asyncio.create_task(self._do_process_communication(disc, weight, ip, sr, sw))
if len(self.proofs_to_write) > 0:
for msg in self.proofs_to_write:
self.server.push_message(msg)
self.proofs_to_write.clear()
await asyncio.sleep(3)