Refactor daemon simulation test (#16081)

This commit is contained in:
Earle Lowe 2023-10-04 10:16:13 -07:00 committed by GitHub
parent 4b96f16b2c
commit 5ea14326c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 320 additions and 278 deletions

View File

@ -173,7 +173,7 @@ class WebSocketServer:
@asynccontextmanager
async def run(self) -> AsyncIterator[None]:
self.log.info("Starting Daemon Server")
self.log.info(f"Starting Daemon Server ({self.self_hostname}:{self.daemon_port})")
# Note: the minimum_version has been already set to TLSv1_2
# in ssl_context_for_server()

View File

@ -74,6 +74,9 @@ async def get_average_block_time(
assert newer_block.timestamp is not None and older_block.timestamp is not None
if newer_block.height == older_block.height: # small chain not long enough to have a block in between
return None
average_block_time = uint32(
(newer_block.timestamp - older_block.timestamp) / (newer_block.height - older_block.height)
)

View File

@ -2,8 +2,9 @@ from __future__ import annotations
import logging
from contextlib import AsyncExitStack, ExitStack, asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple
from typing import AsyncIterator, Dict, List, Optional, Tuple, Union
from chia.consensus.constants import ConsensusConstants
from chia.daemon.server import WebSocketServer
@ -13,6 +14,7 @@ from chia.full_node.full_node import FullNode
from chia.full_node.full_node_api import FullNodeAPI
from chia.harvester.harvester import Harvester
from chia.harvester.harvester_api import HarvesterAPI
from chia.introducer.introducer_api import IntroducerAPI
from chia.protocols.shared_protocol import Capability
from chia.server.server import ChiaServer
from chia.server.start_service import Service
@ -48,6 +50,18 @@ SimulatorsAndWalletsServices = Tuple[
]
@dataclass(frozen=True)
class FullSystem:
node_1: Union[Service[FullNode, FullNodeAPI], Service[FullNode, FullNodeSimulator]]
node_2: Union[Service[FullNode, FullNodeAPI], Service[FullNode, FullNodeSimulator]]
harvester: Harvester
farmer: Farmer
introducer: IntroducerAPI
timelord: Service[Timelord, TimelordAPI]
timelord_bluebox: Service[Timelord, TimelordAPI]
daemon: WebSocketServer
def cleanup_keyring(keyring: TempKeyring) -> None:
keyring.cleanup()
@ -124,7 +138,7 @@ async def setup_simulators_and_wallets(
*,
key_seed: Optional[bytes32] = None,
initial_num_public_keys: int = 5,
db_version: int = 1,
db_version: int = 2,
config_overrides: Optional[Dict[str, int]] = None,
disable_capabilities: Optional[List[Capability]] = None,
) -> AsyncIterator[Tuple[List[FullNodeSimulator], List[Tuple[WalletNode, ChiaServer]], BlockTools]]:
@ -164,7 +178,7 @@ async def setup_simulators_and_wallets_service(
*,
key_seed: Optional[bytes32] = None,
initial_num_public_keys: int = 5,
db_version: int = 1,
db_version: int = 2,
config_overrides: Optional[Dict[str, int]] = None,
disable_capabilities: Optional[List[Capability]] = None,
) -> AsyncIterator[
@ -299,44 +313,13 @@ async def setup_full_system(
shared_b_tools: BlockTools,
b_tools: Optional[BlockTools] = None,
b_tools_1: Optional[BlockTools] = None,
db_version: int = 1,
) -> AsyncIterator[
Tuple[Any, Any, Harvester, Farmer, Any, Service[Timelord, TimelordAPI], object, object, Any, ChiaServer]
]:
db_version: int = 2,
) -> AsyncIterator[FullSystem]:
with TempKeyring(populate=True) as keychain1, TempKeyring(populate=True) as keychain2:
async with setup_full_system_inner(
b_tools, b_tools_1, False, consensus_constants, db_version, keychain1, keychain2, shared_b_tools
) as (_, ret):
yield ret
@asynccontextmanager
async def setup_full_system_connect_to_deamon(
consensus_constants: ConsensusConstants,
shared_b_tools: BlockTools,
b_tools: Optional[BlockTools] = None,
b_tools_1: Optional[BlockTools] = None,
db_version: int = 1,
) -> AsyncIterator[
Tuple[
Any,
Any,
Harvester,
Farmer,
Any,
Service[Timelord, TimelordAPI],
object,
object,
Any,
ChiaServer,
Optional[WebSocketServer],
],
]:
with TempKeyring(populate=True) as keychain1, TempKeyring(populate=True) as keychain2:
async with setup_full_system_inner(
b_tools, b_tools_1, True, consensus_constants, db_version, keychain1, keychain2, shared_b_tools
) as (daemon_ws, ret):
yield ret + (daemon_ws,)
) as full_system:
yield full_system
@asynccontextmanager
@ -349,77 +332,94 @@ async def setup_full_system_inner(
keychain1: Keychain,
keychain2: Keychain,
shared_b_tools: BlockTools,
) -> AsyncIterator[
Tuple[
Optional[WebSocketServer],
Tuple[Any, Any, Harvester, Farmer, Any, Service[Timelord, TimelordAPI], object, object, Any, ChiaServer],
]
]:
) -> AsyncIterator[FullSystem]:
if b_tools is None:
b_tools = await create_block_tools_async(constants=consensus_constants, keychain=keychain1)
if b_tools_1 is None:
b_tools_1 = await create_block_tools_async(constants=consensus_constants, keychain=keychain2)
self_hostname = shared_b_tools.config["self_hostname"]
async with AsyncExitStack() as async_exit_stack:
vdf1_port = uint16(find_available_listen_port("vdf1"))
vdf2_port = uint16(find_available_listen_port("vdf2"))
await async_exit_stack.enter_async_context(
setup_vdf_clients(bt=b_tools, self_hostname=self_hostname, port=vdf1_port)
)
await async_exit_stack.enter_async_context(
setup_vdf_client(bt=shared_b_tools, self_hostname=self_hostname, port=vdf2_port)
)
daemon_ws = await async_exit_stack.enter_async_context(setup_daemon(btools=b_tools))
# Start the introducer first so we can find out the port, and use that for the nodes
introducer_service = await async_exit_stack.enter_async_context(setup_introducer(shared_b_tools, uint16(0)))
introducer = introducer_service._api
introducer_server = introducer_service._node.server
# Then start the full node so we can use the port for the farmer and timelord
nodes = [
await async_exit_stack.enter_async_context(
setup_full_node(
consensus_constants,
f"blockchain_test_{i}.db",
shared_b_tools.config["self_hostname"],
b_tools if i == 0 else b_tools_1,
introducer_server._port,
False,
10,
True,
connect_to_daemon=connect_to_daemon,
db_version=db_version,
)
node_1 = await async_exit_stack.enter_async_context(
setup_full_node(
consensus_constants,
"blockchain_test_1.db",
self_hostname=self_hostname,
local_bt=b_tools,
introducer_port=introducer_server._port,
simulator=False,
send_uncompact_interval=0,
sanitize_weight_proof_only=False,
connect_to_daemon=connect_to_daemon,
db_version=db_version,
)
for i in range(2)
]
node_apis = [fni._api for fni in nodes]
full_node_0_port = node_apis[0].full_node.server.get_port()
)
node_2 = await async_exit_stack.enter_async_context(
setup_full_node(
consensus_constants,
"blockchain_test_2.db",
self_hostname=self_hostname,
local_bt=b_tools_1,
introducer_port=introducer_server._port,
simulator=False,
send_uncompact_interval=10,
sanitize_weight_proof_only=True,
connect_to_daemon=False, # node 2 doesn't connect to the daemon
db_version=db_version,
)
)
farmer_service = await async_exit_stack.enter_async_context(
setup_farmer(
shared_b_tools,
shared_b_tools.root_path / "harvester",
shared_b_tools.config["self_hostname"],
consensus_constants,
full_node_0_port,
self_hostname=self_hostname,
consensus_constants=consensus_constants,
full_node_port=node_1._api.full_node.server.get_port(),
)
)
harvester_service = await async_exit_stack.enter_async_context(
setup_harvester(
shared_b_tools,
shared_b_tools.root_path / "harvester",
UnresolvedPeerInfo(shared_b_tools.config["self_hostname"], farmer_service._server.get_port()),
UnresolvedPeerInfo(self_hostname, farmer_service._server.get_port()),
consensus_constants,
)
)
harvester = harvester_service._node
vdf1_port = uint16(find_available_listen_port("vdf1"))
vdf2_port = uint16(find_available_listen_port("vdf2"))
timelord = await async_exit_stack.enter_async_context(
setup_timelord(
full_node_0_port,
False,
consensus_constants,
b_tools.config,
b_tools.root_path,
full_node_port=node_1._api.full_node.server.get_port(),
sanitizer=False,
consensus_constants=consensus_constants,
config=b_tools.config,
root_path=b_tools.root_path,
vdf_port=vdf1_port,
)
)
timelord_bluebox_service = await async_exit_stack.enter_async_context(
setup_timelord(
uint16(1000),
node_2._api.full_node.server.get_port(),
True,
consensus_constants,
b_tools_1.config,
@ -433,25 +433,15 @@ async def setup_full_system_inner(
return count
await time_out_assert_custom_interval(10, 3, num_connections, 1)
vdf_clients = await async_exit_stack.enter_async_context(
setup_vdf_clients(shared_b_tools, shared_b_tools.config["self_hostname"], vdf1_port)
full_system = FullSystem(
node_1=node_1,
node_2=node_2,
harvester=harvester,
farmer=farmer_service._node,
introducer=introducer,
timelord=timelord,
timelord_bluebox=timelord_bluebox_service,
daemon=daemon_ws,
)
vdf_bluebox_clients = await async_exit_stack.enter_async_context(
setup_vdf_client(shared_b_tools, shared_b_tools.config["self_hostname"], vdf2_port)
)
timelord_bluebox = timelord_bluebox_service._api
timelord_bluebox_server = timelord_bluebox_service._node.server
ret = (
node_apis[0],
node_apis[1],
harvester,
farmer_service._node,
introducer,
timelord,
vdf_clients,
vdf_bluebox_clients,
timelord_bluebox,
timelord_bluebox_server,
)
daemon_ws = await async_exit_stack.enter_async_context(setup_daemon(btools=b_tools))
yield daemon_ws, ret
yield full_system

View File

@ -9,7 +9,7 @@ import time
from contextlib import asynccontextmanager, contextmanager
from pathlib import Path
from types import FrameType
from typing import Any, AsyncGenerator, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, AsyncGenerator, AsyncIterator, Dict, Iterator, List, Optional, Tuple, Union
from chia.cmds.init_funcs import init
from chia.consensus.constants import ConsensusConstants
@ -42,7 +42,7 @@ from chia.simulator.start_simulator import create_full_node_simulator_service
from chia.ssl.create_ssl import create_all_ssl
from chia.timelord.timelord import Timelord
from chia.timelord.timelord_api import TimelordAPI
from chia.timelord.timelord_launcher import kill_processes, spawn_process
from chia.timelord.timelord_launcher import VDFClientProcessMgr, find_vdf_client, spawn_process
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.peer_info import UnresolvedPeerInfo
from chia.util.bech32m import encode_puzzle_hash
@ -457,10 +457,12 @@ async def setup_introducer(bt: BlockTools, port: int) -> AsyncGenerator[Service[
@asynccontextmanager
async def setup_vdf_client(bt: BlockTools, self_hostname: str, port: int) -> AsyncGenerator[asyncio.Task[Any], None]:
lock = asyncio.Lock()
async def setup_vdf_client(bt: BlockTools, self_hostname: str, port: int) -> AsyncIterator[None]:
find_vdf_client() # raises FileNotFoundError if not found
process_mgr = VDFClientProcessMgr()
vdf_task_1 = asyncio.create_task(
spawn_process(self_hostname, port, 1, lock, prefer_ipv6=bt.config.get("prefer_ipv6", False))
spawn_process(self_hostname, port, 1, process_mgr, prefer_ipv6=bt.config.get("prefer_ipv6", False)),
name="vdf_client_1",
)
async def stop(
@ -468,46 +470,58 @@ async def setup_vdf_client(bt: BlockTools, self_hostname: str, port: int) -> Asy
stack_frame: Optional[FrameType],
loop: asyncio.AbstractEventLoop,
) -> None:
await kill_processes(lock)
await process_mgr.kill_processes()
async with SignalHandlers.manage() as signal_handlers:
signal_handlers.setup_async_signal_handler(handler=stop)
try:
yield vdf_task_1
yield
finally:
await kill_processes(lock)
await process_mgr.kill_processes()
vdf_task_1.cancel()
try:
await vdf_task_1
except (Exception, asyncio.CancelledError):
pass
@asynccontextmanager
async def setup_vdf_clients(
bt: BlockTools, self_hostname: str, port: int
) -> AsyncGenerator[Tuple[asyncio.Task[Any], asyncio.Task[Any], asyncio.Task[Any]], None]:
lock = asyncio.Lock()
vdf_task_1 = asyncio.create_task(
spawn_process(self_hostname, port, 1, lock, prefer_ipv6=bt.config.get("prefer_ipv6", False))
)
vdf_task_2 = asyncio.create_task(
spawn_process(self_hostname, port, 2, lock, prefer_ipv6=bt.config.get("prefer_ipv6", False))
)
vdf_task_3 = asyncio.create_task(
spawn_process(self_hostname, port, 3, lock, prefer_ipv6=bt.config.get("prefer_ipv6", False))
)
async def setup_vdf_clients(bt: BlockTools, self_hostname: str, port: int) -> AsyncIterator[None]:
find_vdf_client() # raises FileNotFoundError if not found
process_mgr = VDFClientProcessMgr()
tasks = []
prefer_ipv6 = bt.config.get("prefer_ipv6", False)
for i in range(1, 4):
tasks.append(
asyncio.create_task(
spawn_process(
host=self_hostname, port=port, counter=i, process_mgr=process_mgr, prefer_ipv6=prefer_ipv6
),
name=f"vdf_client_{i}",
)
)
async def stop(
signal_: signal.Signals,
stack_frame: Optional[FrameType],
loop: asyncio.AbstractEventLoop,
) -> None:
await kill_processes(lock)
await process_mgr.kill_processes()
signal_handlers = SignalHandlers()
async with signal_handlers.manage():
signal_handlers.setup_async_signal_handler(handler=stop)
try:
yield vdf_task_1, vdf_task_2, vdf_task_3
yield
finally:
await kill_processes(lock)
await process_mgr.kill_processes()
for task in tasks:
task.cancel()
try:
await task
except (Exception, asyncio.CancelledError):
pass
@asynccontextmanager

View File

@ -5,9 +5,12 @@ import logging
import os
import pathlib
import signal
import sys
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from types import FrameType
from typing import Any, Dict, List, Optional
from typing import Any, AsyncIterator, Dict, List, Optional
import pkg_resources
@ -18,38 +21,69 @@ from chia.util.misc import SignalHandlers
from chia.util.network import resolve
from chia.util.setproctitle import setproctitle
active_processes: List = []
stopped = False
log = logging.getLogger(__name__)
async def kill_processes(lock: asyncio.Lock):
global stopped
global active_processes
async with lock:
stopped = True
for process in active_processes:
@dataclass
class VDFClientProcessMgr:
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
stopped: bool = False
active_processes: List[asyncio.subprocess.Process] = field(default_factory=list)
async def remove_process(self, proc: asyncio.subprocess.Process) -> None:
async with self.lock:
try:
process.kill()
except ProcessLookupError:
self.active_processes.remove(proc)
except ValueError:
pass
async def add_process(self, proc: asyncio.subprocess.Process) -> None:
async with self.lock:
self.active_processes.append(proc)
async def kill_processes(self) -> None:
async with self.lock:
self.stopped = True
for process in self.active_processes:
try:
process.kill()
await process.wait()
if sys.version_info < (3, 11, 1):
# hack to avoid `Event loop is closed` errors (fixed in python 3.11.1)
# https://github.com/python/cpython/issues/88050
process._transport.close() # type: ignore [attr-defined]
except (ProcessLookupError, AttributeError):
pass
self.active_processes.clear()
@asynccontextmanager
async def manage_proc(self, proc: asyncio.subprocess.Process) -> AsyncIterator[None]:
await self.add_process(proc)
try:
yield
finally:
await self.remove_process(proc)
def find_vdf_client() -> pathlib.Path:
p = pathlib.Path(pkg_resources.get_distribution("chiavdf").location) / "vdf_client"
if p.is_file():
return p
raise FileNotFoundError("can't find vdf_client binary")
raise FileNotFoundError("Cannot find vdf_client binary. Is Timelord installed? See install-timelord.sh")
async def spawn_process(host: str, port: int, counter: int, lock: asyncio.Lock, *, prefer_ipv6: bool):
global stopped
global active_processes
async def spawn_process(
host: str,
port: int,
counter: int,
process_mgr: VDFClientProcessMgr,
*,
prefer_ipv6: bool,
) -> None:
path_to_vdf_client = find_vdf_client()
first_10_seconds = True
start_time = time.time()
while not stopped:
while not process_mgr.stopped:
try:
dirname = path_to_vdf_client.parent
basename = path_to_vdf_client.name
@ -63,25 +97,22 @@ async def spawn_process(host: str, port: int, counter: int, lock: asyncio.Lock,
except Exception as e:
log.warning(f"Exception while spawning process {counter}: {(e)}")
continue
async with lock:
active_processes.append(proc)
stdout, stderr = await proc.communicate()
if stdout:
log.info(f"VDF client {counter}: {stdout.decode().rstrip()}")
if stderr:
if first_10_seconds:
if time.time() - start_time > 10:
first_10_seconds = False
else:
log.error(f"VDF client {counter}: {stderr.decode().rstrip()}")
log.info(f"Process number {counter} ended.")
async with lock:
if proc in active_processes:
active_processes.remove(proc)
async with process_mgr.manage_proc(proc):
stdout, stderr = await proc.communicate()
if stdout:
log.info(f"VDF client {counter}: {stdout.decode().rstrip()}")
if stderr:
if first_10_seconds:
if time.time() - start_time > 10:
first_10_seconds = False
else:
log.error(f"VDF client {counter}: {stderr.decode().rstrip()}")
await asyncio.sleep(0.1)
async def spawn_all_processes(config: Dict, net_config: Dict, lock: asyncio.Lock):
async def spawn_all_processes(config: Dict, net_config: Dict, process_mgr: VDFClientProcessMgr):
await asyncio.sleep(5)
hostname = net_config["self_hostname"] if "host" not in config else config["host"]
port = config["port"]
@ -90,27 +121,33 @@ async def spawn_all_processes(config: Dict, net_config: Dict, lock: asyncio.Lock
log.info("Process_count set to 0, stopping TLauncher.")
return
awaitables = [
spawn_process(hostname, port, i, lock, prefer_ipv6=net_config.get("prefer_ipv6", False))
spawn_process(
hostname,
port,
i,
process_mgr,
prefer_ipv6=net_config.get("prefer_ipv6", False),
)
for i in range(process_count)
]
await asyncio.gather(*awaitables)
async def async_main(config: Dict[str, Any], net_config: Dict[str, Any]) -> None:
lock = asyncio.Lock()
process_mgr = VDFClientProcessMgr()
async def stop(
signal_: signal.Signals,
stack_frame: Optional[FrameType],
loop: asyncio.AbstractEventLoop,
) -> None:
await kill_processes(lock)
await process_mgr.kill_processes()
async with SignalHandlers.manage() as signal_handlers:
signal_handlers.setup_async_signal_handler(handler=stop)
try:
await spawn_all_processes(config, net_config, lock)
await spawn_all_processes(config, net_config, process_mgr)
finally:
log.info("Launcher fully closed.")

View File

@ -41,7 +41,6 @@ from chia.simulator.full_node_simulator import FullNodeSimulator
from chia.simulator.setup_nodes import (
SimulatorsAndWallets,
setup_full_system,
setup_full_system_connect_to_deamon,
setup_n_nodes,
setup_simulators_and_wallets,
setup_simulators_and_wallets_service,
@ -695,29 +694,6 @@ async def farmer_three_harvester_not_started(
yield _
# TODO: Ideally, the db_version should be the (parameterized) db_version
# fixture, to test all versions of the database schema. This doesn't work
# because of a hack in shutting down the full node, which means you cannot run
# more than one simulations per process.
@pytest_asyncio.fixture(
scope="function",
params=[
pytest.param(
None, marks=pytest.mark.limit_consensus_modes(reason="This test only supports one running at a time.")
)
],
)
async def daemon_simulation(consensus_mode, bt, get_b_tools, get_b_tools_1):
async with setup_full_system_connect_to_deamon(
test_constants_modified,
bt,
b_tools=get_b_tools,
b_tools_1=get_b_tools_1,
db_version=1,
) as _:
yield _, get_b_tools, get_b_tools_1
@pytest_asyncio.fixture(scope="function")
async def get_daemon(bt):
async with setup_daemon(btools=bt) as _:
@ -982,9 +958,9 @@ def cost_logger_fixture() -> Iterator[CostLogger]:
@pytest_asyncio.fixture(scope="function")
async def simulation(bt):
async with setup_full_system(test_constants_modified, bt, db_version=1) as _:
yield _
async def simulation(bt, get_b_tools):
async with setup_full_system(test_constants_modified, bt, get_b_tools, db_version=2) as full_system:
yield full_system, get_b_tools
HarvesterFarmerEnvironment = Tuple[

View File

@ -1,5 +1,4 @@
from __future__ import annotations
job_timeout = 50
install_timelord = True
checkout_blocks_and_plots = True

View File

@ -1,8 +1,6 @@
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field, replace
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
@ -25,19 +23,16 @@ from chia.daemon.keychain_server import (
)
from chia.daemon.server import WebSocketServer, plotter_log_path, service_plotter
from chia.plotters.plotters import call_plotters
from chia.server.outbound_message import NodeType
from chia.simulator.block_tools import BlockTools
from chia.simulator.keyring import TempKeyring
from chia.simulator.time_out_assert import time_out_assert, time_out_assert_custom_interval
from chia.types.peer_info import PeerInfo
from chia.simulator.setup_services import setup_full_node
from chia.simulator.time_out_assert import time_out_assert_not_none
from chia.util.config import load_config
from chia.util.ints import uint16
from chia.util.json_util import dict_to_json_str
from chia.util.keychain import Keychain, KeyData, supports_os_passphrase_storage
from chia.util.keyring_wrapper import DEFAULT_PASSPHRASE_IF_NO_MASTER_PASSPHRASE, KeyringWrapper
from chia.util.ws_message import create_payload, create_payload_dict
from chia.wallet.derive_keys import master_sk_to_farmer_sk, master_sk_to_pool_sk
from tests.core.node_height import node_height_at_least
from tests.util.misc import Marks, datacases
chiapos_version = pkg_resources.get_distribution("chiapos").version
@ -313,7 +308,9 @@ def assert_response(
assert message["data"] == expected_response_data
def assert_response_success_only(response: aiohttp.http_websocket.WSMessage, request_id: Optional[str] = None) -> None:
def assert_response_success_only(
response: aiohttp.http_websocket.WSMessage, request_id: Optional[str] = None
) -> Dict[str, Any]:
# Expect: JSON response
assert response.type == aiohttp.WSMsgType.TEXT
message = json.loads(response.data.strip())
@ -321,6 +318,7 @@ def assert_response_success_only(response: aiohttp.http_websocket.WSMessage, req
if request_id is not None:
assert message["request_id"] == request_id
assert message["data"]["success"] is True
return message
def assert_running_services_response(response_dict: Dict[str, Any], expected_response_dict: Dict[str, Any]) -> None:
@ -412,69 +410,44 @@ async def daemon_client_with_config_and_keys(get_keychain_for_function, get_daem
@pytest.mark.asyncio
async def test_daemon_simulation(self_hostname, daemon_simulation):
deamon_and_nodes, get_b_tools, bt = daemon_simulation
node1, node2, _, _, _, _, _, _, _, _, daemon1 = deamon_and_nodes
server1 = node1.full_node.server
node2_port = node2.full_node.server.get_port()
await server1.start_client(PeerInfo(self_hostname, uint16(node2_port)))
async def test_daemon_passthru(get_daemon, bt):
ws_server = get_daemon
config = bt.config
daemon_port = config["daemon_port"]
async def num_connections():
count = len(node2.server.get_connections(NodeType.FULL_NODE))
return count
async with aiohttp.ClientSession() as client:
async with client.ws_connect(
f"wss://127.0.0.1:{daemon_port}",
autoclose=True,
autoping=True,
ssl_context=bt.get_daemon_ssl_context(),
max_msg_size=100 * 1024 * 1024,
) as ws:
service_name = "test_service_name"
data = {"service": service_name}
payload = create_payload("register_service", data, service_name, "daemon")
await ws.send_str(payload)
assert_response_success_only(await ws.receive())
await time_out_assert_custom_interval(60, 1, num_connections, 1)
async with setup_full_node(
consensus_constants=bt.constants,
db_name="sim-test.db",
self_hostname="localhost",
local_bt=bt,
simulator=False,
db_version=2,
connect_to_daemon=True,
) as _:
await time_out_assert_not_none(30, ws_server.connections.get, "chia_full_node")
await time_out_assert(1500, node_height_at_least, True, node2, 1)
payload = create_payload("get_blockchain_state", {}, service_name, "chia_full_node")
await ws.send_str(payload)
session = aiohttp.ClientSession()
log = logging.getLogger()
log.warning(f"Connecting to daemon on port {daemon1.daemon_port}")
ws = await session.ws_connect(
f"wss://127.0.0.1:{daemon1.daemon_port}",
autoclose=True,
autoping=True,
ssl_context=get_b_tools.get_daemon_ssl_context(),
max_msg_size=100 * 1024 * 1024,
)
service_name = "test_service_name"
data = {"service": service_name}
payload = create_payload("register_service", data, service_name, "daemon")
await ws.send_str(payload)
message_queue: asyncio.Queue = asyncio.Queue()
async def reader(ws, queue):
while True:
# ClientWebSocketReponse::receive() internally handles PING, PONG, and CLOSE messages
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
message = msg.data.strip()
message = json.loads(message)
await queue.put(message)
else:
if msg.type == aiohttp.WSMsgType.ERROR:
await ws.close()
elif msg.type == aiohttp.WSMsgType.CLOSED:
pass
break
read_handler = asyncio.create_task(reader(ws, message_queue))
data = {}
payload = create_payload("get_blockchain_state", data, service_name, "chia_full_node")
await ws.send_str(payload)
await asyncio.sleep(5)
blockchain_state_found = False
while not message_queue.empty():
message = await message_queue.get()
if message["command"] == "get_blockchain_state":
blockchain_state_found = True
await ws.close()
read_handler.cancel()
assert blockchain_state_found
response = await ws.receive()
message = assert_response_success_only(response)
assert message["command"] == "get_blockchain_state"
assert message["origin"] == "chia_full_node"
assert message["data"]["blockchain_state"]["genesis_challenge_initialized"] is True
@pytest.mark.parametrize(

View File

@ -1,30 +1,38 @@
from __future__ import annotations
import dataclasses
from typing import List, Tuple
import json
from typing import AsyncIterator, List, Tuple
import aiohttp
import pkg_resources
import pytest
import pytest_asyncio
from chia.cmds.units import units
from chia.consensus.block_record import BlockRecord
from chia.consensus.block_rewards import calculate_base_farmer_reward, calculate_pool_reward
from chia.daemon.server import WebSocketServer
from chia.full_node.full_node import FullNode
from chia.full_node.full_node_api import FullNodeAPI
from chia.server.outbound_message import NodeType
from chia.server.server import ChiaServer
from chia.server.start_service import Service
from chia.simulator.block_tools import BlockTools, create_block_tools_async, test_constants
from chia.simulator.full_node_simulator import FullNodeSimulator
from chia.simulator.keyring import TempKeyring
from chia.simulator.setup_nodes import SimulatorsAndWallets
from chia.simulator.setup_nodes import FullSystem, SimulatorsAndWallets
from chia.simulator.setup_services import setup_full_node
from chia.simulator.simulator_protocol import FarmNewBlockProtocol, GetAllCoinsProtocol, ReorgProtocol
from chia.simulator.time_out_assert import time_out_assert
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.peer_info import PeerInfo
from chia.util.ints import uint8, uint16, uint32, uint64
from chia.util.ws_message import create_payload
from chia.wallet.util.tx_config import DEFAULT_TX_CONFIG
from chia.wallet.wallet_node import WalletNode
from tests.core.node_height import node_height_at_least
chiapos_version = pkg_resources.get_distribution("chiapos").version
test_constants_modified = dataclasses.replace(
test_constants,
@ -46,7 +54,7 @@ test_constants_modified = dataclasses.replace(
# because of a hack in shutting down the full node, which means you cannot run
# more than one simulations per process.
@pytest_asyncio.fixture(scope="function")
async def extra_node(self_hostname):
async def extra_node(self_hostname) -> AsyncIterator[FullNodeAPI | FullNodeSimulator]:
with TempKeyring() as keychain:
b_tools = await create_block_tools_async(constants=test_constants_modified, keychain=keychain)
async with setup_full_node(
@ -54,7 +62,7 @@ async def extra_node(self_hostname):
"blockchain_test_3.db",
self_hostname,
b_tools,
db_version=1,
db_version=2,
) as service:
yield service._api
@ -62,12 +70,14 @@ async def extra_node(self_hostname):
class TestSimulation:
@pytest.mark.limit_consensus_modes(reason="This test only supports one running at a time.")
@pytest.mark.asyncio
async def test_simulation_1(self, simulation, extra_node, self_hostname):
node1, node2, _, _, _, _, _, _, _, sanitizer_server = simulation
server1: ChiaServer = node1.full_node.server
async def test_full_system(self, simulation, extra_node, self_hostname):
full_system: FullSystem
bt: BlockTools
full_system, bt = simulation
server1: ChiaServer = full_system.node_1._server
blocks_to_farm = 3 # farming 3 blocks is sufficient to test the system
node1_port: uint16 = server1.get_port()
node2_port: uint16 = node2.full_node.server.get_port()
node2_port: uint16 = full_system.node_2._server.get_port()
# Connect node 1 to node 2
connected: bool = await server1.start_client(PeerInfo(self_hostname, node2_port))
@ -75,7 +85,7 @@ class TestSimulation:
assert len(server1.get_connections(NodeType.FULL_NODE, outbound=True)) >= 1
# Connect node3 to node1 and node2 - checks come later
node3: Service[FullNode] = extra_node
node3: FullNodeAPI = extra_node
server3: ChiaServer = node3.full_node.server
connected = await server3.start_client(PeerInfo(self_hostname, node1_port))
assert connected, f"server3 was unable to connect to node1 on port {node1_port}"
@ -83,17 +93,18 @@ class TestSimulation:
assert connected, f"server3 was unable to connect to node2 on port {node2_port}"
assert len(server3.get_connections(NodeType.FULL_NODE, outbound=True)) >= 2
# wait up to 10 mins for node2 to sync the chain to height 7
await time_out_assert(600, node2.full_node.blockchain.get_peak_height, 7)
# wait up to 25 mins for node2 to sync the chain to blocks_to_farm height
await time_out_assert(1500, node_height_at_least, True, full_system.node_2._api, blocks_to_farm)
connected = await sanitizer_server.start_client(PeerInfo(self_hostname, node2_port))
assert connected, f"sanitizer_server was unable to connect to node2 on port {node2_port}"
async def has_compact(node1, node2):
peak_height_1 = node1.full_node.blockchain.get_peak_height()
headers_1 = await node1.full_node.blockchain.get_header_blocks_in_range(0, peak_height_1 - 6)
peak_height_2 = node2.full_node.blockchain.get_peak_height()
headers_2 = await node2.full_node.blockchain.get_header_blocks_in_range(0, peak_height_2 - 6)
async def has_compact(node1: FullNode, node2: FullNode) -> bool:
peak_height_1 = node1.blockchain.get_peak_height()
if peak_height_1 is None:
return False
headers_1 = await node1.blockchain.get_header_blocks_in_range(0, peak_height_1 - blocks_to_farm - 1)
peak_height_2 = node2.blockchain.get_peak_height()
if peak_height_2 is None:
return False
headers_2 = await node2.blockchain.get_header_blocks_in_range(0, peak_height_2 - blocks_to_farm - 1)
# Commented to speed up.
# cc_eos = [False, False]
# icc_eos = [False, False]
@ -127,14 +138,53 @@ class TestSimulation:
# )
return has_compact == [True, True]
await time_out_assert(600, has_compact, True, node1, node2)
await time_out_assert(600, has_compact, True, full_system.node_1._node, full_system.node_2._node)
# check node3 has synced to the proper height
peak_height: uint32 = max(
node1.full_node.blockchain.get_peak_height(), node2.full_node.blockchain.get_peak_height()
full_system.node_1._node.blockchain.get_peak_height(),
full_system.node_2._node.blockchain.get_peak_height(),
)
# wait up to 10 mins for node3 to sync
await time_out_assert(600, node3.full_node.blockchain.get_peak_height, peak_height)
await time_out_assert(600, node_height_at_least, True, node3, peak_height)
# Connect node_1 up to the daemon
full_system.node_1.rpc_server.connect_to_daemon(
self_hostname=self_hostname, daemon_port=full_system.daemon.daemon_port
)
async def verify_daemon_connection(daemon: WebSocketServer, service: str) -> bool:
return len(daemon.connections.get(service, set())) >= 1
await time_out_assert(60, verify_daemon_connection, True, full_system.daemon, "chia_full_node")
async with aiohttp.ClientSession() as session:
ws = await session.ws_connect(
f"wss://127.0.0.1:{full_system.daemon.daemon_port}",
autoclose=True,
autoping=True,
ssl_context=bt.get_daemon_ssl_context(),
max_msg_size=100 * 1024 * 1024,
)
service_name = "test_service_name"
payload = create_payload("register_service", {"service": service_name}, service_name, "daemon")
await ws.send_str(payload)
await ws.receive()
await time_out_assert(10, verify_daemon_connection, True, full_system.daemon, service_name)
blockchain_state_found = False
payload = create_payload("get_blockchain_state", {}, service_name, "chia_full_node")
await ws.send_str(payload)
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
message = msg.data.strip()
message = json.loads(message)
if message["command"] == "get_blockchain_state":
blockchain_state_found = True
await ws.close()
assert blockchain_state_found, "Could not get blockchain state from daemon and node"
@pytest.mark.asyncio
async def test_simulator_auto_farm_and_get_coins(