Add support for multi node farmers (#15444)

* Extract `respond_signatures` logic

* Add `call_api_of_specific` method to `ChiaServer`

* Fix typehint for test fixture `farmer_one_harvester`

* Make test helper `add_dummy_connection` work with any node type

* Add `reply_types` to `request_signatures` of harvester

* Fix duplicate SP processing with multiple nodes

* Handle state in `request_signed_values`

* Add set typehint

* Add comment re `_process_respond_signatures()` usage

* Fix imports

* Fix wording to not confuse with SignedValues message
This commit is contained in:
Felix Brucker 2023-07-28 21:48:32 +02:00 committed by GitHub
parent de2ab88883
commit c5ca0978f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 347 additions and 165 deletions

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import json
import logging
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
import aiohttp
from blspy import AugSchemeMPL, G2Element, PrivateKey
@ -13,6 +13,7 @@ from chia.consensus.pot_iterations import calculate_iterations_quality, calculat
from chia.farmer.farmer import Farmer, increment_pool_stats, strip_old_entries
from chia.harvester.harvester_api import HarvesterAPI
from chia.protocols import farmer_protocol, harvester_protocol
from chia.protocols.farmer_protocol import DeclareProofOfSpace, SignedValues
from chia.protocols.harvester_protocol import (
PlotSyncDone,
PlotSyncPathList,
@ -27,7 +28,7 @@ from chia.protocols.pool_protocol import (
get_current_authentication_token,
)
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.server.outbound_message import NodeType, make_msg
from chia.server.outbound_message import Message, NodeType, make_msg
from chia.server.server import ssl_context_for_root
from chia.server.ws_connection import WSChiaConnection
from chia.ssl.create_ssl import get_mozilla_ca_crt
@ -412,8 +413,176 @@ class FarmerAPI:
@api_request()
async def respond_signatures(self, response: harvester_protocol.RespondSignatures) -> None:
request = self._process_respond_signatures(response)
if request is None:
return None
message: Message | None = None
if isinstance(request, DeclareProofOfSpace):
self.farmer.state_changed("proof", {"proof": request, "passed_filter": True})
message = make_msg(ProtocolMessageTypes.declare_proof_of_space, request)
if isinstance(request, SignedValues):
message = make_msg(ProtocolMessageTypes.signed_values, request)
await self.farmer.server.send_to_all([message], NodeType.FULL_NODE)
"""
FARMER PROTOCOL (FARMER <-> FULL NODE)
"""
@api_request()
async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignagePoint) -> None:
if new_signage_point.challenge_chain_sp not in self.farmer.sps:
self.farmer.sps[new_signage_point.challenge_chain_sp] = []
if new_signage_point in self.farmer.sps[new_signage_point.challenge_chain_sp]:
self.farmer.log.debug(f"Duplicate signage point {new_signage_point.signage_point_index}")
return
# Mark this SP as known, so we do not process it multiple times
self.farmer.sps[new_signage_point.challenge_chain_sp].append(new_signage_point)
try:
pool_difficulties: List[PoolDifficulty] = []
for p2_singleton_puzzle_hash, pool_dict in self.farmer.pool_state.items():
if pool_dict["pool_config"].pool_url == "":
# Self pooling
continue
if pool_dict["current_difficulty"] is None:
self.farmer.log.warning(
f"No pool specific difficulty has been set for {p2_singleton_puzzle_hash}, "
f"check communication with the pool, skipping this signage point, pool: "
f"{pool_dict['pool_config'].pool_url} "
)
continue
pool_difficulties.append(
PoolDifficulty(
pool_dict["current_difficulty"],
self.farmer.constants.POOL_SUB_SLOT_ITERS,
p2_singleton_puzzle_hash,
)
)
message = harvester_protocol.NewSignagePointHarvester(
new_signage_point.challenge_hash,
new_signage_point.difficulty,
new_signage_point.sub_slot_iters,
new_signage_point.signage_point_index,
new_signage_point.challenge_chain_sp,
pool_difficulties,
uint8(calculate_prefix_bits(self.farmer.constants, new_signage_point.peak_height)),
)
msg = make_msg(ProtocolMessageTypes.new_signage_point_harvester, message)
await self.farmer.server.send_to_all([msg], NodeType.HARVESTER)
except Exception as exception:
# Remove here, as we want to reprocess the SP should it be sent again
self.farmer.sps[new_signage_point.challenge_chain_sp].remove(new_signage_point)
raise exception
finally:
# Age out old 24h information for every signage point regardless
# of any failures. Note that this still lets old data remain if
# the client isn't receiving signage points.
cutoff_24h = time.time() - (24 * 60 * 60)
for p2_singleton_puzzle_hash, pool_dict in self.farmer.pool_state.items():
for key in ["points_found_24h", "points_acknowledged_24h"]:
if key not in pool_dict:
continue
pool_dict[key] = strip_old_entries(pairs=pool_dict[key], before=cutoff_24h)
now = uint64(int(time.time()))
self.farmer.cache_add_time[new_signage_point.challenge_chain_sp] = now
missing_signage_points = self.farmer.check_missing_signage_points(now, new_signage_point)
self.farmer.state_changed(
"new_signage_point",
{"sp_hash": new_signage_point.challenge_chain_sp, "missing_signage_points": missing_signage_points},
)
@api_request()
async def request_signed_values(self, full_node_request: farmer_protocol.RequestSignedValues) -> Optional[Message]:
if full_node_request.quality_string not in self.farmer.quality_str_to_identifiers:
self.farmer.log.error(f"Do not have quality string {full_node_request.quality_string}")
return None
(plot_identifier, challenge_hash, sp_hash, node_id) = self.farmer.quality_str_to_identifiers[
full_node_request.quality_string
]
request = harvester_protocol.RequestSignatures(
plot_identifier,
challenge_hash,
sp_hash,
[full_node_request.foliage_block_data_hash, full_node_request.foliage_transaction_block_hash],
)
response = await self.farmer.server.call_api_of_specific(HarvesterAPI.request_signatures, request, node_id)
if response is None or not isinstance(response, harvester_protocol.RespondSignatures):
self.farmer.log.error(f"Invalid response from harvester {node_id} for request_signatures: {response}")
return None
# Use the same processing as for unsolicited respond signature requests
signed_values = self._process_respond_signatures(response)
if signed_values is None:
return None
assert isinstance(signed_values, SignedValues)
return make_msg(ProtocolMessageTypes.signed_values, signed_values)
@api_request(peer_required=True)
async def farming_info(self, request: farmer_protocol.FarmingInfo, peer: WSChiaConnection) -> None:
self.farmer.state_changed(
"new_farming_info",
{
"farming_info": {
"challenge_hash": request.challenge_hash,
"signage_point": request.sp_hash,
"passed_filter": request.passed,
"proofs": request.proofs,
"total_plots": request.total_plots,
"timestamp": request.timestamp,
"node_id": peer.peer_node_id,
"lookup_time": request.lookup_time,
}
},
)
@api_request(peer_required=True)
async def respond_plots(self, _: harvester_protocol.RespondPlots, peer: WSChiaConnection) -> None:
self.farmer.log.warning(f"Respond plots came too late from: {peer.get_peer_logging()}")
@api_request(peer_required=True)
async def plot_sync_start(self, message: PlotSyncStart, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].sync_started(message)
@api_request(peer_required=True)
async def plot_sync_loaded(self, message: PlotSyncPlotList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_loaded(message)
@api_request(peer_required=True)
async def plot_sync_removed(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_removed(message)
@api_request(peer_required=True)
async def plot_sync_invalid(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_invalid(message)
@api_request(peer_required=True)
async def plot_sync_keys_missing(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_keys_missing(message)
@api_request(peer_required=True)
async def plot_sync_duplicates(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_duplicates(message)
@api_request(peer_required=True)
async def plot_sync_done(self, message: PlotSyncDone, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].sync_done(message)
def _process_respond_signatures(
self, response: harvester_protocol.RespondSignatures
) -> Optional[Union[DeclareProofOfSpace, SignedValues]]:
"""
There are two cases: receiving signatures for sps, or receiving signatures for the block.
Processing the responded signatures happens when receiving an unsolicited request for an SP or when receiving
the signature response for a block from a harvester.
"""
if response.sp_hash not in self.farmer.sps:
self.farmer.log.warning(f"Do not have challenge hash {response.challenge_hash}")
@ -495,7 +664,7 @@ class FarmerAPI:
pool_target = None
pool_target_signature = None
request = farmer_protocol.DeclareProofOfSpace(
return farmer_protocol.DeclareProofOfSpace(
response.challenge_hash,
challenge_chain_sp,
signage_point_index,
@ -507,11 +676,6 @@ class FarmerAPI:
pool_target,
pool_target_signature,
)
self.farmer.state_changed("proof", {"proof": request, "passed_filter": True})
msg = make_msg(ProtocolMessageTypes.declare_proof_of_space, request)
await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE)
return None
else:
# This is a response with block signatures
for sk in self.farmer.get_private_keys():
@ -553,150 +717,10 @@ class FarmerAPI:
assert AugSchemeMPL.verify(agg_pk, foliage_block_data_hash, foliage_agg_sig)
assert AugSchemeMPL.verify(agg_pk, foliage_transaction_block_hash, foliage_block_agg_sig)
request_to_nodes = farmer_protocol.SignedValues(
return farmer_protocol.SignedValues(
computed_quality_string,
foliage_agg_sig,
foliage_block_agg_sig,
)
msg = make_msg(ProtocolMessageTypes.signed_values, request_to_nodes)
await self.farmer.server.send_to_all([msg], NodeType.FULL_NODE)
"""
FARMER PROTOCOL (FARMER <-> FULL NODE)
"""
@api_request()
async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignagePoint) -> None:
try:
pool_difficulties: List[PoolDifficulty] = []
for p2_singleton_puzzle_hash, pool_dict in self.farmer.pool_state.items():
if pool_dict["pool_config"].pool_url == "":
# Self pooling
continue
if pool_dict["current_difficulty"] is None:
self.farmer.log.warning(
f"No pool specific difficulty has been set for {p2_singleton_puzzle_hash}, "
f"check communication with the pool, skipping this signage point, pool: "
f"{pool_dict['pool_config'].pool_url} "
)
continue
pool_difficulties.append(
PoolDifficulty(
pool_dict["current_difficulty"],
self.farmer.constants.POOL_SUB_SLOT_ITERS,
p2_singleton_puzzle_hash,
)
)
message = harvester_protocol.NewSignagePointHarvester(
new_signage_point.challenge_hash,
new_signage_point.difficulty,
new_signage_point.sub_slot_iters,
new_signage_point.signage_point_index,
new_signage_point.challenge_chain_sp,
pool_difficulties,
uint8(calculate_prefix_bits(self.farmer.constants, new_signage_point.peak_height)),
)
msg = make_msg(ProtocolMessageTypes.new_signage_point_harvester, message)
await self.farmer.server.send_to_all([msg], NodeType.HARVESTER)
if new_signage_point.challenge_chain_sp not in self.farmer.sps:
self.farmer.sps[new_signage_point.challenge_chain_sp] = []
finally:
# Age out old 24h information for every signage point regardless
# of any failures. Note that this still lets old data remain if
# the client isn't receiving signage points.
cutoff_24h = time.time() - (24 * 60 * 60)
for p2_singleton_puzzle_hash, pool_dict in self.farmer.pool_state.items():
for key in [
"points_found_24h",
"points_acknowledged_24h",
"pool_errors_24h",
]:
if key not in pool_dict:
continue
pool_dict[key] = strip_old_entries(pairs=pool_dict[key], before=cutoff_24h)
if new_signage_point in self.farmer.sps[new_signage_point.challenge_chain_sp]:
self.farmer.log.debug(f"Duplicate signage point {new_signage_point.signage_point_index}")
return
now = uint64(int(time.time()))
self.farmer.sps[new_signage_point.challenge_chain_sp].append(new_signage_point)
self.farmer.cache_add_time[new_signage_point.challenge_chain_sp] = now
missing_signage_points = self.farmer.check_missing_signage_points(now, new_signage_point)
self.farmer.state_changed(
"new_signage_point",
{"sp_hash": new_signage_point.challenge_chain_sp, "missing_signage_points": missing_signage_points},
)
@api_request()
async def request_signed_values(self, full_node_request: farmer_protocol.RequestSignedValues) -> None:
if full_node_request.quality_string not in self.farmer.quality_str_to_identifiers:
self.farmer.log.error(f"Do not have quality string {full_node_request.quality_string}")
return None
(plot_identifier, challenge_hash, sp_hash, node_id) = self.farmer.quality_str_to_identifiers[
full_node_request.quality_string
]
request = harvester_protocol.RequestSignatures(
plot_identifier,
challenge_hash,
sp_hash,
[full_node_request.foliage_block_data_hash, full_node_request.foliage_transaction_block_hash],
)
msg = make_msg(ProtocolMessageTypes.request_signatures, request)
await self.farmer.server.send_to_specific([msg], node_id)
@api_request(peer_required=True)
async def farming_info(self, request: farmer_protocol.FarmingInfo, peer: WSChiaConnection) -> None:
self.farmer.state_changed(
"new_farming_info",
{
"farming_info": {
"challenge_hash": request.challenge_hash,
"signage_point": request.sp_hash,
"passed_filter": request.passed,
"proofs": request.proofs,
"total_plots": request.total_plots,
"timestamp": request.timestamp,
"node_id": peer.peer_node_id,
"lookup_time": request.lookup_time,
}
},
)
@api_request(peer_required=True)
async def respond_plots(self, _: harvester_protocol.RespondPlots, peer: WSChiaConnection) -> None:
self.farmer.log.warning(f"Respond plots came too late from: {peer.get_peer_logging()}")
@api_request(peer_required=True)
async def plot_sync_start(self, message: PlotSyncStart, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].sync_started(message)
@api_request(peer_required=True)
async def plot_sync_loaded(self, message: PlotSyncPlotList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_loaded(message)
@api_request(peer_required=True)
async def plot_sync_removed(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_removed(message)
@api_request(peer_required=True)
async def plot_sync_invalid(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_invalid(message)
@api_request(peer_required=True)
async def plot_sync_keys_missing(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_keys_missing(message)
@api_request(peer_required=True)
async def plot_sync_duplicates(self, message: PlotSyncPathList, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].process_duplicates(message)
@api_request(peer_required=True)
async def plot_sync_done(self, message: PlotSyncDone, peer: WSChiaConnection) -> None:
await self.farmer.plot_sync_receivers[peer.peer_node_id].sync_done(message)
return None

View File

@ -269,7 +269,7 @@ class HarvesterAPI:
},
)
@api_request()
@api_request(reply_types=[ProtocolMessageTypes.respond_signatures])
async def request_signatures(self, request: harvester_protocol.RequestSignatures) -> Optional[Message]:
"""
The farmer requests a signature on the header hash, for one of the proofs that we found.

View File

@ -8,7 +8,7 @@ import traceback
from dataclasses import dataclass, field
from ipaddress import IPv4Network, IPv6Network, ip_network
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union, cast
from aiohttp import (
ClientResponseError,
@ -39,6 +39,7 @@ from chia.util.errors import Err, ProtocolError
from chia.util.ints import uint16
from chia.util.network import WebServer, is_in_network, is_localhost, is_trusted_peer
from chia.util.ssl_check import verify_ssl_certs_and_keys
from chia.util.streamable import Streamable
max_message_size = 50 * 1024 * 1024 # 50MB
@ -598,6 +599,15 @@ class ChiaServer:
for message in messages:
await connection.send_message(message)
async def call_api_of_specific(
self, request_method: Callable[..., Awaitable[Optional[Message]]], message_data: Streamable, node_id: bytes32
) -> Optional[Any]:
if node_id in self.all_connections:
connection = self.all_connections[node_id]
return await connection.call_api(request_method, message_data)
return None
def get_connections(
self, node_type: Optional[NodeType] = None, *, outbound: Optional[bool] = None
) -> List[WSChiaConnection]:

View File

@ -53,7 +53,6 @@ from chia.util.ints import uint16, uint64
from chia.util.keychain import Keychain
from chia.util.task_timing import main as task_instrumentation_main
from chia.util.task_timing import start_task_instrumentation, stop_task_instrumentation
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_node import WalletNode
from chia.wallet.wallet_node_api import WalletNodeAPI
from tests.core.data_layer.util import ChiaRoot
@ -664,8 +663,11 @@ async def farmer_one_harvester_simulator_wallet(
yield harvester_services[0], farmer_service, nodes[0], wallets[0], bt
FarmerOneHarvester = Tuple[List[Service[Harvester, HarvesterAPI]], Service[Farmer, FarmerAPI], BlockTools]
@pytest_asyncio.fixture(scope="function")
async def farmer_one_harvester(tmp_path: Path, get_b_tools: BlockTools) -> AsyncIterator[Tuple[List[Service], Service]]:
async def farmer_one_harvester(tmp_path: Path, get_b_tools: BlockTools) -> AsyncIterator[FarmerOneHarvester]:
async for _ in setup_farmer_multi_harvester(get_b_tools, 1, tmp_path, get_b_tools.constants, start_services=True):
yield _

View File

@ -2,7 +2,8 @@ from __future__ import annotations
import asyncio
import logging
from typing import Tuple
from pathlib import Path
from typing import Set, Tuple
import aiohttp
from cryptography import x509
@ -12,7 +13,7 @@ from cryptography.hazmat.primitives import hashes, serialization
from chia.protocols.shared_protocol import capabilities, protocol_version
from chia.server.outbound_message import NodeType
from chia.server.server import ChiaServer, ssl_context_for_client
from chia.server.ssl_context import chia_ssl_ca_paths
from chia.server.ssl_context import chia_ssl_ca_paths, private_ssl_ca_paths
from chia.server.ws_connection import WSChiaConnection
from chia.simulator.time_out_assert import adjusted_timeout, time_out_assert
from chia.ssl.create_ssl import generate_ca_signed_cert
@ -39,16 +40,34 @@ async def disconnect_all_and_reconnect(server: ChiaServer, reconnect_to: ChiaSer
async def add_dummy_connection(
server: ChiaServer, self_hostname: str, dummy_port: int, type: NodeType = NodeType.FULL_NODE
) -> Tuple[asyncio.Queue, bytes32]:
wsc, peer_id = await add_dummy_connection_wsc(server, self_hostname, dummy_port, type)
return wsc.incoming_queue, peer_id
async def add_dummy_connection_wsc(
server: ChiaServer, self_hostname: str, dummy_port: int, type: NodeType = NodeType.FULL_NODE
) -> Tuple[WSChiaConnection, bytes32]:
timeout = aiohttp.ClientTimeout(total=10)
session = aiohttp.ClientSession(timeout=timeout)
config = load_config(server.root_path, "config.yaml")
chia_ca_crt_path, chia_ca_key_path = chia_ssl_ca_paths(server.root_path, config)
ca_crt_path: Path
ca_key_path: Path
authenticated_client_types: Set[NodeType] = {NodeType.HARVESTER}
if type in authenticated_client_types:
private_ca_crt_path, private_ca_key_path = private_ssl_ca_paths(server.root_path, config)
ca_crt_path = private_ca_crt_path
ca_key_path = private_ca_key_path
else:
chia_ca_crt_path, chia_ca_key_path = chia_ssl_ca_paths(server.root_path, config)
ca_crt_path = chia_ca_crt_path
ca_key_path = chia_ca_key_path
dummy_crt_path = server.root_path / "dummy.crt"
dummy_key_path = server.root_path / "dummy.key"
generate_ca_signed_cert(
chia_ca_crt_path.read_bytes(), chia_ca_key_path.read_bytes(), dummy_crt_path, dummy_key_path
)
ssl_context = ssl_context_for_client(chia_ca_crt_path, chia_ca_key_path, dummy_crt_path, dummy_key_path)
generate_ca_signed_cert(ca_crt_path.read_bytes(), ca_key_path.read_bytes(), dummy_crt_path, dummy_key_path)
ssl_context = ssl_context_for_client(ca_crt_path, ca_key_path, dummy_crt_path, dummy_key_path)
pem_cert = x509.load_pem_x509_certificate(dummy_crt_path.read_bytes(), default_backend())
der_cert = x509.load_der_x509_certificate(pem_cert.public_bytes(serialization.Encoding.DER), default_backend())
peer_id = bytes32(der_cert.fingerprint(hashes.SHA256()))
@ -68,10 +87,10 @@ async def add_dummy_connection(
30,
local_capabilities_for_handshake=capabilities,
)
await wsc.perform_handshake(server._network_id, protocol_version, dummy_port, NodeType.FULL_NODE)
await wsc.perform_handshake(server._network_id, protocol_version, dummy_port, type)
if wsc.incoming_message_task is not None:
wsc.incoming_message_task.cancel()
return wsc.incoming_queue, peer_id
return wsc, peer_id
async def connect_and_get_peer(server_1: ChiaServer, server_2: ChiaServer, self_hostname: str) -> WSChiaConnection:

View File

View File

@ -0,0 +1,3 @@
from __future__ import annotations
checkout_blocks_and_plots = True

View File

@ -0,0 +1,96 @@
from __future__ import annotations
from asyncio import Task, create_task, gather, sleep
from typing import Any, Coroutine, Optional, TypeVar
import pytest
from chia.farmer.farmer_api import FarmerAPI
from chia.protocols import farmer_protocol
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.server.outbound_message import Message, NodeType
from chia.util.hash import std_hash
from chia.util.ints import uint8, uint32, uint64
from tests.conftest import FarmerOneHarvester
from tests.connection_utils import add_dummy_connection, add_dummy_connection_wsc
from tests.util.network_protocol_data import new_signage_point, request_signed_values, respond_signatures, signed_values
T = TypeVar("T")
async def begin_task(coro: Coroutine[Any, Any, T]) -> Task[T]:
"""Awaitable function that adds a coroutine to the event loop and sets it running."""
task = create_task(coro)
await sleep(0)
return task
@pytest.mark.asyncio
async def test_farmer_ignores_concurrent_duplicate_signage_points(
farmer_one_harvester: FarmerOneHarvester, self_hostname: str
) -> None:
_, farmer_service, _ = farmer_one_harvester
farmer_api: FarmerAPI = farmer_service._api
farmer_server = farmer_service._server
incoming_queue, peer_id = await add_dummy_connection(farmer_server, self_hostname, 12312, NodeType.HARVESTER)
# Consume the handshake
response = (await incoming_queue.get()).type
assert ProtocolMessageTypes(response).name == "harvester_handshake"
sp = farmer_protocol.NewSignagePoint(
std_hash(b"1"), std_hash(b"2"), std_hash(b"3"), uint64(1), uint64(1000000), uint8(2), uint32(1)
)
await gather(
farmer_api.new_signage_point(sp),
farmer_api.new_signage_point(sp),
farmer_api.new_signage_point(sp),
)
# Wait a bit for the queue to fill
await sleep(1)
assert incoming_queue.qsize() == 1
response = (await incoming_queue.get()).type
assert ProtocolMessageTypes(response).name == "new_signage_point_harvester"
@pytest.mark.asyncio
async def test_farmer_responds_with_signed_values(farmer_one_harvester: FarmerOneHarvester, self_hostname: str) -> None:
_, farmer_service, _ = farmer_one_harvester
farmer_api: FarmerAPI = farmer_service._api
farmer_server = farmer_service._server
dummy_wsc, peer_id = await add_dummy_connection_wsc(farmer_server, self_hostname, 12312, NodeType.HARVESTER)
incoming_queue = dummy_wsc.incoming_queue
# Consume the handshake
response = (await incoming_queue.get()).type
assert ProtocolMessageTypes(response).name == "harvester_handshake"
# Mark our dummy harvester as the harvester which found a proof
farmer_service._node.quality_str_to_identifiers[request_signed_values.quality_string] = (
"plot_1",
new_signage_point.challenge_hash,
new_signage_point.challenge_chain_sp,
peer_id,
)
setattr(farmer_api, "_process_respond_signatures", lambda res: signed_values)
signed_values_task: Task[Optional[Message]] = await begin_task(
farmer_api.request_signed_values(request_signed_values)
)
# Wait a bit for the dummy harvester to receive the signature request and respond with a dummy signature
await sleep(1)
assert incoming_queue.qsize() == 1
request_signatures_message = await incoming_queue.get()
assert ProtocolMessageTypes(request_signatures_message.type).name == "request_signatures"
await dummy_wsc.outgoing_queue.put(
Message(
uint8(ProtocolMessageTypes.respond_signatures.value),
request_signatures_message.id,
bytes(respond_signatures),
)
)
signed_values_message = await signed_values_task
assert signed_values_message is not None
assert ProtocolMessageTypes(signed_values_message.type).name == "signed_values"
assert signed_values_message.data == bytes(signed_values)

View File

@ -9,7 +9,7 @@ from packaging.version import Version
from chia.cmds.init_funcs import chia_full_version_str
from chia.full_node.full_node_api import FullNodeAPI
from chia.protocols.full_node_protocol import RequestTransaction
from chia.protocols.full_node_protocol import RejectBlock, RequestBlock, RequestTransaction
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.protocols.shared_protocol import Error, protocol_version
from chia.protocols.wallet_protocol import RejectHeaderRequest
@ -172,3 +172,31 @@ async def test_error_receive(
await wallet_connection.outgoing_queue.put(message)
await time_out_assert(10, error_log_found, True, full_node_connection)
await time_out_assert(10, error_log_found, True, wallet_connection)
@pytest.mark.asyncio
async def test_call_api_of_specific(
two_nodes: Tuple[FullNodeAPI, FullNodeAPI, ChiaServer, ChiaServer, BlockTools], self_hostname: str
) -> None:
_, _, server_1, server_2, _ = two_nodes
assert await server_1.start_client(PeerInfo(self_hostname, uint16(server_2._port)), None)
message = await server_1.call_api_of_specific(
FullNodeAPI.request_block, RequestBlock(uint32(42), False), server_2.node_id
)
assert message is not None
assert isinstance(message, RejectBlock)
@pytest.mark.asyncio
async def test_call_api_of_specific_for_missing_peer(
two_nodes: Tuple[FullNodeAPI, FullNodeAPI, ChiaServer, ChiaServer, BlockTools]
) -> None:
_, _, server_1, server_2, _ = two_nodes
message = await server_1.call_api_of_specific(
FullNodeAPI.request_block, RequestBlock(uint32(42), False), server_2.node_id
)
assert message is None