rebase clean

This commit is contained in:
Yostra 2020-11-26 01:22:13 -05:00
parent b2363e9bfb
commit 1823f84293
8 changed files with 1143 additions and 2706 deletions

View File

@ -1,24 +1,19 @@
import asyncio
import logging
from typing import Dict, List, Optional, Callable, Set, Tuple
from typing import Dict, List, Optional, Callable, Tuple
from blspy import G1Element
from src.server.server import ChiaServer
from src.server.ws_connection import WSChiaConnection
from src.util.keychain import Keychain
from src.consensus.constants import ConsensusConstants
from src.consensus.pot_iterations import (
calculate_iterations_quality,
calculate_sp_interval_iters,
)
from src.protocols import farmer_protocol, harvester_protocol
from src.server.outbound_message import Message, NodeType
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from src.types.pool_target import PoolTarget
from src.util.api_decorators import api_request
from src.util.ints import uint32, uint64
from src.util.ints import uint64
from src.wallet.derive_keys import master_sk_to_farmer_sk, master_sk_to_pool_sk
from src.util.chech32 import decode_puzzle_hash
@ -58,10 +53,9 @@ class Farmer:
self.cache_clear_task: asyncio.Task
self.constants = consensus_constants
self._shut_down = False
self.server: Optional[ChiaServer] = None
self.server = None
self.keychain = keychain
self.state_changed_callback: Optional[Callable] = None
self.log = log
if len(self._get_public_keys()) == 0:
error_str = "No keys exist. Please run 'chia keys generate' or open the UI."
@ -94,25 +88,17 @@ class Farmer:
async def _on_connect(self, peer: WSChiaConnection):
# Sends a handshake to the harvester
msg = harvester_protocol.HarvesterHandshake(
self._get_public_keys(),
self.pool_public_keys,
)
if peer.connection_type is NodeType.HARVESTER:
h_msg = harvester_protocol.HarvesterHandshake(
self._get_public_keys(), self.pool_public_keys
)
msg = Message("harvester_handshake", h_msg)
msg = Message("harvester_handshake", msg)
await peer.send_message(msg)
if self.current_weight in self.challenges:
for posf in self.challenges[self.current_weight]:
message = harvester_protocol.NewChallenge(posf.challenge_hash)
msg = Message("new_challenge", message)
await peer.send_message(msg)
def _set_server(self, server):
self.server = server
def _set_state_changed_callback(self, callback: Callable):
self.state_changed_callback = callback
def _state_changed(self, change: str):
if self.state_changed_callback is not None:
self.state_changed_callback(change)
@ -122,31 +108,21 @@ class Farmer:
def _get_private_keys(self):
all_sks = self.keychain.get_all_private_keys()
return [master_sk_to_farmer_sk(sk) for sk, _ in all_sks] + [
master_sk_to_pool_sk(sk) for sk, _ in all_sks
]
return [master_sk_to_farmer_sk(sk) for sk, _ in all_sks] + [master_sk_to_pool_sk(sk) for sk, _ in all_sks]
async def _get_required_iters(
self, challenge_hash: bytes32, quality_string: bytes32, plot_size: uint8
):
weight: uint128 = self.challenge_to_weight[challenge_hash]
difficulty: uint64 = uint64(0)
for posf in self.challenges[weight]:
if posf.challenge_hash == challenge_hash:
difficulty = posf.difficulty
if difficulty == 0:
raise RuntimeError("Did not find challenge")
estimate_min = (
self.proof_of_time_estimate_ips
* self.constants.BLOCK_TIME_TARGET
/ self.constants.MIN_ITERS_PROPORTION
)
estimate_min = uint64(int(estimate_min))
number_iters: uint64 = calculate_iterations_quality(
quality_string,
plot_size,
difficulty,
estimate_min,
)
return number_iters
async def _periodically_clear_cache_task(self):
time_slept: uint64 = uint64(0)
while not self._shut_down:
if time_slept > self.constants.SUB_SLOT_TIME_TARGET * 10:
removed_keys: List[bytes32] = []
for key, add_time in self.cache_add_time.items():
self.sps.pop(key, None)
self.proofs_of_space.pop(key, None)
self.quality_str_to_identifiers.pop(key, None)
self.number_of_responses.pop(key, None)
removed_keys.append(key)
for key in removed_keys:
self.cache_add_time.pop(key, None)
time_slept = uint64(0)
time_slept += 1
await asyncio.sleep(1)

View File

