Speedup connections 2. (#5421)

* Decrease sleeps.

* Wait 30 minutes before retrying address.

* Address comments.

* Address comments.

* Sleep in node discovery if we don't need connections.

* Rename sleeps.

* Lint.

* Refactor sleeps.

* Don't query introducers at startup.
This commit is contained in:
Florin Chirica 2021-05-20 20:38:35 +03:00 committed by GitHub
parent 15d4938a8c
commit 4ad8d9dcef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 30 deletions

View File

@ -24,7 +24,7 @@ from chia.util.path import mkdir, path_from_root
MAX_PEERS_RECEIVED_PER_REQUEST = 1000
MAX_TOTAL_PEERS_RECEIVED = 3000
MAX_CONCURRENT_OUTBOUND_CONNECTIONS = 100
MAX_CONCURRENT_OUTBOUND_CONNECTIONS = 70
class FullNodeDiscovery:
@ -145,10 +145,9 @@ class FullNodeDiscovery:
await self.address_manager.connect(peer_info)
def _num_needed_peers(self) -> int:
diff = self.target_outbound_count
outgoing = self.server.get_outgoing_connections()
diff -= len(outgoing)
return diff if diff >= 0 else 0
target = self.target_outbound_count
outgoing = len(self.server.get_full_node_outgoing_connections())
return max(0, target - outgoing)
"""
Uses the Poisson distribution to determine the next time
@ -236,7 +235,7 @@ class FullNodeDiscovery:
# We don't know any address, connect to the introducer to get some.
size = await self.address_manager.size()
if size == 0 or retry_introducers or introducer_attempts == 0:
if size == 0 or retry_introducers:
try:
await asyncio.sleep(introducer_backoff)
except asyncio.CancelledError:
@ -269,8 +268,8 @@ class FullNodeDiscovery:
introducer_backoff = 1
# Only connect out to one peer per network group (/16 for IPv4).
groups = []
full_node_connected = self.server.get_full_node_connections()
groups = set()
full_node_connected = self.server.get_full_node_outgoing_connections()
connected = [c.get_peer_info() for c in full_node_connected]
connected = [c for c in connected if c is not None]
for conn in full_node_connected:
@ -278,8 +277,7 @@ class FullNodeDiscovery:
if peer is None:
continue
group = peer.get_group()
if group not in groups:
groups.append(group)
groups.add(group)
# Feeler Connections
#
@ -310,14 +308,11 @@ class FullNodeDiscovery:
max_tries = 10
elif len(groups) <= 5:
max_tries = 25
select_peer_interval = max(0.1, len(groups) * 0.25)
while not got_peer and not self.is_closed:
sleep_interval = 1 + len(groups) * 0.5
sleep_interval = min(sleep_interval, self.peer_connect_interval)
# Special case: try to find our first peer much quicker.
if len(groups) == 0:
sleep_interval = 0.1
self.log.debug(f"Address manager query count: {tries}. Query limit: {max_tries}")
try:
await asyncio.sleep(sleep_interval)
await asyncio.sleep(select_peer_interval)
except asyncio.CancelledError:
return None
tries += 1
@ -348,10 +343,8 @@ class FullNodeDiscovery:
if addr in connected:
addr = None
continue
# only consider very recently tried nodes after 30 failed attempts
# attempt a node once per 30 minutes if we lack connections to increase the chance
# to try all the peer table.
if now - info.last_try < 1800 and tries < 30:
# attempt a node once per 30 minutes.
if now - info.last_try < 1800:
continue
if time.time() - last_timestamp_local_info > 1800 or local_peerinfo is None:
local_peerinfo = await self.server.get_peer_info()
@ -359,23 +352,28 @@ class FullNodeDiscovery:
if local_peerinfo is not None and addr == local_peerinfo:
continue
got_peer = True
self.log.debug(f"Addrman selected address: {addr}.")
disconnect_after_handshake = is_feeler
if self._num_needed_peers() == 0:
extra_peers_needed = self._num_needed_peers()
if extra_peers_needed == 0:
disconnect_after_handshake = True
retry_introducers = False
initiate_connection = self._num_needed_peers() > 0 or has_collision or is_feeler
sleep_interval = 1 + len(groups) * 0.5
sleep_interval = min(sleep_interval, self.peer_connect_interval)
# Special case: try to find our first peer much quicker.
if len(groups) == 0:
sleep_interval = 0.1
self.log.debug(f"Num peers needed: {extra_peers_needed}")
initiate_connection = extra_peers_needed > 0 or has_collision or is_feeler
connect_peer_interval = max(0.25, len(groups) * 0.5)
if not initiate_connection:
connect_peer_interval += 15
connect_peer_interval = min(connect_peer_interval, self.peer_connect_interval)
if addr is not None and initiate_connection:
while len(self.pending_outbound_connections) >= MAX_CONCURRENT_OUTBOUND_CONNECTIONS:
self.log.debug(f"Max concurrent outbound connections reached. Retrying in {sleep_interval}s.")
await asyncio.sleep(sleep_interval)
self.log.debug(
f"Max concurrent outbound connections reached. Retrying in {connect_peer_interval}s."
)
await asyncio.sleep(connect_peer_interval)
self.log.debug(f"Creating connection task with {addr}.")
asyncio.create_task(self.start_client_async(addr, disconnect_after_handshake))
await asyncio.sleep(sleep_interval)
await asyncio.sleep(connect_peer_interval)
except Exception as e:
self.log.error(f"Exception in create outbound connections: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")

View File

@ -606,6 +606,14 @@ class ChiaServer:
return result
def get_full_node_outgoing_connections(self) -> List[WSChiaConnection]:
result = []
connections = self.get_full_node_connections()
for connection in connections:
if connection.is_outbound:
result.append(connection)
return result
def get_full_node_connections(self) -> List[WSChiaConnection]:
return list(self.connection_by_type[NodeType.FULL_NODE].values())