server/tests-py: Never allocate the same port twice.

We seem to be getting flakes where we try and use the same port for two different servers. This is because in certain cases we cannot simply allocate the port dynamically, but have to decide it in advance, leading to a race condition.

We resolve this by keeping track of the ports we allocate when using this method, making sure we never allocate them twice. We also make sure we allocate from a different pool of ports to the usual dynamic port pool (typically above port 32768, and often above port 49152).

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8903
GitOrigin-RevId: 375a23867591a4566493dddbc550c58cf88ea392
This commit is contained in:
Samir Talwar 2023-04-25 14:49:31 +02:00 committed by hasura-bot
parent 15ce4818b2
commit e24bcf2a39
10 changed files with 82 additions and 37 deletions

View File

@ -102,13 +102,13 @@ def pytest_addoption(parser):
#By default,
#1) Set test grouping to by class (--dist=loadfile)
#1) Set test grouping to by class (--dist=loadscope)
#2) Set default parallelism to one
def pytest_cmdline_preparse(config, args):
worker = os.environ.get('PYTEST_XDIST_WORKER')
if 'xdist' in sys.modules and not worker: # pytest-xdist plugin
num = 1
args[:] = ['--dist=loadfile', f'-n{num}'] + args
args[:] = ['--dist=loadscope', f'-n{num}'] + args
def pytest_configure(config):
# Pytest has removed the global pytest.config
@ -315,8 +315,8 @@ def hge_bin(request: pytest.FixtureRequest) -> Optional[str]:
return request.config.getoption('--hge-bin') # type: ignore
@pytest.fixture(scope='class')
def hge_port() -> int:
return fixtures.hge.hge_port()
def hge_port(worker_id: str) -> int:
return fixtures.hge.hge_port(worker_id)
@pytest.fixture(scope='class')
def hge_url(request: pytest.FixtureRequest, hge_bin: Optional[str], hge_port: int) -> str:

View File

