core.server test_loop() failures (#16207)

This commit is contained in:
Kyle Altendorf 2023-09-13 11:17:43 -04:00 committed by GitHub
parent bc83010c92
commit 0934e3dc4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 105 deletions

View File

@ -1,36 +1,84 @@
from __future__ import annotations
import asyncio
import itertools
import logging
import pathlib
import random
import sys
import time
from tests.util.misc import create_logger
# TODO: CAMPid 0945094189459712842390t591
IP = "127.0.0.1"
PORT = 8444
NUM_CLIENTS = 5000
NUM_CLIENTS = 500
total_open_connections = 0
async def tcp_echo_client(counter: str) -> None:
while True:
t1 = time.monotonic()
writer = None
try:
print(f"Opened connection: {counter}")
reader, writer = await asyncio.open_connection(IP, PORT)
await asyncio.sleep(15)
# writer.close()
# await writer.wait_closed()
except Exception as e:
t2 = time.monotonic()
print(f"Closed connection {counter}: {e}. Time: {t2 - t1}")
pass
finally:
if writer is not None:
writer.close()
await writer.wait_closed()
async def tcp_echo_client(task_counter: str, logger: logging.Logger) -> None:
global total_open_connections
try:
for loop_counter in itertools.count():
label = f"{task_counter:5}-{loop_counter:5}"
await asyncio.sleep(random.random())
t1 = time.monotonic()
writer = None
try:
logger.info(f"Opening connection: {label}")
reader, writer = await asyncio.open_connection(IP, PORT)
total_open_connections += 1
logger.info(f"Opened connection: {label} (total: {total_open_connections})")
assert writer is not None
await asyncio.sleep(1 + 4 * random.random())
except asyncio.CancelledError as e:
t2 = time.monotonic()
logger.info(f"Cancelled connection: {label} - {e}. Time: {t2 - t1:.3f}")
break
except Exception as e:
t2 = time.monotonic()
logger.info(f"Closed connection: {label} - {e}. Time: {t2 - t1:.3f}")
finally:
logger.info(f"--- {label} a")
if writer is not None:
total_open_connections -= 1
logger.info(f"--- {label} B (total: {total_open_connections})")
writer.close()
await writer.wait_closed()
finally:
logger.info(f"--- {task_counter:5} task finishing")
async def main() -> None:
await asyncio.gather(*[tcp_echo_client("{}".format(i)) for i in range(0, NUM_CLIENTS)])
shutdown_path = pathlib.Path(sys.argv[1])
out_path = shutdown_path.with_suffix(".out")
async def dun() -> None:
while shutdown_path.exists():
await asyncio.sleep(0.25)
task.cancel()
file_task = asyncio.create_task(dun())
with out_path.open(mode="w") as file:
logger = create_logger(file=file)
async def f() -> None:
await asyncio.gather(
*[tcp_echo_client(task_counter="{}".format(i), logger=logger) for i in range(0, NUM_CLIENTS)]
)
task = asyncio.create_task(f())
try:
await task
except asyncio.CancelledError:
pass
finally:
logger.info("leaving flood")
await file_task
asyncio.run(main())

View File

@ -3,13 +3,17 @@ from __future__ import annotations
import asyncio
import asyncio.events
import asyncio.protocols
import dataclasses
import functools
import logging.config
import pathlib
import sys
import threading
from typing import List, Optional
from typing import List, Optional, final, overload
from chia.server.chia_policy import ChiaPolicy
from chia.server.start_service import async_run
from tests.util.misc import create_logger
if sys.platform == "win32":
import _winapi
@ -17,10 +21,14 @@ if sys.platform == "win32":
NULL = _winapi.NULL
@final
@dataclasses.dataclass
class EchoServer(asyncio.Protocol):
logger: logging.Logger
def connection_made(self, transport: asyncio.BaseTransport) -> None:
peername = transport.get_extra_info("peername")
print("connection from {}".format(peername))
self.logger.info("connection from {}".format(peername))
self.transport = transport
def data_received(self, data: bytes) -> None:
@ -33,43 +41,93 @@ class EchoServer(asyncio.Protocol):
# self.transport.close()
@overload
async def async_main(
*,
out_path: pathlib.Path,
shutdown_path: pathlib.Path,
ip: str = "127.0.0.1",
port: int = 8444,
thread_end_event: Optional[threading.Event] = None,
port_holder: Optional[List[int]] = None,
) -> None:
loop = asyncio.get_event_loop()
server = await loop.create_server(EchoServer, ip, port)
if port_holder is not None:
[server_socket] = server.sockets
# TODO: review if this is general enough, such as for ipv6
port_holder.append(server_socket.getsockname()[1])
print("serving on {}".format(server.sockets[0].getsockname()))
...
try:
@overload
async def async_main(
*,
out_path: pathlib.Path,
thread_end_event: threading.Event,
ip: str = "127.0.0.1",
port: int = 8444,
port_holder: Optional[List[int]] = None,
) -> None:
...
async def async_main(
*,
out_path: pathlib.Path,
shutdown_path: Optional[pathlib.Path] = None,
thread_end_event: Optional[threading.Event] = None,
ip: str = "127.0.0.1",
port: int = 8444,
port_holder: Optional[List[int]] = None,
) -> None:
with out_path.open(mode="w") as file:
logger = create_logger(file=file)
file_task: Optional[asyncio.Task[None]] = None
if thread_end_event is None:
await asyncio.sleep(20)
else:
while not thread_end_event.is_set():
await asyncio.sleep(0.1)
except KeyboardInterrupt:
print("exit")
finally:
print("closing server")
server.close()
await server.wait_closed()
print("server closed")
# await asyncio.sleep(5)
assert shutdown_path is not None
thread_end_event = threading.Event()
async def dun() -> None:
while shutdown_path.exists():
await asyncio.sleep(0.25)
thread_end_event.set()
file_task = asyncio.create_task(dun())
loop = asyncio.get_event_loop()
server = await loop.create_server(functools.partial(EchoServer, logger=logger), ip, port)
if port_holder is not None:
[server_socket] = server.sockets
# TODO: review if this is general enough, such as for ipv6
port_holder.append(server_socket.getsockname()[1])
logger.info("serving on {}".format(server.sockets[0].getsockname()))
try:
try:
while not thread_end_event.is_set():
await asyncio.sleep(0.1)
finally:
# the test checks explicitly for this
logger.info("exit: shutting down")
logger.info("exit: thread end event set")
except KeyboardInterrupt:
logger.info("exit: keyboard interrupt")
except asyncio.CancelledError:
logger.info("exit: cancelled")
finally:
logger.info("closing server")
server.close()
await server.wait_closed()
logger.info("server closed")
if file_task is not None:
await file_task
def main(connection_limit: int = 25) -> None:
asyncio.set_event_loop_policy(ChiaPolicy())
logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)
stream_handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(hdlr=stream_handler)
async_run(async_main(), connection_limit=connection_limit - 100)
shutdown_path = pathlib.Path(sys.argv[1])
async_run(
async_main(
shutdown_path=shutdown_path,
out_path=shutdown_path.with_suffix(".out"),
),
connection_limit=connection_limit - 100,
)
if __name__ == "__main__":

