chia-blockchain/chia/seeder/crawl_store.py

383 lines
15 KiB
Python

from __future__ import annotations
import ipaddress
import logging
import random
import time
from dataclasses import dataclass, field, replace
from typing import Dict, List
import aiosqlite
from chia.seeder.peer_record import PeerRecord, PeerReliability
from chia.util.ints import uint32, uint64
log = logging.getLogger(__name__)
@dataclass
class CrawlStore:
crawl_db: aiosqlite.Connection
host_to_records: Dict[str, PeerRecord] = field(default_factory=dict) # peer_id: PeerRecord
host_to_selected_time: Dict[str, float] = field(default_factory=dict) # peer_id: timestamp (as a float)
host_to_reliability: Dict[str, PeerReliability] = field(default_factory=dict) # peer_id: PeerReliability
banned_peers: int = 0
ignored_peers: int = 0
reliable_peers: int = 0
@classmethod
async def create(cls, connection: aiosqlite.Connection) -> CrawlStore:
self = cls(connection)
await self.crawl_db.execute(
"CREATE TABLE IF NOT EXISTS peer_records("
" peer_id text PRIMARY KEY,"
" ip_address text,"
" port bigint,"
" connected int,"
" last_try_timestamp bigint,"
" try_count bigint,"
" connected_timestamp bigint,"
" added_timestamp bigint,"
" best_timestamp bigint,"
" version text,"
" handshake_time text"
" tls_version text)"
)
await self.crawl_db.execute(
"CREATE TABLE IF NOT EXISTS peer_reliability("
" peer_id text PRIMARY KEY,"
" ignore_till int, ban_till int,"
" stat_2h_w real, stat_2h_c real, stat_2h_r real,"
" stat_8h_w real, stat_8h_c real, stat_8h_r real,"
" stat_1d_w real, stat_1d_c real, stat_1d_r real,"
" stat_1w_w real, stat_1w_c real, stat_1w_r real,"
" stat_1m_w real, stat_1m_c real, stat_1m_r real,"
" tries int, successes int)"
)
try:
await self.crawl_db.execute("ALTER TABLE peer_records ADD COLUMN tls_version text")
except aiosqlite.OperationalError:
pass # ignore what is likely Duplicate column error
await self.crawl_db.execute("CREATE TABLE IF NOT EXISTS good_peers(ip text)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS ip_address on peer_records(ip_address)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS port on peer_records(port)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS connected on peer_records(connected)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS added_timestamp on peer_records(added_timestamp)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS peer_id on peer_reliability(peer_id)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS ignore_till on peer_reliability(ignore_till)")
await self.crawl_db.commit()
await self.unload_from_db()
return self
def maybe_add_peer(self, peer_record: PeerRecord, peer_reliability: PeerReliability) -> None:
if peer_record.peer_id not in self.host_to_records:
self.host_to_records[peer_record.peer_id] = peer_record
if peer_reliability.peer_id not in self.host_to_reliability:
self.host_to_reliability[peer_reliability.peer_id] = peer_reliability
async def add_peer(self, peer_record: PeerRecord, peer_reliability: PeerReliability, save_db: bool = False) -> None:
if not save_db:
self.host_to_records[peer_record.peer_id] = peer_record
self.host_to_reliability[peer_reliability.peer_id] = peer_reliability
return
added_timestamp = int(time.time())
cursor = await self.crawl_db.execute(
"INSERT OR REPLACE INTO peer_records VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
peer_record.peer_id,
peer_record.ip_address,
peer_record.port,
int(peer_record.connected),
peer_record.last_try_timestamp,
peer_record.try_count,
peer_record.connected_timestamp,
added_timestamp,
peer_record.best_timestamp,
peer_record.version,
peer_record.handshake_time,
peer_record.tls_version,
),
)
await cursor.close()
cursor = await self.crawl_db.execute(
"INSERT OR REPLACE INTO peer_reliability"
" VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
peer_reliability.peer_id,
peer_reliability.ignore_till,
peer_reliability.ban_till,
peer_reliability.stat_2h.weight,
peer_reliability.stat_2h.count,
peer_reliability.stat_2h.reliability,
peer_reliability.stat_8h.weight,
peer_reliability.stat_8h.count,
peer_reliability.stat_8h.reliability,
peer_reliability.stat_1d.weight,
peer_reliability.stat_1d.count,
peer_reliability.stat_1d.reliability,
peer_reliability.stat_1w.weight,
peer_reliability.stat_1w.count,
peer_reliability.stat_1w.reliability,
peer_reliability.stat_1m.weight,
peer_reliability.stat_1m.count,
peer_reliability.stat_1m.reliability,
peer_reliability.tries,
peer_reliability.successes,
),
)
await cursor.close()
async def get_peer_reliability(self, peer_id: str) -> PeerReliability:
return self.host_to_reliability[peer_id]
async def peer_failed_to_connect(self, peer: PeerRecord) -> None:
now = uint64(time.time())
age_timestamp = int(max(peer.last_try_timestamp, peer.connected_timestamp))
if age_timestamp == 0:
age_timestamp = now - 1000
replaced = replace(peer, try_count=uint32(peer.try_count + 1), last_try_timestamp=now)
reliability = await self.get_peer_reliability(peer.peer_id)
if reliability is None:
reliability = PeerReliability(peer.peer_id)
reliability.update(False, now - age_timestamp)
await self.add_peer(replaced, reliability)
async def peer_connected(self, peer: PeerRecord, tls_version: str) -> None:
now = uint64(time.time())
age_timestamp = int(max(peer.last_try_timestamp, peer.connected_timestamp))
if age_timestamp == 0:
age_timestamp = now - 1000
replaced = replace(peer, connected=True, connected_timestamp=now, tls_version=tls_version)
reliability = await self.get_peer_reliability(peer.peer_id)
if reliability is None:
reliability = PeerReliability(peer.peer_id)
reliability.update(True, now - age_timestamp)
await self.add_peer(replaced, reliability)
async def update_best_timestamp(self, host: str, timestamp: uint64) -> None:
if host not in self.host_to_records:
return
record = self.host_to_records[host]
replaced = replace(record, best_timestamp=timestamp)
if host not in self.host_to_reliability:
return
reliability = self.host_to_reliability[host]
await self.add_peer(replaced, reliability)
async def peer_connected_hostname(self, host: str, connected: bool = True, tls_version: str = "unknown") -> None:
if host not in self.host_to_records:
return
record = self.host_to_records[host]
if connected:
await self.peer_connected(record, tls_version)
else:
await self.peer_failed_to_connect(record)
async def get_peers_to_crawl(self, min_batch_size: int, max_batch_size: int) -> List[PeerRecord]:
now = int(time.time())
records = []
records_v6 = []
counter = 0
self.ignored_peers = 0
self.banned_peers = 0
for peer_id in self.host_to_reliability:
add = False
counter += 1
reliability = self.host_to_reliability[peer_id]
if reliability.ignore_till < now and reliability.ban_till < now:
add = True
else:
if reliability.ban_till >= now:
self.banned_peers += 1
elif reliability.ignore_till >= now:
self.ignored_peers += 1
record = self.host_to_records[peer_id]
if record.last_try_timestamp == 0 and record.connected_timestamp == 0:
add = True
if peer_id in self.host_to_selected_time:
last_selected = self.host_to_selected_time[peer_id]
if time.time() - last_selected < 120:
add = False
if add:
v6 = True
try:
_ = ipaddress.IPv6Address(peer_id)
except ValueError:
v6 = False
delta_time = 600 if v6 else 1000
if now - record.last_try_timestamp >= delta_time and now - record.connected_timestamp >= delta_time:
if not v6:
records.append(record)
else:
records_v6.append(record)
batch_size = max(min_batch_size, len(records) // 10)
batch_size = min(batch_size, max_batch_size)
if len(records) > batch_size:
random.shuffle(records)
records = records[:batch_size]
if len(records_v6) > batch_size:
random.shuffle(records_v6)
records_v6 = records_v6[:batch_size]
records += records_v6
for record in records:
self.host_to_selected_time[record.peer_id] = time.time()
return records
def get_ipv6_peers(self) -> int:
counter = 0
for peer_id in self.host_to_reliability:
v6 = True
try:
_ = ipaddress.IPv6Address(peer_id)
except ValueError:
v6 = False
if v6:
counter += 1
return counter
def get_total_records(self) -> int:
return len(self.host_to_records)
def get_ignored_peers(self) -> int:
return self.ignored_peers
def get_banned_peers(self) -> int:
return self.banned_peers
def get_reliable_peers(self) -> int:
return self.reliable_peers
async def load_to_db(self) -> None:
log.info("Saving peers to DB...")
for peer_id in list(self.host_to_reliability.keys()):
if peer_id in self.host_to_reliability and peer_id in self.host_to_records:
reliability = self.host_to_reliability[peer_id]
record = self.host_to_records[peer_id]
await self.add_peer(record, reliability, True)
await self.crawl_db.commit()
log.info(" - Done saving peers to DB")
async def unload_from_db(self) -> None:
self.host_to_records = {}
self.host_to_reliability = {}
log.info("Loading peer reliability records...")
cursor = await self.crawl_db.execute(
"SELECT * from peer_reliability",
)
rows = await cursor.fetchall()
await cursor.close()
for row in rows:
reliability = PeerReliability(
row[0],
row[1],
row[2],
row[3],
row[4],
row[5],
row[6],
row[7],
row[8],
row[9],
row[10],
row[11],
row[12],
row[13],
row[14],
row[15],
row[16],
row[17],
row[18],
row[19],
)
self.host_to_reliability[reliability.peer_id] = reliability
log.info(" - Done loading peer reliability records...")
log.info("Loading peer records...")
cursor = await self.crawl_db.execute(
"SELECT * from peer_records",
)
rows = await cursor.fetchall()
await cursor.close()
for row in rows:
peer = PeerRecord(
row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11]
)
self.host_to_records[peer.peer_id] = peer
log.info(" - Done loading peer records...")
# Crawler -> DNS.
async def load_reliable_peers_to_db(self) -> None:
peers = []
for peer_id, reliability in self.host_to_reliability.items():
if reliability.is_reliable():
peers.append(peer_id)
self.reliable_peers = len(peers)
log.info("Deleting old good_peers from DB...")
cursor = await self.crawl_db.execute(
"DELETE from good_peers",
)
await cursor.close()
log.info(" - Done deleting old good_peers...")
log.info("Saving new good_peers to DB...")
for peer_id in peers:
cursor = await self.crawl_db.execute(
"INSERT OR REPLACE INTO good_peers VALUES(?)",
(peer_id,),
)
await cursor.close()
await self.crawl_db.commit()
log.info(" - Done saving new good_peers to DB...")
def load_host_to_version(self) -> tuple[dict[str, str], dict[str, uint64]]:
versions = {}
handshake = {}
for host, record in self.host_to_records.items():
if host not in self.host_to_records:
continue
record = self.host_to_records[host]
if record.version == "undefined":
continue
if record.handshake_time < time.time() - 5 * 24 * 3600:
continue
versions[host] = record.version
handshake[host] = record.handshake_time
return versions, handshake
def load_best_peer_reliability(self) -> dict[str, uint64]:
best_timestamp = {}
for host, record in self.host_to_records.items():
if record.best_timestamp > time.time() - 5 * 24 * 3600:
best_timestamp[host] = record.best_timestamp
return best_timestamp
async def update_version(self, host: str, version: str, timestamp_now: uint64) -> None:
record = self.host_to_records.get(host, None)
reliability = self.host_to_reliability.get(host, None)
if record is None or reliability is None:
return
record.update_version(version, timestamp_now)
await self.add_peer(record, reliability)
async def get_good_peers(self) -> list[str]: # This is for the DNS server
cursor = await self.crawl_db.execute(
"SELECT * from good_peers",
)
rows = await cursor.fetchall()
await cursor.close()
result = [row[0] for row in rows]
if len(result) > 0:
random.shuffle(result) # mix up the peers
return result