@ -22,8 +22,8 @@ _PASS_THROUGH_ENV_VARS = set([
])
def hge_port() -> int:
return ports.find_free_port()
def hge_port(worker_id: str) -> int:
return ports.find_free_port(worker_id)
def hge_server(

View File

@ -1,19 +1,64 @@
import contextlib
import re
import socket
import time
from typing import Optional
def find_free_port() -> int:
_WORKER_ID_PATTERN = re.compile('^gw(\\d+)$')
"""
A set of the ports reserved by this file.
It is never cleared; we simply expect to not run out before the tests finish.
"""
_allocated_ports: set[int] = set()
def find_free_port(worker_id: str) -> int:
"""
Finds a free port.
Finds a free port in the range allocated to the given worker.
There is no lock placed on the port, so something else could claim the port
between this function finding a port and returning.
between this function finding a port and it being used for its intended
purpose. To mitigate this:
1. we do not use the usual dynamic port range (above 32768), and
2. we never return the same port twice from this function.
We use the worker ID to construct the port range because we do not share
the set of allocated ports between workers. This means we need to ensure
that the ranges do not overlap.
Note that the worker ID should be provided by the `worker_id` fixture.
More details can be found here:
https://pytest-xdist.readthedocs.io/en/latest/how-to.html#identifying-the-worker-process-during-a-test
The worker ID is string in the form "gw<N>", where N is the worker number.
For example, if you have 4 workers, they will be called "gw0", "gw1",
"gw2", and "gw3". We parse the number back out of the string in order to
construct the port range.
"""
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
# Use a different port range per xdist worker. The range is 1000 ports,
# starting at port 10000. So worker ID 7, for example, will use the ports
# 17000 (inclusive) to 18000 (exclusive).
match = _WORKER_ID_PATTERN.match(worker_id)
if not match:
raise Exception(f'Invalid worker ID: {worker_id!r}')
worker_number = int(match.group(1))
port_range = port_range = range((worker_number + 10) * 1000, (worker_number + 11) * 1000)
for port in port_range:
if port not in _allocated_ports:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
try:
s.bind(('', port))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_allocated_ports.add(port)
return port
except OSError:
# try the next one
pass
raise Exception('No available port found.')
def is_port_in_use(port: int) -> bool:
"""

View File

@ -5,14 +5,14 @@ from typing import Optional
import ports
class NodeGraphQL:
def __init__(self, cmd: list[str], env: dict[str, str] = {}, port: Optional[int] = None):
self.cmd = cmd
def __init__(self, worker_id: str, script: str, env: dict[str, str] = {}, port: Optional[int] = None):
self.script = script
self.env = env
self.port = port if port else ports.find_free_port()
self.port = port if port else ports.find_free_port(worker_id)
self.proc: Optional[subprocess.Popen[bytes]] = None
def start(self):
self.proc = subprocess.Popen(self.cmd, env={**os.environ, **self.env, 'PORT': str(self.port)})
self.proc = subprocess.Popen(['node', self.script], env={**os.environ, **self.env, 'PORT': str(self.port)})
try:
ports.wait_for_port(self.port, timeout = 30)
except:

View File

@ -13,9 +13,9 @@ TODO:- Test Actions metadata
@pytest.fixture(scope='class')
@pytest.mark.early
def graphql_service(hge_fixture_env: dict[str, str]):
def graphql_service(worker_id: str, hge_fixture_env: dict[str, str]):
(_, port) = extract_server_address_from('GRAPHQL_SERVICE_HANDLER')
server = NodeGraphQL(['node', 'remote_schemas/nodejs/actions_remote_join_schema.js'], port=port)
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/actions_remote_join_schema.js', port=port)
server.start()
print(f'{graphql_service.__name__} server started on {server.url}')
hge_fixture_env['GRAPHQL_SERVICE_HANDLER'] = server.url

View File

@ -22,8 +22,8 @@ class TestApolloFederation:
return 'queries/apollo_federation'
@pytest.fixture
def federated_server_with_hge_only(self, hge_url: str, hge_key: str):
server = NodeGraphQL(["node", "remote_schemas/nodejs/apollo_federated_server_with_hge_only.js"], env={
def federated_server_with_hge_only(self, worker_id: str, hge_url: str, hge_key: str):
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/apollo_federated_server_with_hge_only.js', env={
'HGE_URL': hge_url,
'HASURA_GRAPHQL_ADMIN_SECRET': hge_key,
})
@ -32,8 +32,8 @@ class TestApolloFederation:
server.stop()
@pytest.fixture
def server_1(self, hge_url: str):
server = NodeGraphQL(["node", "remote_schemas/nodejs/apollo_server_1.js"], env={
def server_1(self, worker_id: str, hge_url: str):
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/apollo_server_1.js', env={
'HGE_URL': hge_url,
})
server.start()
@ -41,8 +41,8 @@ class TestApolloFederation:
server.stop()
@pytest.fixture
def federated_server_with_hge_and_server1(self, hge_url: str, hge_key: str, server_1):
server = NodeGraphQL(["node", "remote_schemas/nodejs/apollo_federated_server_with_hge_and_server1.js"], env={
def federated_server_with_hge_and_server1(self, worker_id: str, hge_url: str, hge_key: str, server_1):
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/apollo_federated_server_with_hge_and_server1.js', env={
'HGE_URL': hge_url,
'OTHER_URL': server_1.url,
'HASURA_GRAPHQL_ADMIN_SECRET': hge_key,

View File

@ -9,9 +9,9 @@ yaml=YAML(typ='safe', pure=True)
@pytest.fixture(scope='class')
@pytest.mark.early
def graphql_service(hge_fixture_env: dict[str, str]):
def graphql_service(worker_id: str, hge_fixture_env: dict[str, str]):
(_, port) = extract_server_address_from('GRAPHQL_SERVICE_HANDLER')
server = NodeGraphQL(['node', 'remote_schemas/nodejs/index.js'], port=port)
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/index.js', port=port)
server.start()
print(f'{graphql_service.__name__} server started on {server.url}')
hge_fixture_env['GRAPHQL_SERVICE_HANDLER'] = server.url

View File

@ -8,9 +8,9 @@ from validate import check_query_f
@pytest.fixture(scope='class')
@pytest.mark.early
def graphql_service(hge_fixture_env: dict[str, str]):
def graphql_service(worker_id: str, hge_fixture_env: dict[str, str]):
(_, port) = extract_server_address_from('GRAPHQL_SERVICE_HANDLER')
server = NodeGraphQL(['node', 'remote_schemas/nodejs/index.js'], port=port)
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/index.js', port=port)
server.start()
print(f'{graphql_service.__name__} server started on {server.url}')
hge_fixture_env['GRAPHQL_SERVICE_HANDLER'] = server.url

View File

@ -14,9 +14,9 @@ pytestmark = [
@pytest.fixture(scope='class')
@pytest.mark.early
def graphql_service_1(hge_fixture_env: dict[str, str]):
def graphql_service_1(worker_id: str, hge_fixture_env: dict[str, str]):
(_, port) = extract_server_address_from('GRAPHQL_SERVICE_1')
server = NodeGraphQL(['node', 'remote_schemas/nodejs/remote_schema_perms.js'], port=port)
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/remote_schema_perms.js', port=port)
server.start()
print(f'{graphql_service_1.__name__} server started on {server.url}')
hge_fixture_env['GRAPHQL_SERVICE_1'] = server.url
@ -25,9 +25,9 @@ def graphql_service_1(hge_fixture_env: dict[str, str]):
@pytest.fixture(scope='class')
@pytest.mark.early
def graphql_service_2(hge_fixture_env: dict[str, str]):
def graphql_service_2(worker_id: str, hge_fixture_env: dict[str, str]):
(_, port) = extract_server_address_from('GRAPHQL_SERVICE_2')
server = NodeGraphQL(['node', 'remote_schemas/nodejs/secondary_remote_schema_perms.js'], port=port)
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/secondary_remote_schema_perms.js', port=port)
server.start()
print(f'{graphql_service_2.__name__} server started on {server.url}')
hge_fixture_env['GRAPHQL_SERVICE_2'] = server.url
@ -36,9 +36,9 @@ def graphql_service_2(hge_fixture_env: dict[str, str]):
@pytest.fixture(scope='class')
@pytest.mark.early
def graphql_service_3(hge_fixture_env: dict[str, str]):
def graphql_service_3(worker_id: str, hge_fixture_env: dict[str, str]):
(_, port) = extract_server_address_from('GRAPHQL_SERVICE_3')
server = NodeGraphQL(['node', 'remote_schemas/nodejs/secondary_remote_schema_perms_error.js'], port=port)
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/secondary_remote_schema_perms_error.js', port=port)
server.start()
print(f'{graphql_service_3.__name__} server started on {server.url}')
hge_fixture_env['GRAPHQL_SERVICE_3'] = server.url

View File

@ -11,9 +11,9 @@ pytestmark = [
@pytest.fixture(scope='class')
@pytest.mark.early
def graphql_service(hge_fixture_env: dict[str, str]):
def graphql_service(worker_id: str, hge_fixture_env: dict[str, str]):
(_, port) = extract_server_address_from('GRAPHQL_SERVICE_1')
server = NodeGraphQL(["node", "remote_schemas/nodejs/remote_schema_perms.js"], port=port)
server = NodeGraphQL(worker_id, 'remote_schemas/nodejs/remote_schema_perms.js', port=port)
server.start()
print(f'{graphql_service.__name__} server started on {server.url}')
hge_fixture_env['GRAPHQL_SERVICE_1'] = server.url