chia-blockchain/chia/server/start_service.py

290 lines
10 KiB
Python
Raw Normal View History

import asyncio
import functools
import os
import logging
import logging.config
import signal
from sys import platform
from typing import Any, Callable, List, Optional, Tuple
2021-06-24 21:10:46 +03:00
from chia.daemon.server import singleton, service_launch_lock_path
from chia.server.ssl_context import chia_ssl_ca_paths, private_ssl_ca_paths
try:
import uvloop
except ImportError:
uvloop = None
from chia.rpc.rpc_server import start_rpc_server
from chia.server.outbound_message import NodeType
from chia.server.server import ChiaServer
from chia.server.upnp import UPnP
from chia.types.peer_info import PeerInfo
from chia.util.chia_logging import initialize_logging
from chia.util.config import load_config, load_config_cli
from chia.util.setproctitle import setproctitle
from chia.util.ints import uint16
from .reconnect_task import start_reconnect_task
# this is used to detect whether we are running in the main process or not, in
# signal handlers. We need to ignore signals in the sub processes.
main_pid: Optional[int] = None
class Service:
def __init__(
self,
root_path,
2020-10-16 04:03:46 +03:00
node: Any,
peer_api: Any,
node_type: NodeType,
advertised_port: int,
service_name: str,
network_id: str,
*,
2020-10-09 21:42:14 +03:00
upnp_ports: List[int] = [],
server_listen_ports: List[int] = [],
connect_peers: List[PeerInfo] = [],
auth_connect_peers: bool = True,
2020-10-16 04:03:46 +03:00
on_connect_callback: Optional[Callable] = None,
rpc_info: Optional[Tuple[type, int]] = None,
2020-06-27 18:44:51 +03:00
parse_cli_args=True,
connect_to_daemon=True,
running_new_process=True,
service_name_prefix="",
max_request_body_size: Optional[int] = None,
) -> None:
2021-01-03 04:03:18 +03:00
self.root_path = root_path
self.config = load_config(root_path, "config.yaml")
ping_interval = self.config.get("ping_interval")
self.self_hostname = self.config.get("self_hostname")
self.daemon_port = self.config.get("daemon_port")
assert ping_interval is not None
self._connect_to_daemon = connect_to_daemon
self._node_type = node_type
self._service_name = service_name
self._rpc_task: Optional[asyncio.Task] = None
self._rpc_close_task: Optional[asyncio.Task] = None
self._network_id: str = network_id
self.max_request_body_size = max_request_body_size
self._running_new_process = running_new_process
# when we start this service as a component of an existing process,
# don't change its proctitle
if running_new_process:
proctitle_name = f"chia_{service_name_prefix}{service_name}"
setproctitle(proctitle_name)
self._log = logging.getLogger(service_name)
2020-06-27 18:44:51 +03:00
if parse_cli_args:
2020-10-12 21:53:05 +03:00
service_config = load_config_cli(root_path, "config.yaml", service_name)
2020-06-27 18:44:51 +03:00
else:
2020-10-12 21:53:05 +03:00
service_config = load_config(root_path, "config.yaml", service_name)
# only initialize logging once per process
if running_new_process:
initialize_logging(service_name, service_config["logging"], root_path)
self._rpc_info = rpc_info
private_ca_crt, private_ca_key = private_ssl_ca_paths(root_path, self.config)
chia_ca_crt, chia_ca_key = chia_ssl_ca_paths(root_path, self.config)
inbound_rlp = self.config.get("inbound_rate_limit_percent")
outbound_rlp = self.config.get("outbound_rate_limit_percent")
if node_type == NodeType.WALLET:
inbound_rlp = service_config.get("inbound_rate_limit_percent", inbound_rlp)
outbound_rlp = 60
assert inbound_rlp and outbound_rlp
self._server = ChiaServer(
advertised_port,
node,
2020-10-16 04:03:46 +03:00
peer_api,
node_type,
ping_interval,
network_id,
inbound_rlp,
outbound_rlp,
2020-11-11 07:14:06 +03:00
root_path,
service_config,
(private_ca_crt, private_ca_key),
(chia_ca_crt, chia_ca_key),
name=f"{service_name}_server",
)
2020-12-01 12:16:14 +03:00
f = getattr(node, "set_server", None)
if f:
f(self._server)
else:
self._log.warning(f"No set_server method for {service_name}")
self._connect_peers = connect_peers
self._auth_connect_peers = auth_connect_peers
2020-10-09 21:42:14 +03:00
self._upnp_ports = upnp_ports
self._server_listen_ports = server_listen_ports
2020-11-11 07:14:06 +03:00
self._api = peer_api
self._node = node
2020-10-09 22:37:57 +03:00
self._did_start = False
self._is_stopping = asyncio.Event()
self._stopped_by_rpc = False
self._on_connect_callback = on_connect_callback
self._advertised_port = advertised_port
self._reconnect_tasks: List[asyncio.Task] = []
self.upnp: Optional[UPnP] = None
async def start(self, **kwargs) -> None:
2020-10-09 22:37:57 +03:00
# we include `kwargs` as a hack for the wallet, which for some
# reason allows parameters to `_start`. This is serious BRAIN DAMAGE,
# and should be fixed at some point.
# TODO: move those parameters to `__init__`
if self._did_start:
return None
assert self.self_hostname is not None
assert self.daemon_port is not None
2020-10-09 22:37:57 +03:00
self._did_start = True
if self._running_new_process:
self._enable_signals()
2020-06-05 02:42:27 +03:00
2020-11-11 07:14:06 +03:00
await self._node._start(**kwargs)
self._node._shut_down = False
2020-10-09 22:37:57 +03:00
for port in self._upnp_ports:
if self.upnp is None:
self.upnp = UPnP()
self.upnp.remap(port)
2020-05-22 23:31:57 +03:00
2020-11-11 07:14:06 +03:00
await self._server.start_server(self._on_connect_callback)
2020-10-09 22:37:57 +03:00
self._reconnect_tasks = [
Resolve v6 addresses (#8861) * Address deficiency discussed in #8552, add ability to resolve to IPv6 addresses for hostnames. * If there is no prefer_ipv6 in the config, set it True (per @hoffmang9) and write it back to config.yaml * Pass prefer_ipv6 flag to get_host_addr, which required a little digging for it in a few places that call get_host_addr. * Update a couple things for consistency * Move the load_config into Wallet's __init__ so it doesn't get called so many times as it would in has_full_node. * Pass None into get_host_addr if there's no preference in config, so we have only that one place where the coded default lives. also fix an oversight where we were building a PeerInfo from a PeerInfo in some cases. * Change the default here to match the default coded into util/network.py. It seems that github testers can't handle trying to use IPv6 and this may be easier for average users (sadly) * A test to see if manually creating the server on :: (IP6_ANY) lets tests connect to localhost with IPv6 on * Revert back to IPv4 default and remove the override inserted into TCPSite for testing. * Don't test for ip6-localhost, as it's not on all systems. * Bah. Forced formatting of commented code... * Add a type annotation for the addrset variable * If we don't quote the socket enums, pylint gets upset because it has issues figuring out where/how they're defined. So, quote them here. Co-authored-by: Chris Ross <cross+chia@distal.com> Co-authored-by: Kyle Altendorf <sda@fstab.net>
2021-11-23 01:34:03 +03:00
start_reconnect_task(self._server, _, self._log, self._auth_connect_peers, self.config.get("prefer_ipv6"))
for _ in self._connect_peers
2020-10-09 22:37:57 +03:00
]
self._log.info(f"Started {self._service_name} service on network_id: {self._network_id}")
2020-10-09 22:37:57 +03:00
self._rpc_close_task = None
if self._rpc_info:
rpc_api, rpc_port = self._rpc_info
self._rpc_task = asyncio.create_task(
start_rpc_server(
2020-11-11 07:14:06 +03:00
rpc_api(self._node),
2020-10-09 22:37:57 +03:00
self.self_hostname,
self.daemon_port,
uint16(rpc_port),
2020-10-09 22:37:57 +03:00
self.stop,
2021-01-03 04:03:18 +03:00
self.root_path,
self.config,
self._connect_to_daemon,
max_request_body_size=self.max_request_body_size,
2020-10-09 22:37:57 +03:00
)
)
async def run(self) -> None:
2021-06-24 21:10:46 +03:00
lockfile = singleton(service_launch_lock_path(self.root_path, self._service_name))
if lockfile is None:
self._log.error(f"{self._service_name}: already running")
raise ValueError(f"{self._service_name}: already running")
2020-10-09 22:37:57 +03:00
await self.start()
await self.wait_closed()
def _enable_signals(self) -> None:
global main_pid
main_pid = os.getpid()
2020-10-09 22:37:57 +03:00
if platform == "win32" or platform == "cygwin":
# pylint: disable=E1101
signal.signal(signal.SIGBREAK, self._accept_signal) # type: ignore
signal.signal(signal.SIGINT, self._accept_signal)
signal.signal(signal.SIGTERM, self._accept_signal)
else:
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
functools.partial(self._accept_signal, signal_number=signal.SIGINT),
)
loop.add_signal_handler(
signal.SIGTERM,
functools.partial(self._accept_signal, signal_number=signal.SIGTERM),
)
2020-08-17 16:25:28 +03:00
def _accept_signal(self, signal_number: int, stack_frame=None):
2020-10-09 22:37:57 +03:00
self._log.info(f"got signal {signal_number}")
# we only handle signals in the main process. In the ProcessPoolExecutor
# processes, we have to ignore them. We'll shut them down gracefully
# from the main process
global main_pid
if os.getpid() != main_pid:
return
2020-08-17 16:25:28 +03:00
self.stop()
def stop(self) -> None:
2020-10-09 22:37:57 +03:00
if not self._is_stopping.is_set():
self._is_stopping.set()
# start with UPnP, since this can take a while, we want it to happen
# in the background while shutting down everything else
for port in self._upnp_ports:
if self.upnp is not None:
self.upnp.release(port)
self._log.info("Cancelling reconnect task")
for _ in self._reconnect_tasks:
_.cancel()
self._log.info("Closing connections")
self._server.close_all()
2020-11-11 07:14:06 +03:00
self._node._close()
self._node._shut_down = True
self._log.info("Calling service stop callback")
2020-12-05 17:28:41 +03:00
if self._rpc_task is not None:
self._log.info("Closing RPC server")
async def close_rpc_server() -> None:
if self._rpc_task:
await (await self._rpc_task)()
self._rpc_close_task = asyncio.create_task(close_rpc_server())
async def wait_closed(self) -> None:
2020-10-09 22:37:57 +03:00
await self._is_stopping.wait()
self._log.info("Waiting for socket to be closed (if opened)")
self._log.info("Waiting for ChiaServer to be closed")
await self._server.await_closed()
if self._rpc_close_task:
self._log.info("Waiting for RPC server")
await self._rpc_close_task
self._log.info("Closed RPC server")
2020-10-09 22:37:57 +03:00
self._log.info("Waiting for service _await_closed callback")
2020-11-11 07:14:06 +03:00
await self._node._await_closed()
if self.upnp is not None:
# this is a blocking call, waiting for the UPnP thread to exit
self.upnp.shutdown()
self._did_start = False
self._is_stopping.clear()
self._log.info(f"Service {self._service_name} at port {self._advertised_port} fully closed")
async def async_run_service(*args, **kwargs) -> None:
service = Service(*args, **kwargs)
return await service.run()
def run_service(*args, **kwargs) -> None:
if uvloop is not None:
uvloop.install()
return asyncio.run(async_run_service(*args, **kwargs))