Remove websockets dependency & do some refactoring (#10611)

* remove old ws
This commit is contained in:
Jack Nelson 2022-04-05 13:19:09 -04:00 committed by GitHub
parent d87b8ac087
commit 9b7d7d2555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 205 additions and 247 deletions

View File

@ -5,7 +5,7 @@ from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Dict, Optional
import websockets
import aiohttp
from chia.util.config import load_config
from chia.util.json_util import dict_to_json_str
@ -13,31 +13,52 @@ from chia.util.ws_message import WsRpcMessage, create_payload_dict
class DaemonProxy:
def __init__(self, uri: str, ssl_context: Optional[ssl.SSLContext]):
def __init__(
self,
uri: str,
ssl_context: Optional[ssl.SSLContext],
max_message_size: Optional[int] = 50 * 1000 * 1000,
):
self._uri = uri
self._request_dict: Dict[str, asyncio.Event] = {}
self.response_dict: Dict[str, Any] = {}
self.ssl_context = ssl_context
self.client_session: Optional[aiohttp.ClientSession] = None
self.websocket: Optional[aiohttp.ClientWebSocketResponse] = None
self.max_message_size = max_message_size
def format_request(self, command: str, data: Dict[str, Any]) -> WsRpcMessage:
request = create_payload_dict(command, data, "client", "daemon")
return request
async def start(self):
self.websocket = await websockets.connect(self._uri, max_size=None, ssl=self.ssl_context)
try:
self.client_session = aiohttp.ClientSession()
self.websocket = await self.client_session.ws_connect(
self._uri,
autoclose=True,
autoping=True,
heartbeat=60,
ssl_context=self.ssl_context,
max_msg_size=self.max_message_size,
)
except Exception:
await self.close()
raise
async def listener():
while True:
try:
message = await self.websocket.recv()
except websockets.exceptions.ConnectionClosedOK:
return None
decoded = json.loads(message)
id = decoded["request_id"]
message = await self.websocket.receive()
if message.type == aiohttp.WSMsgType.TEXT:
decoded = json.loads(message.data)
request_id = decoded["request_id"]
if id in self._request_dict:
self.response_dict[id] = decoded
self._request_dict[id].set()
if request_id in self._request_dict:
self.response_dict[request_id] = decoded
self._request_dict[request_id].set()
else:
await self.close()
return None
asyncio.create_task(listener())
await asyncio.sleep(1)
@ -46,7 +67,9 @@ class DaemonProxy:
request_id = request["request_id"]
self._request_dict[request_id] = asyncio.Event()
string = dict_to_json_str(request)
asyncio.create_task(self.websocket.send(string))
if self.websocket is None:
raise Exception("Websocket is not connected")
asyncio.create_task(self.websocket.send_str(string))
async def timeout():
await asyncio.sleep(30)
@ -117,19 +140,24 @@ class DaemonProxy:
return response
async def close(self) -> None:
await self.websocket.close()
if self.websocket is not None:
await self.websocket.close()
if self.client_session is not None:
await self.client_session.close()
async def exit(self) -> WsRpcMessage:
request = self.format_request("exit", {})
return await self._get(request)
async def connect_to_daemon(self_hostname: str, daemon_port: int, ssl_context: ssl.SSLContext) -> DaemonProxy:
async def connect_to_daemon(
self_hostname: str, daemon_port: int, max_message_size: int, ssl_context: ssl.SSLContext
) -> DaemonProxy:
"""
Connect to the local daemon.
"""
client = DaemonProxy(f"wss://{self_hostname}:{daemon_port}", ssl_context)
client = DaemonProxy(f"wss://{self_hostname}:{daemon_port}", ssl_context, max_message_size)
await client.start()
return client
@ -143,12 +171,15 @@ async def connect_to_daemon_and_validate(root_path: Path, quiet: bool = False) -
try:
net_config = load_config(root_path, "config.yaml")
daemon_max_message_size = net_config.get("daemon_max_message_size", 50 * 1000 * 1000)
crt_path = root_path / net_config["daemon_ssl"]["private_crt"]
key_path = root_path / net_config["daemon_ssl"]["private_key"]
ca_crt_path = root_path / net_config["private_ssl_ca"]["crt"]
ca_key_path = root_path / net_config["private_ssl_ca"]["key"]
ssl_context = ssl_context_for_client(ca_crt_path, ca_key_path, crt_path, key_path)
connection = await connect_to_daemon(net_config["self_hostname"], net_config["daemon_port"], ssl_context)
connection = await connect_to_daemon(
net_config["self_hostname"], net_config["daemon_port"], daemon_max_message_size, ssl_context
)
r = await connection.ping()
if "value" in r["data"] and r["data"]["value"] == "pong":
@ -168,7 +199,6 @@ async def acquire_connection_to_daemon(root_path: Path, quiet: bool = False):
block exits scope, execution resumes in this function, wherein the connection is
closed.
"""
from chia.daemon.client import connect_to_daemon_and_validate
daemon: Optional[DaemonProxy] = None
try:

View File

@ -14,8 +14,7 @@ from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, TextIO, Tuple, cast
from websockets import ConnectionClosedOK, WebSocketException, WebSocketServerProtocol, serve
from chia import __version__
from chia.cmds.init_funcs import check_keys, chia_init
from chia.cmds.passphrase_funcs import default_passphrase, using_default_passphrase
from chia.daemon.keychain_server import KeychainServer, keychain_commands
@ -39,12 +38,12 @@ from chia.util.path import mkdir
from chia.util.service_groups import validate_service
from chia.util.setproctitle import setproctitle
from chia.util.ws_message import WsRpcMessage, create_payload, format_response
from chia import __version__
io_pool_exc = ThreadPoolExecutor()
try:
from aiohttp import ClientSession, web
from aiohttp import ClientSession, WSMsgType, web
from aiohttp.web_ws import WebSocketResponse
except ModuleNotFoundError:
print("Error: Make sure to run . ./activate from the project folder before starting Chia.")
quit()
@ -134,24 +133,24 @@ class WebSocketServer:
ca_key_path: Path,
crt_path: Path,
key_path: Path,
shutdown_event: asyncio.Event,
run_check_keys_on_unlock: bool = False,
):
self.root_path = root_path
self.log = log
self.services: Dict = dict()
self.plots_queue: List[Dict] = []
self.connections: Dict[str, List[WebSocketServerProtocol]] = dict() # service_name : [WebSocket]
self.remote_address_map: Dict[WebSocketServerProtocol, str] = dict() # socket: service_name
self.ping_job: Optional[asyncio.Task] = None
self.connections: Dict[str, List[WebSocketResponse]] = dict() # service_name : [WebSocket]
self.remote_address_map: Dict[WebSocketResponse, str] = dict() # socket: service_name
self.net_config = load_config(root_path, "config.yaml")
self.self_hostname = self.net_config["self_hostname"]
self.daemon_port = self.net_config["daemon_port"]
self.daemon_max_message_size = self.net_config.get("daemon_max_message_size", 50 * 1000 * 1000)
self.websocket_server = None
self.websocket_runner: Optional[web.AppRunner] = None
self.ssl_context = ssl_context_for_server(ca_crt_path, ca_key_path, crt_path, key_path, log=self.log)
self.shut_down = False
self.keychain_server = KeychainServer()
self.run_check_keys_on_unlock = run_check_keys_on_unlock
self.shutdown_event = shutdown_event
async def start(self):
self.log.info("Starting Daemon Server")
@ -184,16 +183,19 @@ class WebSocketServer:
except NotImplementedError:
self.log.info("Not implemented")
self.websocket_server = await serve(
self.safe_handle,
self.self_hostname,
self.daemon_port,
max_size=self.daemon_max_message_size,
ping_interval=500,
ping_timeout=300,
ssl=self.ssl_context,
app = web.Application(client_max_size=self.daemon_max_message_size)
app.add_routes([web.get("/", self.incoming_connection)])
self.websocket_runner = web.AppRunner(app, access_log=None, logger=self.log, keepalive_timeout=300)
await self.websocket_runner.setup()
site = web.TCPSite(
self.websocket_runner,
host=self.self_hostname,
port=self.daemon_port,
shutdown_timeout=3,
ssl_context=self.ssl_context,
)
self.log.info("Waiting Daemon WebSocketServer closure")
await site.start()
def cancel_task_safe(self, task: Optional[asyncio.Task]):
if task is not None:
@ -203,22 +205,28 @@ class WebSocketServer:
self.log.error(f"Error while canceling task.{e} {task}")
async def stop(self) -> Dict[str, Any]:
self.shut_down = True
self.cancel_task_safe(self.ping_job)
await self.exit()
if self.websocket_server is not None:
self.websocket_server.close()
jobs = []
for service_name in self.services.keys():
jobs.append(kill_service(self.root_path, self.services, service_name))
if jobs:
await asyncio.wait(jobs)
self.services.clear()
asyncio.create_task(self.exit())
return {"success": True}
async def safe_handle(self, websocket: WebSocketServerProtocol, path: str):
service_name = ""
try:
async for message in websocket:
async def incoming_connection(self, request):
ws: WebSocketResponse = web.WebSocketResponse(max_msg_size=self.daemon_max_message_size, heartbeat=30)
await ws.prepare(request)
while True:
msg = await ws.receive()
self.log.debug(f"Received message: {msg}")
if msg.type == WSMsgType.TEXT:
try:
decoded = json.loads(message)
decoded = json.loads(msg.data)
if "data" not in decoded:
decoded["data"] = {}
response, sockets_to_use = await self.handle_message(websocket, decoded)
response, sockets_to_use = await self.handle_message(ws, decoded)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Error while handling message: {tb}")
@ -228,28 +236,27 @@ class WebSocketServer:
if len(sockets_to_use) > 0:
for socket in sockets_to_use:
try:
await socket.send(response)
await socket.send_str(response)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Unexpected exception trying to send to websocket: {e} {tb}")
self.remove_connection(socket)
await socket.close()
except Exception as e:
tb = traceback.format_exc()
service_name = "Unknown"
if websocket in self.remote_address_map:
service_name = self.remote_address_map[websocket]
if isinstance(e, ConnectionClosedOK):
self.log.info(f"ConnectionClosedOk. Closing websocket with {service_name} {e}")
elif isinstance(e, WebSocketException):
self.log.info(f"Websocket exception. Closing websocket with {service_name} {e} {tb}")
break
else:
self.log.error(f"Unexpected exception in websocket: {e} {tb}")
finally:
self.remove_connection(websocket)
await websocket.close()
service_name = "Unknown"
if ws in self.remote_address_map:
service_name = self.remote_address_map[ws]
if msg.type == WSMsgType.CLOSE:
self.log.info(f"ConnectionClosed. Closing websocket with {service_name}")
elif msg.type == WSMsgType.ERROR:
self.log.info(f"Websocket exception. Closing websocket with {service_name}. {ws.exception()}")
def remove_connection(self, websocket: WebSocketServerProtocol):
self.remove_connection(ws)
await ws.close()
break
def remove_connection(self, websocket: WebSocketResponse):
service_name = None
if websocket in self.remote_address_map:
service_name = self.remote_address_map[websocket]
@ -263,31 +270,8 @@ class WebSocketServer:
after_removal.append(connection)
self.connections[service_name] = after_removal
async def ping_task(self) -> None:
restart = True
await asyncio.sleep(30)
for remote_address, service_name in self.remote_address_map.items():
if service_name in self.connections:
sockets = self.connections[service_name]
for socket in sockets:
if socket.remote_address[1] == remote_address:
try:
self.log.info(f"About to ping: {service_name}")
await socket.ping()
except asyncio.CancelledError:
self.log.info("Ping task received Cancel")
restart = False
break
except Exception as e:
self.log.info(f"Ping error: {e}")
self.log.warning("Ping failed, connection closed.")
self.remove_connection(socket)
await socket.close()
if restart is True:
self.ping_job = asyncio.create_task(self.ping_task())
async def handle_message(
self, websocket: WebSocketServerProtocol, message: WsRpcMessage
self, websocket: WebSocketResponse, message: WsRpcMessage
) -> Tuple[Optional[str], List[Any]]:
"""
This function gets called when new message is received via websocket.
@ -631,7 +615,7 @@ class WebSocketServer:
for websocket in websockets:
try:
await websocket.send(response)
await websocket.send_str(response)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Unexpected exception trying to send to websocket: {e} {tb}")
@ -690,7 +674,7 @@ class WebSocketServer:
for websocket in websockets:
try:
await websocket.send(response)
await websocket.send_str(response)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Unexpected exception trying to send to websocket: {e} {tb}")
@ -1150,20 +1134,13 @@ class WebSocketServer:
return response
async def exit(self) -> Dict[str, Any]:
jobs = []
for k in self.services.keys():
jobs.append(kill_service(self.root_path, self.services, k))
if jobs:
await asyncio.wait(jobs)
self.services.clear()
async def exit(self) -> None:
if self.websocket_runner is not None:
await self.websocket_runner.cleanup()
self.shutdown_event.set()
log.info("chia daemon exiting")
response = {"success": True}
return response
async def register_service(self, websocket: WebSocketServerProtocol, request: Dict[str, Any]) -> Dict[str, Any]:
async def register_service(self, websocket: WebSocketResponse, request: Dict[str, Any]) -> Dict[str, Any]:
self.log.info(f"Register service {request}")
service = request["service"]
if service not in self.connections:
@ -1179,8 +1156,6 @@ class WebSocketServer:
}
else:
self.remote_address_map[websocket] = service
if self.ping_job is None:
self.ping_job = asyncio.create_task(self.ping_task())
self.log.info(f"registered for service {service}")
log.info(f"{response}")
return response
@ -1351,7 +1326,6 @@ async def kill_service(
if process is None:
return False
del services[service_name]
result = await kill_process(process, root_path, service_name, "", delay_before_kill)
return result
@ -1361,68 +1335,6 @@ def is_running(services: Dict[str, subprocess.Popen], service_name: str) -> bool
return process is not None and process.poll() is None
def create_server_for_daemon(root_path: Path):
routes = web.RouteTableDef()
services: Dict = dict()
@routes.get("/daemon/ping/")
async def ping(request: web.Request) -> web.Response:
return web.Response(text="pong")
@routes.get("/daemon/service/start/")
async def start_service(request: web.Request) -> web.Response:
service_name = request.query.get("service")
if service_name is None or not validate_service(service_name):
r = f"{service_name} unknown service"
return web.Response(text=str(r))
if is_running(services, service_name):
r = f"{service_name} already running"
return web.Response(text=str(r))
try:
process, pid_path = launch_service(root_path, service_name)
services[service_name] = process
r = f"{service_name} started"
except (subprocess.SubprocessError, IOError):
log.exception(f"problem starting {service_name}")
r = f"{service_name} start failed"
return web.Response(text=str(r))
@routes.get("/daemon/service/stop/")
async def stop_service(request: web.Request) -> web.Response:
service_name = request.query.get("service")
if service_name is None:
r = f"{service_name} unknown service"
return web.Response(text=str(r))
r = str(await kill_service(root_path, services, service_name))
return web.Response(text=str(r))
@routes.get("/daemon/service/is_running/")
async def is_running_handler(request: web.Request) -> web.Response:
service_name = request.query.get("service")
if service_name is None:
r = f"{service_name} unknown service"
return web.Response(text=str(r))
r = str(is_running(services, service_name))
return web.Response(text=str(r))
@routes.get("/daemon/exit/")
async def exit(request: web.Request):
jobs = []
for k in services.keys():
jobs.append(kill_service(root_path, services, k))
if jobs:
await asyncio.wait(jobs)
services.clear()
# we can't await `site.stop()` here because that will cause a deadlock, waiting for this
# request to exit
def singleton(lockfile: Path, text: str = "semaphore") -> Optional[TextIO]:
"""
Open a lockfile exclusively.
@ -1474,14 +1386,20 @@ async def async_run_daemon(root_path: Path, wait_for_unlock: bool = False) -> in
print("daemon: already launching")
return 2
shutdown_event = asyncio.Event()
# TODO: clean this up, ensuring lockfile isn't removed until the listen port is open
create_server_for_daemon(root_path)
ws_server = WebSocketServer(
root_path, ca_crt_path, ca_key_path, crt_path, key_path, run_check_keys_on_unlock=wait_for_unlock
root_path,
ca_crt_path,
ca_key_path,
crt_path,
key_path,
shutdown_event,
run_check_keys_on_unlock=wait_for_unlock,
)
await ws_server.start()
assert ws_server.websocket_server is not None
await ws_server.websocket_server.wait_closed()
await shutdown_event.wait()
log.info("Daemon WebSocketServer closed")
# sys.stdout.close()
return 0

View File

@ -217,6 +217,9 @@ class Farmer:
await self.cache_clear_task
if self.update_pool_state_task is not None:
await self.update_pool_state_task
if self.keychain_proxy is not None:
await self.keychain_proxy.close()
await asyncio.sleep(0.5) # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
self.started = False
def _set_state_changed_callback(self, callback: Callable):

View File

@ -5,7 +5,7 @@ import traceback
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import aiohttp
from aiohttp import ClientConnectorError, ClientSession, ClientWebSocketResponse, WSMsgType, web
from chia.rpc.util import wrap_http_handler
from chia.server.outbound_message import NodeType
@ -17,6 +17,7 @@ from chia.util.json_util import dict_to_json_str
from chia.util.ws_message import create_payload, create_payload_dict, format_response, pong
log = logging.getLogger(__name__)
max_message_size = 50 * 1024 * 1024 # 50MB
class RpcServer:
@ -29,7 +30,8 @@ class RpcServer:
self.stop_cb: Callable = stop_cb
self.log = log
self.shut_down = False
self.websocket: Optional[aiohttp.ClientWebSocketResponse] = None
self.websocket: Optional[ClientWebSocketResponse] = None
self.client_session: Optional[ClientSession] = None
self.service_name = service_name
self.root_path = root_path
self.net_config = net_config
@ -45,6 +47,8 @@ class RpcServer:
self.shut_down = True
if self.websocket is not None:
await self.websocket.close()
if self.client_session is not None:
await self.client_session.close()
async def _state_changed(self, *args):
if self.websocket is None:
@ -168,7 +172,7 @@ class RpcServer:
async def close_connection(self, request: Dict):
node_id = hexstr_to_bytes(request["node_id"])
if self.rpc_api.service.server is None:
raise aiohttp.web.HTTPInternalServerError()
raise web.HTTPInternalServerError()
connections_to_close = [c for c in self.rpc_api.service.server.get_connections() if c.peer_node_id == node_id]
if len(connections_to_close) == 0:
raise ValueError(f"Connection with node_id {node_id.hex()} does not exist")
@ -243,52 +247,52 @@ class RpcServer:
while True:
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.type == WSMsgType.TEXT:
message = msg.data.strip()
# self.log.info(f"received message: {message}")
await self.safe_handle(ws, message)
elif msg.type == aiohttp.WSMsgType.BINARY:
elif msg.type == WSMsgType.BINARY:
self.log.debug("Received binary data")
elif msg.type == aiohttp.WSMsgType.PING:
elif msg.type == WSMsgType.PING:
self.log.debug("Ping received")
await ws.pong()
elif msg.type == aiohttp.WSMsgType.PONG:
elif msg.type == WSMsgType.PONG:
self.log.debug("Pong received")
else:
if msg.type == aiohttp.WSMsgType.CLOSE:
if msg.type == WSMsgType.CLOSE:
self.log.debug("Closing RPC websocket")
await ws.close()
elif msg.type == aiohttp.WSMsgType.ERROR:
elif msg.type == WSMsgType.ERROR:
self.log.error("Error during receive %s" % ws.exception())
elif msg.type == aiohttp.WSMsgType.CLOSED:
elif msg.type == WSMsgType.CLOSED:
pass
break
await ws.close()
async def connect_to_daemon(self, self_hostname: str, daemon_port: uint16):
while True:
while not self.shut_down:
try:
if self.shut_down:
break
async with aiohttp.ClientSession() as session:
async with session.ws_connect(
f"wss://{self_hostname}:{daemon_port}",
autoclose=True,
autoping=True,
heartbeat=60,
ssl_context=self.ssl_context,
max_msg_size=100 * 1024 * 1024,
) as ws:
self.websocket = ws
await self.connection(ws)
self.websocket = None
except aiohttp.ClientConnectorError:
self.client_session = ClientSession()
self.websocket = await self.client_session.ws_connect(
f"wss://{self_hostname}:{daemon_port}",
autoclose=True,
autoping=True,
heartbeat=60,
ssl_context=self.ssl_context,
max_msg_size=max_message_size,
)
await self.connection(self.websocket)
except ClientConnectorError:
self.log.warning(f"Cannot connect to daemon at ws://{self_hostname}:{daemon_port}")
except Exception as e:
tb = traceback.format_exc()
self.log.warning(f"Exception: {tb} {type(e)}")
if self.websocket is not None:
await self.websocket.close()
if self.client_session is not None:
await self.client_session.close()
self.websocket = None
self.client_session = None
await asyncio.sleep(2)
@ -306,17 +310,15 @@ async def start_rpc_server(
Starts an HTTP server with the following RPC methods, to be used by local clients to
query the node.
"""
app = aiohttp.web.Application()
app = web.Application()
rpc_server = RpcServer(rpc_api, rpc_api.service_name, stop_cb, root_path, net_config)
rpc_server.rpc_api.service._set_state_changed_callback(rpc_server.state_changed)
app.add_routes(
[aiohttp.web.post(route, wrap_http_handler(func)) for (route, func) in rpc_server.get_routes().items()]
)
app.add_routes([web.post(route, wrap_http_handler(func)) for (route, func) in rpc_server.get_routes().items()])
if connect_to_daemon:
daemon_connection = asyncio.create_task(rpc_server.connect_to_daemon(self_hostname, daemon_port))
runner = aiohttp.web.AppRunner(app, access_log=None)
runner = web.AppRunner(app, access_log=None)
await runner.setup()
site = aiohttp.web.TCPSite(runner, self_hostname, int(rpc_port), ssl_context=rpc_server.ssl_context)
site = web.TCPSite(runner, self_hostname, int(rpc_port), ssl_context=rpc_server.ssl_context)
await site.start()
async def cleanup():

View File

@ -33,6 +33,8 @@ from chia.util.ints import uint16
from chia.util.network import is_in_network, is_localhost
from chia.util.ssl_check import verify_ssl_certs_and_keys
max_message_size = 50 * 1024 * 1024 # 50MB
def ssl_context_for_server(
ca_cert: Path,
@ -277,7 +279,7 @@ class ChiaServer:
if request.remote in self.banned_peers and time.time() < self.banned_peers[request.remote]:
self.log.warning(f"Peer {request.remote} is banned, refusing connection")
return None
ws = web.WebSocketResponse(max_msg_size=50 * 1024 * 1024)
ws = web.WebSocketResponse(max_msg_size=max_message_size)
await ws.prepare(request)
close_event = asyncio.Event()
cert_bytes = request.transport._ssl_protocol._extra["ssl_object"].getpeercert(True)
@ -422,7 +424,7 @@ class ChiaServer:
self.log.debug(f"Connecting: {url}, Peer info: {target_node}")
try:
ws = await session.ws_connect(
url, autoclose=True, autoping=True, heartbeat=60, ssl=ssl_context, max_msg_size=50 * 1024 * 1024
url, autoclose=True, autoping=True, heartbeat=60, ssl=ssl_context, max_msg_size=max_message_size
)
except ServerDisconnectedError:
self.log.debug(f"Server disconnected error connecting to {url}. Perhaps we are banned by the peer.")

View File

@ -6,33 +6,32 @@ import time
import traceback
from asyncio import CancelledError
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set, Tuple, Any, Iterator
from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple
from blspy import PrivateKey, AugSchemeMPL
from blspy import AugSchemeMPL, PrivateKey
from packaging.version import Version
from chia.consensus.block_record import BlockRecord
from chia.consensus.blockchain import ReceiveBlockResult
from chia.consensus.constants import ConsensusConstants
from chia.daemon.keychain_proxy import (
KeychainProxy,
KeychainProxyConnectionFailure,
KeyringIsEmpty,
connect_to_keychain_and_validate,
wrap_local_keychain,
KeychainProxy,
KeyringIsEmpty,
)
from chia.util.chunks import chunks
from chia.protocols import wallet_protocol
from chia.protocols.full_node_protocol import RequestProofOfWeight, RespondProofOfWeight
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.protocols.wallet_protocol import (
RespondToCoinUpdates,
CoinState,
RespondToPhUpdates,
RespondBlockHeader,
RequestSESInfo,
RespondSESInfo,
RequestHeaderBlocks,
RequestSESInfo,
RespondBlockHeader,
RespondSESInfo,
RespondToCoinUpdates,
RespondToPhUpdates,
)
from chia.server.node_discovery import WalletPeers
from chia.server.outbound_message import Message, NodeType, make_msg
@ -46,29 +45,30 @@ from chia.types.coin_spend import CoinSpend
from chia.types.header_block import HeaderBlock
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.peer_info import PeerInfo
from chia.types.weight_proof import WeightProof, SubEpochData
from chia.types.weight_proof import SubEpochData, WeightProof
from chia.util.byte_types import hexstr_to_bytes
from chia.util.chunks import chunks
from chia.util.config import WALLET_PEERS_PATH_KEY_DEPRECATED
from chia.util.default_root import STANDALONE_ROOT_PATH
from chia.util.ints import uint32, uint64
from chia.util.keychain import KeyringIsLocked, Keychain
from chia.util.keychain import Keychain, KeyringIsLocked
from chia.util.path import mkdir, path_from_root
from chia.wallet.util.new_peak_queue import NewPeakQueue, NewPeakQueueTypes, NewPeakItem
from chia.util.profiler import profile_task
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.util.new_peak_queue import NewPeakItem, NewPeakQueue, NewPeakQueueTypes
from chia.wallet.util.peer_request_cache import PeerRequestCache, can_use_peer_request_cache
from chia.wallet.util.wallet_sync_utils import (
request_and_validate_removals,
request_and_validate_additions,
fetch_last_tx_from_peer,
subscribe_to_phs,
subscribe_to_coin_updates,
last_change_height_cs,
fetch_header_blocks_in_range,
fetch_last_tx_from_peer,
last_change_height_cs,
request_and_validate_additions,
request_and_validate_removals,
subscribe_to_coin_updates,
subscribe_to_phs,
)
from chia.wallet.wallet_action import WalletAction
from chia.wallet.wallet_coin_record import WalletCoinRecord
from chia.wallet.wallet_state_manager import WalletStateManager
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.wallet_action import WalletAction
from chia.util.profiler import profile_task
class WalletNode:
@ -269,6 +269,9 @@ class WalletNode:
if self.wallet_state_manager is not None:
await self.wallet_state_manager._await_closed()
self.wallet_state_manager = None
if self.keychain_proxy is not None:
await self.keychain_proxy.close()
await asyncio.sleep(0.5) # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
self.logged_in = False
self.wallet_peers = None

View File

@ -27,7 +27,6 @@ dependencies = [
"PyYAML==5.4.1", # Used for config file format
"setproctitle==1.2.2", # Gives the chia processes readable names
"sortedcontainers==2.4.0", # For maintaining sorted mempools
"websockets==8.1.0", # For use in wallet RPC and electron UI
# TODO: when moving to click 8 remove the pinning of black noted below
"click==7.1.2", # For the CLI
"dnspythonchia==2.2.0", # Query DNS seeds

View File

@ -9,7 +9,9 @@ class TestDaemonRpc:
async def test_get_version_rpc(self, get_daemon, bt):
ws_server = get_daemon
config = bt.config
client = await connect_to_daemon(config["self_hostname"], config["daemon_port"], bt.get_daemon_ssl_context())
client = await connect_to_daemon(
config["self_hostname"], config["daemon_port"], 50 * 1000 * 1000, bt.get_daemon_ssl_context()
)
response = await client.get_version()
assert response["data"]["success"]

View File

@ -1,6 +1,5 @@
import logging
import asyncio
import logging
from secrets import token_bytes
from typing import Dict, List
@ -8,23 +7,23 @@ from chia.consensus.constants import ConsensusConstants
from chia.full_node.full_node_api import FullNodeAPI
from chia.server.start_service import Service
from chia.server.start_wallet import service_kwargs_for_wallet
from tests.block_tools import create_block_tools_async, test_constants, BlockTools
from tests.setup_services import (
setup_full_node,
setup_harvester,
setup_farmer,
setup_introducer,
setup_vdf_clients,
setup_timelord,
setup_vdf_client,
setup_daemon,
)
from tests.util.keyring import TempKeyring
from tests.util.socket import find_available_listen_port
from chia.util.hash import std_hash
from chia.util.ints import uint16, uint32
from chia.util.keychain import bytes_to_mnemonic
from tests.block_tools import BlockTools, create_block_tools_async, test_constants
from tests.setup_services import (
setup_daemon,
setup_farmer,
setup_full_node,
setup_harvester,
setup_introducer,
setup_timelord,
setup_vdf_client,
setup_vdf_clients,
)
from tests.time_out_assert import time_out_assert_custom_interval
from tests.util.keyring import TempKeyring
from tests.util.socket import find_available_listen_port
def cleanup_keyring(keyring: TempKeyring):

View File

@ -6,7 +6,7 @@ from secrets import token_bytes
from typing import AsyncGenerator, Optional
from chia.consensus.constants import ConsensusConstants
from chia.daemon.server import WebSocketServer, create_server_for_daemon, daemon_launch_lock_path, singleton
from chia.daemon.server import WebSocketServer, daemon_launch_lock_path, singleton
from chia.server.start_farmer import service_kwargs_for_farmer
from chia.server.start_full_node import service_kwargs_for_full_node
from chia.server.start_harvester import service_kwargs_for_harvester
@ -36,8 +36,8 @@ async def setup_daemon(btools: BlockTools) -> AsyncGenerator[WebSocketServer, No
ca_crt_path = root_path / config["private_ssl_ca"]["crt"]
ca_key_path = root_path / config["private_ssl_ca"]["key"]
assert lockfile is not None
create_server_for_daemon(btools.root_path)
ws_server = WebSocketServer(root_path, ca_crt_path, ca_key_path, crt_path, key_path)
shutdown_event = asyncio.Event()
ws_server = WebSocketServer(root_path, ca_crt_path, ca_key_path, crt_path, key_path, shutdown_event)
await ws_server.start()
yield ws_server