2023-09-20 08:15:59 +03:00
|
|
|
import socket
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
def check_port(port: int) -> bool:
|
|
|
|
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
with tcp, udp:
|
|
|
|
try:
|
|
|
|
tcp.bind(("127.0.0.1", port))
|
|
|
|
udp.bind(("127.0.0.1", port))
|
|
|
|
except OSError:
|
|
|
|
return False
|
2023-11-29 14:22:04 +03:00
|
|
|
else:
|
|
|
|
return True
|
2023-09-20 08:15:59 +03:00
|
|
|
|
|
|
|
|
|
|
|
def check_port_range(port_range: range) -> bool:
|
2023-11-29 14:22:04 +03:00
|
|
|
return all(check_port(port) for port in port_range)
|
2023-09-20 08:15:59 +03:00
|
|
|
|
|
|
|
|
|
|
|
class Ports:
|
2023-11-29 14:22:04 +03:00
|
|
|
NEXT_PORT = 10000
|
|
|
|
|
2023-09-20 08:15:59 +03:00
|
|
|
def allocate(self, num: int) -> int:
|
|
|
|
"""
|
|
|
|
Allocates
|
|
|
|
"""
|
2023-11-29 14:22:04 +03:00
|
|
|
while Ports.NEXT_PORT + num <= 65535:
|
|
|
|
start = Ports.NEXT_PORT
|
|
|
|
Ports.NEXT_PORT += num
|
|
|
|
if not check_port_range(range(start, Ports.NEXT_PORT)):
|
2023-09-20 08:15:59 +03:00
|
|
|
continue
|
|
|
|
return start
|
2023-11-29 14:22:04 +03:00
|
|
|
msg = "cannot find enough free port"
|
|
|
|
raise OSError(msg)
|
2023-09-20 08:15:59 +03:00
|
|
|
|
|
|
|
|
2023-11-29 14:22:04 +03:00
|
|
|
@pytest.fixture()
|
2023-09-20 08:15:59 +03:00
|
|
|
def ports() -> Ports:
|
|
|
|
return Ports()
|