server|tests: Delay the harvester -> farmer connection in some tests (#12862)

* Introduce `Service.add_peer`

* Delay the harvester/farmer connection in some tests to avoid flakiness
This commit is contained in:
dustinface 2022-08-10 11:57:34 +02:00 committed by GitHub
parent 2a51d7deaf
commit 7c9fb654a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 48 additions and 31 deletions

View File

@ -24,11 +24,11 @@ def create_harvester_service(
root_path: pathlib.Path,
config: Dict,
consensus_constants: ConsensusConstants,
farmer_peer: Optional[PeerInfo],
connect_to_daemon: bool = True,
) -> Service:
service_config = config[SERVICE_NAME]
connect_peers = [PeerInfo(service_config["farmer_peer"]["host"], service_config["farmer_peer"]["port"])]
overrides = service_config["network_overrides"]["constants"][service_config["selected_network"]]
updated_constants = consensus_constants.replace_str_to_bytes(**overrides)
@ -47,7 +47,7 @@ def create_harvester_service(
advertised_port=service_config["port"],
service_name=SERVICE_NAME,
server_listen_ports=[service_config["port"]],
connect_peers=connect_peers,
connect_peers=[] if farmer_peer is None else [farmer_peer],
auth_connect_peers=True,
network_id=network_id,
rpc_info=rpc_info,
@ -65,7 +65,8 @@ async def async_main() -> int:
logging_config=service_config["logging"],
root_path=DEFAULT_ROOT_PATH,
)
service = create_harvester_service(DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS)
farmer_peer = PeerInfo(service_config["farmer_peer"]["host"], service_config["farmer_peer"]["port"])
service = create_harvester_service(DEFAULT_ROOT_PATH, config, DEFAULT_CONSTANTS, farmer_peer)
await service.setup_process_global_state()
await service.run()

View File

@ -38,6 +38,10 @@ T = TypeVar("T")
RpcInfo = Tuple[type, int]
class ServiceException(Exception):
pass
class Service:
def __init__(
self,
@ -114,7 +118,6 @@ class Service:
else:
self._log.warning(f"No set_server method for {service_name}")
self._connect_peers = connect_peers
self._auth_connect_peers = auth_connect_peers
self._upnp_ports = upnp_ports
self._server_listen_ports = server_listen_ports
@ -127,7 +130,7 @@ class Service:
self._on_connect_callback = on_connect_callback
self._advertised_port = advertised_port
self._reconnect_tasks: List[asyncio.Task] = []
self._reconnect_tasks: Dict[PeerInfo, Optional[asyncio.Task]] = {peer: None for peer in connect_peers}
self.upnp: Optional[UPnP] = None
async def start(self, **kwargs) -> None:
@ -155,10 +158,9 @@ class Service:
await self._server.start_server(self._on_connect_callback)
self._advertised_port = self._server.get_port()
self._reconnect_tasks = [
start_reconnect_task(self._server, _, self._log, self._auth_connect_peers, self.config.get("prefer_ipv6"))
for _ in self._connect_peers
]
for peer in self._reconnect_tasks.keys():
self.add_peer(peer)
self._log.info(f"Started {self._service_name} service on network_id: {self._network_id}")
self._rpc_close_task = None
@ -185,6 +187,14 @@ class Service:
self._log.error(f"{self._service_name}: already running")
raise ValueError(f"{self._service_name}: already running") from e
def add_peer(self, peer: PeerInfo) -> None:
if self._reconnect_tasks.get(peer) is not None:
raise ServiceException(f"Peer {peer} already added")
self._reconnect_tasks[peer] = start_reconnect_task(
self._server, peer, self._log, self._auth_connect_peers, self.config.get("prefer_ipv6")
)
async def setup_process_global_state(self) -> None:
# Being async forces this to be run from within an active event loop as is
# needed for the signal handler setup.
@ -231,8 +241,10 @@ class Service:
self.upnp.release(port)
self._log.info("Cancelling reconnect task")
for _ in self._reconnect_tasks:
_.cancel()
for _ in self._reconnect_tasks.values():
if _ is not None:
_.cancel()
self._reconnect_tasks.clear()
self._log.info("Closing connections")
self._server.close_all()
self._node._close()

View File

@ -4,6 +4,7 @@ import pytest
from chia.farmer.farmer import Farmer
from chia.simulator.time_out_assert import time_out_assert
from chia.types.peer_info import PeerInfo
from chia.util.keychain import generate_mnemonic
@ -57,6 +58,7 @@ async def test_harvester_handshake(farmer_one_harvester_not_started):
# Start both services and wait a bit
await farmer_service.start()
await harvester_service.start()
harvester_service.add_peer(PeerInfo(str(farmer_service.self_hostname), farmer_service._server.get_port()))
# Handshake task should be started but the handshake should not be done
await time_out_assert(5, handshake_task_active, True)
assert not await handshake_done()
@ -72,6 +74,7 @@ async def test_harvester_handshake(farmer_one_harvester_not_started):
assert len(harvester.plot_manager.farmer_public_keys) == 0
# Re-start the harvester and make sure the handshake task gets started but the handshake still doesn't go through
await harvester_service.start()
harvester_service.add_peer(PeerInfo(str(farmer_service.self_hostname), farmer_service._server.get_port()))
await time_out_assert(5, handshake_task_active, True)
assert not await handshake_done()
# Stop the farmer and make sure the handshake_task doesn't block the shutdown

View File

@ -301,7 +301,9 @@ async def environment(
harvester_services, farmer_service, bt = farmer_two_harvester_not_started
farmer: Farmer = farmer_service._node
await farmer_service.start()
harvesters: List[Harvester] = [await start_harvester_service(service) for service in harvester_services]
harvesters: List[Harvester] = [
await start_harvester_service(service, farmer_service) for service in harvester_services
]
for harvester in harvesters:
# Remove default plot directory for this tests
with lock_and_load_config(harvester.root_path, "config.yaml") as config:
@ -501,7 +503,7 @@ async def test_harvester_restart(environment: Environment) -> None:
assert not env.harvesters[0].plot_manager._refreshing_enabled
assert not env.harvesters[0].plot_manager.needs_refresh()
# Start the harvester, wait for the handshake and make sure the receiver comes back
await env.harvester_services[0].start()
await start_harvester_service(env.harvester_services[0], env.farmer_service)
await time_out_assert(5, env.handshake_done, True, 0)
assert len(env.farmer.plot_sync_receivers) == 2
# Remove the duplicates dir to avoid conflicts with the original plots

View File

@ -240,7 +240,9 @@ async def create_test_runner(
await farmer_service.start()
farmer: Farmer = farmer_service._node
assert len(farmer.plot_sync_receivers) == 0
harvesters: List[Harvester] = [await start_harvester_service(service) for service in harvester_services]
harvesters: List[Harvester] = [
await start_harvester_service(service, farmer_service) for service in harvester_services
]
for receiver in farmer.plot_sync_receivers.values():
receiver.simulate_error = 0 # type: ignore[attr-defined]
receiver.message_counter = 0 # type: ignore[attr-defined]

View File

@ -10,6 +10,7 @@ from chia.server.start_service import Service
from chia.server.ws_connection import Message, NodeType
from chia.simulator.time_out_assert import time_out_assert
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.peer_info import PeerInfo
from chia.util.ints import uint64
@ -33,11 +34,12 @@ def plot_sync_identifier(current_sync_id: uint64, message_id: uint64) -> PlotSyn
return PlotSyncIdentifier(uint64(int(time.time())), current_sync_id, message_id)
async def start_harvester_service(harvester_service: Service) -> Harvester:
async def start_harvester_service(harvester_service: Service, farmer_service: Service) -> Harvester:
# Set the `last_refresh_time` of the plot manager to avoid initial plot loading
harvester: Harvester = harvester_service._node
harvester.plot_manager.last_refresh_time = time.time()
await harvester_service.start()
harvester_service.add_peer(PeerInfo(str(farmer_service.self_hostname), farmer_service._server.get_port()))
harvester.plot_manager.stop_refreshing()
assert harvester.plot_sync_sender._sync_id == 0

View File

@ -7,6 +7,7 @@ from chia.consensus.constants import ConsensusConstants
from chia.full_node.full_node_api import FullNodeAPI
from chia.protocols.shared_protocol import Capability
from chia.server.start_service import Service
from chia.types.peer_info import PeerInfo
from chia.util.hash import std_hash
from chia.util.ints import uint16, uint32
from chia.simulator.block_tools import BlockTools, create_block_tools_async, test_constants
@ -226,24 +227,21 @@ async def setup_farmer_multi_harvester(
start_services: bool,
) -> AsyncIterator[Tuple[List[Service], Service, BlockTools]]:
if start_services:
farmer_port = uint16(0)
else:
# If we don't start the services, we won't be able to get the farmer port, which the harvester needs
farmer_port = uint16(find_available_listen_port("farmer_server"))
node_iterators = [
setup_farmer(
block_tools,
temp_dir / "farmer",
block_tools.config["self_hostname"],
consensus_constants,
port=farmer_port,
port=uint16(0),
start_service=start_services,
)
]
farmer_service = await node_iterators[0].__anext__()
farmer_port = farmer_service._server._port
if start_services:
farmer_peer = PeerInfo(block_tools.config["self_hostname"], farmer_service._server._port)
else:
farmer_peer = None
for i in range(0, harvester_count):
root_path: Path = temp_dir / f"harvester_{i}"
@ -251,8 +249,7 @@ async def setup_farmer_multi_harvester(
setup_harvester(
block_tools,
root_path,
block_tools.config["self_hostname"],
farmer_port,
farmer_peer,
consensus_constants,
start_service=start_services,
)
@ -329,8 +326,7 @@ async def setup_full_system(
harvester_iter = setup_harvester(
shared_b_tools,
shared_b_tools.root_path / "harvester",
shared_b_tools.config["self_hostname"],
farmer_service._server.get_port(),
PeerInfo(shared_b_tools.config["self_hostname"], farmer_service._server.get_port()),
consensus_constants,
)

View File

@ -19,6 +19,7 @@ from chia.server.start_wallet import create_wallet_service
from chia.simulator.block_tools import BlockTools
from chia.simulator.start_simulator import create_full_node_simulator_service
from chia.timelord.timelord_launcher import kill_processes, spawn_process
from chia.types.peer_info import PeerInfo
from chia.util.bech32m import encode_puzzle_hash
from chia.util.config import lock_and_load_config, save_config
from chia.util.ints import uint16
@ -222,8 +223,7 @@ async def setup_wallet_node(
async def setup_harvester(
b_tools: BlockTools,
root_path: Path,
self_hostname: str,
farmer_port: uint16,
farmer_peer: Optional[PeerInfo],
consensus_constants: ConsensusConstants,
start_service: bool = True,
):
@ -235,14 +235,13 @@ async def setup_harvester(
config["harvester"]["selected_network"] = "testnet0"
config["harvester"]["port"] = 0
config["harvester"]["rpc_port"] = 0
config["harvester"]["farmer_peer"]["host"] = self_hostname
config["harvester"]["farmer_peer"]["port"] = int(farmer_port)
config["harvester"]["plot_directories"] = [str(b_tools.plot_dir.resolve())]
save_config(root_path, "config.yaml", config)
service = create_harvester_service(
root_path,
config,
consensus_constants,
farmer_peer=farmer_peer,
connect_to_daemon=False,
)