mirror of
https://github.com/Chia-Network/chia-blockchain.git
synced 2024-11-11 01:28:17 +03:00
Timelord changes and harvester api
This commit is contained in:
parent
9d7961209b
commit
bbb3d28559
199
src/harvester.py
199
src/harvester.py
@ -3,27 +3,17 @@ import asyncio
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional, Tuple, List, Callable, Set
|
||||
import time
|
||||
import concurrent
|
||||
import dataclasses
|
||||
|
||||
from blspy import G1Element
|
||||
|
||||
from chiapos import DiskProver
|
||||
from src.consensus.constants import ConsensusConstants
|
||||
from src.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters
|
||||
from src.protocols import harvester_protocol
|
||||
from src.server.outbound_message import Message
|
||||
from src.types.proof_of_space import ProofOfSpace
|
||||
from src.types.sized_bytes import bytes32
|
||||
from src.util.api_decorators import api_request
|
||||
from src.util.ints import uint8, uint64
|
||||
from src.plotting.plot_tools import (
|
||||
load_plots,
|
||||
PlotInfo,
|
||||
remove_plot_directory,
|
||||
add_plot_directory,
|
||||
get_plot_directories,
|
||||
remove_plot_directory as remove_plot_directory_pt,
|
||||
add_plot_directory as add_plot_directory_pt,
|
||||
get_plot_directories as get_plot_directories_pt,
|
||||
)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -78,7 +68,7 @@ class Harvester:
|
||||
if self.state_changed_callback is not None:
|
||||
self.state_changed_callback(change)
|
||||
|
||||
def _get_plots(self) -> Tuple[List[Dict], List[str], List[str]]:
|
||||
def get_plots(self) -> Tuple[List[Dict], List[str], List[str]]:
|
||||
response_plots: List[Dict] = []
|
||||
for path, plot_info in self.provers.items():
|
||||
prover = plot_info.prover
|
||||
@ -102,7 +92,7 @@ class Harvester:
|
||||
[str(s) for s in self.no_key_filenames],
|
||||
)
|
||||
|
||||
async def _refresh_plots(self):
|
||||
async def refresh_plots(self):
|
||||
locked: bool = self._refresh_lock.locked()
|
||||
changed: bool = False
|
||||
async with self._refresh_lock:
|
||||
@ -119,7 +109,7 @@ class Harvester:
|
||||
if changed:
|
||||
self._state_changed("plots")
|
||||
|
||||
def _delete_plot(self, str_path: str):
|
||||
def delete_plot(self, str_path: str):
|
||||
path = Path(str_path).resolve()
|
||||
if path in self.provers:
|
||||
del self.provers[path]
|
||||
@ -131,180 +121,17 @@ class Harvester:
|
||||
self._state_changed("plots")
|
||||
return True
|
||||
|
||||
async def _add_plot_directory(self, str_path: str) -> bool:
|
||||
add_plot_directory(str_path, self.root_path)
|
||||
await self._refresh_plots()
|
||||
async def add_plot_directory(self, str_path: str) -> bool:
|
||||
add_plot_directory_pt(str_path, self.root_path)
|
||||
await self.refresh_plots()
|
||||
return True
|
||||
|
||||
async def _get_plot_directories(self) -> List[str]:
|
||||
return get_plot_directories(self.root_path)
|
||||
async def get_plot_directories(self) -> List[str]:
|
||||
return get_plot_directories_pt(self.root_path)
|
||||
|
||||
async def _remove_plot_directory(self, str_path: str) -> bool:
|
||||
remove_plot_directory(str_path, self.root_path)
|
||||
async def remove_plot_directory(self, str_path: str) -> bool:
|
||||
remove_plot_directory_pt(str_path, self.root_path)
|
||||
return True
|
||||
|
||||
def set_server(self, server):
|
||||
self.server = server
|
||||
|
||||
@api_request
|
||||
async def harvester_handshake(self, harvester_handshake: harvester_protocol.HarvesterHandshake):
|
||||
"""
|
||||
Handshake between the harvester and farmer. The harvester receives the pool public keys,
|
||||
as well as the farmer pks, which must be put into the plots, before the plotting process begins.
|
||||
We cannot use any plots which have different keys in them.
|
||||
"""
|
||||
self.farmer_public_keys = harvester_handshake.farmer_public_keys
|
||||
self.pool_public_keys = harvester_handshake.pool_public_keys
|
||||
|
||||
await self._refresh_plots()
|
||||
|
||||
if len(self.provers) == 0:
|
||||
log.warning("Not farming any plots on this harvester. Check your configuration.")
|
||||
return
|
||||
|
||||
self._state_changed("plots")
|
||||
|
||||
@api_request
|
||||
async def new_signage_point(self, new_challenge: harvester_protocol.NewSignagePoint):
|
||||
"""
|
||||
The harvester receives a new signage point from the farmer, this happens at the start of each slot.
|
||||
The harvester does a few things:
|
||||
1. The harvester applies the plot filter for each of the plots, to select the proportion which are eligible
|
||||
for this signage point and challenge.
|
||||
2. The harvester gets the qualities for each plot. This is approximately 7 reads per plot which qualifies.
|
||||
Note that each plot may have 0, 1, 2, etc qualities for that challenge: but on average it will have 1.
|
||||
3. Checks the required_iters for each quality and the given signage point, to see which are eligible for
|
||||
inclusion (required_iters < sp_interval_iters).
|
||||
4. Looks up the full proof of space in the plot for each quality, approximately 64 reads per quality
|
||||
5. Returns the proof of space to the farmer
|
||||
"""
|
||||
|
||||
if len(self.pool_public_keys) == 0 or len(self.farmer_public_keys) == 0:
|
||||
# This means that we have not received the handshake yet
|
||||
return
|
||||
|
||||
start = time.time()
|
||||
assert len(new_challenge.challenge_hash) == 32
|
||||
|
||||
# Refresh plots to see if there are any new ones
|
||||
await self._refresh_plots()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def blocking_lookup(filename: Path, plot_info: PlotInfo, prover: DiskProver) -> List[ProofOfSpace]:
|
||||
# Uses the DiskProver object to lookup qualities. This is a blocking call,
|
||||
# so it should be run in a thread pool.
|
||||
try:
|
||||
sp_challenge_hash = ProofOfSpace.calculate_pos_challenge(
|
||||
plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
|
||||
)
|
||||
quality_strings = prover.get_qualities_for_challenge(sp_challenge_hash)
|
||||
except Exception:
|
||||
log.error("Error using prover object. Reinitializing prover object.")
|
||||
self.provers[filename] = dataclasses.replace(plot_info, prover=DiskProver(str(filename)))
|
||||
return []
|
||||
|
||||
responses: List[ProofOfSpace] = []
|
||||
if quality_strings is not None:
|
||||
# Found proofs of space (on average 1 is expected per plot)
|
||||
for index, quality_str in enumerate(quality_strings):
|
||||
required_iters: uint64 = calculate_iterations_quality(
|
||||
quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash
|
||||
)
|
||||
sp_interval_iters = calculate_sp_interval_iters(self.constants, new_challenge.sub_slot_iters)
|
||||
if required_iters < sp_interval_iters:
|
||||
# Found a very good proof of space! will fetch the whole proof from disk, then send to farmer
|
||||
try:
|
||||
proof_xs = prover.get_full_proof(sp_challenge_hash, index)
|
||||
except RuntimeError:
|
||||
log.error(f"Exception fetching full proof for {filename}")
|
||||
continue
|
||||
|
||||
plot_public_key = ProofOfSpace.generate_plot_public_key(
|
||||
plot_info.local_sk.get_g1(), plot_info.farmer_public_key
|
||||
)
|
||||
responses.append(
|
||||
ProofOfSpace(
|
||||
sp_challenge_hash,
|
||||
plot_info.pool_public_key,
|
||||
None,
|
||||
plot_public_key,
|
||||
uint8(prover.get_size()),
|
||||
proof_xs,
|
||||
)
|
||||
)
|
||||
return responses
|
||||
|
||||
async def lookup_challenge(filename: Path, prover: DiskProver) -> List[harvester_protocol.NewProofOfSpace]:
|
||||
# Executes a DiskProverLookup in a thread pool, and returns responses
|
||||
all_responses: List[harvester_protocol.NewProofOfSpace] = []
|
||||
proofs_of_space: List[ProofOfSpace] = await loop.run_in_executor(
|
||||
self.executor, blocking_lookup, filename, prover
|
||||
)
|
||||
for proof_of_space in proofs_of_space:
|
||||
all_responses.append(
|
||||
harvester_protocol.NewProofOfSpace(
|
||||
new_challenge.challenge_hash, prover.get_id(), proof_of_space, new_challenge.signage_point_index
|
||||
)
|
||||
)
|
||||
return all_responses
|
||||
|
||||
awaitables = []
|
||||
for try_plot_filename, try_plot_info in self.provers.items():
|
||||
if try_plot_filename.exists():
|
||||
# Passes the plot filter (does not check sp filter yet though, since we have not reached sp)
|
||||
# This is being executed at the beginning of the slot
|
||||
if ProofOfSpace.passes_plot_filter(
|
||||
self.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
|
||||
):
|
||||
awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover))
|
||||
|
||||
# Concurrently executes all lookups on disk, to take advantage of multiple disk parallelism
|
||||
total_proofs_found = 0
|
||||
for sublist_awaitable in asyncio.as_completed(awaitables):
|
||||
for response in await sublist_awaitable:
|
||||
total_proofs_found += 1
|
||||
msg = Message("challenge_response", response)
|
||||
yield msg
|
||||
log.info(
|
||||
f"{len(awaitables)} plots were eligible for farming {new_challenge.challenge_hash.hex()[:10]}..."
|
||||
f" Found {total_proofs_found} proofs. Time: {time.time() - start}. "
|
||||
f"Total {len(self.provers)} plots"
|
||||
)
|
||||
|
||||
@api_request
|
||||
async def request_signatures(self, request: harvester_protocol.RequestSignatures):
|
||||
"""
|
||||
The farmer requests a signature on the header hash, for one of the proofs that we found.
|
||||
A signature is created on the header hash using the harvester private key. This can also
|
||||
be used for pooling.
|
||||
"""
|
||||
try:
|
||||
plot_info = self.provers[Path(request.plot_identifier).resolve()]
|
||||
except KeyError:
|
||||
log.warning(f"KeyError plot {request.plot_identifier} does not exist.")
|
||||
return
|
||||
|
||||
local_sk = plot_info.local_sk
|
||||
agg_pk = ProofOfSpace.generate_plot_public_key(local_sk.get_g1(), plot_info.farmer_public_key)
|
||||
|
||||
# This is only a partial signature. When combined with the farmer's half, it will
|
||||
# form a complete PrependSignature.
|
||||
message_signatures: List[Tuple[bytes32, G2Element]] = []
|
||||
for message in request.messages:
|
||||
signature: G2Element = AugSchemeMPL.sign(local_sk, message, agg_pk)
|
||||
message_signatures.append((message, signature))
|
||||
|
||||
response: harvester_protocol.RespondSignatures = harvester_protocol.RespondSignatures(
|
||||
request.plot_identifier,
|
||||
request.sp_hash,
|
||||
local_sk.get_g1(),
|
||||
plot_info.farmer_public_key,
|
||||
message_signatures,
|
||||
)
|
||||
|
||||
yield OutboundMessage(
|
||||
NodeType.FARMER,
|
||||
Message("respond_signatures", response),
|
||||
Delivery.RESPOND,
|
||||
)
|
||||
|
@ -4,7 +4,7 @@ import time
|
||||
from pathlib import Path
|
||||
from typing import Optional, Callable, List, Tuple
|
||||
|
||||
from blspy import AugSchemeMPL, G2Element
|
||||
from blspy import AugSchemeMPL, G2Element, G1Element
|
||||
from chiapos import DiskProver
|
||||
|
||||
from src.consensus.pot_iterations import calculate_sp_interval_iters, calculate_iterations_quality
|
||||
@ -24,6 +24,8 @@ class HarvesterAPI:
|
||||
|
||||
def __init__(self, harvester):
|
||||
self.harvester = harvester
|
||||
self.farmer_public_keys: List[G1Element] = []
|
||||
self.pool_public_keys: List[G1Element] = []
|
||||
|
||||
def _set_state_changed_callback(self, callback: Callable):
|
||||
self.harvester.state_changed_callback = callback
|
||||
@ -38,7 +40,7 @@ class HarvesterAPI:
|
||||
self.farmer_public_keys = harvester_handshake.farmer_public_keys
|
||||
self.pool_public_keys = harvester_handshake.pool_public_keys
|
||||
|
||||
await self.harvester._refresh_plots()
|
||||
await self.harvester.refresh_plots()
|
||||
|
||||
if len(self.harvester.provers) == 0:
|
||||
self.harvester.log.warning("Not farming any plots on this harvester. Check your configuration.")
|
||||
@ -69,7 +71,7 @@ class HarvesterAPI:
|
||||
assert len(new_challenge.challenge_hash) == 32
|
||||
|
||||
# Refresh plots to see if there are any new ones
|
||||
await self.harvester._refresh_plots()
|
||||
await self.harvester.refresh_plots()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
@ -93,7 +95,9 @@ class HarvesterAPI:
|
||||
required_iters: uint64 = calculate_iterations_quality(
|
||||
quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash
|
||||
)
|
||||
sp_interval_iters = calculate_sp_interval_iters(self.harvester.constants, new_challenge.sub_slot_iters)
|
||||
sp_interval_iters = calculate_sp_interval_iters(
|
||||
self.harvester.constants, new_challenge.sub_slot_iters
|
||||
)
|
||||
if required_iters < sp_interval_iters:
|
||||
# Found a very good proof of space! will fetch the whole proof from disk, then send to farmer
|
||||
try:
|
||||
@ -137,7 +141,10 @@ class HarvesterAPI:
|
||||
# Passes the plot filter (does not check sp filter yet though, since we have not reached sp)
|
||||
# This is being executed at the beginning of the slot
|
||||
if ProofOfSpace.passes_plot_filter(
|
||||
self.harvester.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
|
||||
self.harvester.constants,
|
||||
try_plot_info.prover.get_id(),
|
||||
new_challenge.challenge_hash,
|
||||
new_challenge.sp_hash,
|
||||
):
|
||||
awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover))
|
||||
|
||||
|
@ -22,14 +22,12 @@ class HarvesterRpcApi:
|
||||
async def _state_changed(self, change: str) -> List[Dict]:
|
||||
if change == "plots":
|
||||
data = await self.get_plots({})
|
||||
payload = create_payload(
|
||||
"get_plots", data, self.service_name, "wallet_ui", string=False
|
||||
)
|
||||
payload = create_payload("get_plots", data, self.service_name, "wallet_ui", string=False)
|
||||
return [payload]
|
||||
return []
|
||||
|
||||
async def get_plots(self, request: Dict) -> Dict:
|
||||
plots, failed_to_open, not_found = self.service._get_plots()
|
||||
plots, failed_to_open, not_found = self.service.get_plots()
|
||||
return {
|
||||
"plots": plots,
|
||||
"failed_to_open_filenames": failed_to_open,
|
||||
@ -37,27 +35,27 @@ class HarvesterRpcApi:
|
||||
}
|
||||
|
||||
async def refresh_plots(self, request: Dict) -> Dict:
|
||||
await self.service._refresh_plots()
|
||||
await self.service.refresh_plots()
|
||||
return {}
|
||||
|
||||
async def delete_plot(self, request: Dict) -> Dict:
|
||||
filename = request["filename"]
|
||||
if self.service._delete_plot(filename):
|
||||
if self.service.delete_plot(filename):
|
||||
return {}
|
||||
raise ValueError(f"Not able to delete file {filename}")
|
||||
|
||||
async def add_plot_directory(self, request: Dict) -> Dict:
|
||||
dirname = request["dirname"]
|
||||
if await self.service._add_plot_directory(dirname):
|
||||
if await self.service.add_plot_directory(dirname):
|
||||
return {}
|
||||
raise ValueError(f"Did not add plot directory {dirname}")
|
||||
|
||||
async def get_plot_directories(self, request: Dict) -> Dict:
|
||||
plot_dirs = await self.service._get_plot_directories()
|
||||
plot_dirs = await self.service.get_plot_directories()
|
||||
return {"directories": plot_dirs}
|
||||
|
||||
async def remove_plot_directory(self, request: Dict) -> Dict:
|
||||
dirname = request["dirname"]
|
||||
if await self.service._remove_plot_directory(dirname):
|
||||
if await self.service.remove_plot_directory(dirname):
|
||||
return {}
|
||||
raise ValueError(f"Did not remove plot directory {dirname}")
|
||||
|
@ -29,21 +29,15 @@ def ssl_context_for_server(
|
||||
private_cert_path: Path, private_key_path: Path, require_cert: bool = False
|
||||
) -> Optional[ssl.SSLContext]:
|
||||
ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
||||
ssl_context.load_cert_chain(
|
||||
certfile=str(private_cert_path), keyfile=str(private_key_path)
|
||||
)
|
||||
ssl_context.load_cert_chain(certfile=str(private_cert_path), keyfile=str(private_key_path))
|
||||
ssl_context.load_verify_locations(str(private_cert_path))
|
||||
ssl_context.verify_mode = ssl.CERT_REQUIRED if require_cert else ssl.CERT_NONE
|
||||
return ssl_context
|
||||
|
||||
|
||||
def ssl_context_for_client(
|
||||
private_cert_path: Path, private_key_path: Path, auth: bool
|
||||
) -> Optional[ssl.SSLContext]:
|
||||
def ssl_context_for_client(private_cert_path: Path, private_key_path: Path, auth: bool) -> Optional[ssl.SSLContext]:
|
||||
ssl_context = ssl._create_unverified_context(purpose=ssl.Purpose.SERVER_AUTH)
|
||||
ssl_context.load_cert_chain(
|
||||
certfile=str(private_cert_path), keyfile=str(private_key_path)
|
||||
)
|
||||
ssl_context.load_cert_chain(certfile=str(private_cert_path), keyfile=str(private_key_path))
|
||||
if auth:
|
||||
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
||||
ssl_context.load_verify_locations(str(private_cert_path))
|
||||
@ -92,9 +86,7 @@ class ChiaServer:
|
||||
self.root_path = root_path
|
||||
self.config = config
|
||||
self.on_connect: Optional[Callable] = None
|
||||
self.incoming_messages: Queue[
|
||||
Tuple[Payload, WSChiaConnection]
|
||||
] = asyncio.Queue()
|
||||
self.incoming_messages: Queue[Tuple[Payload, WSChiaConnection]] = asyncio.Queue()
|
||||
self.shut_down_event = asyncio.Event()
|
||||
|
||||
if self._local_type is NodeType.INTRODUCER:
|
||||
@ -123,9 +115,7 @@ class ChiaServer:
|
||||
self.runner = web.AppRunner(self.app, access_log=None)
|
||||
await self.runner.setup()
|
||||
require_cert = self._local_type not in (NodeType.FULL_NODE, NodeType.INTRODUCER)
|
||||
ssl_context = ssl_context_for_server(
|
||||
self._private_cert_path, self._private_key_path, require_cert
|
||||
)
|
||||
ssl_context = ssl_context_for_server(self._private_cert_path, self._private_key_path, require_cert)
|
||||
if self._local_type not in [NodeType.WALLET, NodeType.HARVESTER]:
|
||||
self.site = web.TCPSite(
|
||||
self.runner,
|
||||
@ -164,10 +154,7 @@ class ChiaServer:
|
||||
|
||||
assert handshake is True
|
||||
await self.connection_added(connection, self.on_connect)
|
||||
if (
|
||||
self._local_type is NodeType.INTRODUCER
|
||||
and connection.connection_type is NodeType.FULL_NODE
|
||||
):
|
||||
if self._local_type is NodeType.INTRODUCER and connection.connection_type is NodeType.FULL_NODE:
|
||||
self.introducer_peers.add(connection.get_peer_info())
|
||||
except Exception as e:
|
||||
error_stack = traceback.format_exc()
|
||||
@ -196,18 +183,14 @@ class ChiaServer:
|
||||
Tries to connect to the target node, adding one connection into the pipeline, if successful.
|
||||
An on connect method can also be specified, and this will be saved into the instance variables.
|
||||
"""
|
||||
ssl_context = ssl_context_for_client(
|
||||
self._private_cert_path, self._private_key_path, auth
|
||||
)
|
||||
ssl_context = ssl_context_for_client(self._private_cert_path, self._private_key_path, auth)
|
||||
session = None
|
||||
try:
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
session = aiohttp.ClientSession(timeout=timeout)
|
||||
url = f"wss://{target_node.host}:{target_node.port}/ws"
|
||||
self.log.info(f"Connecting: {url}")
|
||||
ws = await session.ws_connect(
|
||||
url, autoclose=False, autoping=True, ssl=ssl_context
|
||||
)
|
||||
ws = await session.ws_connect(url, autoclose=False, autoping=True, ssl=ssl_context)
|
||||
if ws is not None:
|
||||
connection = WSChiaConnection(
|
||||
self._local_type,
|
||||
@ -232,12 +215,14 @@ class ChiaServer:
|
||||
await self.connection_added(connection, on_connect)
|
||||
self.log.info("Connected")
|
||||
return True
|
||||
except aiohttp.client_exceptions.ClientConnectorError as e:
|
||||
self.log.warning(f"{e}")
|
||||
except Exception as e:
|
||||
error_stack = traceback.format_exc()
|
||||
self.log.error(f"Exception: {e}")
|
||||
if session is not None:
|
||||
await session.close()
|
||||
self.log.error(f"Exception Stack: {error_stack}")
|
||||
if session is not None:
|
||||
await session.close()
|
||||
|
||||
return False
|
||||
|
||||
@ -258,40 +243,24 @@ class ChiaServer:
|
||||
async def api_call(payload: Payload, connection: WSChiaConnection):
|
||||
try:
|
||||
full_message = payload.msg
|
||||
connection.log.info(
|
||||
f"<- {full_message.function} from peer {connection.peer_node_id}"
|
||||
)
|
||||
if len(
|
||||
full_message.function
|
||||
) == 0 or full_message.function.startswith("_"):
|
||||
connection.log.info(f"<- {full_message.function} from peer {connection.peer_node_id}")
|
||||
if len(full_message.function) == 0 or full_message.function.startswith("_"):
|
||||
# This prevents remote calling of private methods that start with "_"
|
||||
self.log.error(
|
||||
f"Non existing function: {full_message.function}"
|
||||
)
|
||||
raise ProtocolError(
|
||||
Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]
|
||||
)
|
||||
self.log.error(f"Non existing function: {full_message.function}")
|
||||
raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function])
|
||||
|
||||
f = getattr(self.api, full_message.function, None)
|
||||
|
||||
if f is None:
|
||||
self.log.error(
|
||||
f"Non existing function: {full_message.function}"
|
||||
)
|
||||
raise ProtocolError(
|
||||
Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]
|
||||
)
|
||||
self.log.error(f"Non existing function: {full_message.function}")
|
||||
raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function])
|
||||
|
||||
if not hasattr(f, "api_function"):
|
||||
self.log.error("Peer trying to call non api function")
|
||||
raise ProtocolError(
|
||||
Err.INVALID_PROTOCOL_MESSAGE, [full_message.function]
|
||||
)
|
||||
raise ProtocolError(Err.INVALID_PROTOCOL_MESSAGE, [full_message.function])
|
||||
|
||||
if hasattr(f, "peer_required"):
|
||||
response: Optional[Message] = await f(
|
||||
full_message.data, connection
|
||||
)
|
||||
response: Optional[Message] = await f(full_message.data, connection)
|
||||
else:
|
||||
response = await f(full_message.data)
|
||||
|
||||
@ -302,9 +271,7 @@ class ChiaServer:
|
||||
|
||||
except Exception as e:
|
||||
tb = traceback.format_exc()
|
||||
connection.log.error(
|
||||
f"Exception: {e}, closing connection {connection}. {tb}"
|
||||
)
|
||||
connection.log.error(f"Exception: {e}, closing connection {connection}. {tb}")
|
||||
await connection.close()
|
||||
|
||||
asyncio.create_task(api_call(payload_inc, connection_inc))
|
||||
@ -328,14 +295,9 @@ class ChiaServer:
|
||||
for message in messages:
|
||||
await connection.send_message(message)
|
||||
|
||||
async def send_to_all_except(
|
||||
self, messages: List[Message], node_type: NodeType, exclude: bytes32
|
||||
):
|
||||
async def send_to_all_except(self, messages: List[Message], node_type: NodeType, exclude: bytes32):
|
||||
for _, connection in self.all_connections.items():
|
||||
if (
|
||||
connection.connection_type is node_type
|
||||
and connection.peer_node_id != exclude
|
||||
):
|
||||
if connection.connection_type is node_type and connection.peer_node_id != exclude:
|
||||
for message in messages:
|
||||
await connection.send_message(message)
|
||||
|
||||
|
767
src/timelord.py
767
src/timelord.py
@ -2,12 +2,773 @@ import asyncio
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
import socket
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
|
||||
from chiavdf import create_discriminant
|
||||
|
||||
from src.consensus.constants import ConsensusConstants
|
||||
from src.consensus.pot_iterations import (
|
||||
calculate_iterations_quality,
|
||||
calculate_sp_iters,
|
||||
calculate_ip_iters,
|
||||
)
|
||||
from src.protocols import timelord_protocol
|
||||
from src.server.outbound_message import NodeType, Message
|
||||
from src.server.server import ChiaServer
|
||||
from src.types.classgroup import ClassgroupElement
|
||||
from src.types.end_of_slot_bundle import EndOfSubSlotBundle
|
||||
from src.types.reward_chain_sub_block import (
|
||||
RewardChainSubBlock,
|
||||
RewardChainSubBlockUnfinished,
|
||||
)
|
||||
from src.types.sized_bytes import bytes32
|
||||
from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs
|
||||
from src.types.vdf import VDFInfo, VDFProof
|
||||
from src.util.api_decorators import api_request
|
||||
from src.util.ints import uint64, uint8, uint128, int512
|
||||
from src.types.sub_epoch_summary import SubEpochSummary
|
||||
from chiapos import Verifier
|
||||
from src.types.slots import ChallengeBlockInfo
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def iters_from_sub_block(
|
||||
constants,
|
||||
reward_chain_sub_block: Union[RewardChainSubBlock, RewardChainSubBlockUnfinished],
|
||||
sub_slot_iters: uint64,
|
||||
difficulty: uint64,
|
||||
) -> Tuple[uint64, uint64]:
|
||||
v = Verifier()
|
||||
pos = reward_chain_sub_block.proof_of_space
|
||||
plot_id: bytes32 = pos.get_plot_id()
|
||||
quality = v.validate_proof(plot_id, pos.size, pos.challenge_hash, bytes(pos.proof))
|
||||
|
||||
if reward_chain_sub_block.challenge_chain_sp_vdf is None:
|
||||
assert reward_chain_sub_block.signage_point_index == 0
|
||||
cc_sp: bytes32 = reward_chain_sub_block.proof_of_space.challenge_hash
|
||||
else:
|
||||
cc_sp: bytes32 = reward_chain_sub_block.challenge_chain_sp_vdf.get_hash()
|
||||
required_iters = calculate_iterations_quality(
|
||||
quality,
|
||||
reward_chain_sub_block.proof_of_space.size,
|
||||
difficulty,
|
||||
cc_sp,
|
||||
)
|
||||
return (
|
||||
calculate_sp_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index),
|
||||
calculate_ip_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index, required_iters),
|
||||
)
|
||||
|
||||
|
||||
class EndOfSubSlotData:
|
||||
eos_bundle: EndOfSubSlotBundle
|
||||
sub_slot_iters: uint64
|
||||
new_difficulty: uint64
|
||||
deficit: uint8
|
||||
|
||||
def __init__(self, eos_bundle, sub_slot_iters, new_difficulty, deficit):
|
||||
self.eos_bundle = eos_bundle
|
||||
self.sub_slot_iters = sub_slot_iters
|
||||
self.new_difficulty = new_difficulty
|
||||
self.deficit = deficit
|
||||
|
||||
|
||||
class Chain(Enum):
|
||||
CHALLENGE_CHAIN = 1
|
||||
REWARD_CHAIN = 2
|
||||
INFUSED_CHALLENGE_CHAIN = 3
|
||||
|
||||
|
||||
class IterationType(Enum):
|
||||
SIGNAGE_POINT = 1
|
||||
INFUSION_POINT = 2
|
||||
END_OF_SUBSLOT = 3
|
||||
|
||||
|
||||
class LastState:
|
||||
def __init__(self, constants: ConsensusConstants):
|
||||
self.peak: Optional[timelord_protocol.NewPeak] = None
|
||||
self.subslot_end: Optional[EndOfSubSlotData] = None
|
||||
self.last_ip: uint64 = uint64(0)
|
||||
self.deficit: uint8 = uint8(0)
|
||||
self.sub_epoch_summary: Optional[SubEpochSummary] = None
|
||||
self.constants: ConsensusConstants = constants
|
||||
self.last_weight: uint128 = uint128(0)
|
||||
self.total_iters: uint128 = uint128(0)
|
||||
self.last_peak_challenge: Optional[bytes32] = None
|
||||
|
||||
def set_state(self, state):
|
||||
if isinstance(state, timelord_protocol.NewPeak):
|
||||
self.peak = state
|
||||
self.subslot_end = None
|
||||
_, self.last_ip = iters_from_sub_block(
|
||||
self.constants,
|
||||
state.reward_chain_sub_block,
|
||||
state.sub_slot_iters,
|
||||
state.difficulty,
|
||||
)
|
||||
self.deficit = state.deficit
|
||||
self.sub_epoch_summary = state.sub_epoch_summary
|
||||
self.last_weight = state.reward_chain_sub_block.weight
|
||||
self.total_iters = state.reward_chain_sub_block.total_iters
|
||||
self.last_peak_challenge = state.reward_chain_sub_block.get_hash()
|
||||
if isinstance(state, EndOfSubSlotData):
|
||||
self.peak = None
|
||||
self.subslot_end = state
|
||||
self.last_ip = 0
|
||||
self.deficit = state.deficit
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return self.peak is None and self.subslot_end is None
|
||||
|
||||
def get_sub_slot_iters(self) -> uint64:
|
||||
if self.peak is not None:
|
||||
return self.peak.sub_slot_iters
|
||||
return self.subslot_end.sub_slot_iters
|
||||
|
||||
def get_weight(self) -> uint128:
|
||||
return self.last_weight
|
||||
|
||||
def get_total_iters(self) -> uint128:
|
||||
return self.total_iters
|
||||
|
||||
def get_last_peak_challenge(self) -> Optional[bytes32]:
|
||||
return self.last_peak_challenge
|
||||
|
||||
def get_difficulty(self) -> uint64:
|
||||
if self.peak is not None:
|
||||
return self.peak.difficulty
|
||||
return self.subslot_end.new_difficulty
|
||||
|
||||
def get_last_ip(self) -> uint64:
|
||||
return self.last_ip
|
||||
|
||||
def get_deficit(self) -> uint8:
|
||||
if self.peak is not None:
|
||||
return self.peak.deficit
|
||||
return self.subslot_end.deficit
|
||||
|
||||
def get_sub_epoch_summary(self) -> Optional[SubEpochSummary]:
|
||||
return self.sub_epoch_summary
|
||||
|
||||
def get_challenge(self, chain: Chain) -> Optional[bytes32]:
|
||||
if self.peak is not None:
|
||||
sub_block = self.peak.reward_chain_sub_block
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
return sub_block.challenge_chain_ip_vdf.challenge_hash
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
return sub_block.get_hash()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if sub_block.infused_challenge_chain_ip_vdf is not None:
|
||||
return sub_block.infused_challenge_chain_ip_vdf.challenge_hash
|
||||
if self.peak.deficit == 4:
|
||||
return ChallengeBlockInfo(
|
||||
sub_block.proof_of_space,
|
||||
sub_block.challenge_chain_sp_vdf,
|
||||
sub_block.challenge_chain_sp_signature,
|
||||
sub_block.challenge_chain_ip_vdf,
|
||||
).get_hash()
|
||||
return None
|
||||
if self.subslot_end is not None:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
return self.subslot_end.eos_bundle.challenge_chain.get_hash()
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
return self.subslot_end.eos_bundle.reward_chain.get_hash()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
|
||||
return self.subslot_end.eos_bundle.infused_challenge_chain.get_hash()
|
||||
else:
|
||||
return None
|
||||
return None
|
||||
|
||||
def get_initial_form(self, chain: Chain) -> Optional[ClassgroupElement]:
|
||||
if self.peak is not None:
|
||||
sub_block = self.peak.reward_chain_sub_block
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
return sub_block.challenge_chain_ip_vdf.output
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
return ClassgroupElement.get_default_element()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if sub_block.infused_challenge_chain_ip_vdf is not None:
|
||||
return sub_block.infused_challenge_chain_ip_vdf.output
|
||||
elif self.peak.deficit == 4:
|
||||
return ClassgroupElement.get_default_element()
|
||||
else:
|
||||
return None
|
||||
if self.subslot_end is not None:
|
||||
if chain == Chain.CHALLENGE_CHAIN or chain == Chain.REWARD_CHAIN:
|
||||
return ClassgroupElement.get_default_element()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
|
||||
return ClassgroupElement.get_default_element()
|
||||
else:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
class Timelord:
|
||||
def __init__(self, config: Dict, discriminant_size_bits: int):
|
||||
def __init__(self, config: Dict, constants: ConsensusConstants):
|
||||
self.config = config
|
||||
self.constants = constants
|
||||
self._shut_down = False
|
||||
self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = []
|
||||
self.lock: asyncio.Lock = asyncio.Lock()
|
||||
self.potential_free_clients: List = []
|
||||
self.ip_whitelist = self.config["vdf_clients"]["ip"]
|
||||
self.server: Optional[ChiaServer] = None
|
||||
self.chain_type_to_stream: Dict[Chain, Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = {}
|
||||
self.chain_start_time: Dict = {}
|
||||
# Chains that currently don't have a vdf_client.
|
||||
self.unspawned_chains: List[Chain] = [
|
||||
Chain.CHALLENGE_CHAIN,
|
||||
Chain.REWARD_CHAIN,
|
||||
Chain.INFUSED_CHALLENGE_CHAIN,
|
||||
]
|
||||
# Chains that currently accept iterations.
|
||||
self.allows_iters: List[Chain] = []
|
||||
# Last peak received, None if it's already processed.
|
||||
self.new_peak: Optional[timelord_protocol.NewPeak] = None
|
||||
# Last end of subslot bundle, None if we built a peak on top of it.
|
||||
self.new_subslot_end: Optional[EndOfSubSlotData] = None
|
||||
# Last state received. Can either be a new peak or a new EndOfSubslotBundle.
|
||||
self.last_state: LastState = LastState(self.constants)
|
||||
# Unfinished block info, iters adjusted to the last peak.
|
||||
self.unfinished_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
|
||||
# Signage points iters, adjusted to the last peak.
|
||||
self.signage_point_iters: List[uint64] = []
|
||||
# For each chain, send those info when the process spawns.
|
||||
self.iters_to_submit: Dict[Chain, List[uint64]] = {}
|
||||
self.iters_submitted: Dict[Chain, List[uint64]] = {}
|
||||
# For each iteration submitted, know if it's a signage point, an infusion point or an end of slot.
|
||||
self.iteration_to_proof_type: Dict[uint64, IterationType] = {}
|
||||
# List of proofs finished.
|
||||
self.proofs_finished: List[Tuple[Chain, VDFInfo, VDFProof]] = []
|
||||
# Data to send at vdf_client initialization.
|
||||
self.finished_sp = 0
|
||||
self.overflow_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
|
||||
self.main_loop = None
|
||||
self.vdf_server = None
|
||||
self._shut_down = False
|
||||
|
||||
async def _start(self):
|
||||
log.info("Starting timelord.")
|
||||
self.main_loop = asyncio.create_task(self._manage_chains())
|
||||
|
||||
self.vdf_server = await asyncio.start_server(
|
||||
self._handle_client,
|
||||
self.config["vdf_server"]["host"],
|
||||
self.config["vdf_server"]["port"],
|
||||
)
|
||||
log.info("Started timelord.")
|
||||
|
||||
def _close(self):
|
||||
self._shut_down = True
|
||||
|
||||
async def _await_closed(self):
|
||||
pass
|
||||
|
||||
def set_server(self, server: ChiaServer):
|
||||
self.server = server
|
||||
|
||||
@api_request
|
||||
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
|
||||
async with self.lock:
|
||||
if self.last_state is None or self.last_state.get_weight() < new_peak.weight:
|
||||
self.new_peak = new_peak
|
||||
|
||||
@api_request
|
||||
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
|
||||
async with self.lock:
|
||||
if not self._accept_unfinished_block(new_unfinished_subblock):
|
||||
return
|
||||
sp_iters, ip_iters = iters_from_sub_block(
|
||||
self.constants,
|
||||
new_unfinished_subblock.reward_chain_sub_block,
|
||||
self.last_state.get_sub_slot_iters(),
|
||||
self.last_state.get_difficulty(),
|
||||
)
|
||||
last_ip_iters = self.last_state.get_last_ip()
|
||||
if sp_iters < ip_iters:
|
||||
self.overflow_blocks.append(new_unfinished_subblock)
|
||||
elif ip_iters > last_ip_iters:
|
||||
self.unfinished_blocks.append(new_unfinished_subblock)
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
|
||||
self.iteration_to_proof_type[ip_iters - last_ip_iters] = IterationType.INFUSION_POINT
|
||||
|
||||
def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool:
|
||||
# Total unfinished block iters needs to exceed peak's iters.
|
||||
if self.last_state.get_total_iters() >= block.total_iters:
|
||||
return False
|
||||
# The peak hash of the rc-sub-block must match
|
||||
# the signage point rc VDF challenge hash of the unfinished sub-block.
|
||||
if (
|
||||
block.reward_chain_sp_vdf is not None
|
||||
and self.last_state.get_last_peak_challenge() is not None
|
||||
and self.last_state.get_last_peak_challenge() != block.reward_chain_sp_vdf.challenge_hash
|
||||
):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
async with self.lock:
|
||||
client_ip = writer.get_extra_info("peername")[0]
|
||||
log.info(f"New timelord connection from client: {client_ip}.")
|
||||
if client_ip in self.ip_whitelist:
|
||||
self.free_clients.append((client_ip, reader, writer))
|
||||
log.info(f"Added new VDF client {client_ip}.")
|
||||
for ip, end_time in list(self.potential_free_clients):
|
||||
if ip == client_ip:
|
||||
self.potential_free_clients.remove((ip, end_time))
|
||||
break
|
||||
|
||||
async def _stop_chain(self, chain: Chain):
|
||||
stop_ip, _, stop_writer = self.chain_type_to_stream[chain]
|
||||
self.potential_free_clients.append((stop_ip, time.time()))
|
||||
stop_writer.write(b"010")
|
||||
await stop_writer.drain()
|
||||
if chain in self.allows_iters:
|
||||
self.allows_iters.remove(chain)
|
||||
self.unspawned_chains.append(chain)
|
||||
|
||||
async def _reset_chains(self):
|
||||
# First, stop all chains.
|
||||
ip_iters = self.last_state.get_last_ip()
|
||||
sub_slot_iters = self.last_state.get_sub_slot_iters()
|
||||
difficulty = self.last_state.get_difficulty()
|
||||
for chain in self.chain_type_to_stream.keys():
|
||||
await self._stop_chain(chain)
|
||||
# Adjust all signage points iterations to the peak.
|
||||
iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT)
|
||||
self.signage_point_iters = [
|
||||
k * iters_per_signage - ip_iters
|
||||
for k in range(1, self.constants.NUM_SPS_SUB_SLOT + 1)
|
||||
if k * iters_per_signage - ip_iters > 0 and k * iters_per_signage < sub_slot_iters
|
||||
]
|
||||
# Adjust all unfinished blocks iterations to the peak.
|
||||
new_unfinished_blocks = []
|
||||
self.proofs_finished = []
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain] = []
|
||||
self.iters_submitted[chain] = []
|
||||
self.iteration_to_proof_type = {}
|
||||
for block in self.unfinished_blocks:
|
||||
if not self._accept_unfinished_block(block):
|
||||
continue
|
||||
block_sp_iters, block_ip_iters = iters_from_sub_block(
|
||||
self.constants,
|
||||
block,
|
||||
sub_slot_iters,
|
||||
difficulty,
|
||||
)
|
||||
new_block_iters = block_ip_iters - ip_iters
|
||||
if new_block_iters > 0:
|
||||
new_unfinished_blocks.append(block)
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain].append(new_block_iters)
|
||||
self.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT
|
||||
# Remove all unfinished blocks that have already passed.
|
||||
self.unfinished_blocks = new_unfinished_blocks
|
||||
# Signage points.
|
||||
if len(self.signage_point_iters) > 0:
|
||||
count_signage = 0
|
||||
for signage in self.signage_point_iters:
|
||||
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
|
||||
self.iters_to_submit[chain].append(signage)
|
||||
self.iteration_to_proof_type[signage] = IterationType.SIGNAGE_POINT
|
||||
count_signage += 1
|
||||
if count_signage == 3:
|
||||
break
|
||||
# TODO: handle the special case when infusion point is the end of subslot.
|
||||
left_subslot_iters = sub_slot_iters - ip_iters
|
||||
log.info(f"Left subslot iters: {left_subslot_iters}.")
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain].append(left_subslot_iters)
|
||||
self.iteration_to_proof_type[left_subslot_iters] = IterationType.END_OF_SUBSLOT
|
||||
|
||||
async def _handle_new_peak(self):
|
||||
self.last_state.set_state(self.new_peak)
|
||||
self.new_peak = None
|
||||
await self._reset_chains()
|
||||
|
||||
async def _handle_subslot_end(self):
|
||||
self.finished_sp = 0
|
||||
self.last_state.set_state(self.new_subslot_end)
|
||||
self.new_subslot_end = None
|
||||
await self._reset_chains()
|
||||
|
||||
async def _map_chains_with_vdf_clients(self):
|
||||
while not self._shut_down:
|
||||
picked_chain = None
|
||||
async with self.lock:
|
||||
if len(self.free_clients) == 0:
|
||||
break
|
||||
ip, reader, writer = self.free_clients[0]
|
||||
for chain_type in self.unspawned_chains:
|
||||
challenge_hash = self.last_state.get_challenge(chain_type)
|
||||
initial_form = self.last_state.get_initial_form(chain_type)
|
||||
if challenge_hash is not None and initial_form is not None:
|
||||
picked_chain = chain_type
|
||||
break
|
||||
if picked_chain is None:
|
||||
break
|
||||
picked_chain = self.unspawned_chains[0]
|
||||
self.chain_type_to_stream[picked_chain] = (ip, reader, writer)
|
||||
self.free_clients = self.free_clients[1:]
|
||||
self.unspawned_chains = self.unspawned_chains[1:]
|
||||
self.chain_start_time[picked_chain] = time.time()
|
||||
|
||||
log.info(f"Mapping free vdf_client with chain: {picked_chain}.")
|
||||
asyncio.create_task(
|
||||
self._do_process_communication(picked_chain, challenge_hash, initial_form, ip, reader, writer)
|
||||
)
|
||||
|
||||
async def _submit_iterations(self):
|
||||
for chain in Chain:
|
||||
if chain in self.allows_iters:
|
||||
_, _, writer = self.chain_type_to_stream[chain]
|
||||
for iteration in self.iters_to_submit[chain]:
|
||||
if iteration in self.iters_submitted[chain]:
|
||||
continue
|
||||
prefix = str(len(str(iteration)))
|
||||
if len(str(iteration)) < 10:
|
||||
prefix = "0" + prefix
|
||||
iter_str = prefix + str(iteration)
|
||||
writer.write(iter_str.encode())
|
||||
await writer.drain()
|
||||
self.iters_submitted[chain].append(iteration)
|
||||
|
||||
def _clear_proof_list(self, iters: uint64):
|
||||
return [
|
||||
(chain, info, proof) for chain, info, proof in self.proofs_finished if info.number_of_iterations != iters
|
||||
]
|
||||
|
||||
async def _check_for_new_sp(self):
|
||||
signage_iters = [
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.SIGNAGE_POINT
|
||||
]
|
||||
if len(signage_iters) == 0:
|
||||
return
|
||||
for signage_iter in signage_iters:
|
||||
proofs_with_iter = [
|
||||
(chain, info, proof)
|
||||
for chain, info, proof in self.proofs_finished
|
||||
if info.number_of_iterations == signage_iter
|
||||
]
|
||||
# Wait for both cc and rc to have the signage point.
|
||||
if len(proofs_with_iter) == 2:
|
||||
cc_info: Optional[VDFInfo] = None
|
||||
cc_proof: Optional[VDFProof] = None
|
||||
rc_info: Optional[VDFInfo] = None
|
||||
rc_proof: Optional[VDFProof] = None
|
||||
for chain, info, proof in proofs_with_iter:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
cc_info = info
|
||||
cc_proof = proof
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
rc_info = info
|
||||
rc_proof = proof
|
||||
if cc_info is None or cc_proof is None or rc_info is None or rc_proof is None:
|
||||
log.error(f"Insufficient signage point data {signage_iter}")
|
||||
continue
|
||||
response = timelord_protocol.NewSignagePointVDF(
|
||||
uint8(self.finished_sp),
|
||||
cc_info,
|
||||
cc_proof,
|
||||
rc_info,
|
||||
rc_proof,
|
||||
)
|
||||
if self.server is not None:
|
||||
msg = Message("new_signage_point_vdf", response)
|
||||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||||
# Cleanup the signage point from memory.
|
||||
self.signage_point_iters.remove(signage_iter)
|
||||
self.finished_sp += 1
|
||||
self.proofs_finished = self._clear_proof_list(signage_iter)
|
||||
# Send the next 3 signage point to the chains.
|
||||
next_iters_count = 0
|
||||
for next_sp in self.signage_point_iters:
|
||||
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
|
||||
if next_sp not in self.iters_submitted[chain] and next_sp not in self.iters_to_submit[chain]:
|
||||
self.iters_to_submit[chain].append(next_sp)
|
||||
self.iteration_to_proof_type[next_sp] = IterationType.SIGNAGE_POINT
|
||||
next_iters_count += 1
|
||||
if next_iters_count == 3:
|
||||
break
|
||||
|
||||
async def _check_for_new_ip(self):
|
||||
infusion_iters = [
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.INFUSION_POINT
|
||||
]
|
||||
for iteration in infusion_iters:
|
||||
proofs_with_iter = [
|
||||
(chain, info, proof)
|
||||
for chain, info, proof in self.proofs_finished
|
||||
if info.number_of_iterations == iteration
|
||||
]
|
||||
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
|
||||
chain_count = 3
|
||||
else:
|
||||
chain_count = 2
|
||||
if len(proofs_with_iter) == chain_count:
|
||||
block = None
|
||||
for unfinished_block in self.unfinished_blocks:
|
||||
_, ip_iters = iters_from_sub_block(
|
||||
self.constants,
|
||||
unfinished_block.reward_chain_sub_block,
|
||||
self.last_state.get_sub_slot_iters(),
|
||||
self.last_state.get_difficulty(),
|
||||
)
|
||||
if ip_iters - self.last_state.get_last_ip() == iteration:
|
||||
block = unfinished_block
|
||||
break
|
||||
if block is not None:
|
||||
self.unfinished_blocks.remove(block)
|
||||
challenge_hash = block.reward_chain_sub_block.get_hash()
|
||||
icc_info: Optional[VDFInfo] = None
|
||||
icc_proof: Optional[VDFProof] = None
|
||||
cc_info: Optional[VDFInfo] = None
|
||||
cc_proof: Optional[VDFProof] = None
|
||||
rc_info: Optional[VDFInfo] = None
|
||||
rc_proof: Optional[VDFProof] = None
|
||||
for chain, info, proof in proofs_with_iter:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
cc_info = info
|
||||
cc_proof = proof
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
rc_info = info
|
||||
rc_proof = proof
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
icc_info = info
|
||||
icc_proof = proof
|
||||
if cc_info is None or cc_proof is None or rc_info is None or rc_proof is None:
|
||||
log.error(
|
||||
f"Insufficient VDF proofs for infusion point ch: {challenge_hash} iterations:{iteration}"
|
||||
)
|
||||
response = timelord_protocol.NewInfusionPointVDF(
|
||||
challenge_hash,
|
||||
cc_info,
|
||||
cc_proof,
|
||||
rc_info,
|
||||
rc_proof,
|
||||
icc_info,
|
||||
icc_proof,
|
||||
)
|
||||
msg = Message("new_infusion_point_vdf", response)
|
||||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||||
for iteration in infusion_iters:
|
||||
self.proofs_finished = self._clear_proof_list(iteration)
|
||||
|
||||
async def _check_for_end_of_subslot(self):
|
||||
left_subslot_iters = [
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items() if t == IterationType.END_OF_SUBSLOT
|
||||
]
|
||||
if len(left_subslot_iters) == 0:
|
||||
return
|
||||
chains_finished = [
|
||||
(chain, info, proof)
|
||||
for chain, info, proof in self.proofs_finished
|
||||
if info.number_of_iterations == left_subslot_iters[0]
|
||||
]
|
||||
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
|
||||
chain_count = 3
|
||||
else:
|
||||
chain_count = 2
|
||||
if len(chains_finished) == chain_count:
|
||||
icc_ip_vdf: Optional[VDFInfo] = None
|
||||
icc_ip_proof: Optional[VDFProof] = None
|
||||
cc_vdf: Optional[VDFInfo] = None
|
||||
cc_proof: Optional[VDFProof] = None
|
||||
rc_vdf: Optional[VDFInfo] = None
|
||||
rc_proof: Optional[VDFProof] = None
|
||||
for chain, info, proof in chains_finished:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
cc_vdf = info
|
||||
cc_proof = proof
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
rc_vdf = info
|
||||
rc_proof = proof
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
icc_ip_vdf = info
|
||||
icc_ip_proof = proof
|
||||
assert cc_proof is not None and rc_proof is not None and cc_vdf is not None and rc_vdf is not None
|
||||
log.info("Collected end of subslot vdfs.")
|
||||
|
||||
icc_sub_slot: Optional[InfusedChallengeChainSubSlot] = (
|
||||
None if icc_ip_vdf is None else InfusedChallengeChainSubSlot(icc_ip_vdf)
|
||||
)
|
||||
icc_sub_slot_hash = icc_sub_slot.get_hash() if self.last_state.get_deficit() == 0 else None
|
||||
if self.last_state.get_sub_epoch_summary() is not None:
|
||||
ses_hash = self.last_state.get_sub_epoch_summary().get_hash()
|
||||
new_sub_slot_iters = self.last_state.get_sub_epoch_summary().new_sub_slot_iters
|
||||
new_difficulty = self.last_state.get_sub_epoch_summary().new_difficulty
|
||||
else:
|
||||
ses_hash = None
|
||||
new_sub_slot_iters = self.last_state.get_sub_slot_iters()
|
||||
new_difficulty = self.last_state.get_difficulty()
|
||||
cc_sub_slot = ChallengeChainSubSlot(cc_vdf, icc_sub_slot_hash, ses_hash, new_sub_slot_iters, new_difficulty)
|
||||
eos_deficit: uint8 = (
|
||||
self.last_state.get_deficit()
|
||||
if self.last_state.get_deficit() > 0
|
||||
else self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK
|
||||
)
|
||||
rc_sub_slot = RewardChainSubSlot(
|
||||
rc_vdf,
|
||||
cc_sub_slot.get_hash(),
|
||||
icc_sub_slot.get_hash() if icc_sub_slot is not None else None,
|
||||
eos_deficit,
|
||||
)
|
||||
eos_bundle = EndOfSubSlotBundle(
|
||||
cc_sub_slot,
|
||||
icc_sub_slot,
|
||||
rc_sub_slot,
|
||||
SubSlotProofs(cc_proof, icc_ip_proof, rc_proof),
|
||||
)
|
||||
if self.server is not None:
|
||||
msg = Message("end_of_sub_slot_bundle", timelord_protocol.NewEndOfSubSlotVDF(eos_bundle))
|
||||
await self.server.send_to_all([msg], NodeType.FULL_NODE)
|
||||
log.info("Built end of subslot bundle.")
|
||||
self.unfinished_blocks = self.overflow_blocks
|
||||
self.overflow_blocks = []
|
||||
self.new_subslot_end = EndOfSubSlotData(
|
||||
eos_bundle,
|
||||
new_sub_slot_iters,
|
||||
new_difficulty,
|
||||
eos_deficit,
|
||||
)
|
||||
|
||||
async def _manage_chains(self):
|
||||
while not self._shut_down:
|
||||
await asyncio.sleep(0.1)
|
||||
# Didn't get any useful data, continue.
|
||||
async with self.lock:
|
||||
if self.last_state.is_empty() and self.new_peak is None:
|
||||
continue
|
||||
# Map free vdf_clients to unspawned chains.
|
||||
await self._map_chains_with_vdf_clients()
|
||||
async with self.lock:
|
||||
# We've got a new peak, process it.
|
||||
if self.new_peak is not None:
|
||||
await self._handle_new_peak()
|
||||
# A subslot ended, process it.
|
||||
if self.new_subslot_end is not None:
|
||||
await self._handle_subslot_end()
|
||||
# Submit pending iterations.
|
||||
await self._submit_iterations()
|
||||
# Check for new signage point and broadcast it if present.
|
||||
await self._check_for_new_sp()
|
||||
# Check for new infusion point and broadcast it if present.
|
||||
await self._check_for_new_ip()
|
||||
# Check for end of subslot, respawn chains and build EndOfSubslotBundle.
|
||||
await self._check_for_end_of_subslot()
|
||||
|
||||
async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer):
|
||||
disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS)
|
||||
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
|
||||
# the timelord tells the vdf_client what to execute.
|
||||
if self.config["fast_algorithm"]:
|
||||
# Run n-wesolowski (fast) algorithm.
|
||||
writer.write(b"N")
|
||||
else:
|
||||
# Run two-wesolowski (slow) algorithm.
|
||||
writer.write(b"N")
|
||||
await writer.drain()
|
||||
|
||||
prefix = str(len(str(disc)))
|
||||
if len(prefix) == 1:
|
||||
prefix = "00" + prefix
|
||||
if len(prefix) == 2:
|
||||
prefix = "0" + prefix
|
||||
writer.write((prefix + str(disc)).encode())
|
||||
await writer.drain()
|
||||
|
||||
# Send (a, b) from 'initial_form'.
|
||||
for num in [initial_form.a, initial_form.b]:
|
||||
prefix = len(str(num))
|
||||
prefix_len = len(str(prefix))
|
||||
writer.write((str(prefix_len) + str(prefix) + str(num)).encode())
|
||||
await writer.drain()
|
||||
try:
|
||||
ok = await reader.readexactly(2)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
return
|
||||
|
||||
if ok.decode() != "OK":
|
||||
return
|
||||
|
||||
log.info("Got handshake with VDF client.")
|
||||
async with self.lock:
|
||||
self.allows_iters.append(chain)
|
||||
# Listen to the client until "STOP" is received.
|
||||
while True:
|
||||
try:
|
||||
data = await reader.readexactly(4)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
break
|
||||
|
||||
msg = ""
|
||||
try:
|
||||
msg = data.decode()
|
||||
except Exception as e:
|
||||
# log.error(f"Exception while decoding data {e}")
|
||||
pass
|
||||
if msg == "STOP":
|
||||
log.info(f"Stopped client running on ip {ip}.")
|
||||
async with self.lock:
|
||||
writer.write(b"ACK")
|
||||
await writer.drain()
|
||||
break
|
||||
else:
|
||||
try:
|
||||
# This must be a proof, 4 bytes is length prefix
|
||||
length = int.from_bytes(data, "big")
|
||||
proof = await reader.readexactly(length)
|
||||
stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode()))
|
||||
except (
|
||||
asyncio.IncompleteReadError,
|
||||
ConnectionResetError,
|
||||
Exception,
|
||||
) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
break
|
||||
|
||||
iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True))
|
||||
|
||||
y_size_bytes = stdout_bytes_io.read(8)
|
||||
y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True))
|
||||
|
||||
y_bytes = stdout_bytes_io.read(y_size)
|
||||
witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True))
|
||||
proof_bytes: bytes = stdout_bytes_io.read()
|
||||
|
||||
# Verifies our own proof just in case
|
||||
a = int.from_bytes(y_bytes[:129], "big", signed=True)
|
||||
b = int.from_bytes(y_bytes[129:], "big", signed=True)
|
||||
output = ClassgroupElement(int512(a), int512(b))
|
||||
time_taken = time.time() - self.chain_start_time[chain]
|
||||
ips = int(iterations_needed / time_taken * 10) / 10
|
||||
log.info(
|
||||
f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}"
|
||||
f" iters."
|
||||
f"Estimated IPS: {ips}. Chain: {chain}"
|
||||
)
|
||||
|
||||
vdf_info: VDFInfo = VDFInfo(
|
||||
challenge_hash,
|
||||
iterations_needed,
|
||||
output,
|
||||
)
|
||||
vdf_proof: VDFProof = VDFProof(
|
||||
witness_type,
|
||||
proof_bytes,
|
||||
)
|
||||
|
||||
if not vdf_proof.is_valid(self.constants, vdf_info):
|
||||
log.error("Invalid proof of time!")
|
||||
# continue
|
||||
async with self.lock:
|
||||
self.proofs_finished.append((chain, vdf_info, vdf_proof))
|
||||
|
@ -1,775 +0,0 @@
|
||||
import asyncio
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
|
||||
from chiavdf import create_discriminant
|
||||
|
||||
from src.consensus.constants import ConsensusConstants
|
||||
from src.consensus.default_constants import DEFAULT_CONSTANTS
|
||||
from src.consensus.pot_iterations import (
|
||||
calculate_iterations_quality,
|
||||
calculate_sp_iters,
|
||||
calculate_ip_iters,
|
||||
)
|
||||
from blspy import G2Element
|
||||
from src.protocols import timelord_protocol
|
||||
from src.server.outbound_message import OutboundMessage, NodeType, Message, Delivery
|
||||
from src.server.server import ChiaServer
|
||||
from src.types.classgroup import ClassgroupElement
|
||||
from src.types.end_of_slot_bundle import EndOfSubSlotBundle
|
||||
from src.types.reward_chain_sub_block import (
|
||||
RewardChainSubBlock,
|
||||
RewardChainSubBlockUnfinished,
|
||||
)
|
||||
from src.types.sized_bytes import bytes32
|
||||
from src.types.slots import ChallengeChainSubSlot, InfusedChallengeChainSubSlot, RewardChainSubSlot, SubSlotProofs
|
||||
from src.types.vdf import VDFInfo, VDFProof
|
||||
from src.util.api_decorators import api_request
|
||||
from src.util.ints import uint64, uint8, uint128, int512
|
||||
from src.types.sub_epoch_summary import SubEpochSummary
|
||||
from src.util.block_tools import BlockTools
|
||||
from chiapos import Verifier
|
||||
from src.types.slots import ChallengeBlockInfo
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def iters_from_sub_block(
|
||||
constants,
|
||||
reward_chain_sub_block: Union[RewardChainSubBlock, RewardChainSubBlockUnfinished],
|
||||
sub_slot_iters: uint64,
|
||||
difficulty: uint64,
|
||||
) -> Tuple[uint64, uint64]:
|
||||
v = Verifier()
|
||||
pos = reward_chain_sub_block.proof_of_space
|
||||
plot_id: bytes32 = pos.get_plot_id()
|
||||
quality = v.validate_proof(plot_id, pos.size, pos.challenge_hash, bytes(pos.proof))
|
||||
|
||||
if reward_chain_sub_block.challenge_chain_sp_vdf is None:
|
||||
assert reward_chain_sub_block.signage_point_index == 0
|
||||
cc_sp: bytes32 = reward_chain_sub_block.proof_of_space.challenge_hash
|
||||
else:
|
||||
cc_sp: bytes32 = reward_chain_sub_block.challenge_chain_sp_vdf.get_hash()
|
||||
required_iters = calculate_iterations_quality(
|
||||
quality,
|
||||
reward_chain_sub_block.proof_of_space.size,
|
||||
difficulty,
|
||||
cc_sp,
|
||||
)
|
||||
return (
|
||||
calculate_sp_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index),
|
||||
calculate_ip_iters(constants, sub_slot_iters, reward_chain_sub_block.signage_point_index, required_iters),
|
||||
)
|
||||
|
||||
class EndOfSubSlotData:
|
||||
eos_bundle: EndOfSubSlotBundle
|
||||
sub_slot_iters: uint64
|
||||
new_difficulty: uint64
|
||||
deficit: uint8
|
||||
|
||||
def __init__(self, eos_bundle, sub_slot_iters, new_difficulty, deficit):
|
||||
self.eos_bundle = eos_bundle
|
||||
self.sub_slot_iters = sub_slot_iters
|
||||
self.new_difficulty = new_difficulty
|
||||
self.deficit = deficit
|
||||
|
||||
class Chain(Enum):
|
||||
CHALLENGE_CHAIN = 1
|
||||
REWARD_CHAIN = 2
|
||||
INFUSED_CHALLENGE_CHAIN = 3
|
||||
|
||||
class IterationType(Enum):
|
||||
SIGNAGE_POINT = 1
|
||||
INFUSION_POINT = 2
|
||||
END_OF_SUBSLOT = 3
|
||||
|
||||
class LastState:
|
||||
def __init__(self, constants):
|
||||
self.peak: Optional[timelord_protocol.NewPeak] = None
|
||||
self.subslot_end: Optional[EndOfSubSlotData] = None
|
||||
self.last_ip: uint64 = 0
|
||||
self.deficit: uint8 = 0
|
||||
self.sub_epoch_summary: Optional[SubEpochSummary] = None
|
||||
self.constants = constants
|
||||
self.last_weight = 0
|
||||
self.total_iters = 0
|
||||
self.last_peak_challenge: Optional[bytes32] = None
|
||||
|
||||
def set_state(self, state):
|
||||
if isinstance(state, timelord_protocol.NewPeak):
|
||||
self.peak = state
|
||||
self.subslot_end = None
|
||||
_, self.last_ip = iters_from_sub_block(
|
||||
self.constants,
|
||||
state.reward_chain_sub_block,
|
||||
state.sub_slot_iters,
|
||||
state.difficulty,
|
||||
)
|
||||
self.deficit = state.deficit
|
||||
self.sub_epoch_summary = state.sub_epoch_summary
|
||||
self.last_weight = state.reward_chain_sub_block.weight
|
||||
self.total_iters = state.reward_chain_sub_block.total_iters
|
||||
self.last_peak_challenge = state.reward_chain_sub_block.get_hash()
|
||||
if isinstance(state, EndOfSubSlotData):
|
||||
self.peak = None
|
||||
self.subslot_end = state
|
||||
self.last_ip = 0
|
||||
self.deficit = state.deficit
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return self.peak is None and self.subslot_end is None
|
||||
|
||||
def get_sub_slot_iters(self) -> uint64:
|
||||
if self.peak is not None:
|
||||
return self.peak.sub_slot_iters
|
||||
return self.subslot_end.sub_slot_iters
|
||||
|
||||
def get_weight(self) -> uint64:
|
||||
return self.last_weight
|
||||
|
||||
def get_total_iters(self) -> uint128:
|
||||
return self.total_iters
|
||||
|
||||
def get_last_peak_challenge(self) -> Optional[bytes32]:
|
||||
return self.last_peak_challenge
|
||||
|
||||
def get_difficulty(self) -> uint64:
|
||||
if self.peak is not None:
|
||||
return self.peak.difficulty
|
||||
return self.subslot_end.new_difficulty
|
||||
|
||||
def get_last_ip(self) -> uint64:
|
||||
return self.last_ip
|
||||
|
||||
def get_deficit(self) -> uint8:
|
||||
if self.peak is not None:
|
||||
return self.peak.deficit
|
||||
return self.subslot_end.deficit
|
||||
|
||||
def get_sub_epoch_summary(self) -> Optional[SubEpochSummary]:
|
||||
return self.sub_epoch_summary
|
||||
|
||||
def get_challenge(self, chain: Chain) -> Optional[bytes32]:
|
||||
if self.peak is not None:
|
||||
sub_block = self.peak.reward_chain_sub_block
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
return sub_block.challenge_chain_ip_vdf.challenge_hash
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
return sub_block.get_hash()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if sub_block.infused_challenge_chain_ip_vdf is not None:
|
||||
return sub_block.infused_challenge_chain_ip_vdf.challenge_hash
|
||||
if self.peak.deficit == 4:
|
||||
return ChallengeBlockInfo(
|
||||
sub_block.proof_of_space,
|
||||
sub_block.challenge_chain_sp_vdf,
|
||||
sub_block.challenge_chain_sp_signature,
|
||||
sub_block.challenge_chain_ip_vdf,
|
||||
).get_hash()
|
||||
return None
|
||||
if self.subslot_end is not None:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
return self.subslot_end.eos_bundle.challenge_chain.get_hash()
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
return self.subslot_end.eos_bundle.reward_chain.get_hash()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
|
||||
return self.subslot_end.eos_bundle.infused_challenge_chain.get_hash()
|
||||
else:
|
||||
return None
|
||||
return None
|
||||
|
||||
def get_initial_form(self, chain: Chain) -> Optional[ClassgroupElement]:
|
||||
if self.peak is not None:
|
||||
sub_block = self.peak.reward_chain_sub_block
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
return sub_block.challenge_chain_ip_vdf.output
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
return ClassgroupElement.get_default_element()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if sub_block.infused_challenge_chain_ip_vdf is not None:
|
||||
return sub_block.infused_challenge_chain_ip_vdf.output
|
||||
elif self.peak.deficit == 4:
|
||||
return ClassgroupElement.get_default_element()
|
||||
else:
|
||||
return None
|
||||
if self.subslot_end is not None:
|
||||
if chain == Chain.CHALLENGE_CHAIN or chain == Chain.REWARD_CHAIN:
|
||||
return ClassgroupElement.get_default_element()
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
if self.subslot_end.deficit < self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK:
|
||||
return ClassgroupElement.get_default_element()
|
||||
else:
|
||||
return None
|
||||
return None
|
||||
|
||||
class Timelord:
|
||||
def __init__(self, config: Dict, constants: ConsensusConstants):
|
||||
self.config = config
|
||||
self.constants = constants
|
||||
self._is_stopped = False
|
||||
self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = []
|
||||
self.lock: asyncio.Lock = asyncio.Lock()
|
||||
self.potential_free_clients: List = []
|
||||
self.ip_whitelist = self.config["vdf_clients"]["ip"]
|
||||
self.server: Optional[ChiaServer] = None
|
||||
self.chain_type_to_stream: Dict[Chain, Tuple[ip, asyncio.StreamReader, asyncio.StreamWriter]] = {}
|
||||
self.chain_start_time: Dict = {}
|
||||
# Chains that currently don't have a vdf_client.
|
||||
self.unspawned_chains: List[Chain] = [
|
||||
Chain.CHALLENGE_CHAIN,
|
||||
Chain.REWARD_CHAIN,
|
||||
Chain.INFUSED_CHALLENGE_CHAIN,
|
||||
]
|
||||
# Chains that currently accept iterations.
|
||||
self.allows_iters: List[Chain] = []
|
||||
# Last peak received, None if it's already processed.
|
||||
self.new_peak: Optional[timelord_protocol.NewPeak] = None
|
||||
# Last end of subslot bundle, None if we built a peak on top of it.
|
||||
self.new_subslot_end: Optional[EndOfSubSlotData] = None
|
||||
# Last state received. Can either be a new peak or a new EndOfSubslotBundle.
|
||||
self.last_state: LastState = LastState(self.constants)
|
||||
# Unfinished block info, iters adjusted to the last peak.
|
||||
self.unfinished_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
|
||||
# Signage points iters, adjusted to the last peak.
|
||||
self.signage_point_iters: List[uint64] = []
|
||||
# For each chain, send those info when the process spawns.
|
||||
self.iters_to_submit: Dict[str, List[uint64]] = {}
|
||||
self.iters_submitted: Dict[str, List[uint64]] = {}
|
||||
# For each iteration submitted, know if it's a signage point, an infusion point or an end of slot.
|
||||
self.iteration_to_proof_type: Dict[uint64, IterationType] = {}
|
||||
# List of proofs finished.
|
||||
self.proofs_finished: List[Tuple[Chain, VDFInfo, VDFProof]] = []
|
||||
# Data to send at vdf_client initialization.
|
||||
self.finished_sp = 0
|
||||
self.overflow_blocks: List[timelord_protocol.NewUnfinishedSubBlock] = []
|
||||
|
||||
async def start(self):
|
||||
log.info("Starting timelord.")
|
||||
self.main_loop = asyncio.create_task(self._manage_chains())
|
||||
|
||||
self.vdf_server = await asyncio.start_server(
|
||||
self._handle_client,
|
||||
self.config["vdf_server"]["host"],
|
||||
self.config["vdf_server"]["port"],
|
||||
)
|
||||
log.info("Started timelord.")
|
||||
|
||||
def set_server(self, server: ChiaServer):
|
||||
self.server = server
|
||||
|
||||
@api_request
|
||||
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
|
||||
async with self.lock:
|
||||
if (
|
||||
self.last_state is None
|
||||
or self.last_state.get_weight() < new_peak.weight
|
||||
):
|
||||
self.new_peak = new_peak
|
||||
|
||||
@api_request
|
||||
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
|
||||
async with self.lock:
|
||||
if not self._accept_unfinished_block(new_unfinished_subblock):
|
||||
return
|
||||
sp_iters, ip_iters = iters_from_sub_block(
|
||||
new_unfinished_subblock.reward_chain_sub_block,
|
||||
self.last_state.get_sub_slot_iters(),
|
||||
self.last_state.get_difficulty(),
|
||||
)
|
||||
last_ip_iters = self.last_state.get_last_ip()
|
||||
if sp_iters < ip_iters:
|
||||
self.overflow_blocks.append(new_unfinished_subblock)
|
||||
elif ip_iters > last_ip_iters:
|
||||
self.unfinished_blocks.append(new_unfinished_subblock)
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
|
||||
self.iteration_to_proof_type[ip_iters - self.last_ip_iters] = IterationType.INFUSION_POINT
|
||||
|
||||
def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool:
|
||||
# Total unfinished block iters needs to exceed peak's iters.
|
||||
if self.last_state.get_total_iters() >= block.total_iters:
|
||||
return False
|
||||
# The peak hash of the rc-sub-block must match
|
||||
# the signage point rc VDF challenge hash of the unfinished sub-block.
|
||||
if (
|
||||
block.reward_chain_sp_vdf is not None
|
||||
and self.last_state.get_last_peak_challenge() is not None
|
||||
and self.last_state.get_last_peak_challenge()
|
||||
!= block.reward_chain_sp_vdf.challenge_hash
|
||||
):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
async with self.lock:
|
||||
client_ip = writer.get_extra_info("peername")[0]
|
||||
log.info(f"New timelord connection from client: {client_ip}.")
|
||||
if client_ip in self.ip_whitelist:
|
||||
self.free_clients.append((client_ip, reader, writer))
|
||||
log.info(f"Added new VDF client {client_ip}.")
|
||||
for ip, end_time in list(self.potential_free_clients):
|
||||
if ip == client_ip:
|
||||
self.potential_free_clients.remove((ip, end_time))
|
||||
break
|
||||
|
||||
async def _stop_chain(self, chain: Chain):
|
||||
stop_ip, _, stop_writer = self.chain_type_to_stream[chain]
|
||||
self.potential_free_clients.append((stop_ip, time.time()))
|
||||
stop_writer.write(b"010")
|
||||
await stop_writer.drain()
|
||||
if chain in self.allows_iters:
|
||||
self.allows_iters.remove(chain)
|
||||
self.unspawned_chains.append(chain)
|
||||
|
||||
async def _reset_chains(self):
|
||||
# First, stop all chains.
|
||||
ip_iters = self.last_state.get_last_ip()
|
||||
sub_slot_iters = self.last_state.get_sub_slot_iters()
|
||||
difficulty = self.last_state.get_difficulty()
|
||||
for chain in self.chain_type_to_stream.keys():
|
||||
await self._stop_chain(chain)
|
||||
# Adjust all signage points iterations to the peak.
|
||||
iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT)
|
||||
self.signage_point_iters = [
|
||||
k * iters_per_signage - ip_iters
|
||||
for k in range(1, self.constants.NUM_SPS_SUB_SLOT + 1)
|
||||
if k * iters_per_signage - ip_iters > 0 and k * iters_per_signage < sub_slot_iters
|
||||
]
|
||||
# Adjust all unfinished blocks iterations to the peak.
|
||||
new_unfinished_blocks = []
|
||||
self.proofs_finished = []
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain] = []
|
||||
self.iters_submitted[chain] = []
|
||||
self.iteration_to_proof_type = {}
|
||||
for block in self.unfinished_blocks:
|
||||
if not self._accept_unfinished_block(block):
|
||||
continue
|
||||
block_sp_iters, block_ip_iters = iters_from_sub_block(
|
||||
self.constants,
|
||||
blocks,
|
||||
sub_slot_iters,
|
||||
difficulty,
|
||||
)
|
||||
new_block_iters = block_ip_iters - ip_iters
|
||||
if new_block_iters > 0:
|
||||
new_unfinished_blocks.append(block)
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain].append(new_block_iters)
|
||||
self.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT
|
||||
# Remove all unfinished blocks that have already passed.
|
||||
self.unfinished_blocks = new_unfinished_blocks
|
||||
# Signage points.
|
||||
if len(self.signage_point_iters) > 0:
|
||||
count_signage = 0
|
||||
for signage in self.signage_point_iters:
|
||||
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
|
||||
self.iters_to_submit[chain].append(signage)
|
||||
self.iteration_to_proof_type[signage] = IterationType.SIGNAGE_POINT
|
||||
count_signage += 1
|
||||
if count_signage == 3:
|
||||
break
|
||||
# TODO: handle the special case when infusion point is the end of subslot.
|
||||
left_subslot_iters = sub_slot_iters - ip_iters
|
||||
log.info(f"Left subslot iters: {left_subslot_iters}.")
|
||||
for chain in Chain:
|
||||
self.iters_to_submit[chain].append(left_subslot_iters)
|
||||
self.iteration_to_proof_type[left_subslot_iters] = IterationType.END_OF_SUBSLOT
|
||||
|
||||
async def _handle_new_peak(self):
|
||||
self.last_state.set_state(self.new_peak)
|
||||
self.new_peak = None
|
||||
await self._reset_chains()
|
||||
|
||||
async def _handle_subslot_end(self):
|
||||
self.finished_sp = 0
|
||||
self.last_state.set_state(self.new_subslot_end)
|
||||
self.new_subslot_end = None
|
||||
await self._reset_chains()
|
||||
|
||||
async def _map_chains_with_vdf_clients(self):
|
||||
while not self._is_stopped:
|
||||
picked_chain = None
|
||||
async with self.lock:
|
||||
if len(self.free_clients) == 0:
|
||||
break
|
||||
ip, reader, writer = self.free_clients[0]
|
||||
for chain_type in self.unspawned_chains:
|
||||
challenge_hash = self.last_state.get_challenge(chain_type)
|
||||
initial_form = self.last_state.get_initial_form(chain_type)
|
||||
if challenge_hash is not None and initial_form is not None:
|
||||
picked_chain = chain_type
|
||||
break
|
||||
if picked_chain is None:
|
||||
break
|
||||
picked_chain = self.unspawned_chains[0]
|
||||
self.chain_type_to_stream[picked_chain] = (ip, reader, writer)
|
||||
self.free_clients = self.free_clients[1:]
|
||||
self.unspawned_chains = self.unspawned_chains[1:]
|
||||
self.chain_start_time[picked_chain] = time.time()
|
||||
|
||||
log.info(f"Mapping free vdf_client with chain: {picked_chain}.")
|
||||
asyncio.create_task(
|
||||
self._do_process_communication(picked_chain, challenge_hash, initial_form, ip, reader, writer)
|
||||
)
|
||||
|
||||
async def _submit_iterations(self):
|
||||
for chain in Chain:
|
||||
if chain in self.allows_iters:
|
||||
_, _, writer = self.chain_type_to_stream[chain]
|
||||
for iteration in self.iters_to_submit[chain]:
|
||||
if iteration in self.iters_submitted[chain]:
|
||||
continue
|
||||
prefix = str(len(str(iteration)))
|
||||
if len(str(iteration)) < 10:
|
||||
prefix = "0" + prefix
|
||||
iter_str = prefix + str(iteration)
|
||||
writer.write(iter_str.encode())
|
||||
await writer.drain()
|
||||
self.iters_submitted[chain].append(iteration)
|
||||
|
||||
def _clear_proof_list(self, iter):
|
||||
return [
|
||||
(chain, info, proof) for chain, info, proof in self.proofs_finished if info.number_of_iterations != iter
|
||||
]
|
||||
|
||||
async def _check_for_new_sp(self):
|
||||
signage_iters = [
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items()
|
||||
if t == IterationType.SIGNAGE_POINT
|
||||
]
|
||||
if len(signage_iters) == 0:
|
||||
return
|
||||
for signage_iter in signage_iters:
|
||||
proofs_with_iter = [
|
||||
(chain, info, proof)
|
||||
for chain, info, proof in self.proofs_finished
|
||||
if info.number_of_iterations == signage_iter
|
||||
]
|
||||
# Wait for both cc and rc to have the signage point.
|
||||
if len(proofs_with_iter) == 2:
|
||||
for chain, info, proof in proofs_with_iter:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
cc_info = info
|
||||
cc_proof = proof
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
rc_info = info
|
||||
rc_proof = proof
|
||||
response = timelord_protocol.NewSignagePointVDF(
|
||||
self.finished_sp,
|
||||
cc_info,
|
||||
cc_proof,
|
||||
rc_info,
|
||||
rc_proof,
|
||||
)
|
||||
if self.server is not None:
|
||||
self.server.push_message(
|
||||
OutboundMessage(
|
||||
NodeType.FULL_NODE,
|
||||
Message("new_signage_point_vdf", response),
|
||||
Delivery.BROADCAST,
|
||||
)
|
||||
)
|
||||
# Cleanup the signage point from memory.
|
||||
self.signage_point_iters.remove(signage_iter)
|
||||
self.finished_sp += 1
|
||||
self.proofs_finished = self._clear_proof_list(signage_iter)
|
||||
# Send the next 3 signage point to the chains.
|
||||
next_iters_count = 0
|
||||
for next_sp in self.signage_point_iters:
|
||||
for chain in [Chain.CHALLENGE_CHAIN, Chain.REWARD_CHAIN]:
|
||||
if (
|
||||
next_sp not in self.iters_submitted[chain]
|
||||
and next_sp not in self.iters_to_submit[chain]
|
||||
):
|
||||
self.iters_to_submit[chain].append(next_sp)
|
||||
self.iteration_to_proof_type[next_sp] = IterationType.SIGNAGE_POINT
|
||||
next_iters_count += 1
|
||||
if next_iters_count == 3:
|
||||
break
|
||||
|
||||
async def _check_for_new_ip(self):
|
||||
infusion_iters = [
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items()
|
||||
if t == IterationType.INFUSION_POINT
|
||||
]
|
||||
for iteration in infusion_iters:
|
||||
proofs_with_iter = [
|
||||
(chain, info, proof)
|
||||
for chain, info, proof in self.proofs_finished
|
||||
if info.number_of_iterations == iteration
|
||||
]
|
||||
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
|
||||
chain_count = 3
|
||||
else:
|
||||
chain_count = 2
|
||||
if len(proofs_with_iter) == chain_count:
|
||||
block = None
|
||||
for unfinished_block in self.unfinished_blocks:
|
||||
_, ip_iters = iters_from_sub_block(
|
||||
self.constants,
|
||||
unfinished_block,
|
||||
self.last_state.get_sub_slot_iters(),
|
||||
self.last_state.get_difficulty(),
|
||||
)
|
||||
if ip_iters - self.last_state.get_last_ip() == iteration:
|
||||
block = unfinished_block
|
||||
break
|
||||
if block is not None:
|
||||
self.unfinished_blocks.remove(block)
|
||||
challenge_hash = block.reward_chain_sub_block.get_hash()
|
||||
icc_info = None
|
||||
icc_proof = None
|
||||
for chain, info, proof in proofs_with_iter:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
cc_info = info
|
||||
cc_proof = proof
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
rc_info = info
|
||||
rc_proof = proof
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
icc_info = info
|
||||
icc_proof = proof
|
||||
response = timelord_protocol.NewInfusionPointVDF(
|
||||
challenge_hash,
|
||||
cc_info,
|
||||
cc_proof,
|
||||
rc_info,
|
||||
rc_proof,
|
||||
icc_info,
|
||||
icc_proof,
|
||||
)
|
||||
self.server.push_message(
|
||||
OutboundMessage(
|
||||
NodeType.FULL_NODE,
|
||||
Message("new_infusion_point_vdf", response),
|
||||
Delivery.BROADCAST,
|
||||
)
|
||||
)
|
||||
for iteration in infusion_iters:
|
||||
self.proofs_finished = self._clear_proof_list(iteration)
|
||||
|
||||
async def _check_for_end_of_subslot(self):
|
||||
left_subslot_iters = [
|
||||
iteration for iteration, t in self.iteration_to_proof_type.items()
|
||||
if t == IterationType.END_OF_SUBSLOT
|
||||
]
|
||||
if len(left_subslot_iters) == 0:
|
||||
return
|
||||
chains_finished = [
|
||||
(chain, info, proof)
|
||||
for chain, info, proof in self.proofs_finished
|
||||
if info.number_of_iterations == left_subslot_iters[0]
|
||||
]
|
||||
if self.last_state.get_challenge(Chain.INFUSED_CHALLENGE_CHAIN) is not None:
|
||||
chain_count = 3
|
||||
else:
|
||||
chain_count = 2
|
||||
if len(chains_finished) == chain_count:
|
||||
icc_ip_vdf: Optional[VDFInfo] = None
|
||||
icc_ip_proof: Optional[VDFProof] = None
|
||||
cc_vdf: Optional[VDFInfo] = None
|
||||
cc_proof: Optional[VDFProof] = None
|
||||
rc_vdf: Optional[VDFInfo] = None
|
||||
rc_proof: Optional[VDFProof] = None
|
||||
for chain, info, proof in chains_finished:
|
||||
if chain == Chain.CHALLENGE_CHAIN:
|
||||
cc_vdf = info
|
||||
cc_proof = proof
|
||||
if chain == Chain.REWARD_CHAIN:
|
||||
rc_vdf = info
|
||||
rc_proof = proof
|
||||
if chain == Chain.INFUSED_CHALLENGE_CHAIN:
|
||||
icc_ip_vdf = info
|
||||
icc_ip_proof = proof
|
||||
assert cc_proof is not None and rc_proof is not None and cc_vdf is not None and rc_vdf is not None
|
||||
log.info("Collected end of subslot vdfs.")
|
||||
|
||||
icc_sub_slot: Optional[InfusedChallengeChainSubSlot] = (
|
||||
None if icc_ip_vdf is None
|
||||
else InfusedChallengeChainSubSlot(icc_ip_vdf)
|
||||
)
|
||||
icc_sub_slot_hash = icc_sub_slot.get_hash() if self.last_state.get_deficit() == 0 else None
|
||||
if self.last_state.get_sub_epoch_summary() is not None:
|
||||
ses_hash = self.last_state.get_sub_epoch_summary().get_hash()
|
||||
new_sub_slot_iters = self.last_state.get_sub_epoch_summary().new_sub_slot_iters
|
||||
new_difficulty = self.last_state.get_sub_epoch_summary().new_difficulty
|
||||
else:
|
||||
ses_hash = None
|
||||
new_sub_slot_iters = self.last_state.get_sub_slot_iters()
|
||||
new_difficulty = self.last_state.get_difficulty()
|
||||
cc_sub_slot = ChallengeChainSubSlot(cc_vdf, icc_sub_slot_hash, ses_hash, new_sub_slot_iters, new_difficulty)
|
||||
eos_deficit: uint8 = (
|
||||
self.last_state.get_deficit()
|
||||
if self.last_state.get_deficit() > 0
|
||||
else self.constants.MIN_SUB_BLOCKS_PER_CHALLENGE_BLOCK
|
||||
)
|
||||
rc_sub_slot = RewardChainSubSlot(
|
||||
rc_vdf,
|
||||
cc_sub_slot.get_hash(),
|
||||
icc_sub_slot.get_hash() if icc_sub_slot is not None else None,
|
||||
eos_deficit,
|
||||
)
|
||||
eos_bundle = EndOfSubSlotBundle(
|
||||
cc_sub_slot,
|
||||
icc_sub_slot,
|
||||
rc_sub_slot,
|
||||
SubSlotProofs(cc_proof, icc_ip_proof, rc_proof),
|
||||
)
|
||||
if self.server is not None:
|
||||
self.server.push_message(
|
||||
OutboundMessage(
|
||||
NodeType.FULL_NODE,
|
||||
Message("end_of_sub_slot_bundle", timelord_protocol.NewEndOfSubSlotVDF(eos_bundle)),
|
||||
Delivery.BROADCAST,
|
||||
)
|
||||
)
|
||||
log.info("Built end of subslot bundle.")
|
||||
self.unfinished_blocks = self.overflow_blocks
|
||||
self.overflow_blocks = []
|
||||
self.new_subslot_end = EndOfSubSlotData(
|
||||
eos_bundle,
|
||||
new_sub_slot_iters,
|
||||
new_difficulty,
|
||||
eos_deficit,
|
||||
)
|
||||
|
||||
async def _manage_chains(self):
|
||||
while not self._is_stopped:
|
||||
await asyncio.sleep(0.1)
|
||||
# Didn't get any useful data, continue.
|
||||
async with self.lock:
|
||||
if self.last_state.is_empty() and self.new_peak is None:
|
||||
continue
|
||||
# Map free vdf_clients to unspawned chains.
|
||||
await self._map_chains_with_vdf_clients()
|
||||
async with self.lock:
|
||||
# We've got a new peak, process it.
|
||||
if self.new_peak is not None:
|
||||
await self._handle_new_peak()
|
||||
# A subslot ended, process it.
|
||||
if self.new_subslot_end is not None:
|
||||
await self._handle_subslot_end()
|
||||
# Submit pending iterations.
|
||||
await self._submit_iterations()
|
||||
# Check for new signage point and broadcast it if present.
|
||||
await self._check_for_new_sp()
|
||||
# Check for new infusion point and broadcast it if present.
|
||||
await self._check_for_new_ip()
|
||||
# Check for end of subslot, respawn chains and build EndOfSubslotBundle.
|
||||
await self._check_for_end_of_subslot()
|
||||
|
||||
async def _do_process_communication(self, chain, challenge_hash, initial_form, ip, reader, writer):
|
||||
disc: int = create_discriminant(challenge_hash, self.constants.DISCRIMINANT_SIZE_BITS)
|
||||
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
|
||||
# the timelord tells the vdf_client what to execute.
|
||||
if self.config["fast_algorithm"]:
|
||||
# Run n-wesolowski (fast) algorithm.
|
||||
writer.write(b"N")
|
||||
else:
|
||||
# Run two-wesolowski (slow) algorithm.
|
||||
writer.write(b"N")
|
||||
await writer.drain()
|
||||
|
||||
prefix = str(len(str(disc)))
|
||||
if len(prefix) == 1:
|
||||
prefix = "00" + prefix
|
||||
if len(prefix) == 2:
|
||||
prefix = "0" + prefix
|
||||
writer.write((prefix + str(disc)).encode())
|
||||
await writer.drain()
|
||||
|
||||
# Send (a, b) from 'initial_form'.
|
||||
for num in [initial_form.a, initial_form.b]:
|
||||
prefix = len(str(num))
|
||||
prefix_len = len(str(prefix))
|
||||
writer.write((str(prefix_len) + str(prefix) + str(num)).encode())
|
||||
await writer.drain()
|
||||
try:
|
||||
ok = await reader.readexactly(2)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
return
|
||||
|
||||
if ok.decode() != "OK":
|
||||
return
|
||||
|
||||
log.info("Got handshake with VDF client.")
|
||||
async with self.lock:
|
||||
self.allows_iters.append(chain)
|
||||
# Listen to the client until "STOP" is received.
|
||||
while True:
|
||||
try:
|
||||
data = await reader.readexactly(4)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
break
|
||||
|
||||
msg = ""
|
||||
try:
|
||||
msg = data.decode()
|
||||
except Exception as e:
|
||||
# log.error(f"Exception while decoding data {e}")
|
||||
pass
|
||||
if msg == "STOP":
|
||||
log.info(f"Stopped client running on ip {ip}.")
|
||||
async with self.lock:
|
||||
writer.write(b"ACK")
|
||||
await writer.drain()
|
||||
break
|
||||
else:
|
||||
try:
|
||||
# This must be a proof, 4 bytes is length prefix
|
||||
length = int.from_bytes(data, "big")
|
||||
proof = await reader.readexactly(length)
|
||||
stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode()))
|
||||
except (
|
||||
asyncio.IncompleteReadError,
|
||||
ConnectionResetError,
|
||||
Exception,
|
||||
) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
break
|
||||
|
||||
iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True))
|
||||
|
||||
y_size_bytes = stdout_bytes_io.read(8)
|
||||
y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True))
|
||||
|
||||
y_bytes = stdout_bytes_io.read(y_size)
|
||||
witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True))
|
||||
proof_bytes: bytes = stdout_bytes_io.read()
|
||||
|
||||
# Verifies our own proof just in case
|
||||
a = int.from_bytes(y_bytes[:129], "big", signed=True)
|
||||
b = int.from_bytes(y_bytes[129:], "big", signed=True)
|
||||
output = ClassgroupElement(int512(a), int512(b))
|
||||
time_taken = time.time() - self.chain_start_time[chain]
|
||||
ips = int(iterations_needed / time_taken * 10) / 10
|
||||
log.info(
|
||||
f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}" f" iters."
|
||||
f"Estimated IPS: {ips}. Chain: {chain}"
|
||||
)
|
||||
|
||||
vdf_info: VDFInfo = VDFInfo(
|
||||
challenge_hash,
|
||||
initial_form,
|
||||
iterations_needed,
|
||||
output,
|
||||
)
|
||||
vdf_proof: VDFProof = VDFProof(
|
||||
witness_type,
|
||||
proof_bytes,
|
||||
)
|
||||
|
||||
if not vdf_proof.is_valid(self.constants, vdf_info):
|
||||
log.error("Invalid proof of time!")
|
||||
# continue
|
||||
async with self.lock:
|
||||
self.proofs_finished.append(
|
||||
(chain, vdf_info, vdf_proof)
|
||||
)
|
@ -1,465 +0,0 @@
|
||||
import asyncio
|
||||
import io
|
||||
import logging
|
||||
import time
|
||||
import socket
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
from chiavdf import create_discriminant
|
||||
from src.protocols import timelord_protocol
|
||||
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
|
||||
from src.server.server import ChiaServer
|
||||
from src.types.classgroup import ClassgroupElement
|
||||
from src.types.vdf import ProofOfTime
|
||||
from src.types.sized_bytes import bytes32
|
||||
from src.util.api_decorators import api_request
|
||||
from src.util.ints import uint8, uint64, int512, uint128
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Timelord:
|
||||
def __init__(self, config: Dict, discriminant_size_bits: int):
|
||||
self.discriminant_size_bits = discriminant_size_bits
|
||||
self.config: Dict = config
|
||||
self.ips_estimate = {
|
||||
socket.gethostbyname(k): v
|
||||
for k, v in list(
|
||||
zip(
|
||||
self.config["vdf_clients"]["ip"],
|
||||
self.config["vdf_clients"]["ips_estimate"],
|
||||
)
|
||||
)
|
||||
}
|
||||
self.log = log
|
||||
self.lock: asyncio.Lock = asyncio.Lock()
|
||||
self.active_discriminants: Dict[bytes32, Tuple[asyncio.StreamWriter, uint64, str]] = {}
|
||||
self.best_weight_three_proofs: int = -1
|
||||
self.active_discriminants_start_time: Dict = {}
|
||||
self.pending_iters: Dict = {}
|
||||
self.submitted_iters: Dict = {}
|
||||
self.done_discriminants: List[bytes32] = []
|
||||
self.proofs_to_write: List[OutboundMessage] = []
|
||||
self.seen_discriminants: List[bytes32] = []
|
||||
self.proof_count: Dict = {}
|
||||
self.avg_ips: Dict = {}
|
||||
self.discriminant_queue: List[Tuple[bytes32, uint128]] = []
|
||||
self.max_connection_time = self.config["max_connection_time"]
|
||||
self.potential_free_clients: List = []
|
||||
self.free_clients: List[Tuple[str, asyncio.StreamReader, asyncio.StreamWriter]] = []
|
||||
self.server: Optional[ChiaServer] = None
|
||||
self.vdf_server = None
|
||||
self._is_shutdown = False
|
||||
self.sanitizer_mode = self.config["sanitizer_mode"]
|
||||
self.last_time_seen_discriminant: Dict = {}
|
||||
self.max_known_weights: List[uint128] = []
|
||||
|
||||
def set_server(self, server: ChiaServer):
|
||||
self.server = server
|
||||
|
||||
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
||||
async with self.lock:
|
||||
client_ip = writer.get_extra_info("peername")[0]
|
||||
log.info(f"New timelord connection from client: {client_ip}.")
|
||||
if client_ip in self.ips_estimate.keys():
|
||||
self.free_clients.append((client_ip, reader, writer))
|
||||
log.info(f"Added new VDF client {client_ip}.")
|
||||
for ip, end_time in list(self.potential_free_clients):
|
||||
if ip == client_ip:
|
||||
self.potential_free_clients.remove((ip, end_time))
|
||||
break
|
||||
|
||||
async def _start(self):
|
||||
if self.sanitizer_mode:
|
||||
log.info("Starting timelord in sanitizer mode")
|
||||
self.disc_queue = asyncio.create_task(self._manage_discriminant_queue_sanitizer())
|
||||
else:
|
||||
log.info("Starting timelord in normal mode")
|
||||
self.disc_queue = asyncio.create_task(self._manage_discriminant_queue())
|
||||
|
||||
self.vdf_server = await asyncio.start_server(
|
||||
self._handle_client,
|
||||
self.config["vdf_server"]["host"],
|
||||
self.config["vdf_server"]["port"],
|
||||
)
|
||||
|
||||
def _close(self):
|
||||
self._is_shutdown = True
|
||||
assert self.vdf_server is not None
|
||||
self.vdf_server.close()
|
||||
|
||||
async def _await_closed(self):
|
||||
assert self.disc_queue is not None
|
||||
await self.disc_queue
|
||||
|
||||
async def _stop_worst_process(self, worst_weight_active):
|
||||
# This is already inside a lock, no need to lock again.
|
||||
log.info(f"Stopping one process at weight {worst_weight_active}")
|
||||
stop_writer: Optional[asyncio.StreamWriter] = None
|
||||
stop_discriminant: Optional[bytes32] = None
|
||||
|
||||
low_weights = {k: v for k, v in self.active_discriminants.items() if v[1] == worst_weight_active}
|
||||
no_iters = {
|
||||
k: v for k, v in low_weights.items() if k not in self.pending_iters or len(self.pending_iters[k]) == 0
|
||||
}
|
||||
|
||||
# If we have process(es) with no iters, stop the one that started the latest
|
||||
if len(no_iters) > 0:
|
||||
latest_start_time = max([self.active_discriminants_start_time[k] for k, _ in no_iters.items()])
|
||||
stop_discriminant, stop_writer = next(
|
||||
(k, v[0]) for k, v in no_iters.items() if self.active_discriminants_start_time[k] == latest_start_time
|
||||
)
|
||||
else:
|
||||
# Otherwise, pick the one that finishes one proof the latest.
|
||||
best_iter = {k: min(self.pending_iters[k]) for k, _ in low_weights.items()}
|
||||
time_taken = {k: time.time() - self.active_discriminants_start_time[k] for k, _ in low_weights.items()}
|
||||
|
||||
client_ip = [v[2] for _, v in low_weights.items()]
|
||||
# ips maps an IP to the expected iterations per second of it.
|
||||
ips = {}
|
||||
for ip in client_ip:
|
||||
if ip in self.avg_ips:
|
||||
current_ips, _ = self.avg_ips[ip]
|
||||
ips[ip] = current_ips
|
||||
else:
|
||||
ips[ip] = self.ips_estimate[ip]
|
||||
|
||||
expected_finish = {
|
||||
k: max(0, (best_iter[k] - time_taken[k] * ips[v[2]]) / ips[v[2]]) for k, v in low_weights.items()
|
||||
}
|
||||
worst_finish = max([v for v in expected_finish.values()])
|
||||
log.info(f"Worst finish time: {worst_finish}s")
|
||||
stop_discriminant, stop_writer = next(
|
||||
(k, v[0]) for k, v in low_weights.items() if expected_finish[k] == worst_finish
|
||||
)
|
||||
assert stop_writer is not None
|
||||
_, _, stop_ip = self.active_discriminants[stop_discriminant]
|
||||
self.potential_free_clients.append((stop_ip, time.time()))
|
||||
stop_writer.write(b"010")
|
||||
await stop_writer.drain()
|
||||
del self.active_discriminants[stop_discriminant]
|
||||
del self.active_discriminants_start_time[stop_discriminant]
|
||||
self.done_discriminants.append(stop_discriminant)
|
||||
|
||||
async def _update_avg_ips(self, challenge_hash, iterations_needed, ip):
|
||||
async with self.lock:
|
||||
if challenge_hash in self.active_discriminants:
|
||||
time_taken = time.time() - self.active_discriminants_start_time[challenge_hash]
|
||||
ips = int(iterations_needed / time_taken * 10) / 10
|
||||
log.info(
|
||||
f"Finished PoT, chall:{challenge_hash[:10].hex()}.."
|
||||
f" {iterations_needed} iters. {int(time_taken*1000)/1000}s, {ips} ips"
|
||||
)
|
||||
if ip not in self.avg_ips:
|
||||
self.avg_ips[ip] = (ips, 1)
|
||||
else:
|
||||
prev_avg_ips, trials = self.avg_ips[ip]
|
||||
new_avg_ips = int((prev_avg_ips * trials + ips) / (trials + 1))
|
||||
self.avg_ips[ip] = (new_avg_ips, trials + 1)
|
||||
log.info(f"New estimate: {new_avg_ips}")
|
||||
if iterations_needed in self.pending_iters[challenge_hash]:
|
||||
self.pending_iters[challenge_hash].remove(iterations_needed)
|
||||
else:
|
||||
log.warning("Finished PoT for an unknown iteration.")
|
||||
else:
|
||||
log.info(
|
||||
f"Finished PoT chall:{challenge_hash[:10].hex()}.. {iterations_needed}"
|
||||
f" iters. But challenge not active anymore"
|
||||
)
|
||||
|
||||
async def _update_proofs_count(self, challenge_weight):
|
||||
async with self.lock:
|
||||
if challenge_weight not in self.proof_count:
|
||||
self.proof_count[challenge_weight] = 1
|
||||
else:
|
||||
self.proof_count[challenge_weight] += 1
|
||||
if self.proof_count[challenge_weight] >= 3:
|
||||
log.info("Cleaning up clients.")
|
||||
self.best_weight_three_proofs = max(self.best_weight_three_proofs, challenge_weight)
|
||||
for active_disc in list(self.active_discriminants):
|
||||
current_writer, current_weight, ip = self.active_discriminants[active_disc]
|
||||
if current_weight <= challenge_weight:
|
||||
log.info(f"Active weight cleanup: {current_weight}")
|
||||
log.info(f"Cleanup weight: {challenge_weight}")
|
||||
self.potential_free_clients.append((ip, time.time()))
|
||||
current_writer.write(b"010")
|
||||
await current_writer.drain()
|
||||
del self.active_discriminants[active_disc]
|
||||
del self.active_discriminants_start_time[active_disc]
|
||||
self.done_discriminants.append(active_disc)
|
||||
|
||||
async def _send_iterations(self, challenge_hash, writer):
|
||||
alive_discriminant = True
|
||||
while alive_discriminant:
|
||||
async with self.lock:
|
||||
if (challenge_hash in self.active_discriminants) and (challenge_hash in self.pending_iters):
|
||||
if challenge_hash not in self.submitted_iters:
|
||||
self.submitted_iters[challenge_hash] = []
|
||||
for iter in sorted(self.pending_iters[challenge_hash]):
|
||||
if iter in self.submitted_iters[challenge_hash]:
|
||||
continue
|
||||
self.submitted_iters[challenge_hash].append(iter)
|
||||
if len(str(iter)) < 10:
|
||||
iter_size = "0" + str(len(str(iter)))
|
||||
else:
|
||||
iter_size = str(len(str(iter)))
|
||||
writer.write((iter_size + str(iter)).encode())
|
||||
await writer.drain()
|
||||
log.info(f"New iteration submitted: {iter}")
|
||||
# Sanitizer will always work with only one iteration.
|
||||
if self.sanitizer_mode:
|
||||
alive_discriminant = False
|
||||
await asyncio.sleep(1)
|
||||
async with self.lock:
|
||||
if challenge_hash in self.done_discriminants:
|
||||
alive_discriminant = False
|
||||
|
||||
async def _do_process_communication(self, challenge_hash, challenge_weight, ip, reader, writer):
|
||||
disc: int = create_discriminant(challenge_hash, self.discriminant_size_bits)
|
||||
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
|
||||
# the timelord tells the vdf_client what to execute.
|
||||
if not self.sanitizer_mode:
|
||||
if self.config["fast_algorithm"]:
|
||||
# Run n-wesolowski (fast) algorithm.
|
||||
writer.write(b"N")
|
||||
else:
|
||||
# Run two-wesolowski (slow) algorithm.
|
||||
writer.write(b"T")
|
||||
else:
|
||||
# Create compact proofs of time.
|
||||
writer.write(b"S")
|
||||
await writer.drain()
|
||||
|
||||
prefix = str(len(str(disc)))
|
||||
if len(prefix) == 1:
|
||||
prefix = "00" + prefix
|
||||
writer.write((prefix + str(disc)).encode())
|
||||
await writer.drain()
|
||||
|
||||
try:
|
||||
ok = await reader.readexactly(2)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
async with self.lock:
|
||||
if challenge_hash not in self.done_discriminants:
|
||||
self.done_discriminants.append(challenge_hash)
|
||||
if self.sanitizer_mode:
|
||||
if challenge_hash in self.pending_iters:
|
||||
del self.pending_iters[challenge_hash]
|
||||
if challenge_hash in self.submitted_iters:
|
||||
del self.submitted_iters[challenge_hash]
|
||||
return
|
||||
|
||||
if ok.decode() != "OK":
|
||||
return
|
||||
|
||||
log.info("Got handshake with VDF client.")
|
||||
|
||||
async with self.lock:
|
||||
self.active_discriminants[challenge_hash] = (writer, challenge_weight, ip)
|
||||
self.active_discriminants_start_time[challenge_hash] = time.time()
|
||||
|
||||
asyncio.create_task(self._send_iterations(challenge_hash, writer))
|
||||
|
||||
# Listen to the client until "STOP" is received.
|
||||
while True:
|
||||
try:
|
||||
data = await reader.readexactly(4)
|
||||
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
async with self.lock:
|
||||
if challenge_hash in self.active_discriminants:
|
||||
del self.active_discriminants[challenge_hash]
|
||||
if challenge_hash in self.active_discriminants_start_time:
|
||||
del self.active_discriminants_start_time[challenge_hash]
|
||||
if challenge_hash not in self.done_discriminants:
|
||||
self.done_discriminants.append(challenge_hash)
|
||||
if self.sanitizer_mode:
|
||||
if challenge_hash in self.pending_iters:
|
||||
del self.pending_iters[challenge_hash]
|
||||
if challenge_hash in self.submitted_iters:
|
||||
del self.submitted_iters[challenge_hash]
|
||||
break
|
||||
|
||||
msg = ""
|
||||
try:
|
||||
msg = data.decode()
|
||||
except Exception as e:
|
||||
log.error(f"Exception while decoding data {e}")
|
||||
|
||||
if msg == "STOP":
|
||||
log.info(f"Stopped client running on ip {ip}.")
|
||||
async with self.lock:
|
||||
writer.write(b"ACK")
|
||||
await writer.drain()
|
||||
break
|
||||
else:
|
||||
try:
|
||||
# This must be a proof, 4bytes is length prefix
|
||||
length = int.from_bytes(data, "big")
|
||||
proof = await reader.readexactly(length)
|
||||
stdout_bytes_io: io.BytesIO = io.BytesIO(bytes.fromhex(proof.decode()))
|
||||
except (
|
||||
asyncio.IncompleteReadError,
|
||||
ConnectionResetError,
|
||||
Exception,
|
||||
) as e:
|
||||
log.warning(f"{type(e)} {e}")
|
||||
async with self.lock:
|
||||
if challenge_hash in self.active_discriminants:
|
||||
del self.active_discriminants[challenge_hash]
|
||||
if challenge_hash in self.active_discriminants_start_time:
|
||||
del self.active_discriminants_start_time[challenge_hash]
|
||||
if challenge_hash not in self.done_discriminants:
|
||||
self.done_discriminants.append(challenge_hash)
|
||||
if self.sanitizer_mode:
|
||||
if challenge_hash in self.pending_iters:
|
||||
del self.pending_iters[challenge_hash]
|
||||
if challenge_hash in self.submitted_iters:
|
||||
del self.submitted_iters[challenge_hash]
|
||||
break
|
||||
|
||||
iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True))
|
||||
|
||||
y_size_bytes = stdout_bytes_io.read(8)
|
||||
y_size = uint64(int.from_bytes(y_size_bytes, "big", signed=True))
|
||||
|
||||
y_bytes = stdout_bytes_io.read(y_size)
|
||||
witness_type = uint8(int.from_bytes(stdout_bytes_io.read(1), "big", signed=True))
|
||||
proof_bytes: bytes = stdout_bytes_io.read()
|
||||
|
||||
# Verifies our own proof just in case
|
||||
a = int.from_bytes(y_bytes[:129], "big", signed=True)
|
||||
b = int.from_bytes(y_bytes[129:], "big", signed=True)
|
||||
|
||||
output = ClassgroupElement(int512(a), int512(b))
|
||||
|
||||
proof_of_time = ProofOfTime(
|
||||
challenge_hash,
|
||||
iterations_needed,
|
||||
output,
|
||||
witness_type,
|
||||
proof_bytes,
|
||||
)
|
||||
|
||||
if not proof_of_time.is_valid(self.discriminant_size_bits):
|
||||
log.error("Invalid proof of time")
|
||||
|
||||
response = timelord_protocol.ProofOfTimeFinished(proof_of_time)
|
||||
|
||||
await self._update_avg_ips(challenge_hash, iterations_needed, ip)
|
||||
|
||||
async with self.lock:
|
||||
self.proofs_to_write.append(
|
||||
OutboundMessage(
|
||||
NodeType.FULL_NODE,
|
||||
Message("proof_of_time_finished", response),
|
||||
Delivery.BROADCAST,
|
||||
)
|
||||
)
|
||||
|
||||
if not self.sanitizer_mode:
|
||||
await self._update_proofs_count(challenge_weight)
|
||||
else:
|
||||
async with self.lock:
|
||||
writer.write(b"010")
|
||||
await writer.drain()
|
||||
try:
|
||||
del self.active_discriminants[challenge_hash]
|
||||
del self.active_discriminants_start_time[challenge_hash]
|
||||
del self.pending_iters[challenge_hash]
|
||||
del self.submitted_iters[challenge_hash]
|
||||
except KeyError:
|
||||
log.error("Discriminant stopped abnormally.")
|
||||
|
||||
async def _manage_discriminant_queue(self):
|
||||
while not self._is_shutdown:
|
||||
async with self.lock:
|
||||
if len(self.discriminant_queue) > 0:
|
||||
max_weight = max([h for _, h in self.discriminant_queue])
|
||||
if max_weight <= self.best_weight_three_proofs:
|
||||
self.done_discriminants.extend([d for d, _ in self.discriminant_queue])
|
||||
self.discriminant_queue.clear()
|
||||
else:
|
||||
max_weight_disc = [d for d, h in self.discriminant_queue if h == max_weight]
|
||||
with_iters = [
|
||||
d for d in max_weight_disc if d in self.pending_iters and len(self.pending_iters[d]) != 0
|
||||
]
|
||||
if len(with_iters) == 0:
|
||||
disc = max_weight_disc[0]
|
||||
else:
|
||||
min_iter = min([min(self.pending_iters[d]) for d in with_iters])
|
||||
disc = next(d for d in with_iters if min(self.pending_iters[d]) == min_iter)
|
||||
if len(self.free_clients) != 0:
|
||||
ip, sr, sw = self.free_clients[0]
|
||||
self.free_clients = self.free_clients[1:]
|
||||
self.discriminant_queue.remove((disc, max_weight))
|
||||
asyncio.create_task(self._do_process_communication(disc, max_weight, ip, sr, sw))
|
||||
else:
|
||||
self.potential_free_clients = [
|
||||
(ip, end_time)
|
||||
for ip, end_time in self.potential_free_clients
|
||||
if time.time() < end_time + self.max_connection_time
|
||||
]
|
||||
if len(self.potential_free_clients) == 0 and len(self.active_discriminants) > 0:
|
||||
worst_weight_active = min(
|
||||
[
|
||||
h
|
||||
for (
|
||||
_,
|
||||
h,
|
||||
_,
|
||||
) in self.active_discriminants.values()
|
||||
]
|
||||
)
|
||||
if max_weight > worst_weight_active:
|
||||
await self._stop_worst_process(worst_weight_active)
|
||||
elif max_weight == worst_weight_active:
|
||||
if disc in self.pending_iters and len(self.pending_iters[disc]) != 0:
|
||||
if any(
|
||||
(k not in self.pending_iters or len(self.pending_iters[k]) == 0)
|
||||
for k, v in self.active_discriminants.items()
|
||||
if v[1] == worst_weight_active
|
||||
):
|
||||
log.info("Stopped process without iters for one with iters.")
|
||||
await self._stop_worst_process(worst_weight_active)
|
||||
|
||||
if len(self.proofs_to_write) > 0:
|
||||
for msg in self.proofs_to_write:
|
||||
self.server.push_message(msg)
|
||||
self.proofs_to_write.clear()
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
async with self.lock:
|
||||
for (
|
||||
stop_discriminant,
|
||||
(stop_writer, _, _),
|
||||
) in self.active_discriminants.items():
|
||||
stop_writer.write(b"010")
|
||||
await stop_writer.drain()
|
||||
self.done_discriminants.append(stop_discriminant)
|
||||
self.active_discriminants.clear()
|
||||
self.active_discriminants_start_time.clear()
|
||||
|
||||
async def _manage_discriminant_queue_sanitizer(self):
|
||||
while not self._is_shutdown:
|
||||
async with self.lock:
|
||||
if len(self.discriminant_queue) > 0:
|
||||
with_iters = [
|
||||
(d, w)
|
||||
for d, w in self.discriminant_queue
|
||||
if d in self.pending_iters and len(self.pending_iters[d]) != 0
|
||||
]
|
||||
if len(with_iters) > 0 and len(self.free_clients) > 0:
|
||||
disc, weight = with_iters[0]
|
||||
log.info(f"Creating compact weso proof: weight {weight}.")
|
||||
ip, sr, sw = self.free_clients[0]
|
||||
self.free_clients = self.free_clients[1:]
|
||||
self.discriminant_queue.remove((disc, weight))
|
||||
asyncio.create_task(self._do_process_communication(disc, weight, ip, sr, sw))
|
||||
if len(self.proofs_to_write) > 0:
|
||||
for msg in self.proofs_to_write:
|
||||
self.server.push_message(msg)
|
||||
self.proofs_to_write.clear()
|
||||
await asyncio.sleep(3)
|
Loading…
Reference in New Issue
Block a user