chia-blockchain/src/server/node_discovery.py

531 lines
22 KiB
Python
Raw Normal View History

import asyncio
import time
import math
2020-10-16 04:03:46 +03:00
from pathlib import Path
import aiosqlite
import traceback
from random import Random
2020-10-28 20:45:10 +03:00
import src.server.ws_connection as ws
2020-10-16 04:03:46 +03:00
from src.server.server import ChiaServer
from src.types.peer_info import PeerInfo, TimestampedPeerInfo
from src.util.path import path_from_root, mkdir
from src.server.outbound_message import (
OutboundMessage,
Message,
NodeType,
)
from src.server.address_manager import ExtendedPeerInfo, AddressManager
from src.server.address_manager_store import AddressManagerStore
from src.protocols import (
introducer_protocol,
full_node_protocol,
)
from secrets import randbits
from src.util.hash import std_hash
2020-11-13 09:47:20 +03:00
from typing import Dict, Optional, AsyncGenerator
from src.util.ints import uint64
OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None]
class FullNodeDiscovery:
def __init__(
self,
2020-10-16 04:03:46 +03:00
server: ChiaServer,
root_path: Path,
target_outbound_count: int,
peer_db_path: str,
2020-12-10 19:34:56 +03:00
introducer_info: Optional[Dict],
2020-10-16 04:03:46 +03:00
peer_connect_interval: int,
log,
):
2020-10-16 04:03:46 +03:00
self.server: ChiaServer = server
2020-12-10 08:40:12 +03:00
self.message_queue: asyncio.Queue = asyncio.Queue()
self.is_closed = False
self.target_outbound_count = target_outbound_count
self.peer_db_path = path_from_root(root_path, peer_db_path)
2020-10-22 22:56:14 +03:00
if introducer_info is not None:
2020-11-13 08:53:01 +03:00
self.introducer_info: Optional[PeerInfo] = PeerInfo(
2020-12-02 06:17:38 +03:00
introducer_info["host"],
introducer_info["port"],
2020-10-22 22:56:14 +03:00
)
2020-11-13 08:53:01 +03:00
else:
self.introducer_info = None
self.peer_connect_interval = peer_connect_interval
self.log = log
self.relay_queue = None
2020-12-28 22:46:03 +03:00
self.address_manager = None
async def initialize_address_manager(self):
mkdir(self.peer_db_path.parent)
self.connection = await aiosqlite.connect(self.peer_db_path)
self.address_manager_store = await AddressManagerStore.create(self.connection)
if not await self.address_manager_store.is_empty():
self.address_manager = await self.address_manager_store.deserialize()
else:
await self.address_manager_store.clear()
self.address_manager = AddressManager()
self.server.set_received_message_callback(self.update_peer_timestamp_on_message)
async def start_tasks(self):
random = Random()
self.connect_peers_task = asyncio.create_task(self._connect_to_peers(random))
self.serialize_task = asyncio.create_task(self._periodically_serialize(random))
2020-12-22 22:21:42 +03:00
self.cleanup_task = asyncio.create_task(self._periodically_cleanup())
async def _close_common(self):
self.is_closed = True
self.connect_peers_task.cancel()
self.serialize_task.cancel()
2020-12-22 22:21:42 +03:00
self.cleanup_task.cancel()
await self.connection.close()
def add_message(self, message, data):
self.message_queue.put_nowait((message, data))
2020-12-27 09:09:32 +03:00
async def on_connect(self, peer: ws.WSChiaConnection):
2020-12-28 18:56:55 +03:00
if (
peer.is_outbound is False
and peer.peer_server_port is not None
and peer.connection_type is NodeType.FULL_NODE
2020-12-28 22:46:03 +03:00
and self.server._local_type is NodeType.FULL_NODE
and self.address_manager is not None
2020-12-28 18:56:55 +03:00
):
2020-12-27 09:09:32 +03:00
timestamped_peer_info = TimestampedPeerInfo(
peer.peer_host,
peer.peer_server_port,
uint64(int(time.time())),
)
await self.address_manager.add_to_new_table([timestamped_peer_info], peer.get_peer_info(), 0)
if self.relay_queue is not None:
self.relay_queue.put_nowait((timestamped_peer_info, 1))
2021-01-08 22:25:37 +03:00
if (
peer.is_outbound
and peer.peer_server_port is not None
and peer.connection_type is NodeType.FULL_NODE
and self.server._local_type is NodeType.FULL_NODE
and self.address_manager is not None
):
msg = Message("request_peers", full_node_protocol.RequestPeers())
await peer.send_message(msg)
# Updates timestamps each time we receive a message for outbound connections.
2021-01-07 01:20:15 +03:00
async def update_peer_timestamp_on_message(self, peer: ws.WSChiaConnection):
if (
peer.is_outbound
and peer.peer_server_port is not None
and peer.connection_type is NodeType.FULL_NODE
and self.server._local_type is NodeType.FULL_NODE
and self.address_manager is not None
):
await self.address_manager.connect(peer.get_peer_info())
2020-10-20 09:40:55 +03:00
def _num_needed_peers(self) -> int:
diff = self.target_outbound_count
2020-10-20 09:40:55 +03:00
outgoing = self.server.get_outgoing_connections()
2020-10-16 04:03:46 +03:00
diff -= len(outgoing)
return diff if diff >= 0 else 0
"""
Uses the Poisson distribution to determine the next time
when we'll initiate a feeler connection.
(https://en.wikipedia.org/wiki/Poisson_distribution)
"""
def _poisson_next_send(self, now, avg_interval_seconds, random):
return now + (
2020-12-01 12:16:14 +03:00
math.log(random.randrange(1 << 48) * -0.0000000000000035527136788 + 1) * avg_interval_seconds * -1000000.0
+ 0.5
)
async def _introducer_client(self):
2020-10-22 22:56:14 +03:00
if self.introducer_info is None:
return
2020-10-28 20:45:10 +03:00
async def on_connect(peer: ws.WSChiaConnection):
msg = Message("request_peers", introducer_protocol.RequestPeers())
2020-10-20 09:40:55 +03:00
await peer.send_message(msg)
await self.server.start_client(self.introducer_info, on_connect)
async def _connect_to_peers(self, random):
next_feeler = self._poisson_next_send(time.time() * 1000 * 1000, 240, random)
empty_tables = False
2020-10-20 09:40:55 +03:00
local_peerinfo: Optional[PeerInfo] = await self.server.get_peer_info()
last_timestamp_local_info: uint64 = uint64(int(time.time()))
while not self.is_closed:
2020-10-06 03:30:45 +03:00
try:
# We don't know any address, connect to the introducer to get some.
size = await self.address_manager.size()
if size == 0 or empty_tables:
await self._introducer_client()
2021-01-07 21:49:21 +03:00
await asyncio.sleep(min(5, self.peer_connect_interval))
2020-10-06 03:30:45 +03:00
empty_tables = False
continue
2020-10-06 03:30:45 +03:00
# Only connect out to one peer per network group (/16 for IPv4).
groups = []
2020-11-11 07:14:06 +03:00
full_node_connected = self.server.get_full_node_connections()
connected = [c.get_peer_info() for c in full_node_connected]
2020-12-01 12:16:14 +03:00
connected = [c for c in connected if c is not None]
2020-11-11 07:14:06 +03:00
for conn in full_node_connected:
2020-10-06 03:30:45 +03:00
peer = conn.get_peer_info()
2020-12-11 19:40:02 +03:00
if peer is None:
continue
2020-10-06 03:30:45 +03:00
group = peer.get_group()
if group not in groups:
groups.append(group)
2020-10-06 03:30:45 +03:00
# Feeler Connections
#
# Design goals:
# * Increase the number of connectable addresses in the tried table.
#
# Method:
# * Choose a random address from new and attempt to connect to it if we can connect
# successfully it is added to tried.
# * Start attempting feeler connections only after node finishes making outbound
# connections.
# * Only make a feeler connection once every few minutes.
2020-10-06 03:30:45 +03:00
is_feeler = False
has_collision = False
if self._num_needed_peers() == 0:
if time.time() * 1000 * 1000 > next_feeler:
2020-12-01 12:16:14 +03:00
next_feeler = self._poisson_next_send(time.time() * 1000 * 1000, 240, random)
2020-10-06 03:30:45 +03:00
is_feeler = True
2020-10-06 03:30:45 +03:00
await self.address_manager.resolve_tried_collisions()
tries = 0
now = time.time()
got_peer = False
addr: Optional[PeerInfo] = None
max_tries = 50
if len(groups) < 3:
max_tries = 10
elif len(groups) <= 5:
max_tries = 25
while not got_peer and not self.is_closed:
2021-01-07 21:49:21 +03:00
sleep_interval = 1 + len(groups) * 0.5
2021-01-07 21:51:18 +03:00
sleep_interval = min(sleep_interval, self.peer_connect_interval)
2020-10-06 03:30:45 +03:00
await asyncio.sleep(sleep_interval)
tries += 1
if tries > max_tries:
addr = None
empty_tables = True
2020-10-06 03:30:45 +03:00
break
2020-12-01 12:16:14 +03:00
info: Optional[ExtendedPeerInfo] = await self.address_manager.select_tried_collision()
2020-10-06 03:30:45 +03:00
if info is None:
info = await self.address_manager.select_peer(is_feeler)
else:
has_collision = True
if info is None:
if not is_feeler:
empty_tables = True
break
# Require outbound connections, other than feelers,
# to be to distinct network groups.
addr = info.peer_info
if has_collision:
break
if addr is not None and not addr.is_valid():
addr = None
continue
if not is_feeler and addr.get_group() in groups:
addr = None
continue
if addr in connected:
addr = None
continue
# only consider very recently tried nodes after 30 failed attempts
if now - info.last_try < 3600 and tries < 30:
continue
2020-12-01 12:16:14 +03:00
if time.time() - last_timestamp_local_info > 1800 or local_peerinfo is None:
2020-11-11 07:14:06 +03:00
local_peerinfo = await self.server.get_peer_info()
2020-10-06 03:30:45 +03:00
last_timestamp_local_info = uint64(int(time.time()))
if local_peerinfo is not None and addr == local_peerinfo:
continue
got_peer = True
2020-10-06 03:30:45 +03:00
disconnect_after_handshake = is_feeler
if self._num_needed_peers() == 0:
disconnect_after_handshake = True
empty_tables = False
2020-12-01 12:16:14 +03:00
initiate_connection = self._num_needed_peers() > 0 or has_collision or is_feeler
2020-12-27 09:09:32 +03:00
connected = False
2020-10-06 03:30:45 +03:00
if addr is not None and initiate_connection:
2020-12-27 09:09:32 +03:00
try:
connected = await self.server.start_client(
2020-12-11 10:27:03 +03:00
addr,
is_feeler=disconnect_after_handshake,
on_connect=self.server.on_connect,
2020-12-08 11:34:21 +03:00
)
2020-12-27 09:09:32 +03:00
except Exception as e:
self.log.error(f"Exception in create outbound connections: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
if self.server.is_duplicate_or_self_connection(addr):
# Mark it as a softer attempt, without counting the failures.
await self.address_manager.attempt(addr, False)
2020-12-27 09:15:08 +03:00
else:
if connected is True:
await self.address_manager.mark_good(addr)
await self.address_manager.connect(addr)
else:
await self.address_manager.attempt(addr, True)
2020-12-27 09:09:32 +03:00
2020-10-15 21:15:26 +03:00
sleep_interval = 1 + len(groups) * 0.5
2020-10-06 03:30:45 +03:00
sleep_interval = min(sleep_interval, self.peer_connect_interval)
await asyncio.sleep(sleep_interval)
except Exception as e:
self.log.error(f"Exception in create outbound connections: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
async def _periodically_serialize(self, random: Random):
while not self.is_closed:
2020-12-28 22:46:03 +03:00
if self.address_manager is None:
await asyncio.sleep(10)
continue
serialize_interval = random.randint(15 * 60, 30 * 60)
await asyncio.sleep(serialize_interval)
async with self.address_manager.lock:
await self.address_manager_store.serialize(self.address_manager)
2020-12-22 22:21:42 +03:00
async def _periodically_cleanup(self):
while not self.is_closed:
# Removes entries with timestamp worse than 14 days ago
# and with a high number of failed attempts.
# Most likely, the peer left the network,
# so we can save space in the peer tables.
cleanup_interval = 1800
max_timestamp_difference = 14 * 3600 * 24
max_consecutive_failures = 10
await asyncio.sleep(cleanup_interval)
2020-12-23 21:29:28 +03:00
2020-12-22 22:21:42 +03:00
# Perform the cleanup only if we have at least 3 connections.
full_node_connected = self.server.get_full_node_connections()
connected = [c.get_peer_info() for c in full_node_connected]
connected = [c for c in connected if c is not None]
if len(connected) >= 3:
async with self.address_manager.lock:
2020-12-22 23:05:55 +03:00
self.address_manager.cleanup(max_timestamp_difference, max_consecutive_failures)
2020-12-22 22:21:42 +03:00
async def _respond_peers_common(self, request, peer_src, is_full_node):
# Check if we got the peers from a full node or from the introducer.
peers_adjusted_timestamp = []
for peer in request.peer_list:
if peer.timestamp < 100000000 or peer.timestamp > time.time() + 10 * 60:
# Invalid timestamp, predefine a bad one.
current_peer = TimestampedPeerInfo(
peer.host,
peer.port,
uint64(int(time.time() - 5 * 24 * 60 * 60)),
)
else:
current_peer = peer
if not is_full_node:
current_peer = TimestampedPeerInfo(
peer.host,
peer.port,
uint64(0),
)
peers_adjusted_timestamp.append(current_peer)
if is_full_node:
2020-12-01 12:16:14 +03:00
await self.address_manager.add_to_new_table(peers_adjusted_timestamp, peer_src, 2 * 60 * 60)
else:
2020-12-01 12:16:14 +03:00
await self.address_manager.add_to_new_table(peers_adjusted_timestamp, None, 0)
class FullNodePeers(FullNodeDiscovery):
def __init__(
self,
server,
root_path,
max_inbound_count,
target_outbound_count,
peer_db_path,
introducer_info,
peer_connect_interval,
log,
):
super().__init__(
server,
root_path,
target_outbound_count,
peer_db_path,
introducer_info,
peer_connect_interval,
log,
)
self.relay_queue = asyncio.Queue()
self.lock = asyncio.Lock()
self.neighbour_known_peers = {}
self.key = randbits(256)
async def start(self):
await self.initialize_address_manager()
2020-12-01 12:16:14 +03:00
self.self_advertise_task = asyncio.create_task(self._periodically_self_advertise())
self.address_relay_task = asyncio.create_task(self._address_relay())
await self.start_tasks()
async def close(self):
await self._close_common()
self.self_advertise_task.cancel()
self.address_relay_task.cancel()
async def _periodically_self_advertise(self):
while not self.is_closed:
2020-09-30 22:31:50 +03:00
try:
2020-09-30 23:32:25 +03:00
await asyncio.sleep(24 * 3600)
2020-09-30 22:31:50 +03:00
# Clean up known nodes for neighbours every 24 hours.
async with self.lock:
for neighbour in list(self.neighbour_known_peers.keys()):
self.neighbour_known_peers[neighbour].clear()
# Self advertise every 24 hours.
2020-10-20 09:40:55 +03:00
peer = await self.server.get_peer_info()
2020-09-30 22:31:50 +03:00
if peer is None:
continue
timestamped_peer = [
TimestampedPeerInfo(
peer.host,
peer.port,
uint64(int(time.time())),
)
]
2020-10-16 04:03:46 +03:00
msg = Message(
2020-10-22 22:56:14 +03:00
"respond_peers",
2020-10-16 04:03:46 +03:00
full_node_protocol.RespondPeers(timestamped_peer),
)
2020-10-16 04:03:46 +03:00
await self.server.send_to_all([msg], NodeType.FULL_NODE)
2020-09-30 22:31:50 +03:00
except Exception as e:
self.log.error(f"Exception in self advertise: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
async def add_peers_neighbour(self, peers, neighbour_info):
neighbour_data = (neighbour_info.host, neighbour_info.port)
async with self.lock:
for peer in peers:
if neighbour_data not in self.neighbour_known_peers:
self.neighbour_known_peers[neighbour_data] = set()
if peer.host not in self.neighbour_known_peers[neighbour_data]:
self.neighbour_known_peers[neighbour_data].add(peer.host)
2020-10-22 22:56:14 +03:00
async def request_peers(self, peer_info: PeerInfo):
try:
# Prevent a fingerprint attack: do not send peers to inbound connections.
# This asymmetric behavior for inbound and outbound connections was introduced
# to prevent a fingerprinting attack: an attacker can send specific fake addresses
# to users' AddrMan and later request them by sending getaddr messages.
# Making nodes which are behind NAT and can only make outgoing connections ignore
# the request_peers message mitigates the attack.
2020-12-28 22:46:03 +03:00
if self.address_manager is None:
return None
peers = await self.address_manager.get_peers()
await self.add_peers_neighbour(peers, peer_info)
2020-10-16 04:03:46 +03:00
msg = Message(
2020-10-22 22:56:14 +03:00
"respond_peers",
2020-10-16 04:03:46 +03:00
full_node_protocol.RespondPeers(peers),
)
2020-10-16 04:03:46 +03:00
2020-10-22 22:56:14 +03:00
return msg
except Exception as e:
self.log.error(f"Request peers exception: {e}")
async def respond_peers(self, request, peer_src, is_full_node):
await self._respond_peers_common(request, peer_src, is_full_node)
if is_full_node:
await self.add_peers_neighbour(request.peer_list, peer_src)
if len(request.peer_list) == 1 and self.relay_queue is not None:
peer = request.peer_list[0]
if peer.timestamp > time.time() - 60 * 10:
self.relay_queue.put_nowait((peer, 2))
async def _address_relay(self):
while not self.is_closed:
try:
relay_peer, num_peers = await self.relay_queue.get()
2020-10-06 20:38:47 +03:00
relay_peer_info = PeerInfo(relay_peer.host, relay_peer.port)
if not relay_peer_info.is_valid():
continue
# https://en.bitcoin.it/wiki/Satoshi_Client_Node_Discovery#Address_Relay
2020-12-29 02:00:42 +03:00
connections = self.server.get_full_node_connections()
hashes = []
cur_day = int(time.time()) // (24 * 60 * 60)
2020-12-29 02:00:42 +03:00
for connection in connections:
peer_info = connection.get_peer_info()
2020-12-28 22:47:23 +03:00
if peer_info is None:
continue
cur_hash = int.from_bytes(
bytes(
std_hash(
self.key.to_bytes(32, byteorder="big")
+ peer_info.get_key()
+ cur_day.to_bytes(3, byteorder="big")
)
),
byteorder="big",
)
hashes.append((cur_hash, connection))
hashes.sort(key=lambda x: x[0])
for index, (_, connection) in enumerate(hashes):
if index >= num_peers:
break
peer_info = connection.get_peer_info()
pair = (peer_info.host, peer_info.port)
async with self.lock:
2020-12-01 12:16:14 +03:00
if pair in self.neighbour_known_peers and relay_peer.host in self.neighbour_known_peers[pair]:
continue
if pair not in self.neighbour_known_peers:
self.neighbour_known_peers[pair] = set()
self.neighbour_known_peers[pair].add(relay_peer.host)
2020-10-16 04:03:46 +03:00
if connection.peer_node_id is None:
continue
2020-10-16 04:03:46 +03:00
msg = Message(
2020-10-22 22:56:14 +03:00
"respond_peers",
2020-10-16 04:03:46 +03:00
full_node_protocol.RespondPeers([relay_peer]),
)
2020-10-16 04:03:46 +03:00
await connection.send_message(msg)
except Exception as e:
self.log.error(f"Exception in address relay: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
class WalletPeers(FullNodeDiscovery):
def __init__(
self,
server,
root_path,
target_outbound_count,
peer_db_path,
introducer_info,
peer_connect_interval,
log,
):
super().__init__(
server,
root_path,
target_outbound_count,
peer_db_path,
introducer_info,
peer_connect_interval,
log,
)
async def start(self):
await self.initialize_address_manager()
await self.start_tasks()
async def ensure_is_closed(self):
if self.is_closed:
return
await self._close_common()
async def respond_peers(self, request, peer_src, is_full_node):
await self._respond_peers_common(request, peer_src, is_full_node)