View File

@ -2,12 +2,12 @@ from __future__ import annotations
import asyncio
import contextlib
import os
import pathlib
import random
import subprocess
import sys
import threading
import time
from dataclasses import dataclass, field
from typing import AsyncIterator, List, Optional
@ -15,7 +15,9 @@ import anyio
import pytest
from chia.server import chia_policy
from chia.simulator.time_out_assert import adjusted_timeout
from tests.core.server import serve
from tests.util.misc import create_logger
here = pathlib.Path(__file__).parent
@ -26,8 +28,10 @@ NUM_CLIENTS = 500
@contextlib.asynccontextmanager
async def serve_in_thread(ip: str, port: int, connection_limit: int) -> AsyncIterator[ServeInThread]:
server = ServeInThread(ip=ip, requested_port=port, connection_limit=connection_limit)
async def serve_in_thread(
out_path: pathlib.Path, ip: str, port: int, connection_limit: int
) -> AsyncIterator[ServeInThread]:
server = ServeInThread(out_path=out_path, ip=ip, requested_port=port, connection_limit=connection_limit)
server.start()
# TODO: can we check when it has really started? just make a connection?
await asyncio.sleep(1)
@ -87,6 +91,7 @@ class Client:
class ServeInThread:
ip: str
requested_port: int
out_path: pathlib.Path
connection_limit: int = 25
original_connection_limit: Optional[int] = None
loop: Optional[asyncio.AbstractEventLoop] = None
@ -119,6 +124,7 @@ class ServeInThread:
async def main(self) -> None:
self.server_task = asyncio.create_task(
serve.async_main(
out_path=self.out_path,
ip=self.ip,
port=self.requested_port,
thread_end_event=self.thread_end_event,
@ -143,65 +149,86 @@ class ServeInThread:
chia_policy.global_max_concurrent_connections = self.original_connection_limit
@pytest.mark.xfail(
condition=sys.platform == "win32",
reason="known failure, being worked on in https://github.com/Chia-Network/chia-blockchain/pull/16207",
)
@pytest.mark.asyncio
async def test_loop() -> None:
async def test_loop(tmp_path: pathlib.Path) -> None:
logger = create_logger()
allowed_over_connections = 0 if sys.platform == "win32" else 100
print(" ==== launching serve.py")
with subprocess.Popen(
[sys.executable, "-m", "tests.core.server.serve"],
encoding="utf-8",
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
) as serving_process:
print(" ==== serve.py running")
time.sleep(5)
print(" ==== launching flood.py")
with subprocess.Popen(
[sys.executable, "-m", "tests.core.server.flood"],
encoding="utf-8",
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
) as flooding_process:
print(" ==== flood.py running")
time.sleep(5)
print(" ==== killing flood.py")
flooding_process.kill()
print(" ==== flood.py done")
serve_file = tmp_path.joinpath("serve")
serve_file.touch()
flood_file = tmp_path.joinpath("flood")
flood_file.touch()
time.sleep(5)
logger.info(" ==== launching serve.py")
with subprocess.Popen(
[sys.executable, "-m", "tests.core.server.serve", os.fspath(serve_file)],
):
logger.info(" ==== serve.py running")
await asyncio.sleep(adjusted_timeout(5))
logger.info(" ==== launching flood.py")
with subprocess.Popen(
[sys.executable, "-m", "tests.core.server.flood", os.fspath(flood_file)],
):
logger.info(" ==== flood.py running")
await asyncio.sleep(adjusted_timeout(10))
logger.info(" ==== killing flood.py")
flood_file.unlink()
flood_output = flood_file.with_suffix(".out").read_text()
logger.info(" ==== flood.py done")
await asyncio.sleep(adjusted_timeout(5))
writer = None
post_connection_error: Optional[str] = None
try:
with anyio.fail_after(delay=1):
print(" ==== attempting a single new connection")
logger.info(" ==== attempting a single new connection")
with anyio.fail_after(delay=adjusted_timeout(1)):
reader, writer = await asyncio.open_connection(IP, PORT)
print(" ==== connection succeeded")
post_connection_succeeded = True
except (TimeoutError, ConnectionRefusedError):
logger.info(" ==== connection succeeded")
post_connection_succeeded = True
except (TimeoutError, ConnectionRefusedError) as e:
logger.info(" ==== connection failed")
post_connection_succeeded = False
post_connection_error = f"{type(e).__name__}: {e}"
finally:
if writer is not None:
writer.close()
await writer.wait_closed()
print(" ==== killing serve.py")
# serving_process.send_signal(signal.CTRL_C_EVENT)
# serving_process.terminate()
output, _ = serving_process.communicate()
print(" ==== serve.py done")
logger.info(" ==== killing serve.py")
print("\n\n ==== output:")
print(output)
serve_file.unlink()
serve_output = serve_file.with_suffix(".out").read_text()
logger.info(" ==== serve.py done")
logger.info(f"\n\n ==== serve output:\n{serve_output}")
logger.info(f"\n\n ==== flood output:\n{flood_output}")
over = []
connection_limit = 25
accept_loop_count_over: List[int] = []
for line in output.splitlines():
server_output_lines = serve_output.splitlines()
found_shutdown = False
shutdown_lines: List[str] = []
for line in server_output_lines:
if not found_shutdown:
if not line.casefold().endswith("shutting down"):
continue
found_shutdown = True
shutdown_lines.append(line)
assert len(shutdown_lines) > 0, "shutdown message is missing from log, unable to verify timing of connections"
for line in server_output_lines:
mark = "Total connections:"
if mark in line:
_, _, rest = line.partition(mark)
@ -209,20 +236,16 @@ async def test_loop() -> None:
if count > connection_limit + allowed_over_connections:
over.append(count)
# mark = "ChiaProactor._chia_accept_loop() entering count="
# if mark in line:
# _, _, rest = line.partition(mark)
# count = int(rest)
# if count > 1:
# accept_loop_count_over.append(count)
assert over == [], over
assert accept_loop_count_over == [], accept_loop_count_over
assert "Traceback" not in output
assert "paused accepting connections" in output
assert post_connection_succeeded
assert "Traceback" not in serve_output
assert "paused accepting connections" in serve_output
assert post_connection_succeeded, post_connection_error
assert all(
"new connection" not in line.casefold() for line in shutdown_lines
), "new connection found during shut down"
print(" ==== all checks passed")
logger.info(" ==== all checks passed")
@pytest.mark.parametrize(
@ -238,12 +261,14 @@ async def test_loop() -> None:
ids=lambda cycles: f"{cycles} cycle{'s' if cycles != 1 else ''}",
)
@pytest.mark.asyncio
async def test_limits_connections(repetition: int, cycles: int) -> None:
async def test_limits_connections(repetition: int, cycles: int, tmp_path: pathlib.Path) -> None:
ip = "127.0.0.1"
connection_limit = 10
connection_attempts = connection_limit + 10
async with serve_in_thread(ip=ip, port=0, connection_limit=connection_limit) as server:
async with serve_in_thread(
out_path=tmp_path.joinpath("serve.out"), ip=ip, port=0, connection_limit=connection_limit
) as server:
for cycle in range(cycles):
if cycle > 0:
await asyncio.sleep(1)

View File

@ -5,16 +5,18 @@ import dataclasses
import enum
import functools
import gc
import logging
import math
import os
import subprocess
import sys
from concurrent.futures import Future
from inspect import getframeinfo, stack
from statistics import mean
from textwrap import dedent
from time import thread_time
from types import TracebackType
from typing import Any, Callable, Collection, Iterator, List, Optional, Type, Union
from typing import Any, Callable, Collection, Iterator, List, Optional, TextIO, Type, Union
import pytest
from chia_rs import Coin
@ -367,3 +369,18 @@ def coin_creation_args(hinted_coin: HintedCoin) -> List[Any]:
else:
memos = []
return [ConditionOpcode.CREATE_COIN, hinted_coin.coin.puzzle_hash, hinted_coin.coin.amount, memos]
def create_logger(file: TextIO = sys.stdout) -> logging.Logger:
logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)
stream_handler = logging.StreamHandler(stream=file)
log_date_format = "%Y-%m-%dT%H:%M:%S"
file_log_formatter = logging.Formatter(
fmt="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
datefmt=log_date_format,
)
stream_handler.setFormatter(file_log_formatter)
logger.addHandler(hdlr=stream_handler)
return logger