@ -2,7 +2,7 @@ from typing import Optional, Callable
from blspy import AugSchemeMPL, G2Element
from src.consensus.pot_iterations import calculate_iterations_quality
from src.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters
from src.farmer import Farmer
from src.protocols import harvester_protocol, farmer_protocol
from src.server.outbound_message import Message, NodeType
@ -23,278 +23,221 @@ class FarmerAPI:
self.farmer.state_changed_callback = callback
@api_request
async def challenge_response(
self,
challenge_response: harvester_protocol.ChallengeResponse,
) -> Optional[Message]:
async def new_proof_of_space(self, new_proof_of_space: harvester_protocol.NewProofOfSpace):
"""
This is a response from the harvester, for a NewChallenge. Here we check if the proof
of space is sufficiently good, and if so, we ask for the whole proof.
"""
height: uint32 = self.farmer.challenge_to_height[
challenge_response.challenge_hash
]
number_iters = await self.farmer._get_required_iters(
challenge_response.challenge_hash,
challenge_response.quality_string,
challenge_response.plot_size,
)
if height < 1000: # As the difficulty adjusts, don't fetch all qualities
if (
challenge_response.challenge_hash
not in self.farmer.challenge_to_best_iters
):
self.farmer.challenge_to_best_iters[
challenge_response.challenge_hash
] = number_iters
elif (
number_iters
< self.farmer.challenge_to_best_iters[challenge_response.challenge_hash]
):
self.farmer.challenge_to_best_iters[
challenge_response.challenge_hash
] = number_iters
else:
return None
if new_proof_of_space.proof.challenge_hash not in self.farmer.number_of_responses:
self.farmer.number_of_responses[new_proof_of_space.proof.challenge_hash] = 0
estimate_secs: float = number_iters / self.farmer.proof_of_time_estimate_ips
if challenge_response.challenge_hash not in self.farmer.challenge_to_estimates:
self.farmer.challenge_to_estimates[challenge_response.challenge_hash] = []
self.farmer.challenge_to_estimates[challenge_response.challenge_hash].append(
estimate_secs
)
self.farmer.log.info(
f"Estimate: {estimate_secs}, rate: {self.farmer.proof_of_time_estimate_ips}"
)
if (
estimate_secs < self.farmer.config["pool_share_threshold"]
or estimate_secs < self.farmer.config["propagate_threshold"]
):
request = harvester_protocol.RequestProofOfSpace(
challenge_response.challenge_hash,
challenge_response.plot_id,
challenge_response.response_number,
if self.farmer.number_of_responses[new_proof_of_space.proof.challenge_hash] >= 5:
self.farmer.log.warning(
f"Surpassed 5 PoSpace for one SP, no longer submitting PoSpace for signage point "
f"{new_proof_of_space.proof.challenge_hash}"
)
return
self.farmer._state_changed("challenge")
msg = Message("request_proof_of_space", request)
return msg
return None
if new_proof_of_space.proof.challenge_hash not in self.farmer.sps:
self.farmer.log.warning(
f"Received response for challenge that we do not have {new_proof_of_space.proof.challenge_hash}"
)
return
@api_request
async def respond_proof_of_space(
self, response: harvester_protocol.RespondProofOfSpace
) -> Optional[Message]:
"""
This is a response from the harvester with a proof of space. We check it's validity,
and request a pool partial, a header signature, or both, if the proof is good enough.
"""
sp = self.farmer.sps[new_proof_of_space.proof.challenge_hash]
challenge_hash: bytes32 = response.proof.challenge_hash
challenge_weight: uint128 = self.farmer.challenge_to_weight[challenge_hash]
difficulty: uint64 = uint64(0)
for posf in self.farmer.challenges[challenge_weight]:
if posf.challenge_hash == challenge_hash:
difficulty = posf.difficulty
if difficulty == 0:
raise RuntimeError("Did not find challenge")
computed_quality_string = response.proof.verify_and_get_quality_string(
self.farmer.constants.NUMBER_ZERO_BITS_CHALLENGE_SIG
computed_quality_string = new_proof_of_space.proof.verify_and_get_quality_string(
self.farmer.constants, new_proof_of_space.challenge_hash, new_proof_of_space.proof.challenge_hash
)
if computed_quality_string is None:
raise RuntimeError("Invalid proof of space")
self.farmer.log.error(f"Invalid proof of space {new_proof_of_space.proof}")
return
self.farmer.harvester_responses_proofs[
(response.proof.challenge_hash, response.plot_id, response.response_number)
] = response.proof
self.farmer.harvester_responses_proof_hash_to_info[
response.proof.get_hash()
] = (
response.proof.challenge_hash,
response.plot_id,
response.response_number,
)
self.farmer.number_of_responses[new_proof_of_space.proof.challenge_hash] += 1
estimate_min = (
self.farmer.proof_of_time_estimate_ips
* self.farmer.constants.BLOCK_TIME_TARGET
/ self.farmer.constants.MIN_ITERS_PROPORTION
)
estimate_min = uint64(int(estimate_min))
number_iters: uint64 = calculate_iterations_quality(
required_iters: uint64 = calculate_iterations_quality(
computed_quality_string,
response.proof.size,
difficulty,
estimate_min,
new_proof_of_space.proof.size,
sp.difficulty,
new_proof_of_space.proof.challenge_hash,
)
estimate_secs: float = number_iters / self.farmer.proof_of_time_estimate_ips
# Double check that the iters are good
assert required_iters < calculate_sp_interval_iters(sp.slot_iterations, sp.sub_slot_iters)
if estimate_secs < self.farmer.config["pool_share_threshold"]:
# TODO: implement pooling
pass
if estimate_secs < self.farmer.config["propagate_threshold"]:
pool_pk = bytes(response.proof.pool_public_key)
if pool_pk not in self.farmer.pool_sks_map:
self.farmer.log.error(
f"Don't have the private key for the pool key used by harvester: {pool_pk.hex()}"
self.farmer._state_changed("proof")
# Proceed at getting the signatures for this PoSpace
request = harvester_protocol.RequestSignatures(
new_proof_of_space.plot_identifier,
new_proof_of_space.proof.challenge_hash,
[sp.challenge_chain_sp, sp.reward_chain_sp],
)
if new_proof_of_space.proof.challenge_hash not in self.farmer.proofs_of_space:
self.farmer.proofs_of_space[new_proof_of_space.proof.challenge_hash] = [
(
new_proof_of_space.plot_identifier,
new_proof_of_space.proof,
)
]
else:
self.farmer.proofs_of_space[new_proof_of_space.proof.challenge_hash].append(
(
new_proof_of_space.plot_identifier,
new_proof_of_space.proof,
)
return None
pool_target: PoolTarget = PoolTarget(self.farmer.pool_target, uint32(0))
pool_target_signature: G2Element = AugSchemeMPL.sign(
self.farmer.pool_sks_map[pool_pk], bytes(pool_target)
)
self.farmer.quality_str_to_identifiers[computed_quality_string] = (
new_proof_of_space.plot_identifier,
new_proof_of_space.proof.challenge_hash,
)
request2 = farmer_protocol.RequestHeaderHash(
challenge_hash,
response.proof,
pool_target,
pool_target_signature,
self.farmer.wallet_target,
)
msg = Message("request_header_hash", request2)
assert self.farmer.server is not None
await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE)
return None
return None
msg = Message("request_signatures", request)
return msg
@api_request
async def respond_signature(self, response: harvester_protocol.RespondSignature):
async def respond_signatures(self, response: harvester_protocol.RespondSignatures):
"""
Receives a signature on a block header hash, which is required for submitting
a block to the blockchain.
There are two cases: receiving signatures for sps, or receiving signatures for the block.
"""
header_hash = response.message
proof_of_space: bytes32 = self.farmer.header_hash_to_pos[header_hash]
validates: bool = False
for sk in self.farmer._get_private_keys():
pk = sk.get_g1()
if pk == response.farmer_pk:
agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk)
assert agg_pk == proof_of_space.plot_public_key
farmer_share = AugSchemeMPL.sign(sk, header_hash, agg_pk)
agg_sig = AugSchemeMPL.aggregate(
[response.message_signature, farmer_share]
)
validates = AugSchemeMPL.verify(agg_pk, header_hash, agg_sig)
if response.sp_hash not in self.farmer.sps:
self.farmer.log.warning(f"Do not have challenge hash {response.challenge_hash}")
return
is_sp_signatures: bool = False
sp = self.farmer.sps[response.sp_hash]
if response.sp_hash == response.message_signatures[0]:
assert sp.reward_chain_sp == response.message_signatures[1]
is_sp_signatures = True
if validates:
break
assert validates
pospace = None
for plot_identifier, _, candidate_pospace in self.farmer.proofs_of_space[response.sp_hash]:
if plot_identifier == response.plot_identifier:
pospace = candidate_pospace
assert pospace is not None
pos_hash: bytes32 = proof_of_space.get_hash()
if is_sp_signatures:
(
challenge_chain_sp,
challenge_chain_sp_harv_sig,
) = response.message_signatures[0]
reward_chain_sp, reward_chain_sp_harv_sig = response.message_signatures[1]
for sk in self.farmer._get_private_keys():
pk = sk.get_g1()
if pk == response.farmer_pk:
agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk)
assert agg_pk == pospace.plot_public_key
farmer_share_cc_sp = AugSchemeMPL.sign(sk, challenge_chain_sp, agg_pk)
agg_sig_cc_sp = AugSchemeMPL.aggregate([challenge_chain_sp_harv_sig, farmer_share_cc_sp])
assert AugSchemeMPL.verify(agg_pk, challenge_chain_sp, agg_sig_cc_sp)
request = farmer_protocol.HeaderSignature(pos_hash, header_hash, agg_sig)
msg = Message("header_signature", request)
assert self.farmer.server is not None
await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE)
computed_quality_string = pospace.verify_and_get_quality_string(
self.farmer.constants, sp.challenge_hash, challenge_chain_sp
)
# This means it passes the sp filter
if computed_quality_string is not None:
farmer_share_rc_sp = AugSchemeMPL.sign(sk, reward_chain_sp, agg_pk)
agg_sig_rc_sp = AugSchemeMPL.aggregate([reward_chain_sp_harv_sig, farmer_share_rc_sp])
assert AugSchemeMPL.verify(agg_pk, reward_chain_sp, agg_sig_rc_sp)
pool_pk = bytes(pospace.pool_public_key)
if pool_pk not in self.farmer.pool_sks_map:
self.farmer.log.error(f"Don't have the private key for the pool key used by harvester: {pool_pk.hex()}")
return
pool_target: PoolTarget = PoolTarget(self.farmer.pool_target, uint32(0))
pool_target_signature: G2Element = AugSchemeMPL.sign(
self.farmer.pool_sks_map[pool_pk], bytes(pool_target)
)
request = farmer_protocol.DeclareProofOfSpace(
response.challenge_hash,
challenge_chain_sp,
sp.signage_point_index,
reward_chain_sp,
pospace,
agg_sig_cc_sp,
agg_sig_rc_sp,
self.farmer.wallet_target,
pool_target,
pool_target_signature,
)
msg = Message("declare_proof_of_space", request)
await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE)
return
else:
self.farmer.log.warning(f"Have invalid PoSpace {pospace}")
else:
# This is a response with block signatures
for sk in self.farmer._get_private_keys():
(
foliage_sub_block_hash,
foliage_sub_block_sig_harvester,
) = response.message_signatures[0]
(
foliage_block_hash,
foliage_block_sig_harvester,
) = response.message_signatures[1]
pk = sk.get_g1()
if pk == response.farmer_pk:
computed_quality_string = pospace.verify_and_get_quality_string(self.farmer.constants, None, None)
agg_pk = ProofOfSpace.generate_plot_public_key(response.local_pk, pk)
assert agg_pk == pospace.plot_public_key
foliage_sub_block_sig_farmer = AugSchemeMPL.sign(sk, foliage_sub_block_hash, agg_pk)
foliage_block_sig_farmer = AugSchemeMPL.sign(sk, foliage_block_hash, agg_pk)
foliage_sub_block_agg_sig = AugSchemeMPL.aggregate(
[foliage_sub_block_sig_harvester, foliage_sub_block_sig_farmer]
)
foliage_block_agg_sig = AugSchemeMPL.aggregate(
[foliage_block_sig_harvester, foliage_block_sig_farmer]
)
assert AugSchemeMPL.verify(agg_pk, foliage_sub_block_hash, foliage_sub_block_agg_sig)
assert AugSchemeMPL.verify(agg_pk, foliage_block_hash, foliage_block_agg_sig)
request = farmer_protocol.SignedValues(
computed_quality_string,
foliage_sub_block_agg_sig,
foliage_block_agg_sig,
)
msg = Message("signed_values", request)
await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE)
"""
FARMER PROTOCOL (FARMER <-> FULL NODE)
"""
@api_request
async def header_hash(self, response: farmer_protocol.HeaderHash):
"""
Full node responds with the hash of the created header
"""
header_hash: bytes32 = response.header_hash
async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignagePoint):
message = harvester_protocol.NewSignagePoint(
new_signage_point.challenge_hash,
new_signage_point.difficulty,
new_signage_point.sub_slot_iters,
new_signage_point.signage_point_index,
new_signage_point.challenge_chain_sp,
)
(
challenge_hash,
plot_id,
response_number,
) = self.farmer.harvester_responses_proof_hash_to_info[response.pos_hash]
pos = self.farmer.harvester_responses_proofs[
challenge_hash, plot_id, response_number
]
self.farmer.header_hash_to_pos[header_hash] = pos
# TODO: only send to the harvester who made the proof of space, not all harvesters
request = harvester_protocol.RequestSignature(plot_id, header_hash)
msg = Message("request_signature", request)
assert self.farmer.server is not None
msg = Message("new_signage_point", message)
await self.farmer.server.send_to_all([msg], NodeType.HARVESTER)
self.farmer.sps[new_signage_point.challenge_chain_sp] = new_signage_point
self.farmer._state_changed("signage_point")
@api_request
async def proof_of_space_finalized(
self,
proof_of_space_finalized: farmer_protocol.ProofOfSpaceFinalized,
):
"""
Full node notifies farmer that a proof of space has been completed. It gets added to the
challenges list at that weight, and weight is updated if necessary
"""
get_proofs: bool = False
if (
proof_of_space_finalized.weight >= self.farmer.current_weight
and proof_of_space_finalized.challenge_hash
not in self.farmer.seen_challenges
):
# Only get proofs for new challenges, at a current or new weight
get_proofs = True
if proof_of_space_finalized.weight > self.farmer.current_weight:
self.farmer.current_weight = proof_of_space_finalized.weight
async def request_signed_values(self, full_node_request: farmer_protocol.RequestSignedValues):
if full_node_request.quality_string not in self.farmer.quality_str_to_identifiers:
self.farmer.log.error(f"Do not have quality string {full_node_request.quality_string}")
return
self.farmer.log.info(
f"\tCurrent weight set to {self.farmer.current_weight}"
)
self.farmer.seen_challenges.add(proof_of_space_finalized.challenge_hash)
if proof_of_space_finalized.weight not in self.farmer.challenges:
self.farmer.challenges[proof_of_space_finalized.weight] = [
proof_of_space_finalized
]
else:
self.farmer.challenges[proof_of_space_finalized.weight].append(
proof_of_space_finalized
)
self.farmer.challenge_to_weight[
proof_of_space_finalized.challenge_hash
] = proof_of_space_finalized.weight
self.farmer.challenge_to_height[
proof_of_space_finalized.challenge_hash
] = proof_of_space_finalized.height
plot_identifier, challenge_hash = self.farmer.quality_str_to_identifiers[full_node_request.quality_string]
request = harvester_protocol.RequestSignatures(
plot_identifier,
challenge_hash,
[
full_node_request.foliage_sub_block_hash,
full_node_request.foliage_block_hash,
],
)
if get_proofs:
message = harvester_protocol.NewChallenge(
proof_of_space_finalized.challenge_hash
)
msg = Message("new_challenge", message)
assert self.farmer.server is not None
await self.farmer.server.send_to_all([msg], NodeType.HARVESTER)
# This allows the collection of estimates from the harvesters
self.farmer._state_changed("challenge")
@api_request
async def proof_of_space_arrived(
self,
proof_of_space_arrived: farmer_protocol.ProofOfSpaceArrived,
) -> Optional[Message]:
"""
Full node notifies the farmer that a new proof of space was created. The farmer can use this
information to decide whether to propagate a proof.
"""
if proof_of_space_arrived.weight not in self.farmer.unfinished_challenges:
self.farmer.unfinished_challenges[proof_of_space_arrived.weight] = []
else:
self.farmer.unfinished_challenges[proof_of_space_arrived.weight].append(
proof_of_space_arrived.quality_string
)
return None
@api_request
async def proof_of_time_rate(
self,
proof_of_time_rate: farmer_protocol.ProofOfTimeRate,
) -> Optional[Message]:
"""
Updates our internal estimate of the iterations per second for the fastest proof of time
in the network.
"""
self.farmer.proof_of_time_estimate_ips = proof_of_time_rate.pot_estimate_ips
return None
msg = Message("request_signatures", request)
await self.farmer.server.send_to_all([msg], NodeType.HARVESTER)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -7,6 +7,7 @@ from src.consensus.blockchain import Blockchain
from src.full_node.sync_store import SyncStore
from src.protocols import full_node_protocol
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
from src.server.server import ChiaServer
from src.server.ws_connection import WSChiaConnection
from src.types.full_block import FullBlock
from src.types.header_block import HeaderBlock
@ -42,6 +43,7 @@ class SyncPeersHandler:
fork_height: uint32,
blockchain: Blockchain,
peak_height: uint32,
server: ChiaServer
):
self.sync_store = sync_store
# Set of outbound requests for every full_node peer, and time sent
@ -61,7 +63,7 @@ class SyncPeersHandler:
self.potential_blocks_received = self.sync_store.potential_blocks_received
self.potential_blocks = self.sync_store.potential_blocks
self.server = server
# No blocks received yet
for height in range(self.fully_validated_up_to + 1, peak_height + 1):
self.potential_blocks_received[uint32(height)] = asyncio.Event()
@ -77,14 +79,33 @@ class SyncPeersHandler:
# We have received all blocks
return True
async def _add_to_request_sets(self) -> List[OutboundMessage]:
async def monitor_timeouts(self):
"""
If any of our requests have timed out, disconnects from the node that should
have responded.
"""
current_time = time.time()
remove_node_ids = []
for node_id, outbound_set in self.current_outbound_sets.items():
for _, time_requested in outbound_set.items():
if current_time - float(time_requested) > self.BLOCK_RESPONSE_TIMEOUT:
remove_node_ids.append(node_id)
for rn_id in remove_node_ids:
if rn_id in self.current_outbound_sets:
log.warning(f"Timeout receiving block, closing connection with node {rn_id}")
self.current_outbound_sets.pop(rn_id, None)
if rn_id in self.server.all_connections:
con = self.server.all_connections[rn_id]
await con.close()
async def add_to_request_sets(self):
"""
Refreshes the pointers of how far we validated and how far we downloaded. Then goes through
all peers and sends requests to peers for the blocks we have not requested yet, or have
requested to a peer that did not respond in time or disconnected.
"""
if not self.sync_store.get_sync_mode():
return []
return
# fork fully validated MAX_GAP target (peak)
# $$$$$X$$$$$$$$$$$$$$$X================----==---=--====---=--X------->
@ -108,8 +129,8 @@ class SyncPeersHandler:
to_send: List[uint32] = []
# Finds a block height
for height in range(
self.fully_validated_up_to + 1,
min(self.fully_validated_up_to + self.MAX_GAP + 1, self.peak_height + 1),
self.fully_validated_up_to + 1,
min(self.fully_validated_up_to + self.MAX_GAP + 1, self.peak_height + 1),
):
if len(to_send) == free_slots:
# No more slots to send to any peers
@ -135,8 +156,6 @@ class SyncPeersHandler:
outbound_sets_list = list(self.current_outbound_sets.items())
outbound_sets_list.sort(key=lambda x: len(x[1]))
index = 0
messages: List[Any] = []
node_id = None
for height in to_send:
# Find a the next peer with an empty slot. There must be an empty slot: to_send
# includes up to free_slots things, and current_outbound sets cannot change since there is
@ -148,22 +167,14 @@ class SyncPeersHandler:
node_id, request_set = outbound_sets_list[index % len(outbound_sets_list)]
request_set[uint32(height)] = uint64(int(time.time()))
request = full_node_protocol.RequestBlock(
height, self.header_hashes[height]
)
msg = OutboundMessage(
NodeType.FULL_NODE,
Message("request_block", request),
Delivery.SPECIFIC,
node_id,
)
messages.append(msg)
request = full_node_protocol.RequestSubBlock(height, True)
msg = Message("request_block", request)
await self.server.send_to_specific([msg], node_id)
return messages
async def new_block(
self, block: Union[FullBlock, HeaderBlock]
) -> List[OutboundMessage]:
):
"""
A new block was received from a peer.
"""
@ -178,7 +189,7 @@ class SyncPeersHandler:
# save block to DB
self.potential_blocks[block.height] = block
if not self.sync_store.get_sync_mode():
return []
return
assert block.height in self.potential_blocks_received
@ -189,8 +200,7 @@ class SyncPeersHandler:
request_set.pop(header_hash, None)
# add to request sets
requests = await self._add_to_request_sets()
return requests
await self.add_to_request_sets()
def new_node_connected(self, node_id: bytes32):
"""

View File

@ -1,17 +1,22 @@
import asyncio
import dataclasses
import time
from pathlib import Path
from typing import Optional, Callable
from typing import Optional, Callable, List, Tuple
from blspy import AugSchemeMPL, G2Element
from chiapos import DiskProver
from src.consensus.pot_iterations import calculate_sp_interval_iters, calculate_iterations_quality
from src.harvester import Harvester
from src.plotting.plot_tools import PlotInfo
from src.protocols import harvester_protocol
from src.server.outbound_message import Message
from src.server.ws_connection import WSChiaConnection
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
from src.util.api_decorators import api_request, peer_required
from src.util.ints import uint8
from src.util.ints import uint8, uint64
class HarvesterAPI:
@ -23,135 +28,162 @@ class HarvesterAPI:
def _set_state_changed_callback(self, callback: Callable):
self.harvester.state_changed_callback = callback
@peer_required
@api_request
async def harvester_handshake(
self,
harvester_handshake: harvester_protocol.HarvesterHandshake,
peer: WSChiaConnection,
):
async def harvester_handshake(self, harvester_handshake: harvester_protocol.HarvesterHandshake):
"""
Handshake between the harvester and farmer. The harvester receives the pool public keys,
as well as the farmer pks, which must be put into the plots, before the plotting process begins.
We cannot use any plots which have different keys in them.
"""
self.harvester.farmer_public_keys = harvester_handshake.farmer_public_keys
self.harvester.pool_public_keys = harvester_handshake.pool_public_keys
self.farmer_public_keys = harvester_handshake.farmer_public_keys
self.pool_public_keys = harvester_handshake.pool_public_keys
await self.harvester._refresh_plots()
if len(self.harvester.provers) == 0:
self.harvester.log.warning(
"Not farming any plots on this harvester. Check your configuration."
)
self.harvester.log.warning("Not farming any plots on this harvester. Check your configuration.")
return
for new_challenge in self.harvester.cached_challenges:
async for msg in self.harvester._new_challenge(new_challenge):
await peer.send_message(msg)
self.harvester.cached_challenges = []
self.harvester._state_changed("plots")
@peer_required
@api_request
async def new_challenge(
self, new_challenge: harvester_protocol.NewChallenge, peer: WSChiaConnection
):
async for msg in self.harvester._new_challenge(new_challenge):
await peer.send_message(msg)
@api_request
async def request_proof_of_space(
self, request: harvester_protocol.RequestProofOfSpace
):
async def new_signage_point(self, new_challenge: harvester_protocol.NewSignagePoint, peer: WSChiaConnection):
"""
The farmer requests a proof of space, for one of the plots.
We look up the correct plot based on the plot id and response number, lookup the proof,
and return it.
The harvester receives a new signage point from the farmer, this happens at the start of each slot.
The harvester does a few things:
1. The harvester applies the plot filter for each of the plots, to select the proportion which are eligible
for this signage point and challenge.
2. The harvester gets the qualities for each plot. This is approximately 7 reads per plot which qualifies.
Note that each plot may have 0, 1, 2, etc qualities for that challenge: but on average it will have 1.
3. Checks the required_iters for each quality and the given signage point, to see which are eligible for
inclusion (required_iters < sp_interval_iters).
4. Looks up the full proof of space in the plot for each quality, approximately 64 reads per quality
5. Returns the proof of space to the farmer
"""
response: Optional[harvester_protocol.RespondProofOfSpace] = None
challenge_hash = request.challenge_hash
filename = Path(request.plot_id).resolve()
index = request.response_number
proof_xs: bytes
plot_info = self.harvester.provers[filename]
if len(self.pool_public_keys) == 0 or len(self.farmer_public_keys) == 0:
# This means that we have not received the handshake yet
return
try:
start = time.time()
assert len(new_challenge.challenge_hash) == 32
# Refresh plots to see if there are any new ones
await self.harvester._refresh_plots()
loop = asyncio.get_running_loop()
def blocking_lookup(filename: Path, plot_info: PlotInfo, prover: DiskProver) -> List[ProofOfSpace]:
# Uses the DiskProver object to lookup qualities. This is a blocking call,
# so it should be run in a thread pool.
try:
proof_xs = plot_info.prover.get_full_proof(challenge_hash, index)
except RuntimeError:
prover = DiskProver(str(filename))
self.harvester.provers[filename] = PlotInfo(
prover,
plot_info.pool_public_key,
plot_info.farmer_public_key,
plot_info.plot_public_key,
plot_info.local_sk,
plot_info.file_size,
plot_info.time_modified,
sp_challenge_hash = ProofOfSpace.calculate_new_challenge_hash(
plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
)
proof_xs = self.harvester.provers[filename].prover.get_full_proof(
challenge_hash, index
quality_strings = prover.get_qualities_for_challenge(sp_challenge_hash)
except Exception:
self.harvester.log.error("Error using prover object. Reinitializing prover object.")
self.harvester.provers[filename] = dataclasses.replace(plot_info, prover=DiskProver(str(filename)))
return []
responses: List[ProofOfSpace] = []
if quality_strings is not None:
# Found proofs of space (on average 1 is expected per plot)
for index, quality_str in enumerate(quality_strings):
required_iters: uint64 = calculate_iterations_quality(
quality_str, prover.get_size(), new_challenge.difficulty, new_challenge.sp_hash
)
sp_interval_iters = calculate_sp_interval_iters(self.harvester.constants, new_challenge.sub_slot_iters)
if required_iters < sp_interval_iters:
# Found a very good proof of space! will fetch the whole proof from disk, then send to farmer
try:
proof_xs = prover.get_full_proof(sp_challenge_hash, index)
except RuntimeError:
self.harvester.log.error(f"Exception fetching full proof for {filename}")
continue
plot_public_key = ProofOfSpace.generate_plot_public_key(
plot_info.local_sk.get_g1(), plot_info.farmer_public_key
)
responses.append(
ProofOfSpace(
sp_challenge_hash,
plot_info.pool_public_key,
None,
plot_public_key,
uint8(prover.get_size()),
proof_xs,
)
)
return responses
async def lookup_challenge(filename: Path, prover: DiskProver) -> List[harvester_protocol.NewProofOfSpace]:
# Executes a DiskProverLookup in a thread pool, and returns responses
all_responses: List[harvester_protocol.NewProofOfSpace] = []
proofs_of_space: List[ProofOfSpace] = await loop.run_in_executor(
self.harvester.executor, blocking_lookup, filename, prover
)
for proof_of_space in proofs_of_space:
all_responses.append(
harvester_protocol.NewProofOfSpace(
new_challenge.challenge_hash, prover.get_id(), proof_of_space, new_challenge.signage_point_index
)
)
except KeyError:
self.harvester.log.warning(f"KeyError plot {filename} does not exist.")
return all_responses
plot_info = self.harvester.provers[filename]
plot_public_key = ProofOfSpace.generate_plot_public_key(
plot_info.local_sk.get_g1(), plot_info.farmer_public_key
)
awaitables = []
for try_plot_filename, try_plot_info in self.harvester.provers.items():
if try_plot_filename.exists():
# Passes the plot filter (does not check sp filter yet though, since we have not reached sp)
# This is being executed at the beginning of the slot
if ProofOfSpace.passes_plot_filter(
self.harvester.constants, try_plot_info.prover.get_id(), new_challenge.challenge_hash, new_challenge.sp_hash
):
awaitables.append(lookup_challenge(try_plot_filename, try_plot_info.prover))
proof_of_space: ProofOfSpace = ProofOfSpace(
challenge_hash,
plot_info.pool_public_key,
plot_public_key,
uint8(self.harvester.provers[filename].prover.get_size()),
proof_xs,
# Concurrently executes all lookups on disk, to take advantage of multiple disk parallelism
total_proofs_found = 0
for sublist_awaitable in asyncio.as_completed(awaitables):
for response in await sublist_awaitable:
total_proofs_found += 1
msg = Message("challenge_response", response)
await peer.send_message(msg)
self.harvester.log.info(
f"{len(awaitables)} plots were eligible for farming {new_challenge.challenge_hash.hex()[:10]}..."
f" Found {total_proofs_found} proofs. Time: {time.time() - start}. "
f"Total {len(self.harvester.provers)} plots"
)
response = harvester_protocol.RespondProofOfSpace(
request.plot_id,
request.response_number,
proof_of_space,
)
if response:
msg = Message("respond_proof_of_space", response)
return msg
@api_request
async def request_signature(self, request: harvester_protocol.RequestSignature):
async def request_signatures(self, request: harvester_protocol.RequestSignatures):
"""
The farmer requests a signature on the header hash, for one of the proofs that we found.
A signature is created on the header hash using the harvester private key. This can also
be used for pooling.
"""
plot_info = None
try:
plot_info = self.harvester.provers[Path(request.plot_id).resolve()]
plot_info = self.harvester.provers[Path(request.plot_identifier).resolve()]
except KeyError:
self.harvester.log.warning(
f"KeyError plot {request.plot_id} does not exist."
)
self.harvester.log.warning(f"KeyError plot {request.plot_identifier} does not exist.")
return
local_sk = plot_info.local_sk
agg_pk = ProofOfSpace.generate_plot_public_key(
local_sk.get_g1(), plot_info.farmer_public_key
)
agg_pk = ProofOfSpace.generate_plot_public_key(local_sk.get_g1(), plot_info.farmer_public_key)
# This is only a partial signature. When combined with the farmer's half, it will
# form a complete PrependSignature.
signature: G2Element = AugSchemeMPL.sign(local_sk, request.message, agg_pk)
message_signatures: List[Tuple[bytes32, G2Element]] = []
for message in request.messages:
signature: G2Element = AugSchemeMPL.sign(local_sk, message, agg_pk)
message_signatures.append((message, signature))
response: harvester_protocol.RespondSignature = (
harvester_protocol.RespondSignature(
request.plot_id,
request.message,
local_sk.get_g1(),
plot_info.farmer_public_key,
signature,
)
response: harvester_protocol.RespondSignatures = harvester_protocol.RespondSignatures(
request.plot_identifier,
request.sp_hash,
local_sk.get_g1(),
plot_info.farmer_public_key,
message_signatures,
)
msg = Message("respond_signature", response)
msg = Message("respond_signatures", response)
return msg

View File

@ -1,8 +1,10 @@
from typing import Callable
from src.protocols import timelord_protocol
from src.timelord import Timelord
from src.timelord_new import Timelord
from src.timelord_new import IterationType, iters_from_sub_block
from src.util.api_decorators import api_request
from src.util.ints import uint64
class TimelordAPI:
@ -14,103 +16,34 @@ class TimelordAPI:
def _set_state_changed_callback(self, callback: Callable):
pass
@api_request
async def challenge_start(self, challenge_start: timelord_protocol.ChallengeStart):
"""
The full node notifies the timelord node that a new challenge is active, and work
should be started on it. We add the challenge into the queue if it's worth it to have.
"""
async with self.timelord.lock:
if not self.timelord.sanitizer_mode:
if challenge_start.challenge_hash in self.timelord.seen_discriminants:
self.timelord.log.info(
f"Have already seen this challenge hash {challenge_start.challenge_hash}. Ignoring."
)
return
if challenge_start.weight <= self.timelord.best_weight_three_proofs:
self.timelord.log.info(
"Not starting challenge, already three proofs at that weight"
)
return
self.timelord.seen_discriminants.append(challenge_start.challenge_hash)
self.timelord.discriminant_queue.append(
(challenge_start.challenge_hash, challenge_start.weight)
)
self.timelord.log.info("Appended to discriminant queue.")
else:
disc_dict = dict(self.timelord.discriminant_queue)
if challenge_start.challenge_hash in disc_dict:
self.timelord.log.info(
"Challenge already in discriminant queue. Ignoring."
)
return
if challenge_start.challenge_hash in self.timelord.active_discriminants:
self.timelord.log.info("Challenge currently running. Ignoring.")
return
self.timelord.discriminant_queue.append(
(challenge_start.challenge_hash, challenge_start.weight)
)
if challenge_start.weight not in self.timelord.max_known_weights:
self.timelord.max_known_weights.append(challenge_start.weight)
self.timelord.max_known_weights.sort()
if len(self.timelord.max_known_weights) > 5:
self.timelord.max_known_weights = (
self.timelord.max_known_weights[-5:]
)
@property
def lock(self):
return self.timelord.lock
@api_request
async def proof_of_space_info(
self,
proof_of_space_info: timelord_protocol.ProofOfSpaceInfo,
):
"""
Notification from full node about a new proof of space for a challenge. If we already
have a process for this challenge, we should communicate to the process to tell it how
many iterations to run for.
"""
async with self.timelord.lock:
if not self.timelord.sanitizer_mode:
self.timelord.log.info(
f"proof_of_space_info {proof_of_space_info.challenge_hash} {proof_of_space_info.iterations_needed}"
)
if (
proof_of_space_info.challenge_hash
in self.timelord.done_discriminants
):
self.timelord.log.info(
f"proof_of_space_info {proof_of_space_info.challenge_hash} already done, returning"
)
return
else:
disc_dict = dict(self.timelord.discriminant_queue)
if proof_of_space_info.challenge_hash in disc_dict:
challenge_weight = disc_dict[proof_of_space_info.challenge_hash]
if challenge_weight >= min(self.timelord.max_known_weights):
self.timelord.log.info(
"Not storing iter, waiting for more block confirmations."
)
return
else:
self.timelord.log.info("Not storing iter, challenge inactive.")
return
if proof_of_space_info.challenge_hash not in self.timelord.pending_iters:
self.timelord.pending_iters[proof_of_space_info.challenge_hash] = []
if proof_of_space_info.challenge_hash not in self.timelord.submitted_iters:
self.timelord.submitted_iters[proof_of_space_info.challenge_hash] = []
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
async with self.lock:
if (
proof_of_space_info.iterations_needed
not in self.timelord.pending_iters[proof_of_space_info.challenge_hash]
and proof_of_space_info.iterations_needed
not in self.timelord.submitted_iters[proof_of_space_info.challenge_hash]
self.timelord.last_state is None
or self.timelord.last_state.get_weight() < new_peak.weight
):
self.timelord.log.info(
f"proof_of_space_info {proof_of_space_info.challenge_hash} adding "
f"{proof_of_space_info.iterations_needed} to "
f"{self.timelord.pending_iters[proof_of_space_info.challenge_hash]}"
)
self.timelord.pending_iters[proof_of_space_info.challenge_hash].append(
proof_of_space_info.iterations_needed
)
self.new_peak = new_peak
@api_request
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
async with self.lock:
if not self.timelord._accept_unfinished_block(new_unfinished_subblock):
return
sp_iters, ip_iters = iters_from_sub_block(
new_unfinished_subblock.reward_chain_sub_block,
self.timelord.last_state.get_ips(),
self.timelord.last_state.get_difficulty(),
)
last_ip_iters = self.timelord.last_state.get_last_ip()
if sp_iters < ip_iters:
self.timelord.overflow_blocks.append(new_unfinished_subblock)
elif ip_iters > last_ip_iters:
self.timelord.unfinished_blocks.append(new_unfinished_subblock)
for chain in Chain:
self.timelord.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
self.timelord.iteration_to_proof_type[ip_iters - self.timelord.last_ip_iters] = IterationType.INFUSION_POINT

View File

@ -225,34 +225,6 @@ class Timelord:
def _set_server(self, server: ChiaServer):
self.server = server
@api_request
async def new_peak(self, new_peak: timelord_protocol.NewPeak):
async with self.lock:
if (
self.last_state is None
or self.last_state.get_weight() < new_peak.weight
):
self.new_peak = new_peak
@api_request
async def new_unfinished_subblock(self, new_unfinished_subblock: timelord_protocol.NewUnfinishedSubBlock):
async with self.lock:
if not self._accept_unfinished_block(new_unfinished_subblock):
return
sp_iters, ip_iters = iters_from_sub_block(
new_unfinished_subblock.reward_chain_sub_block,
self.last_state.get_ips(),
self.last_state.get_difficulty(),
)
last_ip_iters = self.last_state.get_last_ip()
if sp_iters < ip_iters:
self.overflow_blocks.append(new_unfinished_subblock)
elif ip_iters > last_ip_iters:
self.unfinished_blocks.append(new_unfinished_subblock)
for chain in Chain:
self.iters_to_submit[chain].append(uint64(ip_iters - last_ip_iters))
self.iteration_to_proof_type[ip_iters - self.last_ip_iters] = IterationType.INFUSION_POINT
def _accept_unfinished_block(self, block: timelord_protocol.NewUnfinishedSubBlock) -> bool:
# Total unfinished block iters needs to exceed peak's iters.
if self.last_state.get_total_iters() >= block.total_iters: