Remove _start_bg_tasks from full node.

This commit is contained in:
Richard Kiss 2020-05-20 11:26:58 -07:00 committed by wjblanke
parent f162bf4a7e
commit 2a3e4d6eea
2 changed files with 57 additions and 32 deletions

View File

@ -40,7 +40,6 @@ from src.types.header import Header, HeaderData
from src.types.header_block import HeaderBlock
from src.types.mempool_inclusion_status import MempoolInclusionStatus
from src.types.mempool_item import MempoolItem
from src.types.peer_info import PeerInfo
from src.types.program import Program
from src.types.proof_of_space import ProofOfSpace
from src.types.sized_bytes import bytes32
@ -268,35 +267,6 @@ class FullNode:
)
return diff if diff >= 0 else 0
def _start_bg_tasks(self):
"""
Start a background task connecting periodically to the introducer and
requesting the peer list.
"""
introducer = self.config["introducer_peer"]
introducer_peerinfo = PeerInfo(introducer["host"], introducer["port"])
async def introducer_client():
async def on_connect() -> OutboundMessageGenerator:
msg = Message("request_peers", full_node_protocol.RequestPeers())
yield OutboundMessage(NodeType.INTRODUCER, msg, Delivery.RESPOND)
while not self._shut_down:
# If we are still connected to introducer, disconnect
for connection in self.global_connections.get_connections():
if connection.connection_type == NodeType.INTRODUCER:
self.global_connections.close(connection)
# The first time connecting to introducer, keep trying to connect
if self._num_needed_peers():
if not await self.server.start_client(
introducer_peerinfo, on_connect
):
await asyncio.sleep(5)
continue
await asyncio.sleep(self.config["introducer_connect_interval"])
self.introducer_task = asyncio.create_task(introducer_client())
def _close(self):
self._shut_down = True
self.blockchain.shut_down()

View File

@ -4,21 +4,66 @@ import logging.config
import signal
import miniupnpc
from typing import AsyncGenerator
try:
import uvloop
except ImportError:
uvloop = None
from src.full_node.full_node import FullNode
from src.protocols import full_node_protocol
from src.rpc.full_node_rpc_server import start_full_node_rpc_server
from src.server.server import ChiaServer
from src.server.connection import NodeType
from src.server.outbound_message import Delivery, Message, NodeType, OutboundMessage
from src.util.logging import initialize_logging
from src.util.config import load_config_cli, load_config
from src.util.default_root import DEFAULT_ROOT_PATH
from src.util.setproctitle import setproctitle
from multiprocessing import freeze_support
from src.types.peer_info import PeerInfo
OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None]
def start_full_node_bg_task(
server,
peer_info,
global_connections,
introducer_connect_interval,
target_peer_count,
):
"""
Start a background task connecting periodically to the introducer and
requesting the peer list.
"""
def _num_needed_peers() -> int:
diff = target_peer_count - len(global_connections.get_full_node_connections())
return diff if diff >= 0 else 0
async def introducer_client():
async def on_connect() -> OutboundMessageGenerator:
msg = Message("request_peers", full_node_protocol.RequestPeers())
yield OutboundMessage(NodeType.INTRODUCER, msg, Delivery.RESPOND)
while True:
# If we are still connected to introducer, disconnect
for connection in global_connections.get_connections():
if connection.connection_type == NodeType.INTRODUCER:
global_connections.close(connection)
# The first time connecting to introducer, keep trying to connect
if _num_needed_peers():
if not await server.start_client(peer_info, on_connect):
await asyncio.sleep(5)
continue
await asyncio.sleep(introducer_connect_interval)
return asyncio.create_task(introducer_client())
async def async_main():
root_path = DEFAULT_ROOT_PATH
@ -87,7 +132,16 @@ async def async_main():
except NotImplementedError:
log.info("signal handlers unsupported")
full_node._start_bg_tasks()
introducer = config["introducer_peer"]
peer_info = PeerInfo(introducer["host"], introducer["port"])
bg_task = start_full_node_bg_task(
server,
peer_info,
server.global_connections,
config["introducer_connect_interval"],
config["target_peer_count"],
)
# Awaits for server and all connections to close
await server.await_closed()
@ -95,6 +149,7 @@ async def async_main():
# Stops the full node and closes DBs
await full_node._await_closed()
bg_task.cancel()
# Waits for the rpc server to close
if rpc_cleanup is not None: