Async connection close handlers (#17017)

This commit is contained in:
Rigidity 2023-12-11 09:27:25 -05:00 committed by GitHub
parent 84169a881d
commit 41047bddc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 11 additions and 9 deletions

View File

@ -313,7 +313,7 @@ class Farmer:
value=ErrorResponse(uint16(PoolErrorCode.REQUEST_FAILED.value), error_message).to_json_dict(), value=ErrorResponse(uint16(PoolErrorCode.REQUEST_FAILED.value), error_message).to_json_dict(),
) )
def on_disconnect(self, connection: WSChiaConnection) -> None: async def on_disconnect(self, connection: WSChiaConnection) -> None:
self.log.info(f"peer disconnected {connection.get_peer_logging()}") self.log.info(f"peer disconnected {connection.get_peer_logging()}")
self.state_changed("close_connection", {}) self.state_changed("close_connection", {})
if connection.connection_type is NodeType.HARVESTER: if connection.connection_type is NodeType.HARVESTER:

View File

@ -895,7 +895,7 @@ class FullNode:
elif connection.connection_type is NodeType.TIMELORD: elif connection.connection_type is NodeType.TIMELORD:
await self.send_peak_to_timelords() await self.send_peak_to_timelords()
def on_disconnect(self, connection: WSChiaConnection) -> None: async def on_disconnect(self, connection: WSChiaConnection) -> None:
self.log.info(f"peer disconnected {connection.get_peer_logging()}") self.log.info(f"peer disconnected {connection.get_peer_logging()}")
self._state_changed("close_connection") self._state_changed("close_connection")
self._state_changed("sync_mode") self._state_changed("sync_mode")

View File

@ -173,7 +173,7 @@ class Harvester:
if event == PlotRefreshEvents.done: if event == PlotRefreshEvents.done:
self.plot_sync_sender.sync_done(update_result.removed, update_result.duration) self.plot_sync_sender.sync_done(update_result.removed, update_result.duration)
def on_disconnect(self, connection: WSChiaConnection) -> None: async def on_disconnect(self, connection: WSChiaConnection) -> None:
self.log.info(f"peer disconnected {connection.get_peer_logging()}") self.log.info(f"peer disconnected {connection.get_peer_logging()}")
self.state_changed("close_connection") self.state_changed("close_connection")
self.plot_sync_sender.stop() self.plot_sync_sender.stop()

View File

@ -522,7 +522,9 @@ class ChiaServer:
return False return False
def connection_closed(self, connection: WSChiaConnection, ban_time: int, closed_connection: bool = False) -> None: async def connection_closed(
self, connection: WSChiaConnection, ban_time: int, closed_connection: bool = False
) -> None:
# closed_connection is true if the callback is being called with a connection that was previously closed # closed_connection is true if the callback is being called with a connection that was previously closed
# in this case we still want to do the banning logic and remove the conection from the list # in this case we still want to do the banning logic and remove the conection from the list
# but the other cleanup should already have been done so we skip that # but the other cleanup should already have been done so we skip that
@ -555,7 +557,7 @@ class ChiaServer:
connection.cancel_tasks() connection.cancel_tasks()
on_disconnect = getattr(self.node, "on_disconnect", None) on_disconnect = getattr(self.node, "on_disconnect", None)
if on_disconnect is not None: if on_disconnect is not None:
on_disconnect(connection) await on_disconnect(connection)
async def validate_broadcast_message_type(self, messages: List[Message], node_type: NodeType) -> None: async def validate_broadcast_message_type(self, messages: List[Message], node_type: NodeType) -> None:
for message in messages: for message in messages:

View File

@ -52,7 +52,7 @@ def create_default_last_message_time_dict() -> Dict[ProtocolMessageTypes, float]
class ConnectionClosedCallbackProtocol(Protocol): class ConnectionClosedCallbackProtocol(Protocol):
def __call__( async def __call__(
self, self,
connection: WSChiaConnection, connection: WSChiaConnection,
ban_time: int, ban_time: int,
@ -277,7 +277,7 @@ class WSChiaConnection:
with log_exceptions(self.log, consume=True): with log_exceptions(self.log, consume=True):
self.log.debug(f"Closing already closed connection for {self.peer_info.host}") self.log.debug(f"Closing already closed connection for {self.peer_info.host}")
if self.close_callback is not None: if self.close_callback is not None:
self.close_callback(self, ban_time, closed_connection=True) await self.close_callback(self, ban_time, closed_connection=True)
self._close_event.set() self._close_event.set()
return None return None
self.closed = True self.closed = True
@ -307,7 +307,7 @@ class WSChiaConnection:
finally: finally:
with log_exceptions(self.log, consume=True): with log_exceptions(self.log, consume=True):
if self.close_callback is not None: if self.close_callback is not None:
self.close_callback(self, ban_time, closed_connection=False) await self.close_callback(self, ban_time, closed_connection=False)
self._close_event.set() self._close_event.set()
async def wait_until_closed(self) -> None: async def wait_until_closed(self) -> None:

View File

@ -691,7 +691,7 @@ class WalletNode:
) )
asyncio.create_task(self.wallet_peers.start()) asyncio.create_task(self.wallet_peers.start())
def on_disconnect(self, peer: WSChiaConnection) -> None: async def on_disconnect(self, peer: WSChiaConnection) -> None:
if self.is_trusted(peer): if self.is_trusted(peer):
self.local_node_synced = False self.local_node_synced = False
self.initialize_wallet_peers() self.initialize_wallet_peers()