chia-blockchain/chia/server/address_manager.py
Kyle Altendorf 3b084a165b
configure isort to add the future annotations import (#13327)
* configure isort to add the future annotations import

* apply the new isort setting

* remove type ignores for new mypy (#13539)

https://pypi.org/project/mypy/0.981/

* another
2022-09-30 03:40:22 -05:00

661 lines
26 KiB
Python

from __future__ import annotations
import logging
import math
import time
from asyncio import Lock
from random import choice, randrange
from secrets import randbits
from typing import Dict, List, Optional, Set, Tuple
from chia.types.peer_info import PeerInfo, TimestampedPeerInfo
from chia.util.hash import std_hash
from chia.util.ints import uint16, uint64
TRIED_BUCKETS_PER_GROUP = 8
NEW_BUCKETS_PER_SOURCE_GROUP = 64
TRIED_BUCKET_COUNT = 256
NEW_BUCKET_COUNT = 1024
BUCKET_SIZE = 64
TRIED_COLLISION_SIZE = 10
NEW_BUCKETS_PER_ADDRESS = 8
LOG_TRIED_BUCKET_COUNT = 3
LOG_NEW_BUCKET_COUNT = 10
LOG_BUCKET_SIZE = 6
HORIZON_DAYS = 30
MAX_RETRIES = 3
MIN_FAIL_DAYS = 7
MAX_FAILURES = 10
log = logging.getLogger(__name__)
# This is a Python port from 'CAddrInfo' class from Bitcoin core code.
class ExtendedPeerInfo:
def __init__(
self,
addr: TimestampedPeerInfo,
src_peer: Optional[PeerInfo],
):
self.peer_info: PeerInfo = PeerInfo(
addr.host,
addr.port,
)
self.timestamp: int = addr.timestamp
self.src: Optional[PeerInfo] = src_peer
if src_peer is None:
self.src = self.peer_info
self.random_pos: Optional[int] = None
self.is_tried: bool = False
self.ref_count: int = 0
self.last_success: int = 0
self.last_try: int = 0
self.num_attempts: int = 0
self.last_count_attempt: int = 0
def to_string(self) -> str:
assert self.src is not None
out = (
self.peer_info.host
+ " "
+ str(int(self.peer_info.port))
+ " "
+ str(int(self.timestamp))
+ " "
+ self.src.host
+ " "
+ str(int(self.src.port))
)
return out
@classmethod
def from_string(cls, peer_str: str):
blobs = peer_str.split(" ")
assert len(blobs) == 5
peer_info = TimestampedPeerInfo(blobs[0], uint16(int(blobs[1])), uint64(int(blobs[2])))
src_peer = PeerInfo(blobs[3], uint16(int(blobs[4])))
return cls(peer_info, src_peer)
def get_tried_bucket(self, key: int) -> int:
hash1 = int.from_bytes(
bytes(std_hash(key.to_bytes(32, byteorder="big") + self.peer_info.get_key())[:8]),
byteorder="big",
)
hash1 = hash1 % TRIED_BUCKETS_PER_GROUP
hash2 = int.from_bytes(
bytes(std_hash(key.to_bytes(32, byteorder="big") + self.peer_info.get_group() + bytes([hash1]))[:8]),
byteorder="big",
)
return hash2 % TRIED_BUCKET_COUNT
def get_new_bucket(self, key: int, src_peer: Optional[PeerInfo] = None) -> int:
if src_peer is None:
src_peer = self.src
assert src_peer is not None
hash1 = int.from_bytes(
bytes(std_hash(key.to_bytes(32, byteorder="big") + self.peer_info.get_group() + src_peer.get_group())[:8]),
byteorder="big",
)
hash1 = hash1 % NEW_BUCKETS_PER_SOURCE_GROUP
hash2 = int.from_bytes(
bytes(std_hash(key.to_bytes(32, byteorder="big") + src_peer.get_group() + bytes([hash1]))[:8]),
byteorder="big",
)
return hash2 % NEW_BUCKET_COUNT
def get_bucket_position(self, key: int, is_new: bool, nBucket: int) -> int:
ch = "N" if is_new else "K"
hash1 = int.from_bytes(
bytes(
std_hash(
key.to_bytes(32, byteorder="big")
+ ch.encode()
+ nBucket.to_bytes(3, byteorder="big")
+ self.peer_info.get_key()
)[:8]
),
byteorder="big",
)
return hash1 % BUCKET_SIZE
def is_terrible(self, now: Optional[int] = None) -> bool:
if now is None:
now = int(math.floor(time.time()))
# never remove things tried in the last minute
if self.last_try > 0 and self.last_try >= now - 60:
return False
# came in a flying DeLorean
if self.timestamp > now + 10 * 60:
return True
# not seen in recent history
if self.timestamp == 0 or now - self.timestamp > HORIZON_DAYS * 24 * 60 * 60:
return True
# tried N times and never a success
if self.last_success == 0 and self.num_attempts >= MAX_RETRIES:
return True
# N successive failures in the last week
if now - self.last_success > MIN_FAIL_DAYS * 24 * 60 * 60 and self.num_attempts >= MAX_FAILURES:
return True
return False
def get_selection_chance(self, now: Optional[int] = None):
if now is None:
now = int(math.floor(time.time()))
chance = 1.0
since_last_try = max(now - self.last_try, 0)
# deprioritize very recent attempts away
if since_last_try < 60 * 10:
chance *= 0.01
# deprioritize 66% after each failed attempt,
# but at most 1/28th to avoid the search taking forever or overly penalizing outages.
chance *= pow(0.66, min(self.num_attempts, 8))
return chance
# This is a Python port from 'CAddrMan' class from Bitcoin core code.
class AddressManager:
id_count: int
key: int
random_pos: List[int]
tried_matrix: List[List[int]]
new_matrix: List[List[int]]
tried_count: int
new_count: int
map_addr: Dict[str, int]
map_info: Dict[int, ExtendedPeerInfo]
last_good: int
tried_collisions: List[int]
used_new_matrix_positions: Set[Tuple[int, int]]
used_tried_matrix_positions: Set[Tuple[int, int]]
allow_private_subnets: bool
def __init__(self) -> None:
self.clear()
self.lock: Lock = Lock()
def clear(self) -> None:
self.id_count = 0
self.key = randbits(256)
self.random_pos = []
self.tried_matrix = [[-1 for x in range(BUCKET_SIZE)] for y in range(TRIED_BUCKET_COUNT)]
self.new_matrix = [[-1 for x in range(BUCKET_SIZE)] for y in range(NEW_BUCKET_COUNT)]
self.tried_count = 0
self.new_count = 0
self.map_addr = {}
self.map_info = {}
self.last_good = 1
self.tried_collisions = []
self.used_new_matrix_positions = set()
self.used_tried_matrix_positions = set()
self.allow_private_subnets = False
def make_private_subnets_valid(self) -> None:
self.allow_private_subnets = True
# Use only this method for modifying new matrix.
def _set_new_matrix(self, row: int, col: int, value: int) -> None:
self.new_matrix[row][col] = value
if value == -1:
if (row, col) in self.used_new_matrix_positions:
self.used_new_matrix_positions.remove((row, col))
else:
if (row, col) not in self.used_new_matrix_positions:
self.used_new_matrix_positions.add((row, col))
# Use only this method for modifying tried matrix.
def _set_tried_matrix(self, row: int, col: int, value: int) -> None:
self.tried_matrix[row][col] = value
if value == -1:
if (row, col) in self.used_tried_matrix_positions:
self.used_tried_matrix_positions.remove((row, col))
else:
if (row, col) not in self.used_tried_matrix_positions:
self.used_tried_matrix_positions.add((row, col))
def load_used_table_positions(self) -> None:
self.used_new_matrix_positions = set()
self.used_tried_matrix_positions = set()
for bucket in range(NEW_BUCKET_COUNT):
for pos in range(BUCKET_SIZE):
if self.new_matrix[bucket][pos] != -1:
self.used_new_matrix_positions.add((bucket, pos))
for bucket in range(TRIED_BUCKET_COUNT):
for pos in range(BUCKET_SIZE):
if self.tried_matrix[bucket][pos] != -1:
self.used_tried_matrix_positions.add((bucket, pos))
def create_(self, addr: TimestampedPeerInfo, addr_src: Optional[PeerInfo]) -> Tuple[ExtendedPeerInfo, int]:
self.id_count += 1
node_id = self.id_count
self.map_info[node_id] = ExtendedPeerInfo(addr, addr_src)
self.map_addr[addr.host] = node_id
self.map_info[node_id].random_pos = len(self.random_pos)
self.random_pos.append(node_id)
return (self.map_info[node_id], node_id)
def find_(self, addr: PeerInfo) -> Tuple[Optional[ExtendedPeerInfo], Optional[int]]:
if addr.host not in self.map_addr:
return (None, None)
node_id = self.map_addr[addr.host]
if node_id not in self.map_info:
return (None, node_id)
return (self.map_info[node_id], node_id)
def swap_random_(self, rand_pos_1: int, rand_pos_2: int) -> None:
if rand_pos_1 == rand_pos_2:
return None
assert rand_pos_1 < len(self.random_pos) and rand_pos_2 < len(self.random_pos)
node_id_1 = self.random_pos[rand_pos_1]
node_id_2 = self.random_pos[rand_pos_2]
self.map_info[node_id_1].random_pos = rand_pos_2
self.map_info[node_id_2].random_pos = rand_pos_1
self.random_pos[rand_pos_1] = node_id_2
self.random_pos[rand_pos_2] = node_id_1
def make_tried_(self, info: ExtendedPeerInfo, node_id: int) -> None:
for bucket in range(NEW_BUCKET_COUNT):
pos = info.get_bucket_position(self.key, True, bucket)
if self.new_matrix[bucket][pos] == node_id:
self._set_new_matrix(bucket, pos, -1)
info.ref_count -= 1
assert info.ref_count == 0
self.new_count -= 1
cur_bucket = info.get_tried_bucket(self.key)
cur_bucket_pos = info.get_bucket_position(self.key, False, cur_bucket)
if self.tried_matrix[cur_bucket][cur_bucket_pos] != -1:
# Evict the old node from the tried table.
node_id_evict = self.tried_matrix[cur_bucket][cur_bucket_pos]
assert node_id_evict in self.map_info
old_info = self.map_info[node_id_evict]
old_info.is_tried = False
self._set_tried_matrix(cur_bucket, cur_bucket_pos, -1)
self.tried_count -= 1
# Find its position into new table.
new_bucket = old_info.get_new_bucket(self.key)
new_bucket_pos = old_info.get_bucket_position(self.key, True, new_bucket)
self.clear_new_(new_bucket, new_bucket_pos)
old_info.ref_count = 1
self._set_new_matrix(new_bucket, new_bucket_pos, node_id_evict)
self.new_count += 1
self._set_tried_matrix(cur_bucket, cur_bucket_pos, node_id)
self.tried_count += 1
info.is_tried = True
def clear_new_(self, bucket: int, pos: int) -> None:
if self.new_matrix[bucket][pos] != -1:
delete_id = self.new_matrix[bucket][pos]
delete_info = self.map_info[delete_id]
assert delete_info.ref_count > 0
delete_info.ref_count -= 1
self._set_new_matrix(bucket, pos, -1)
if delete_info.ref_count == 0:
self.delete_new_entry_(delete_id)
def mark_good_(self, addr: PeerInfo, test_before_evict: bool, timestamp: int) -> None:
self.last_good = timestamp
(info, node_id) = self.find_(addr)
if not addr.is_valid(self.allow_private_subnets):
return None
if info is None:
return None
if node_id is None:
return None
if not (info.peer_info.host == addr.host and info.peer_info.port == addr.port):
return None
# update info
info.last_success = timestamp
info.last_try = timestamp
info.num_attempts = 0
# timestamp is not updated here, to avoid leaking information about
# currently-connected peers.
# if it is already in the tried set, don't do anything else
if info.is_tried:
return None
# find a bucket it is in now
bucket_rand = randrange(NEW_BUCKET_COUNT)
new_bucket = -1
for n in range(NEW_BUCKET_COUNT):
cur_new_bucket = (n + bucket_rand) % NEW_BUCKET_COUNT
cur_new_bucket_pos = info.get_bucket_position(self.key, True, cur_new_bucket)
if self.new_matrix[cur_new_bucket][cur_new_bucket_pos] == node_id:
new_bucket = cur_new_bucket
break
# if no bucket is found, something bad happened;
if new_bucket == -1:
return None
# NOTE(Florin): Double check this. It's not used anywhere else.
# which tried bucket to move the entry to
tried_bucket = info.get_tried_bucket(self.key)
tried_bucket_pos = info.get_bucket_position(self.key, False, tried_bucket)
# Will moving this address into tried evict another entry?
if test_before_evict and self.tried_matrix[tried_bucket][tried_bucket_pos] != -1:
if len(self.tried_collisions) < TRIED_COLLISION_SIZE:
if node_id not in self.tried_collisions:
self.tried_collisions.append(node_id)
else:
self.make_tried_(info, node_id)
def delete_new_entry_(self, node_id: int) -> None:
info = self.map_info[node_id]
if info is None or info.random_pos is None:
return None
self.swap_random_(info.random_pos, len(self.random_pos) - 1)
self.random_pos = self.random_pos[:-1]
del self.map_addr[info.peer_info.host]
del self.map_info[node_id]
self.new_count -= 1
def add_to_new_table_(self, addr: TimestampedPeerInfo, source: Optional[PeerInfo], penalty: int) -> bool:
is_unique = False
peer_info = PeerInfo(
addr.host,
addr.port,
)
if not peer_info.is_valid(self.allow_private_subnets):
return False
(info, node_id) = self.find_(peer_info)
if info is not None and info.peer_info.host == addr.host and info.peer_info.port == addr.port:
penalty = 0
if info is not None:
# periodically update timestamp
currently_online = time.time() - addr.timestamp < 24 * 60 * 60
update_interval = 60 * 60 if currently_online else 24 * 60 * 60
if addr.timestamp > 0 and (
info.timestamp > 0 or info.timestamp < addr.timestamp - update_interval - penalty
):
info.timestamp = max(0, addr.timestamp - penalty)
# do not update if no new information is present
if addr.timestamp == 0 or (info.timestamp > 0 and addr.timestamp <= info.timestamp):
return False
# do not update if the entry was already in the "tried" table
if info.is_tried:
return False
# do not update if the max reference count is reached
if info.ref_count == NEW_BUCKETS_PER_ADDRESS:
return False
# stochastic test: previous ref_count == N: 2^N times harder to increase it
factor = 1 << info.ref_count
if factor > 1 and randrange(factor) != 0:
return False
else:
(info, node_id) = self.create_(addr, source)
info.timestamp = max(0, info.timestamp - penalty)
self.new_count += 1
is_unique = True
new_bucket = info.get_new_bucket(self.key, source)
new_bucket_pos = info.get_bucket_position(self.key, True, new_bucket)
if self.new_matrix[new_bucket][new_bucket_pos] != node_id:
add_to_new = self.new_matrix[new_bucket][new_bucket_pos] == -1
if not add_to_new:
info_existing = self.map_info[self.new_matrix[new_bucket][new_bucket_pos]]
if info_existing.is_terrible() or (info_existing.ref_count > 1 and info.ref_count == 0):
add_to_new = True
if add_to_new:
self.clear_new_(new_bucket, new_bucket_pos)
info.ref_count += 1
if node_id is not None:
self._set_new_matrix(new_bucket, new_bucket_pos, node_id)
else:
if info.ref_count == 0:
if node_id is not None:
self.delete_new_entry_(node_id)
return is_unique
def attempt_(self, addr: PeerInfo, count_failures: bool, timestamp: int) -> None:
info, _ = self.find_(addr)
if info is None:
return None
if not (info.peer_info.host == addr.host and info.peer_info.port == addr.port):
return None
info.last_try = timestamp
if count_failures and info.last_count_attempt < self.last_good:
info.last_count_attempt = timestamp
info.num_attempts += 1
def select_peer_(self, new_only: bool) -> Optional[ExtendedPeerInfo]:
if len(self.random_pos) == 0:
return None
if new_only and self.new_count == 0:
return None
# Use a 50% chance for choosing between tried and new table entries.
if not new_only and self.tried_count > 0 and (self.new_count == 0 or randrange(2) == 0):
chance = 1.0
start = time.time()
cached_tried_matrix_positions: List[Tuple[int, int]] = []
if len(self.used_tried_matrix_positions) < math.sqrt(TRIED_BUCKET_COUNT * BUCKET_SIZE):
cached_tried_matrix_positions = list(self.used_tried_matrix_positions)
while True:
if len(self.used_tried_matrix_positions) < math.sqrt(TRIED_BUCKET_COUNT * BUCKET_SIZE):
if len(self.used_tried_matrix_positions) == 0:
log.error(f"Empty tried table, but tried_count shows {self.tried_count}.")
return None
# The table is sparse, randomly pick from positions list.
index = randrange(len(cached_tried_matrix_positions))
tried_bucket, tried_bucket_pos = cached_tried_matrix_positions[index]
else:
# The table is dense, randomly trying positions is faster than loading positions list.
tried_bucket = randrange(TRIED_BUCKET_COUNT)
tried_bucket_pos = randrange(BUCKET_SIZE)
while self.tried_matrix[tried_bucket][tried_bucket_pos] == -1:
tried_bucket = (tried_bucket + randbits(LOG_TRIED_BUCKET_COUNT)) % TRIED_BUCKET_COUNT
tried_bucket_pos = (tried_bucket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
node_id = self.tried_matrix[tried_bucket][tried_bucket_pos]
assert node_id != -1
info = self.map_info[node_id]
if randbits(30) < (chance * info.get_selection_chance() * (1 << 30)):
end = time.time()
log.debug(f"address_manager.select_peer took {(end - start):.2e} seconds in tried table.")
return info
chance *= 1.2
else:
chance = 1.0
start = time.time()
cached_new_matrix_positions: List[Tuple[int, int]] = []
if len(self.used_new_matrix_positions) < math.sqrt(NEW_BUCKET_COUNT * BUCKET_SIZE):
cached_new_matrix_positions = list(self.used_new_matrix_positions)
while True:
if len(self.used_new_matrix_positions) < math.sqrt(NEW_BUCKET_COUNT * BUCKET_SIZE):
if len(self.used_new_matrix_positions) == 0:
log.error(f"Empty new table, but new_count shows {self.new_count}.")
return None
index = randrange(len(cached_new_matrix_positions))
new_bucket, new_bucket_pos = cached_new_matrix_positions[index]
else:
new_bucket = randrange(NEW_BUCKET_COUNT)
new_bucket_pos = randrange(BUCKET_SIZE)
while self.new_matrix[new_bucket][new_bucket_pos] == -1:
new_bucket = (new_bucket + randbits(LOG_NEW_BUCKET_COUNT)) % NEW_BUCKET_COUNT
new_bucket_pos = (new_bucket_pos + randbits(LOG_BUCKET_SIZE)) % BUCKET_SIZE
node_id = self.new_matrix[new_bucket][new_bucket_pos]
assert node_id != -1
info = self.map_info[node_id]
if randbits(30) < chance * info.get_selection_chance() * (1 << 30):
end = time.time()
log.debug(f"address_manager.select_peer took {(end - start):.2e} seconds in new table.")
return info
chance *= 1.2
def resolve_tried_collisions_(self) -> None:
for node_id in self.tried_collisions[:]:
resolved = False
if node_id not in self.map_info:
resolved = True
else:
info = self.map_info[node_id]
peer = info.peer_info
tried_bucket = info.get_tried_bucket(self.key)
tried_bucket_pos = info.get_bucket_position(self.key, False, tried_bucket)
if self.tried_matrix[tried_bucket][tried_bucket_pos] != -1:
old_id = self.tried_matrix[tried_bucket][tried_bucket_pos]
old_info = self.map_info[old_id]
if time.time() - old_info.last_success < 4 * 60 * 60:
resolved = True
elif time.time() - old_info.last_try < 4 * 60 * 60:
if time.time() - old_info.last_try > 60:
self.mark_good_(peer, False, math.floor(time.time()))
resolved = True
elif time.time() - info.last_success > 40 * 60:
self.mark_good_(peer, False, math.floor(time.time()))
resolved = True
else:
self.mark_good_(peer, False, math.floor(time.time()))
resolved = True
if resolved:
self.tried_collisions.remove(node_id)
def select_tried_collision_(self) -> Optional[ExtendedPeerInfo]:
if len(self.tried_collisions) == 0:
return None
new_id = choice(self.tried_collisions)
if new_id not in self.map_info:
self.tried_collisions.remove(new_id)
return None
new_info = self.map_info[new_id]
tried_bucket = new_info.get_tried_bucket(self.key)
tried_bucket_pos = new_info.get_bucket_position(self.key, False, tried_bucket)
old_id = self.tried_matrix[tried_bucket][tried_bucket_pos]
return self.map_info[old_id]
def get_peers_(self) -> List[TimestampedPeerInfo]:
addr: List[TimestampedPeerInfo] = []
num_nodes = math.ceil(23 * len(self.random_pos) / 100)
if num_nodes > 1000:
num_nodes = 1000
for n in range(len(self.random_pos)):
if len(addr) >= num_nodes:
return addr
rand_pos = randrange(len(self.random_pos) - n) + n
self.swap_random_(n, rand_pos)
info = self.map_info[self.random_pos[n]]
if not info.peer_info.is_valid(self.allow_private_subnets):
continue
if not info.is_terrible():
cur_peer_info = TimestampedPeerInfo(
info.peer_info.host,
uint16(info.peer_info.port),
uint64(info.timestamp),
)
addr.append(cur_peer_info)
return addr
def cleanup(self, max_timestamp_difference: int, max_consecutive_failures: int):
now = int(math.floor(time.time()))
for bucket in range(NEW_BUCKET_COUNT):
for pos in range(BUCKET_SIZE):
if self.new_matrix[bucket][pos] != -1:
node_id = self.new_matrix[bucket][pos]
cur_info = self.map_info[node_id]
if (
cur_info.timestamp < now - max_timestamp_difference
and cur_info.num_attempts >= max_consecutive_failures
):
self.clear_new_(bucket, pos)
def connect_(self, addr: PeerInfo, timestamp: int):
info, _ = self.find_(addr)
if info is None:
return None
# check whether we are talking about the exact same peer
if not (info.peer_info.host == addr.host and info.peer_info.port == addr.port):
return None
update_interval = 20 * 60
if timestamp - info.timestamp > update_interval:
info.timestamp = timestamp
async def size(self) -> int:
async with self.lock:
return len(self.random_pos)
async def add_to_new_table(
self,
addresses: List[TimestampedPeerInfo],
source: Optional[PeerInfo] = None,
penalty: int = 0,
) -> bool:
is_added = False
async with self.lock:
for addr in addresses:
cur_peer_added = self.add_to_new_table_(addr, source, penalty)
is_added = is_added or cur_peer_added
return is_added
# Mark an entry as accessible.
async def mark_good(
self,
addr: PeerInfo,
test_before_evict: bool = True,
timestamp: int = -1,
):
if timestamp == -1:
timestamp = math.floor(time.time())
async with self.lock:
self.mark_good_(addr, test_before_evict, timestamp)
# Mark an entry as connection attempted to.
async def attempt(
self,
addr: PeerInfo,
count_failures: bool,
timestamp: int = -1,
):
if timestamp == -1:
timestamp = math.floor(time.time())
async with self.lock:
self.attempt_(addr, count_failures, timestamp)
# See if any to-be-evicted tried table entries have been tested and if so resolve the collisions.
async def resolve_tried_collisions(self):
async with self.lock:
self.resolve_tried_collisions_()
# Randomly select an address in tried that another address is attempting to evict.
async def select_tried_collision(self) -> Optional[ExtendedPeerInfo]:
async with self.lock:
return self.select_tried_collision_()
# Choose an address to connect to.
async def select_peer(self, new_only: bool = False) -> Optional[ExtendedPeerInfo]:
async with self.lock:
return self.select_peer_(new_only)
# Return a bunch of addresses, selected at random.
async def get_peers(self) -> List[TimestampedPeerInfo]:
async with self.lock:
return self.get_peers_()
async def connect(self, addr: PeerInfo, timestamp: int = -1):
if timestamp == -1:
timestamp = math.floor(time.time())
async with self.lock:
return self.connect_(addr, timestamp)