chia-blockchain/chia/farmer/farmer.py
2023-12-11 08:27:25 -06:00

838 lines
40 KiB
Python

from __future__ import annotations
import asyncio
import contextlib
import json
import logging
import time
import traceback
from math import floor
from pathlib import Path
from typing import TYPE_CHECKING, Any, AsyncIterator, ClassVar, Dict, List, Optional, Set, Tuple, Union, cast
import aiohttp
from chia_rs import AugSchemeMPL, G1Element, G2Element, PrivateKey
from chia.consensus.constants import ConsensusConstants
from chia.daemon.keychain_proxy import KeychainProxy, connect_to_keychain_and_validate, wrap_local_keychain
from chia.plot_sync.delta import Delta
from chia.plot_sync.receiver import Receiver
from chia.pools.pool_config import PoolWalletConfig, add_auth_key, load_pool_config
from chia.protocols import farmer_protocol, harvester_protocol
from chia.protocols.pool_protocol import (
AuthenticationPayload,
ErrorResponse,
GetFarmerResponse,
PoolErrorCode,
PostFarmerPayload,
PostFarmerRequest,
PutFarmerPayload,
PutFarmerRequest,
get_current_authentication_token,
)
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.rpc.rpc_server import StateChangedProtocol, default_get_connections
from chia.server.outbound_message import NodeType, make_msg
from chia.server.server import ChiaServer, ssl_context_for_root
from chia.server.ws_connection import WSChiaConnection
from chia.ssl.create_ssl import get_mozilla_ca_crt
from chia.types.blockchain_format.proof_of_space import ProofOfSpace
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.bech32m import decode_puzzle_hash
from chia.util.byte_types import hexstr_to_bytes
from chia.util.config import config_path_for_filename, load_config, lock_and_load_config, save_config
from chia.util.errors import KeychainProxyConnectionFailure
from chia.util.hash import std_hash
from chia.util.ints import uint8, uint16, uint32, uint64
from chia.util.keychain import Keychain
from chia.util.logging import TimedDuplicateFilter
from chia.wallet.derive_keys import (
find_authentication_sk,
find_owner_sk,
master_sk_to_farmer_sk,
master_sk_to_pool_sk,
match_address_to_sk,
)
from chia.wallet.puzzles.singleton_top_layer import SINGLETON_MOD
singleton_mod_hash = SINGLETON_MOD.get_tree_hash()
log = logging.getLogger(__name__)
UPDATE_POOL_INFO_INTERVAL: int = 3600
UPDATE_POOL_INFO_FAILURE_RETRY_INTERVAL: int = 120
UPDATE_POOL_FARMER_INFO_INTERVAL: int = 300
def strip_old_entries(pairs: List[Tuple[float, Any]], before: float) -> List[Tuple[float, Any]]:
for index, [timestamp, points] in enumerate(pairs):
if timestamp >= before:
if index == 0:
return pairs
if index > 0:
return pairs[index:]
return []
def increment_pool_stats(
pool_states: Dict[bytes32, Any],
p2_singleton_puzzlehash: bytes32,
name: str,
current_time: float,
count: int = 1,
value: Optional[Union[int, Dict[str, Any]]] = None,
) -> None:
if p2_singleton_puzzlehash not in pool_states:
return
pool_state = pool_states[p2_singleton_puzzlehash]
if f"{name}_since_start" in pool_state:
pool_state[f"{name}_since_start"] += count
if f"{name}_24h" in pool_state:
if value is None:
pool_state[f"{name}_24h"].append((uint32(current_time), pool_state["current_difficulty"]))
else:
pool_state[f"{name}_24h"].append((uint32(current_time), value))
# Age out old 24h information for every signage point regardless
# of any failures. Note that this still lets old data remain if
# the client isn't receiving signage points.
cutoff_24h = current_time - (24 * 60 * 60)
pool_state[f"{name}_24h"] = strip_old_entries(pairs=pool_state[f"{name}_24h"], before=cutoff_24h)
return
"""
HARVESTER PROTOCOL (FARMER <-> HARVESTER)
"""
class Farmer:
if TYPE_CHECKING:
from chia.rpc.rpc_server import RpcServiceProtocol
_protocol_check: ClassVar[RpcServiceProtocol] = cast("Farmer", None)
def __init__(
self,
root_path: Path,
farmer_config: Dict[str, Any],
pool_config: Dict[str, Any],
consensus_constants: ConsensusConstants,
local_keychain: Optional[Keychain] = None,
):
self.keychain_proxy: Optional[KeychainProxy] = None
self.local_keychain = local_keychain
self._root_path = root_path
self.config = farmer_config
self.pool_config = pool_config
# Keep track of all sps, keyed on challenge chain signage point hash
self.sps: Dict[bytes32, List[farmer_protocol.NewSignagePoint]] = {}
# Keep track of harvester plot identifier (str), target sp index, and PoSpace for each challenge
self.proofs_of_space: Dict[bytes32, List[Tuple[str, ProofOfSpace]]] = {}
# Quality string to plot identifier and challenge_hash, for use with harvester.RequestSignatures
self.quality_str_to_identifiers: Dict[bytes32, Tuple[str, bytes32, bytes32, bytes32]] = {}
# number of responses to each signage point
self.number_of_responses: Dict[bytes32, int] = {}
# A dictionary of keys to time added. These keys refer to keys in the above 4 dictionaries. This is used
# to periodically clear the memory
self.cache_add_time: Dict[bytes32, uint64] = {}
self.plot_sync_receivers: Dict[bytes32, Receiver] = {}
self.cache_clear_task: Optional[asyncio.Task[None]] = None
self.update_pool_state_task: Optional[asyncio.Task[None]] = None
self.constants = consensus_constants
self._shut_down = False
self.server: Any = None
self.state_changed_callback: Optional[StateChangedProtocol] = None
self.log = log
self.log.addFilter(TimedDuplicateFilter("No pool specific authentication_token_timeout.*", 60 * 10))
self.log.addFilter(TimedDuplicateFilter("No pool specific difficulty has been set.*", 60 * 10))
self.started = False
self.harvester_handshake_task: Optional[asyncio.Task[None]] = None
# From p2_singleton_puzzle_hash to pool state dict
self.pool_state: Dict[bytes32, Dict[str, Any]] = {}
# From p2_singleton to auth PrivateKey
self.authentication_keys: Dict[bytes32, PrivateKey] = {}
# Last time we updated pool_state based on the config file
self.last_config_access_time: float = 0
self.all_root_sks: List[PrivateKey] = []
# Use to find missing signage points. (new_signage_point, time)
self.prev_signage_point: Optional[Tuple[uint64, farmer_protocol.NewSignagePoint]] = None
@contextlib.asynccontextmanager
async def manage(self) -> AsyncIterator[None]:
async def start_task() -> None:
# `Farmer.setup_keys` returns `False` if there are no keys setup yet. In this case we just try until it
# succeeds or until we need to shut down.
while not self._shut_down:
if await self.setup_keys():
self.update_pool_state_task = asyncio.create_task(self._periodically_update_pool_state_task())
self.cache_clear_task = asyncio.create_task(self._periodically_clear_cache_and_refresh_task())
log.debug("start_task: initialized")
self.started = True
return
await asyncio.sleep(1)
asyncio.create_task(start_task())
try:
yield
finally:
self._shut_down = True
if self.cache_clear_task is not None:
await self.cache_clear_task
if self.update_pool_state_task is not None:
await self.update_pool_state_task
if self.keychain_proxy is not None:
proxy = self.keychain_proxy
self.keychain_proxy = None
await proxy.close()
await asyncio.sleep(0.5) # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
self.started = False
def get_connections(self, request_node_type: Optional[NodeType]) -> List[Dict[str, Any]]:
return default_get_connections(server=self.server, request_node_type=request_node_type)
async def ensure_keychain_proxy(self) -> KeychainProxy:
if self.keychain_proxy is None:
if self.local_keychain:
self.keychain_proxy = wrap_local_keychain(self.local_keychain, log=self.log)
else:
self.keychain_proxy = await connect_to_keychain_and_validate(self._root_path, self.log)
if not self.keychain_proxy:
raise KeychainProxyConnectionFailure()
return self.keychain_proxy
async def get_all_private_keys(self) -> List[Tuple[PrivateKey, bytes]]:
keychain_proxy = await self.ensure_keychain_proxy()
return await keychain_proxy.get_all_private_keys()
async def setup_keys(self) -> bool:
no_keys_error_str = "No keys exist. Please run 'chia keys generate' or open the UI."
try:
self.all_root_sks = [sk for sk, _ in await self.get_all_private_keys()]
except KeychainProxyConnectionFailure:
return False
self._private_keys = [master_sk_to_farmer_sk(sk) for sk in self.all_root_sks] + [
master_sk_to_pool_sk(sk) for sk in self.all_root_sks
]
if len(self.get_public_keys()) == 0:
log.warning(no_keys_error_str)
return False
config = load_config(self._root_path, "config.yaml")
if "xch_target_address" not in self.config:
self.config = config["farmer"]
if "xch_target_address" not in self.pool_config:
self.pool_config = config["pool"]
if "xch_target_address" not in self.config or "xch_target_address" not in self.pool_config:
log.debug("xch_target_address missing in the config")
return False
# This is the farmer configuration
self.farmer_target_encoded = self.config["xch_target_address"]
self.farmer_target = decode_puzzle_hash(self.farmer_target_encoded)
self.pool_public_keys = [G1Element.from_bytes(bytes.fromhex(pk)) for pk in self.config["pool_public_keys"]]
# This is the self pooling configuration, which is only used for original self-pooled plots
self.pool_target_encoded = self.pool_config["xch_target_address"]
self.pool_target = decode_puzzle_hash(self.pool_target_encoded)
self.pool_sks_map = {bytes(key.get_g1()): key for key in self.get_private_keys()}
assert len(self.farmer_target) == 32
assert len(self.pool_target) == 32
if len(self.pool_sks_map) == 0:
log.warning(no_keys_error_str)
return False
return True
def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None:
self.state_changed_callback = callback
async def on_connect(self, peer: WSChiaConnection) -> None:
self.state_changed("add_connection", {})
async def handshake_task() -> None:
# Wait until the task in `Farmer._start` is done so that we have keys available for the handshake. Bail out
# early if we need to shut down or if the harvester is not longer connected.
while not self.started and not self._shut_down and peer in self.server.get_connections():
await asyncio.sleep(1)
if self._shut_down:
log.debug("handshake_task: shutdown")
self.harvester_handshake_task = None
return
if peer not in self.server.get_connections():
log.debug("handshake_task: disconnected")
self.harvester_handshake_task = None
return
# Sends a handshake to the harvester
handshake = harvester_protocol.HarvesterHandshake(
self.get_public_keys(),
self.pool_public_keys,
)
msg = make_msg(ProtocolMessageTypes.harvester_handshake, handshake)
await peer.send_message(msg)
self.harvester_handshake_task = None
if peer.connection_type is NodeType.HARVESTER:
self.plot_sync_receivers[peer.peer_node_id] = Receiver(peer, self.plot_sync_callback)
self.harvester_handshake_task = asyncio.create_task(handshake_task())
def set_server(self, server: ChiaServer) -> None:
self.server = server
def state_changed(self, change: str, data: Dict[str, Any]) -> None:
if self.state_changed_callback is not None:
self.state_changed_callback(change, data)
def handle_failed_pool_response(self, p2_singleton_puzzle_hash: bytes32, error_message: str) -> None:
self.log.error(error_message)
increment_pool_stats(
self.pool_state,
p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=ErrorResponse(uint16(PoolErrorCode.REQUEST_FAILED.value), error_message).to_json_dict(),
)
async def on_disconnect(self, connection: WSChiaConnection) -> None:
self.log.info(f"peer disconnected {connection.get_peer_logging()}")
self.state_changed("close_connection", {})
if connection.connection_type is NodeType.HARVESTER:
del self.plot_sync_receivers[connection.peer_node_id]
self.state_changed("harvester_removed", {"node_id": connection.peer_node_id})
async def plot_sync_callback(self, peer_id: bytes32, delta: Optional[Delta]) -> None:
log.debug(f"plot_sync_callback: peer_id {peer_id}, delta {delta}")
receiver: Receiver = self.plot_sync_receivers[peer_id]
harvester_updated: bool = delta is not None and not delta.empty()
if receiver.initial_sync() or harvester_updated:
self.state_changed("harvester_update", receiver.to_dict(True))
async def _pool_get_pool_info(self, pool_config: PoolWalletConfig) -> Optional[Dict[str, Any]]:
try:
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
f"{pool_config.pool_url}/pool_info", ssl=ssl_context_for_root(get_mozilla_ca_crt(), log=self.log)
) as resp:
if resp.ok:
response: Dict[str, Any] = json.loads(await resp.text())
self.log.info(f"GET /pool_info response: {response}")
return response
else:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash,
f"Error in GET /pool_info {pool_config.pool_url}, {resp.status}",
)
except Exception as e:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash, f"Exception in GET /pool_info {pool_config.pool_url}, {e}"
)
return None
async def _pool_get_farmer(
self, pool_config: PoolWalletConfig, authentication_token_timeout: uint8, authentication_sk: PrivateKey
) -> Optional[Dict[str, Any]]:
authentication_token = get_current_authentication_token(authentication_token_timeout)
message: bytes32 = std_hash(
AuthenticationPayload(
"get_farmer", pool_config.launcher_id, pool_config.target_puzzle_hash, authentication_token
)
)
signature: G2Element = AugSchemeMPL.sign(authentication_sk, message)
get_farmer_params = {
"launcher_id": pool_config.launcher_id.hex(),
"authentication_token": authentication_token,
"signature": bytes(signature).hex(),
}
try:
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
f"{pool_config.pool_url}/farmer",
params=get_farmer_params,
ssl=ssl_context_for_root(get_mozilla_ca_crt(), log=self.log),
) as resp:
if resp.ok:
response: Dict[str, Any] = json.loads(await resp.text())
log_level = logging.INFO
if "error_code" in response:
log_level = logging.WARNING
increment_pool_stats(
self.pool_state,
pool_config.p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=response,
)
self.log.log(log_level, f"GET /farmer response: {response}")
return response
else:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash,
f"Error in GET /farmer {pool_config.pool_url}, {resp.status}",
)
except Exception as e:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash, f"Exception in GET /farmer {pool_config.pool_url}, {e}"
)
return None
async def _pool_post_farmer(
self, pool_config: PoolWalletConfig, authentication_token_timeout: uint8, owner_sk: PrivateKey
) -> Optional[Dict[str, Any]]:
auth_sk: Optional[PrivateKey] = self.get_authentication_sk(pool_config)
assert auth_sk is not None
post_farmer_payload: PostFarmerPayload = PostFarmerPayload(
pool_config.launcher_id,
get_current_authentication_token(authentication_token_timeout),
auth_sk.get_g1(),
pool_config.payout_instructions,
None,
)
assert owner_sk.get_g1() == pool_config.owner_public_key
signature: G2Element = AugSchemeMPL.sign(owner_sk, post_farmer_payload.get_hash())
post_farmer_request = PostFarmerRequest(post_farmer_payload, signature)
self.log.debug(f"POST /farmer request {post_farmer_request}")
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{pool_config.pool_url}/farmer",
json=post_farmer_request.to_json_dict(),
ssl=ssl_context_for_root(get_mozilla_ca_crt(), log=self.log),
) as resp:
if resp.ok:
response: Dict[str, Any] = json.loads(await resp.text())
log_level = logging.INFO
if "error_code" in response:
log_level = logging.WARNING
increment_pool_stats(
self.pool_state,
pool_config.p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=response,
)
self.log.log(log_level, f"POST /farmer response: {response}")
return response
else:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash,
f"Error in POST /farmer {pool_config.pool_url}, {resp.status}",
)
except Exception as e:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash, f"Exception in POST /farmer {pool_config.pool_url}, {e}"
)
return None
async def _pool_put_farmer(
self, pool_config: PoolWalletConfig, authentication_token_timeout: uint8, owner_sk: PrivateKey
) -> None:
auth_sk: Optional[PrivateKey] = self.get_authentication_sk(pool_config)
assert auth_sk is not None
put_farmer_payload: PutFarmerPayload = PutFarmerPayload(
pool_config.launcher_id,
get_current_authentication_token(authentication_token_timeout),
auth_sk.get_g1(),
pool_config.payout_instructions,
None,
)
assert owner_sk.get_g1() == pool_config.owner_public_key
signature: G2Element = AugSchemeMPL.sign(owner_sk, put_farmer_payload.get_hash())
put_farmer_request = PutFarmerRequest(put_farmer_payload, signature)
self.log.debug(f"PUT /farmer request {put_farmer_request}")
try:
async with aiohttp.ClientSession() as session:
async with session.put(
f"{pool_config.pool_url}/farmer",
json=put_farmer_request.to_json_dict(),
ssl=ssl_context_for_root(get_mozilla_ca_crt(), log=self.log),
) as resp:
if resp.ok:
response: Dict[str, Any] = json.loads(await resp.text())
log_level = logging.INFO
if "error_code" in response:
log_level = logging.WARNING
increment_pool_stats(
self.pool_state,
pool_config.p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=response,
)
self.log.log(log_level, f"PUT /farmer response: {response}")
else:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash,
f"Error in PUT /farmer {pool_config.pool_url}, {resp.status}",
)
except Exception as e:
self.handle_failed_pool_response(
pool_config.p2_singleton_puzzle_hash, f"Exception in PUT /farmer {pool_config.pool_url}, {e}"
)
def get_authentication_sk(self, pool_config: PoolWalletConfig) -> Optional[PrivateKey]:
if pool_config.p2_singleton_puzzle_hash in self.authentication_keys:
return self.authentication_keys[pool_config.p2_singleton_puzzle_hash]
auth_sk: Optional[PrivateKey] = find_authentication_sk(self.all_root_sks, pool_config.owner_public_key)
if auth_sk is not None:
self.authentication_keys[pool_config.p2_singleton_puzzle_hash] = auth_sk
return auth_sk
async def update_pool_state(self) -> None:
config = load_config(self._root_path, "config.yaml")
pool_config_list: List[PoolWalletConfig] = load_pool_config(self._root_path)
for pool_config in pool_config_list:
p2_singleton_puzzle_hash = pool_config.p2_singleton_puzzle_hash
try:
authentication_sk: Optional[PrivateKey] = self.get_authentication_sk(pool_config)
if authentication_sk is None:
self.log.error(f"Could not find authentication sk for {p2_singleton_puzzle_hash}")
continue
add_auth_key(self._root_path, pool_config, authentication_sk.get_g1())
if p2_singleton_puzzle_hash not in self.pool_state:
self.pool_state[p2_singleton_puzzle_hash] = {
"p2_singleton_puzzle_hash": p2_singleton_puzzle_hash.hex(),
"points_found_since_start": 0,
"points_found_24h": [],
"points_acknowledged_since_start": 0,
"points_acknowledged_24h": [],
"next_farmer_update": 0,
"next_pool_info_update": 0,
"current_points": 0,
"current_difficulty": None,
"pool_errors_24h": [],
"valid_partials_since_start": 0,
"valid_partials_24h": [],
"invalid_partials_since_start": 0,
"invalid_partials_24h": [],
"insufficient_partials_since_start": 0,
"insufficient_partials_24h": [],
"stale_partials_since_start": 0,
"stale_partials_24h": [],
"missing_partials_since_start": 0,
"missing_partials_24h": [],
"authentication_token_timeout": None,
"plot_count": 0,
"pool_config": pool_config,
}
self.log.info(f"Added pool: {pool_config}")
else:
self.pool_state[p2_singleton_puzzle_hash]["pool_config"] = pool_config
pool_state = self.pool_state[p2_singleton_puzzle_hash]
# Skip state update when self pooling
if pool_config.pool_url == "":
continue
enforce_https = config["full_node"]["selected_network"] == "mainnet"
if enforce_https and not pool_config.pool_url.startswith("https://"):
self.log.error(f"Pool URLs must be HTTPS on mainnet {pool_config.pool_url}")
continue
# TODO: Improve error handling below, inform about unexpected failures
if time.time() >= pool_state["next_pool_info_update"]:
pool_state["next_pool_info_update"] = time.time() + UPDATE_POOL_INFO_INTERVAL
# Makes a GET request to the pool to get the updated information
pool_info = await self._pool_get_pool_info(pool_config)
if pool_info is not None and "error_code" not in pool_info:
pool_state["authentication_token_timeout"] = pool_info["authentication_token_timeout"]
# Only update the first time from GET /pool_info, gets updated from GET /farmer later
if pool_state["current_difficulty"] is None:
pool_state["current_difficulty"] = pool_info["minimum_difficulty"]
else:
pool_state["next_pool_info_update"] = time.time() + UPDATE_POOL_INFO_FAILURE_RETRY_INTERVAL
if time.time() >= pool_state["next_farmer_update"]:
pool_state["next_farmer_update"] = time.time() + UPDATE_POOL_FARMER_INFO_INTERVAL
authentication_token_timeout = pool_state["authentication_token_timeout"]
async def update_pool_farmer_info() -> Tuple[Optional[GetFarmerResponse], Optional[PoolErrorCode]]:
# Run a GET /farmer to see if the farmer is already known by the pool
response = await self._pool_get_farmer(
pool_config, authentication_token_timeout, authentication_sk
)
farmer_response: Optional[GetFarmerResponse] = None
error_code_response: Optional[PoolErrorCode] = None
if response is not None:
if "error_code" not in response:
farmer_response = GetFarmerResponse.from_json_dict(response)
if farmer_response is not None:
pool_state["current_difficulty"] = farmer_response.current_difficulty
pool_state["current_points"] = farmer_response.current_points
else:
try:
error_code_response = PoolErrorCode(response["error_code"])
except ValueError:
self.log.error(
f"Invalid error code received from the pool: {response['error_code']}"
)
return farmer_response, error_code_response
if authentication_token_timeout is not None:
farmer_info, error_code = await update_pool_farmer_info()
if error_code == PoolErrorCode.FARMER_NOT_KNOWN:
# Make the farmer known on the pool with a POST /farmer
owner_sk_and_index = find_owner_sk(self.all_root_sks, pool_config.owner_public_key)
assert owner_sk_and_index is not None
post_response = await self._pool_post_farmer(
pool_config, authentication_token_timeout, owner_sk_and_index[0]
)
if post_response is not None and "error_code" not in post_response:
self.log.info(
f"Welcome message from {pool_config.pool_url}: "
f"{post_response['welcome_message']}"
)
# Now we should be able to update the local farmer info
farmer_info, farmer_is_known = await update_pool_farmer_info()
if farmer_info is None and not farmer_is_known:
self.log.error("Failed to update farmer info after POST /farmer.")
# Update the farmer information on the pool if the payout instructions changed or if the
# signature is invalid (latter to make sure the pool has the correct authentication public key).
payout_instructions_update_required: bool = (
farmer_info is not None
and pool_config.payout_instructions.lower() != farmer_info.payout_instructions.lower()
)
if payout_instructions_update_required or error_code == PoolErrorCode.INVALID_SIGNATURE:
owner_sk_and_index = find_owner_sk(self.all_root_sks, pool_config.owner_public_key)
assert owner_sk_and_index is not None
await self._pool_put_farmer(
pool_config, authentication_token_timeout, owner_sk_and_index[0]
)
else:
self.log.warning(
f"No pool specific authentication_token_timeout has been set for {p2_singleton_puzzle_hash}"
f", check communication with the pool."
)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Exception in update_pool_state for {pool_config.pool_url}, {e} {tb}")
def get_public_keys(self) -> List[G1Element]:
return [child_sk.get_g1() for child_sk in self._private_keys]
def get_private_keys(self) -> List[PrivateKey]:
return self._private_keys
async def get_reward_targets(self, search_for_private_key: bool, max_ph_to_search: int = 500) -> Dict[str, Any]:
if search_for_private_key:
all_sks = await self.get_all_private_keys()
have_farmer_sk, have_pool_sk = False, False
search_addresses: List[bytes32] = [self.farmer_target, self.pool_target]
for sk, _ in all_sks:
found_addresses: Set[bytes32] = match_address_to_sk(sk, search_addresses, max_ph_to_search)
if not have_farmer_sk and self.farmer_target in found_addresses:
search_addresses.remove(self.farmer_target)
have_farmer_sk = True
if not have_pool_sk and self.pool_target in found_addresses:
search_addresses.remove(self.pool_target)
have_pool_sk = True
if have_farmer_sk and have_pool_sk:
break
return {
"farmer_target": self.farmer_target_encoded,
"pool_target": self.pool_target_encoded,
"have_farmer_sk": have_farmer_sk,
"have_pool_sk": have_pool_sk,
}
return {
"farmer_target": self.farmer_target_encoded,
"pool_target": self.pool_target_encoded,
}
def set_reward_targets(self, farmer_target_encoded: Optional[str], pool_target_encoded: Optional[str]) -> None:
with lock_and_load_config(self._root_path, "config.yaml") as config:
if farmer_target_encoded is not None:
self.farmer_target_encoded = farmer_target_encoded
self.farmer_target = decode_puzzle_hash(farmer_target_encoded)
config["farmer"]["xch_target_address"] = farmer_target_encoded
if pool_target_encoded is not None:
self.pool_target_encoded = pool_target_encoded
self.pool_target = decode_puzzle_hash(pool_target_encoded)
config["pool"]["xch_target_address"] = pool_target_encoded
save_config(self._root_path, "config.yaml", config)
async def set_payout_instructions(self, launcher_id: bytes32, payout_instructions: str) -> None:
for p2_singleton_puzzle_hash, pool_state_dict in self.pool_state.items():
if launcher_id == pool_state_dict["pool_config"].launcher_id:
with lock_and_load_config(self._root_path, "config.yaml") as config:
new_list = []
pool_list = config["pool"].get("pool_list", [])
if pool_list is not None:
for list_element in pool_list:
if hexstr_to_bytes(list_element["launcher_id"]) == bytes(launcher_id):
list_element["payout_instructions"] = payout_instructions
new_list.append(list_element)
config["pool"]["pool_list"] = new_list
save_config(self._root_path, "config.yaml", config)
# Force a GET /farmer which triggers the PUT /farmer if it detects the changed instructions
pool_state_dict["next_farmer_update"] = 0
return
self.log.warning(f"Launcher id: {launcher_id} not found")
async def generate_login_link(self, launcher_id: bytes32) -> Optional[str]:
for pool_state in self.pool_state.values():
pool_config: PoolWalletConfig = pool_state["pool_config"]
if pool_config.launcher_id == launcher_id:
authentication_sk: Optional[PrivateKey] = self.get_authentication_sk(pool_config)
if authentication_sk is None:
self.log.error(f"Could not find authentication sk for {pool_config.p2_singleton_puzzle_hash}")
continue
authentication_token_timeout = pool_state["authentication_token_timeout"]
if authentication_token_timeout is None:
self.log.error(
f"No pool specific authentication_token_timeout has been set for"
f"{pool_config.p2_singleton_puzzle_hash}, check communication with the pool."
)
return None
authentication_token = get_current_authentication_token(authentication_token_timeout)
message: bytes32 = std_hash(
AuthenticationPayload(
"get_login", pool_config.launcher_id, pool_config.target_puzzle_hash, authentication_token
)
)
signature: G2Element = AugSchemeMPL.sign(authentication_sk, message)
return (
pool_config.pool_url
+ f"/login?launcher_id={launcher_id.hex()}&authentication_token={authentication_token}"
f"&signature={bytes(signature).hex()}"
)
return None
async def get_harvesters(self, counts_only: bool = False) -> Dict[str, Any]:
harvesters: List[Dict[str, Any]] = []
for connection in self.server.get_connections(NodeType.HARVESTER):
self.log.debug(f"get_harvesters host: {connection.peer_info.host}, node_id: {connection.peer_node_id}")
receiver = self.plot_sync_receivers.get(connection.peer_node_id)
if receiver is not None:
harvesters.append(receiver.to_dict(counts_only))
else:
self.log.debug(
f"get_harvesters invalid peer: {connection.peer_info.host}, node_id: {connection.peer_node_id}"
)
return {"harvesters": harvesters}
def get_receiver(self, node_id: bytes32) -> Receiver:
receiver: Optional[Receiver] = self.plot_sync_receivers.get(node_id)
if receiver is None:
raise KeyError(f"Receiver missing for {node_id}")
return receiver
def check_missing_signage_points(
self, timestamp: uint64, new_signage_point: farmer_protocol.NewSignagePoint
) -> Optional[Tuple[uint64, uint32]]:
if self.prev_signage_point is None:
self.prev_signage_point = (timestamp, new_signage_point)
return None
prev_time, prev_sp = self.prev_signage_point
self.prev_signage_point = (timestamp, new_signage_point)
if prev_sp.challenge_hash == new_signage_point.challenge_hash:
missing_sps = new_signage_point.signage_point_index - prev_sp.signage_point_index - 1
if missing_sps > 0:
return timestamp, uint32(missing_sps)
return None
actual_sp_interval_seconds = float(timestamp - prev_time)
if actual_sp_interval_seconds <= 0:
return None
expected_sp_interval_seconds = self.constants.SUB_SLOT_TIME_TARGET / self.constants.NUM_SPS_SUB_SLOT
allowance = 1.6 # Should be chosen from the range (1 <= allowance < 2)
if actual_sp_interval_seconds < expected_sp_interval_seconds * allowance:
return None
skipped_sps = uint32(floor(actual_sp_interval_seconds / expected_sp_interval_seconds))
return timestamp, skipped_sps
async def _periodically_update_pool_state_task(self) -> None:
time_slept = 0
config_path: Path = config_path_for_filename(self._root_path, "config.yaml")
while not self._shut_down:
# Every time the config file changes, read it to check the pool state
stat_info = config_path.stat()
if stat_info.st_mtime > self.last_config_access_time:
# If we detect the config file changed, refresh private keys first just in case
self.all_root_sks = [sk for sk, _ in await self.get_all_private_keys()]
self.last_config_access_time = stat_info.st_mtime
await self.update_pool_state()
time_slept = 0
elif time_slept > 60:
await self.update_pool_state()
time_slept = 0
time_slept += 1
await asyncio.sleep(1)
async def _periodically_clear_cache_and_refresh_task(self) -> None:
time_slept = 0
refresh_slept = 0
while not self._shut_down:
try:
if time_slept > self.constants.SUB_SLOT_TIME_TARGET:
now = time.time()
removed_keys: List[bytes32] = []
for key, add_time in self.cache_add_time.items():
if now - float(add_time) > self.constants.SUB_SLOT_TIME_TARGET * 3:
self.sps.pop(key, None)
self.proofs_of_space.pop(key, None)
self.quality_str_to_identifiers.pop(key, None)
self.number_of_responses.pop(key, None)
removed_keys.append(key)
for key in removed_keys:
self.cache_add_time.pop(key, None)
time_slept = 0
log.debug(
f"Cleared farmer cache. Num sps: {len(self.sps)} {len(self.proofs_of_space)} "
f"{len(self.quality_str_to_identifiers)} {len(self.number_of_responses)}"
)
time_slept += 1
refresh_slept += 1
# Periodically refresh GUI to show the correct download/upload rate.
if refresh_slept >= 30:
self.state_changed("add_connection", {})
refresh_slept = 0
except Exception:
log.error(f"_periodically_clear_cache_and_refresh_task failed: {traceback.format_exc()}")
await asyncio.sleep(1)