From 0934e3dc4e03283428b6f95b2b9b77b70431016e Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 13 Sep 2023 11:17:43 -0400 Subject: [PATCH] `core.server` `test_loop()` failures (#16207) --- tests/core/server/flood.py | 88 ++++++++++++++++----- tests/core/server/serve.py | 114 ++++++++++++++++++++------- tests/core/server/test_loop.py | 137 +++++++++++++++++++-------------- tests/util/misc.py | 19 ++++- 4 files changed, 253 insertions(+), 105 deletions(-) diff --git a/tests/core/server/flood.py b/tests/core/server/flood.py index fc4a3d8b0ab5..263b8166c441 100644 --- a/tests/core/server/flood.py +++ b/tests/core/server/flood.py @@ -1,36 +1,84 @@ from __future__ import annotations import asyncio +import itertools +import logging +import pathlib +import random +import sys import time +from tests.util.misc import create_logger + # TODO: CAMPid 0945094189459712842390t591 IP = "127.0.0.1" PORT = 8444 -NUM_CLIENTS = 5000 +NUM_CLIENTS = 500 + +total_open_connections = 0 -async def tcp_echo_client(counter: str) -> None: - while True: - t1 = time.monotonic() - writer = None - try: - print(f"Opened connection: {counter}") - reader, writer = await asyncio.open_connection(IP, PORT) - await asyncio.sleep(15) - # writer.close() - # await writer.wait_closed() - except Exception as e: - t2 = time.monotonic() - print(f"Closed connection {counter}: {e}. Time: {t2 - t1}") - pass - finally: - if writer is not None: - writer.close() - await writer.wait_closed() +async def tcp_echo_client(task_counter: str, logger: logging.Logger) -> None: + global total_open_connections + try: + for loop_counter in itertools.count(): + label = f"{task_counter:5}-{loop_counter:5}" + await asyncio.sleep(random.random()) + t1 = time.monotonic() + writer = None + try: + logger.info(f"Opening connection: {label}") + reader, writer = await asyncio.open_connection(IP, PORT) + total_open_connections += 1 + logger.info(f"Opened connection: {label} (total: {total_open_connections})") + assert writer is not None + await asyncio.sleep(1 + 4 * random.random()) + except asyncio.CancelledError as e: + t2 = time.monotonic() + logger.info(f"Cancelled connection: {label} - {e}. Time: {t2 - t1:.3f}") + break + except Exception as e: + t2 = time.monotonic() + logger.info(f"Closed connection: {label} - {e}. Time: {t2 - t1:.3f}") + finally: + logger.info(f"--- {label} a") + if writer is not None: + total_open_connections -= 1 + logger.info(f"--- {label} B (total: {total_open_connections})") + writer.close() + await writer.wait_closed() + finally: + logger.info(f"--- {task_counter:5} task finishing") async def main() -> None: - await asyncio.gather(*[tcp_echo_client("{}".format(i)) for i in range(0, NUM_CLIENTS)]) + shutdown_path = pathlib.Path(sys.argv[1]) + out_path = shutdown_path.with_suffix(".out") + + async def dun() -> None: + while shutdown_path.exists(): + await asyncio.sleep(0.25) + + task.cancel() + + file_task = asyncio.create_task(dun()) + + with out_path.open(mode="w") as file: + logger = create_logger(file=file) + + async def f() -> None: + await asyncio.gather( + *[tcp_echo_client(task_counter="{}".format(i), logger=logger) for i in range(0, NUM_CLIENTS)] + ) + + task = asyncio.create_task(f()) + try: + await task + except asyncio.CancelledError: + pass + finally: + logger.info("leaving flood") + await file_task asyncio.run(main()) diff --git a/tests/core/server/serve.py b/tests/core/server/serve.py index 8a7c1eb9b684..6f01fdf72ed9 100644 --- a/tests/core/server/serve.py +++ b/tests/core/server/serve.py @@ -3,13 +3,17 @@ from __future__ import annotations import asyncio import asyncio.events import asyncio.protocols +import dataclasses +import functools import logging.config +import pathlib import sys import threading -from typing import List, Optional +from typing import List, Optional, final, overload from chia.server.chia_policy import ChiaPolicy from chia.server.start_service import async_run +from tests.util.misc import create_logger if sys.platform == "win32": import _winapi @@ -17,10 +21,14 @@ if sys.platform == "win32": NULL = _winapi.NULL +@final +@dataclasses.dataclass class EchoServer(asyncio.Protocol): + logger: logging.Logger + def connection_made(self, transport: asyncio.BaseTransport) -> None: peername = transport.get_extra_info("peername") - print("connection from {}".format(peername)) + self.logger.info("connection from {}".format(peername)) self.transport = transport def data_received(self, data: bytes) -> None: @@ -33,43 +41,93 @@ class EchoServer(asyncio.Protocol): # self.transport.close() +@overload async def async_main( + *, + out_path: pathlib.Path, + shutdown_path: pathlib.Path, ip: str = "127.0.0.1", port: int = 8444, - thread_end_event: Optional[threading.Event] = None, port_holder: Optional[List[int]] = None, ) -> None: - loop = asyncio.get_event_loop() - server = await loop.create_server(EchoServer, ip, port) - if port_holder is not None: - [server_socket] = server.sockets - # TODO: review if this is general enough, such as for ipv6 - port_holder.append(server_socket.getsockname()[1]) - print("serving on {}".format(server.sockets[0].getsockname())) + ... - try: + +@overload +async def async_main( + *, + out_path: pathlib.Path, + thread_end_event: threading.Event, + ip: str = "127.0.0.1", + port: int = 8444, + port_holder: Optional[List[int]] = None, +) -> None: + ... + + +async def async_main( + *, + out_path: pathlib.Path, + shutdown_path: Optional[pathlib.Path] = None, + thread_end_event: Optional[threading.Event] = None, + ip: str = "127.0.0.1", + port: int = 8444, + port_holder: Optional[List[int]] = None, +) -> None: + with out_path.open(mode="w") as file: + logger = create_logger(file=file) + file_task: Optional[asyncio.Task[None]] = None if thread_end_event is None: - await asyncio.sleep(20) - else: - while not thread_end_event.is_set(): - await asyncio.sleep(0.1) - except KeyboardInterrupt: - print("exit") - finally: - print("closing server") - server.close() - await server.wait_closed() - print("server closed") - # await asyncio.sleep(5) + assert shutdown_path is not None + thread_end_event = threading.Event() + + async def dun() -> None: + while shutdown_path.exists(): + await asyncio.sleep(0.25) + + thread_end_event.set() + + file_task = asyncio.create_task(dun()) + + loop = asyncio.get_event_loop() + server = await loop.create_server(functools.partial(EchoServer, logger=logger), ip, port) + if port_holder is not None: + [server_socket] = server.sockets + # TODO: review if this is general enough, such as for ipv6 + port_holder.append(server_socket.getsockname()[1]) + logger.info("serving on {}".format(server.sockets[0].getsockname())) + + try: + try: + while not thread_end_event.is_set(): + await asyncio.sleep(0.1) + finally: + # the test checks explicitly for this + logger.info("exit: shutting down") + logger.info("exit: thread end event set") + except KeyboardInterrupt: + logger.info("exit: keyboard interrupt") + except asyncio.CancelledError: + logger.info("exit: cancelled") + finally: + logger.info("closing server") + server.close() + await server.wait_closed() + logger.info("server closed") + if file_task is not None: + await file_task def main(connection_limit: int = 25) -> None: asyncio.set_event_loop_policy(ChiaPolicy()) - logger = logging.getLogger() - logger.setLevel(level=logging.DEBUG) - stream_handler = logging.StreamHandler(stream=sys.stdout) - logger.addHandler(hdlr=stream_handler) - async_run(async_main(), connection_limit=connection_limit - 100) + shutdown_path = pathlib.Path(sys.argv[1]) + async_run( + async_main( + shutdown_path=shutdown_path, + out_path=shutdown_path.with_suffix(".out"), + ), + connection_limit=connection_limit - 100, + ) if __name__ == "__main__": diff --git a/tests/core/server/test_loop.py b/tests/core/server/test_loop.py index 0249e320c1d2..bf1d58659fce 100644 --- a/tests/core/server/test_loop.py +++ b/tests/core/server/test_loop.py @@ -2,12 +2,12 @@ from __future__ import annotations import asyncio import contextlib +import os import pathlib import random import subprocess import sys import threading -import time from dataclasses import dataclass, field from typing import AsyncIterator, List, Optional @@ -15,7 +15,9 @@ import anyio import pytest from chia.server import chia_policy +from chia.simulator.time_out_assert import adjusted_timeout from tests.core.server import serve +from tests.util.misc import create_logger here = pathlib.Path(__file__).parent @@ -26,8 +28,10 @@ NUM_CLIENTS = 500 @contextlib.asynccontextmanager -async def serve_in_thread(ip: str, port: int, connection_limit: int) -> AsyncIterator[ServeInThread]: - server = ServeInThread(ip=ip, requested_port=port, connection_limit=connection_limit) +async def serve_in_thread( + out_path: pathlib.Path, ip: str, port: int, connection_limit: int +) -> AsyncIterator[ServeInThread]: + server = ServeInThread(out_path=out_path, ip=ip, requested_port=port, connection_limit=connection_limit) server.start() # TODO: can we check when it has really started? just make a connection? await asyncio.sleep(1) @@ -87,6 +91,7 @@ class Client: class ServeInThread: ip: str requested_port: int + out_path: pathlib.Path connection_limit: int = 25 original_connection_limit: Optional[int] = None loop: Optional[asyncio.AbstractEventLoop] = None @@ -119,6 +124,7 @@ class ServeInThread: async def main(self) -> None: self.server_task = asyncio.create_task( serve.async_main( + out_path=self.out_path, ip=self.ip, port=self.requested_port, thread_end_event=self.thread_end_event, @@ -143,65 +149,86 @@ class ServeInThread: chia_policy.global_max_concurrent_connections = self.original_connection_limit -@pytest.mark.xfail( - condition=sys.platform == "win32", - reason="known failure, being worked on in https://github.com/Chia-Network/chia-blockchain/pull/16207", -) @pytest.mark.asyncio -async def test_loop() -> None: +async def test_loop(tmp_path: pathlib.Path) -> None: + logger = create_logger() + allowed_over_connections = 0 if sys.platform == "win32" else 100 - print(" ==== launching serve.py") - with subprocess.Popen( - [sys.executable, "-m", "tests.core.server.serve"], - encoding="utf-8", - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - ) as serving_process: - print(" ==== serve.py running") - time.sleep(5) - print(" ==== launching flood.py") - with subprocess.Popen( - [sys.executable, "-m", "tests.core.server.flood"], - encoding="utf-8", - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - ) as flooding_process: - print(" ==== flood.py running") - time.sleep(5) - print(" ==== killing flood.py") - flooding_process.kill() - print(" ==== flood.py done") + serve_file = tmp_path.joinpath("serve") + serve_file.touch() + flood_file = tmp_path.joinpath("flood") + flood_file.touch() - time.sleep(5) + logger.info(" ==== launching serve.py") + with subprocess.Popen( + [sys.executable, "-m", "tests.core.server.serve", os.fspath(serve_file)], + ): + logger.info(" ==== serve.py running") + + await asyncio.sleep(adjusted_timeout(5)) + + logger.info(" ==== launching flood.py") + with subprocess.Popen( + [sys.executable, "-m", "tests.core.server.flood", os.fspath(flood_file)], + ): + logger.info(" ==== flood.py running") + + await asyncio.sleep(adjusted_timeout(10)) + + logger.info(" ==== killing flood.py") + flood_file.unlink() + + flood_output = flood_file.with_suffix(".out").read_text() + logger.info(" ==== flood.py done") + + await asyncio.sleep(adjusted_timeout(5)) writer = None + post_connection_error: Optional[str] = None try: - with anyio.fail_after(delay=1): - print(" ==== attempting a single new connection") + logger.info(" ==== attempting a single new connection") + with anyio.fail_after(delay=adjusted_timeout(1)): reader, writer = await asyncio.open_connection(IP, PORT) - print(" ==== connection succeeded") - post_connection_succeeded = True - except (TimeoutError, ConnectionRefusedError): + logger.info(" ==== connection succeeded") + post_connection_succeeded = True + except (TimeoutError, ConnectionRefusedError) as e: + logger.info(" ==== connection failed") post_connection_succeeded = False + post_connection_error = f"{type(e).__name__}: {e}" finally: if writer is not None: writer.close() await writer.wait_closed() - print(" ==== killing serve.py") - # serving_process.send_signal(signal.CTRL_C_EVENT) - # serving_process.terminate() - output, _ = serving_process.communicate() - print(" ==== serve.py done") + logger.info(" ==== killing serve.py") - print("\n\n ==== output:") - print(output) + serve_file.unlink() + + serve_output = serve_file.with_suffix(".out").read_text() + + logger.info(" ==== serve.py done") + + logger.info(f"\n\n ==== serve output:\n{serve_output}") + logger.info(f"\n\n ==== flood output:\n{flood_output}") over = [] connection_limit = 25 accept_loop_count_over: List[int] = [] - for line in output.splitlines(): + server_output_lines = serve_output.splitlines() + found_shutdown = False + shutdown_lines: List[str] = [] + for line in server_output_lines: + if not found_shutdown: + if not line.casefold().endswith("shutting down"): + continue + + found_shutdown = True + shutdown_lines.append(line) + + assert len(shutdown_lines) > 0, "shutdown message is missing from log, unable to verify timing of connections" + + for line in server_output_lines: mark = "Total connections:" if mark in line: _, _, rest = line.partition(mark) @@ -209,20 +236,16 @@ async def test_loop() -> None: if count > connection_limit + allowed_over_connections: over.append(count) - # mark = "ChiaProactor._chia_accept_loop() entering count=" - # if mark in line: - # _, _, rest = line.partition(mark) - # count = int(rest) - # if count > 1: - # accept_loop_count_over.append(count) - assert over == [], over assert accept_loop_count_over == [], accept_loop_count_over - assert "Traceback" not in output - assert "paused accepting connections" in output - assert post_connection_succeeded + assert "Traceback" not in serve_output + assert "paused accepting connections" in serve_output + assert post_connection_succeeded, post_connection_error + assert all( + "new connection" not in line.casefold() for line in shutdown_lines + ), "new connection found during shut down" - print(" ==== all checks passed") + logger.info(" ==== all checks passed") @pytest.mark.parametrize( @@ -238,12 +261,14 @@ async def test_loop() -> None: ids=lambda cycles: f"{cycles} cycle{'s' if cycles != 1 else ''}", ) @pytest.mark.asyncio -async def test_limits_connections(repetition: int, cycles: int) -> None: +async def test_limits_connections(repetition: int, cycles: int, tmp_path: pathlib.Path) -> None: ip = "127.0.0.1" connection_limit = 10 connection_attempts = connection_limit + 10 - async with serve_in_thread(ip=ip, port=0, connection_limit=connection_limit) as server: + async with serve_in_thread( + out_path=tmp_path.joinpath("serve.out"), ip=ip, port=0, connection_limit=connection_limit + ) as server: for cycle in range(cycles): if cycle > 0: await asyncio.sleep(1) diff --git a/tests/util/misc.py b/tests/util/misc.py index a1a7392cc1fc..737819c55a40 100644 --- a/tests/util/misc.py +++ b/tests/util/misc.py @@ -5,16 +5,18 @@ import dataclasses import enum import functools import gc +import logging import math import os import subprocess +import sys from concurrent.futures import Future from inspect import getframeinfo, stack from statistics import mean from textwrap import dedent from time import thread_time from types import TracebackType -from typing import Any, Callable, Collection, Iterator, List, Optional, Type, Union +from typing import Any, Callable, Collection, Iterator, List, Optional, TextIO, Type, Union import pytest from chia_rs import Coin @@ -367,3 +369,18 @@ def coin_creation_args(hinted_coin: HintedCoin) -> List[Any]: else: memos = [] return [ConditionOpcode.CREATE_COIN, hinted_coin.coin.puzzle_hash, hinted_coin.coin.amount, memos] + + +def create_logger(file: TextIO = sys.stdout) -> logging.Logger: + logger = logging.getLogger() + logger.setLevel(level=logging.DEBUG) + stream_handler = logging.StreamHandler(stream=file) + log_date_format = "%Y-%m-%dT%H:%M:%S" + file_log_formatter = logging.Formatter( + fmt="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s", + datefmt=log_date_format, + ) + stream_handler.setFormatter(file_log_formatter) + logger.addHandler(hdlr=stream_handler) + + return logger