Speedup outbound connections. (#5124)

* Initial commit.

* Fix typo.

* Retry each node once every 30 mins.

* Flake8.

* mypy.

* Bound number of concurrent outbound connections.

* Blank line.
This commit is contained in:
Florin Chirica 2021-05-16 09:07:39 +03:00 committed by GitHub
parent 39ef69643b
commit 7012ca9e07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -5,7 +5,7 @@ import traceback
from pathlib import Path
from random import Random
from secrets import randbits
from typing import Dict, Optional, List
from typing import Dict, Optional, List, Set
import aiosqlite
@ -24,6 +24,7 @@ from chia.util.path import mkdir, path_from_root
MAX_PEERS_RECEIVED_PER_REQUEST = 1000
MAX_TOTAL_PEERS_RECEIVED = 3000
MAX_CONCURRENT_OUTBOUND_CONNECTIONS = 100
class FullNodeDiscovery:
@ -63,6 +64,7 @@ class FullNodeDiscovery:
self.cleanup_task: Optional[asyncio.Task] = None
self.initial_wait: int = 0
self.resolver = dns.asyncresolver.Resolver()
self.pending_outbound_connections: Set = set()
async def initialize_address_manager(self) -> None:
mkdir(self.peer_db_path.parent)
@ -189,6 +191,34 @@ class FullNodeDiscovery:
except Exception as e:
self.log.error(f"Exception while querying DNS server: {e}")
async def start_client_async(self, addr: PeerInfo, is_feeler: bool) -> None:
try:
if self.address_manager is None:
return
if addr.host in self.pending_outbound_connections:
return
self.pending_outbound_connections.add(addr.host)
client_connected = await self.server.start_client(
addr,
on_connect=self.server.on_connect,
is_feeler=is_feeler,
)
if self.server.is_duplicate_or_self_connection(addr):
# Mark it as a softer attempt, without counting the failures.
await self.address_manager.attempt(addr, False)
else:
if client_connected is True:
await self.address_manager.mark_good(addr)
await self.address_manager.connect(addr)
else:
await self.address_manager.attempt(addr, True)
self.pending_outbound_connections.remove(addr.host)
except Exception as e:
if addr.host in self.pending_outbound_connections:
self.pending_outbound_connections.remove(addr.host)
self.log.error(f"Exception in create outbound connections: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
async def _connect_to_peers(self, random) -> None:
next_feeler = self._poisson_next_send(time.time() * 1000 * 1000, 240, random)
retry_introducers = False
@ -319,9 +349,9 @@ class FullNodeDiscovery:
addr = None
continue
# only consider very recently tried nodes after 30 failed attempts
# attempt a node once per 2 hours if we lack connections to increase the chance
# attempt a node once per 30 minutes if we lack connections to increase the chance
# to try all the peer table.
if now - info.last_try < 2 * 3600 and tries < 30:
if now - info.last_try < 1800 and tries < 30:
continue
if time.time() - last_timestamp_local_info > 1800 or local_peerinfo is None:
local_peerinfo = await self.server.get_peer_info()
@ -335,33 +365,16 @@ class FullNodeDiscovery:
disconnect_after_handshake = True
retry_introducers = False
initiate_connection = self._num_needed_peers() > 0 or has_collision or is_feeler
client_connected = False
if addr is not None and initiate_connection:
try:
client_connected = await self.server.start_client(
addr,
is_feeler=disconnect_after_handshake,
on_connect=self.server.on_connect,
)
except Exception as e:
self.log.error(f"Exception in create outbound connections: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
if self.server.is_duplicate_or_self_connection(addr):
# Mark it as a softer attempt, without counting the failures.
await self.address_manager.attempt(addr, False)
else:
if client_connected is True:
await self.address_manager.mark_good(addr)
await self.address_manager.connect(addr)
else:
await self.address_manager.attempt(addr, True)
sleep_interval = 1 + len(groups) * 0.5
sleep_interval = min(sleep_interval, self.peer_connect_interval)
# Special case: try to find our first peer much quicker.
if len(groups) == 0:
sleep_interval = 0.1
if addr is not None and initiate_connection:
while len(self.pending_outbound_connections) >= MAX_CONCURRENT_OUTBOUND_CONNECTIONS:
self.log.debug(f"Max concurrent outbound connections reached. Retrying in {sleep_interval}s.")
await asyncio.sleep(sleep_interval)
asyncio.create_task(self.start_client_async(addr, disconnect_after_handshake))
await asyncio.sleep(sleep_interval)
except Exception as e:
self.log.error(f"Exception in create outbound connections: {e}")