diff --git a/README.md b/README.md index 1a553c1e4a08..cb97e5d8d855 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Chia is a modern cryptocurrency built from scratch, designed to be efficient, decentralized, and secure. Here are some of the features and benefits: * [Proof of space and time](https://docs.google.com/document/d/1tmRIb7lgi4QfKkNaxuKOBHRmwbVlGL4f7EsBDr_5xZE/edit) based consensus which allows anyone to farm with commodity hardware * Very easy to use full node and farmer GUI and cli (thousands of nodes active on mainnet) +* [Chia seeder](https://github.com/Chia-Network/chia-blockchain/wiki/Chia-Seeder-User-Guide), which maintains a list of reliable nodes within the Chia network via a built-in DNS server. * Simplified UTXO based transaction model, with small on-chain state * Lisp-style Turing-complete functional [programming language](https://chialisp.com/) for money related use cases * BLS keys and aggregate signatures (only one signature per block) @@ -33,6 +34,7 @@ TCP port 8444 access to your peer. These methods tend to be router make/model specific. Most users should only install harvesters, farmers, plotter, full nodes, and wallets. +Setting up a seeder is best left to more advanced users. Building Timelords and VDFs is for sophisticated users, in most environments. Chia Network and additional volunteers are running sufficient Timelords for consensus. diff --git a/chia/cmds/seeder.py b/chia/cmds/seeder.py new file mode 100644 index 000000000000..15500e979e6d --- /dev/null +++ b/chia/cmds/seeder.py @@ -0,0 +1,204 @@ +import os +from pathlib import Path +from typing import Dict + +import click + +import chia.cmds.configure as chia_configure +from chia import __version__ +from chia.cmds.chia import monkey_patch_click +from chia.cmds.init_funcs import init +from chia.seeder.util.config import patch_default_seeder_config +from chia.seeder.util.service_groups import all_groups, services_for_groups +from chia.seeder.util.service import launch_service, kill_service +from chia.util.config import load_config, save_config +from chia.util.default_root import DEFAULT_ROOT_PATH + +CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) + + +@click.group( + help=f"\n Manage the Chia Seeder ({__version__})\n", + epilog="Try 'chia seeder start crawler' or 'chia seeder start server'", + context_settings=CONTEXT_SETTINGS, +) +@click.option("--root-path", default=DEFAULT_ROOT_PATH, help="Config file root", type=click.Path(), show_default=True) +@click.pass_context +def cli( + ctx: click.Context, + root_path: str, +) -> None: + from pathlib import Path + + ctx.ensure_object(dict) + ctx.obj["root_path"] = Path(root_path) + + +@cli.command("version", short_help="Show the Chia Seeder version") +def version_cmd() -> None: + print(__version__) + + +@click.command("init", short_help="Create or migrate the configuration") +@click.pass_context +def init_cmd(ctx: click.Context, **kwargs): + print("Calling Chia Seeder Init...") + init(None, ctx.obj["root_path"], True) + if os.environ.get("CHIA_ROOT", None) is not None: + print(f"warning, your CHIA_ROOT is set to {os.environ['CHIA_ROOT']}.") + root_path = ctx.obj["root_path"] + print(f"Chia directory {root_path}") + if root_path.is_dir() and not Path(root_path / "config" / "config.yaml").exists(): + # This is reached if CHIA_ROOT is set, but there is no config + # This really shouldn't happen, but if we dont have the base chia config, we can't continue + print("Config does not exist. Can't continue!") + return -1 + patch_default_seeder_config(root_path) + return 0 + + +@click.command("start", short_help="Start service groups") +@click.argument("group", type=click.Choice(all_groups()), nargs=-1, required=True) +@click.pass_context +def start_cmd(ctx: click.Context, group: str) -> None: + services = services_for_groups(group) + + for service in services: + print(f"Starting {service}") + launch_service(ctx.obj["root_path"], service) + + +@click.command("stop", short_help="Stop service groups") +@click.argument("group", type=click.Choice(all_groups()), nargs=-1, required=True) +@click.pass_context +def stop_cmd(ctx: click.Context, group: str) -> None: + services = services_for_groups(group) + + for service in services: + print(f"Stopping {service}") + kill_service(ctx.obj["root_path"], service) + + +def configure( + root_path: Path, + testnet: str, + crawler_db_path: str, + minimum_version_count: int, + domain_name: str, + nameserver: str, +): + # Run the parent config, in case anything there (testnet) needs to be run, THEN load the config for local changes + chia_configure.configure(root_path, "", "", "", "", "", "", "", "", testnet, "") + + config: Dict = load_config(DEFAULT_ROOT_PATH, "config.yaml") + change_made = False + if testnet is not None: + if testnet == "true" or testnet == "t": + print("Updating Chia Seeder to testnet settings") + port = 58444 + network = "testnet7" + bootstrap = ["testnet-node.chia.net"] + + config["seeder"]["port"] = port + config["seeder"]["other_peers_port"] = port + config["seeder"]["selected_network"] = network + config["seeder"]["bootstrap_peers"] = bootstrap + + change_made = True + + elif testnet == "false" or testnet == "f": + print("Updating Chia Seeder to mainnet settings") + port = 8444 + network = "mainnet" + bootstrap = ["node.chia.net"] + + config["seeder"]["port"] = port + config["seeder"]["other_peers_port"] = port + config["seeder"]["selected_network"] = network + config["seeder"]["bootstrap_peers"] = bootstrap + + change_made = True + else: + print("Please choose True or False") + + if crawler_db_path is not None: + config["seeder"]["crawler_db_path"] = crawler_db_path + change_made = True + + if minimum_version_count is not None: + config["seeder"]["minimum_version_count"] = minimum_version_count + change_made = True + + if domain_name is not None: + config["seeder"]["domain_name"] = domain_name + change_made = True + + if nameserver is not None: + config["seeder"]["nameserver"] = nameserver + change_made = True + + if change_made: + print("Restart any running Chia Seeder services for changes to take effect") + save_config(root_path, "config.yaml", config) + return 0 + + +@click.command("configure", short_help="Modify configuration") +@click.option( + "--testnet", + "-t", + help="configures for connection to testnet", + type=click.Choice(["true", "t", "false", "f"]), +) +@click.option( + "--crawler-db-path", + help="configures for path to the crawler database", + type=str, +) +@click.option( + "--minimum-version-count", + help="configures how many of a particular version must be seen to be reported in logs", + type=int, +) +@click.option( + "--domain-name", + help="configures the domain_name setting. Ex: `seeder.example.com.`", + type=str, +) +@click.option( + "--nameserver", + help="configures the nameserver setting. Ex: `example.com.`", + type=str, +) +@click.pass_context +def configure_cmd( + ctx, + testnet, + crawler_db_path, + minimum_version_count, + domain_name, + nameserver, +): + configure( + ctx.obj["root_path"], + testnet, + crawler_db_path, + minimum_version_count, + domain_name, + nameserver, + ) + + +cli.add_command(init_cmd) +cli.add_command(start_cmd) +cli.add_command(stop_cmd) +cli.add_command(configure_cmd) + + +def main() -> None: + monkey_patch_click() + cli() # pylint: disable=no-value-for-parameter + + +if __name__ == "__main__": + main() diff --git a/chia/daemon/server.py b/chia/daemon/server.py index a2250d8488d7..03aee1988815 100644 --- a/chia/daemon/server.py +++ b/chia/daemon/server.py @@ -100,6 +100,9 @@ if getattr(sys, "frozen", False): "chia_timelord": "start_timelord", "chia_timelord_launcher": "timelord_launcher", "chia_full_node_simulator": "start_simulator", + "chia_seeder": "chia_seeder", + "chia_seeder_crawler": "chia_seeder_crawler", + "chia_seeder_dns": "chia_seeder_dns", } def executable_for_service(service_name: str) -> str: diff --git a/chia/seeder/__init__.py b/chia/seeder/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/chia/seeder/crawl_store.py b/chia/seeder/crawl_store.py new file mode 100644 index 000000000000..a6eccec87f24 --- /dev/null +++ b/chia/seeder/crawl_store.py @@ -0,0 +1,360 @@ +import asyncio +import dataclasses +import ipaddress +import logging +import random +import time +from typing import List, Dict + +import aiosqlite + +from chia.seeder.peer_record import PeerRecord, PeerReliability + +log = logging.getLogger(__name__) + + +class CrawlStore: + crawl_db: aiosqlite.Connection + last_timestamp: int + lock: asyncio.Lock + + host_to_records: Dict + host_to_selected_time: Dict + host_to_reliability: Dict + banned_peers: int + ignored_peers: int + reliable_peers: int + + @classmethod + async def create(cls, connection: aiosqlite.Connection): + self = cls() + + self.crawl_db = connection + await self.crawl_db.execute( + ( + "CREATE TABLE IF NOT EXISTS peer_records(" + " peer_id text PRIMARY KEY," + " ip_address text," + " port bigint," + " connected int," + " last_try_timestamp bigint," + " try_count bigint," + " connected_timestamp bigint," + " added_timestamp bigint," + " best_timestamp bigint," + " version text," + " handshake_time text)" + ) + ) + await self.crawl_db.execute( + ( + "CREATE TABLE IF NOT EXISTS peer_reliability(" + " peer_id text PRIMARY KEY," + " ignore_till int, ban_till int," + " stat_2h_w real, stat_2h_c real, stat_2h_r real," + " stat_8h_w real, stat_8h_c real, stat_8h_r real," + " stat_1d_w real, stat_1d_c real, stat_1d_r real," + " stat_1w_w real, stat_1w_c real, stat_1w_r real," + " stat_1m_w real, stat_1m_c real, stat_1m_r real," + " tries int, successes int)" + ) + ) + + await self.crawl_db.execute(("CREATE TABLE IF NOT EXISTS good_peers(ip text)")) + + await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS ip_address on peer_records(ip_address)") + + await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS port on peer_records(port)") + + await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS connected on peer_records(connected)") + + await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS added_timestamp on peer_records(added_timestamp)") + + await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS peer_id on peer_reliability(peer_id)") + await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS ignore_till on peer_reliability(ignore_till)") + + await self.crawl_db.commit() + self.last_timestamp = 0 + self.ignored_peers = 0 + self.banned_peers = 0 + self.reliable_peers = 0 + self.host_to_selected_time = {} + await self.unload_from_db() + return self + + def maybe_add_peer(self, peer_record: PeerRecord, peer_reliability: PeerReliability): + if peer_record.peer_id not in self.host_to_records: + self.host_to_records[peer_record.peer_id] = peer_record + if peer_reliability.peer_id not in self.host_to_reliability: + self.host_to_reliability[peer_reliability.peer_id] = peer_reliability + + async def add_peer(self, peer_record: PeerRecord, peer_reliability: PeerReliability, save_db: bool = False): + if not save_db: + self.host_to_records[peer_record.peer_id] = peer_record + self.host_to_reliability[peer_reliability.peer_id] = peer_reliability + return + + added_timestamp = int(time.time()) + cursor = await self.crawl_db.execute( + "INSERT OR REPLACE INTO peer_records VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + peer_record.peer_id, + peer_record.ip_address, + peer_record.port, + int(peer_record.connected), + peer_record.last_try_timestamp, + peer_record.try_count, + peer_record.connected_timestamp, + added_timestamp, + peer_record.best_timestamp, + peer_record.version, + peer_record.handshake_time, + ), + ) + await cursor.close() + cursor = await self.crawl_db.execute( + "INSERT OR REPLACE INTO peer_reliability" + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + peer_reliability.peer_id, + peer_reliability.ignore_till, + peer_reliability.ban_till, + peer_reliability.stat_2h.weight, + peer_reliability.stat_2h.count, + peer_reliability.stat_2h.reliability, + peer_reliability.stat_8h.weight, + peer_reliability.stat_8h.count, + peer_reliability.stat_8h.reliability, + peer_reliability.stat_1d.weight, + peer_reliability.stat_1d.count, + peer_reliability.stat_1d.reliability, + peer_reliability.stat_1w.weight, + peer_reliability.stat_1w.count, + peer_reliability.stat_1w.reliability, + peer_reliability.stat_1m.weight, + peer_reliability.stat_1m.count, + peer_reliability.stat_1m.reliability, + peer_reliability.tries, + peer_reliability.successes, + ), + ) + await cursor.close() + + async def get_peer_reliability(self, peer_id: str) -> PeerReliability: + return self.host_to_reliability[peer_id] + + async def peer_failed_to_connect(self, peer: PeerRecord): + now = int(time.time()) + age_timestamp = int(max(peer.last_try_timestamp, peer.connected_timestamp)) + if age_timestamp == 0: + age_timestamp = now - 1000 + replaced = dataclasses.replace(peer, try_count=peer.try_count + 1, last_try_timestamp=now) + reliability = await self.get_peer_reliability(peer.peer_id) + if reliability is None: + reliability = PeerReliability(peer.peer_id) + reliability.update(False, now - age_timestamp) + await self.add_peer(replaced, reliability) + + async def peer_connected(self, peer: PeerRecord): + now = int(time.time()) + age_timestamp = int(max(peer.last_try_timestamp, peer.connected_timestamp)) + if age_timestamp == 0: + age_timestamp = now - 1000 + replaced = dataclasses.replace(peer, connected=True, connected_timestamp=now) + reliability = await self.get_peer_reliability(peer.peer_id) + if reliability is None: + reliability = PeerReliability(peer.peer_id) + reliability.update(True, now - age_timestamp) + await self.add_peer(replaced, reliability) + + async def update_best_timestamp(self, host: str, timestamp): + if host not in self.host_to_records: + return + record = self.host_to_records[host] + replaced = dataclasses.replace(record, best_timestamp=timestamp) + if host not in self.host_to_reliability: + return + reliability = self.host_to_reliability[host] + await self.add_peer(replaced, reliability) + + async def peer_connected_hostname(self, host: str, connected: bool = True): + if host not in self.host_to_records: + return + record = self.host_to_records[host] + if connected: + await self.peer_connected(record) + else: + await self.peer_failed_to_connect(record) + + async def get_peers_to_crawl(self, min_batch_size, max_batch_size) -> List[PeerRecord]: + now = int(time.time()) + records = [] + records_v6 = [] + counter = 0 + self.ignored_peers = 0 + self.banned_peers = 0 + for peer_id in self.host_to_reliability: + add = False + counter += 1 + reliability = self.host_to_reliability[peer_id] + if reliability.ignore_till < now and reliability.ban_till < now: + add = True + else: + if reliability.ban_till >= now: + self.banned_peers += 1 + elif reliability.ignore_till >= now: + self.ignored_peers += 1 + record = self.host_to_records[peer_id] + if record.last_try_timestamp == 0 and record.connected_timestamp == 0: + add = True + if peer_id in self.host_to_selected_time: + last_selected = self.host_to_selected_time[peer_id] + if time.time() - last_selected < 120: + add = False + if add: + v6 = True + try: + _ = ipaddress.IPv6Address(peer_id) + except ValueError: + v6 = False + delta_time = 600 if v6 else 1000 + if now - record.last_try_timestamp >= delta_time and now - record.connected_timestamp >= delta_time: + if not v6: + records.append(record) + else: + records_v6.append(record) + + batch_size = max(min_batch_size, len(records) // 10) + batch_size = min(batch_size, max_batch_size) + if len(records) > batch_size: + random.shuffle(records) + records = records[:batch_size] + if len(records_v6) > batch_size: + random.shuffle(records_v6) + records_v6 = records_v6[:batch_size] + records += records_v6 + for record in records: + self.host_to_selected_time[record.peer_id] = time.time() + return records + + def get_ipv6_peers(self) -> int: + counter = 0 + for peer_id in self.host_to_reliability: + v6 = True + try: + _ = ipaddress.IPv6Address(peer_id) + except ValueError: + v6 = False + if v6: + counter += 1 + return counter + + def get_total_records(self) -> int: + return len(self.host_to_records) + + def get_ignored_peers(self) -> int: + return self.ignored_peers + + def get_banned_peers(self) -> int: + return self.banned_peers + + def get_reliable_peers(self) -> int: + return self.reliable_peers + + async def load_to_db(self): + for peer_id in list(self.host_to_reliability.keys()): + if peer_id in self.host_to_reliability and peer_id in self.host_to_records: + reliability = self.host_to_reliability[peer_id] + record = self.host_to_records[peer_id] + await self.add_peer(record, reliability, True) + await self.crawl_db.commit() + + async def unload_from_db(self): + self.host_to_records = {} + self.host_to_reliability = {} + cursor = await self.crawl_db.execute( + "SELECT * from peer_reliability", + ) + rows = await cursor.fetchall() + await cursor.close() + for row in rows: + reliability = PeerReliability( + row[0], + row[1], + row[2], + row[3], + row[4], + row[5], + row[6], + row[7], + row[8], + row[9], + row[10], + row[11], + row[12], + row[13], + row[14], + row[15], + row[16], + row[17], + row[18], + row[19], + ) + self.host_to_reliability[row[0]] = reliability + cursor = await self.crawl_db.execute( + "SELECT * from peer_records", + ) + rows = await cursor.fetchall() + await cursor.close() + for row in rows: + peer = PeerRecord(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10]) + self.host_to_records[row[0]] = peer + + # Crawler -> DNS. + async def load_reliable_peers_to_db(self): + peers = [] + for peer_id in self.host_to_reliability: + reliability = self.host_to_reliability[peer_id] + if reliability.is_reliable(): + peers.append(peer_id) + self.reliable_peers = len(peers) + cursor = await self.crawl_db.execute( + "DELETE from good_peers", + ) + await cursor.close() + for peer in peers: + cursor = await self.crawl_db.execute( + "INSERT OR REPLACE INTO good_peers VALUES(?)", + (peer,), + ) + await cursor.close() + await self.crawl_db.commit() + + def load_host_to_version(self): + versions = {} + handshake = {} + + for host, record in self.host_to_records.items(): + if host not in self.host_to_records: + continue + record = self.host_to_records[host] + if record.version == "undefined": + continue + versions[host] = record.version + handshake[host] = record.handshake_time + + return (versions, handshake) + + def load_best_peer_reliability(self): + best_timestamp = {} + for host, record in self.host_to_records.items(): + best_timestamp[host] = record.best_timestamp + return best_timestamp + + async def update_version(self, host, version, now): + record = self.host_to_records.get(host, None) + reliability = self.host_to_reliability.get(host, None) + if record is None or reliability is None: + return + record.update_version(version, now) + await self.add_peer(record, reliability) diff --git a/chia/seeder/crawler.py b/chia/seeder/crawler.py new file mode 100644 index 000000000000..352f03a434ff --- /dev/null +++ b/chia/seeder/crawler.py @@ -0,0 +1,322 @@ +import asyncio +import logging +import time +import traceback +import ipaddress +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple + +import aiosqlite + +import chia.server.ws_connection as ws +from chia.consensus.constants import ConsensusConstants +from chia.full_node.coin_store import CoinStore +from chia.protocols import full_node_protocol +from chia.seeder.crawl_store import CrawlStore +from chia.seeder.peer_record import PeerRecord, PeerReliability +from chia.server.server import ChiaServer +from chia.types.peer_info import PeerInfo +from chia.util.path import mkdir, path_from_root +from chia.util.ints import uint32, uint64 + +log = logging.getLogger(__name__) + + +class Crawler: + sync_store: Any + coin_store: CoinStore + connection: aiosqlite.Connection + config: Dict + server: Any + log: logging.Logger + constants: ConsensusConstants + _shut_down: bool + root_path: Path + peer_count: int + with_peak: set + + def __init__( + self, + config: Dict, + root_path: Path, + consensus_constants: ConsensusConstants, + name: str = None, + ): + self.initialized = False + self.root_path = root_path + self.config = config + self.server = None + self._shut_down = False # Set to true to close all infinite loops + self.constants = consensus_constants + self.state_changed_callback: Optional[Callable] = None + self.crawl_store = None + self.log = log + self.peer_count = 0 + self.with_peak = set() + self.peers_retrieved: List[Any] = [] + self.host_to_version: Dict[str, str] = {} + self.version_cache: List[Tuple[str, str]] = [] + self.handshake_time: Dict[str, int] = {} + self.best_timestamp_per_peer: Dict[str, int] = {} + if "crawler_db_path" in config and config["crawler_db_path"] != "": + path = Path(config["crawler_db_path"]) + self.db_path = path.resolve() + else: + db_path_replaced: str = "crawler.db" + self.db_path = path_from_root(root_path, db_path_replaced) + mkdir(self.db_path.parent) + self.bootstrap_peers = config["bootstrap_peers"] + self.minimum_height = config["minimum_height"] + self.other_peers_port = config["other_peers_port"] + + def _set_state_changed_callback(self, callback: Callable): + self.state_changed_callback = callback + + async def create_client(self, peer_info, on_connect): + return await self.server.start_client(peer_info, on_connect) + + async def connect_task(self, peer): + async def peer_action(peer: ws.WSChiaConnection): + peer_info = peer.get_peer_info() + version = peer.get_version() + if peer_info is not None and version is not None: + self.version_cache.append((peer_info.host, version)) + # Ask peer for peers + response = await peer.request_peers(full_node_protocol.RequestPeers(), timeout=3) + # Add peers to DB + if isinstance(response, full_node_protocol.RespondPeers): + self.peers_retrieved.append(response) + peer_info = peer.get_peer_info() + tries = 0 + got_peak = False + while tries < 25: + tries += 1 + if peer_info is None: + break + if peer_info in self.with_peak: + got_peak = True + break + await asyncio.sleep(0.1) + if not got_peak and peer_info is not None and self.crawl_store is not None: + await self.crawl_store.peer_connected_hostname(peer_info.host, False) + await peer.close() + + try: + connected = await self.create_client(PeerInfo(peer.ip_address, peer.port), peer_action) + if not connected: + await self.crawl_store.peer_failed_to_connect(peer) + except Exception as e: + self.log.info(f"Exception: {e}. Traceback: {traceback.format_exc()}.") + await self.crawl_store.peer_failed_to_connect(peer) + + async def _start(self): + self.task = asyncio.create_task(self.crawl()) + + async def crawl(self): + try: + self.connection = await aiosqlite.connect(self.db_path) + self.crawl_store = await CrawlStore.create(self.connection) + self.log.info("Started") + await self.crawl_store.load_to_db() + await self.crawl_store.load_reliable_peers_to_db() + t_start = time.time() + total_nodes = 0 + self.seen_nodes = set() + tried_nodes = set() + for peer in self.bootstrap_peers: + new_peer = PeerRecord( + peer, + peer, + self.other_peers_port, + False, + 0, + 0, + 0, + uint64(int(time.time())), + uint64(0), + "undefined", + uint64(0), + ) + new_peer_reliability = PeerReliability(peer) + self.crawl_store.maybe_add_peer(new_peer, new_peer_reliability) + + self.host_to_version, self.handshake_time = self.crawl_store.load_host_to_version() + self.best_timestamp_per_peer = self.crawl_store.load_best_peer_reliability() + while True: + self.with_peak = set() + peers_to_crawl = await self.crawl_store.get_peers_to_crawl(25000, 250000) + tasks = set() + for peer in peers_to_crawl: + if peer.port == self.other_peers_port: + total_nodes += 1 + if peer.ip_address not in tried_nodes: + tried_nodes.add(peer.ip_address) + task = asyncio.create_task(self.connect_task(peer)) + tasks.add(task) + if len(tasks) >= 250: + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + tasks = set(filter(lambda t: not t.done(), tasks)) + + if len(tasks) > 0: + await asyncio.wait(tasks, timeout=30) + + for response in self.peers_retrieved: + for response_peer in response.peer_list: + if response_peer.host not in self.best_timestamp_per_peer: + self.best_timestamp_per_peer[response_peer.host] = response_peer.timestamp + self.best_timestamp_per_peer[response_peer.host] = max( + self.best_timestamp_per_peer[response_peer.host], response_peer.timestamp + ) + if ( + response_peer.host not in self.seen_nodes + and response_peer.timestamp > time.time() - 5 * 24 * 3600 + ): + self.seen_nodes.add(response_peer.host) + new_peer = PeerRecord( + response_peer.host, + response_peer.host, + uint32(response_peer.port), + False, + uint64(0), + uint32(0), + uint64(0), + uint64(int(time.time())), + uint64(response_peer.timestamp), + "undefined", + uint64(0), + ) + new_peer_reliability = PeerReliability(response_peer.host) + if self.crawl_store is not None: + self.crawl_store.maybe_add_peer(new_peer, new_peer_reliability) + await self.crawl_store.update_best_timestamp( + response_peer.host, + self.best_timestamp_per_peer[response_peer.host], + ) + for host, version in self.version_cache: + self.handshake_time[host] = int(time.time()) + self.host_to_version[host] = version + await self.crawl_store.update_version(host, version, int(time.time())) + + to_remove = set() + now = int(time.time()) + for host in self.host_to_version.keys(): + active = True + if host not in self.handshake_time: + active = False + elif self.handshake_time[host] < now - 5 * 24 * 3600: + active = False + if not active: + to_remove.add(host) + + self.host_to_version = { + host: version for host, version in self.host_to_version.items() if host not in to_remove + } + self.best_timestamp_per_peer = { + host: timestamp + for host, timestamp in self.best_timestamp_per_peer.items() + if timestamp >= now - 5 * 24 * 3600 + } + versions = {} + for host, version in self.host_to_version.items(): + if version not in versions: + versions[version] = 0 + versions[version] += 1 + self.version_cache = [] + self.peers_retrieved = [] + + self.server.banned_peers = {} + if len(peers_to_crawl) == 0: + continue + await self.crawl_store.load_to_db() + await self.crawl_store.load_reliable_peers_to_db() + total_records = self.crawl_store.get_total_records() + ipv6_count = self.crawl_store.get_ipv6_peers() + self.log.error("***") + self.log.error("Finished batch:") + self.log.error(f"Total IPs stored in DB: {total_records}.") + self.log.error(f"Total IPV6 addresses stored in DB: {ipv6_count}") + self.log.error(f"Total connections attempted since crawler started: {total_nodes}.") + self.log.error(f"Total unique nodes attempted since crawler started: {len(tried_nodes)}.") + t_now = time.time() + t_delta = int(t_now - t_start) + if t_delta > 0: + self.log.error(f"Avg connections per second: {total_nodes // t_delta}.") + # Periodically print detailed stats. + reliable_peers = self.crawl_store.get_reliable_peers() + self.log.error(f"High quality reachable nodes, used by DNS introducer in replies: {reliable_peers}") + banned_peers = self.crawl_store.get_banned_peers() + ignored_peers = self.crawl_store.get_ignored_peers() + available_peers = len(self.host_to_version) + addresses_count = len(self.best_timestamp_per_peer) + total_records = self.crawl_store.get_total_records() + ipv6_addresses_count = 0 + for host in self.best_timestamp_per_peer.keys(): + try: + _ = ipaddress.IPv6Address(host) + ipv6_addresses_count += 1 + except ValueError: + continue + self.log.error( + "IPv4 addresses gossiped with timestamp in the last 5 days with respond_peers messages: " + f"{addresses_count - ipv6_addresses_count}." + ) + self.log.error( + "IPv6 addresses gossiped with timestamp in the last 5 days with respond_peers messages: " + f"{ipv6_addresses_count}." + ) + ipv6_available_peers = 0 + for host in self.host_to_version.keys(): + try: + _ = ipaddress.IPv6Address(host) + ipv6_available_peers += 1 + except ValueError: + continue + self.log.error( + f"Total IPv4 nodes reachable in the last 5 days: {available_peers - ipv6_available_peers}." + ) + self.log.error(f"Total IPv6 nodes reachable in the last 5 days: {ipv6_available_peers}.") + self.log.error("Version distribution among reachable in the last 5 days (at least 100 nodes):") + if "minimum_version_count" in self.config and self.config["minimum_version_count"] > 0: + minimum_version_count = self.config["minimum_version_count"] + else: + minimum_version_count = 100 + for version, count in sorted(versions.items(), key=lambda kv: kv[1], reverse=True): + if count >= minimum_version_count: + self.log.error(f"Version: {version} - Count: {count}") + self.log.error(f"Banned addresses in the DB: {banned_peers}") + self.log.error(f"Temporary ignored addresses in the DB: {ignored_peers}") + self.log.error( + "Peers to crawl from in the next batch (total IPs - ignored - banned): " + f"{total_records - banned_peers - ignored_peers}" + ) + self.log.error("***") + except Exception as e: + self.log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.") + + def set_server(self, server: ChiaServer): + self.server = server + + def _state_changed(self, change: str): + if self.state_changed_callback is not None: + self.state_changed_callback(change) + + async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection): + try: + peer_info = peer.get_peer_info() + if peer_info is None: + return + if request.height >= self.minimum_height: + if self.crawl_store is not None: + await self.crawl_store.peer_connected_hostname(peer_info.host, True) + self.with_peak.add(peer_info) + except Exception as e: + self.log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.") + + async def on_connect(self, connection: ws.WSChiaConnection): + pass + + def _close(self): + self._shut_down = True + + async def _await_closed(self): + await self.connection.close() diff --git a/chia/seeder/crawler_api.py b/chia/seeder/crawler_api.py new file mode 100644 index 000000000000..7d9d1baf7e23 --- /dev/null +++ b/chia/seeder/crawler_api.py @@ -0,0 +1,128 @@ +from typing import Callable, Optional + +import chia.server.ws_connection as ws +from chia.full_node.full_node import full_node_protocol, wallet_protocol +from chia.seeder.crawler import Crawler +from chia.server.outbound_message import Message +from chia.util.api_decorators import api_request, peer_required + + +class CrawlerAPI: + crawler: Crawler + + def __init__(self, crawler): + self.crawler = crawler + + def _set_state_changed_callback(self, callback: Callable): + pass + + def __getattr__(self, attr_name: str): + async def invoke(*args, **kwargs): + pass + + return invoke + + @property + def server(self): + return self.crawler.server + + @property + def log(self): + return self.crawler.log + + @peer_required + @api_request + async def request_peers(self, _request: full_node_protocol.RequestPeers, peer: ws.WSChiaConnection): + pass + + @peer_required + @api_request + async def respond_peers( + self, request: full_node_protocol.RespondPeers, peer: ws.WSChiaConnection + ) -> Optional[Message]: + pass + + @peer_required + @api_request + async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection) -> Optional[Message]: + await self.crawler.new_peak(request, peer) + return None + + @api_request + async def new_transaction(self, transaction: full_node_protocol.NewTransaction) -> Optional[Message]: + pass + + @api_request + @peer_required + async def new_signage_point_or_end_of_sub_slot( + self, new_sp: full_node_protocol.NewSignagePointOrEndOfSubSlot, peer: ws.WSChiaConnection + ) -> Optional[Message]: + pass + + @api_request + async def new_unfinished_block( + self, new_unfinished_block: full_node_protocol.NewUnfinishedBlock + ) -> Optional[Message]: + pass + + @peer_required + @api_request + async def new_compact_vdf(self, request: full_node_protocol.NewCompactVDF, peer: ws.WSChiaConnection): + pass + + @api_request + async def request_transaction(self, request: full_node_protocol.RequestTransaction) -> Optional[Message]: + pass + + @api_request + async def request_proof_of_weight(self, request: full_node_protocol.RequestProofOfWeight) -> Optional[Message]: + pass + + @api_request + async def request_block(self, request: full_node_protocol.RequestBlock) -> Optional[Message]: + pass + + @api_request + async def request_blocks(self, request: full_node_protocol.RequestBlocks) -> Optional[Message]: + pass + + @api_request + async def request_unfinished_block( + self, request_unfinished_block: full_node_protocol.RequestUnfinishedBlock + ) -> Optional[Message]: + pass + + @api_request + async def request_signage_point_or_end_of_sub_slot( + self, request: full_node_protocol.RequestSignagePointOrEndOfSubSlot + ) -> Optional[Message]: + pass + + @peer_required + @api_request + async def request_mempool_transactions( + self, + request: full_node_protocol.RequestMempoolTransactions, + peer: ws.WSChiaConnection, + ) -> Optional[Message]: + pass + + @api_request + async def request_block_header(self, request: wallet_protocol.RequestBlockHeader) -> Optional[Message]: + pass + + @api_request + async def request_additions(self, request: wallet_protocol.RequestAdditions) -> Optional[Message]: + pass + + @api_request + async def request_removals(self, request: wallet_protocol.RequestRemovals) -> Optional[Message]: + pass + + @api_request + async def request_puzzle_solution(self, request: wallet_protocol.RequestPuzzleSolution) -> Optional[Message]: + pass + + @api_request + async def request_header_blocks(self, request: wallet_protocol.RequestHeaderBlocks) -> Optional[Message]: + pass diff --git a/chia/seeder/dns_server.py b/chia/seeder/dns_server.py new file mode 100644 index 000000000000..02c101c5e4cf --- /dev/null +++ b/chia/seeder/dns_server.py @@ -0,0 +1,287 @@ +import asyncio +import ipaddress +import logging +import random +import signal +import traceback +from typing import Any, List + +import aiosqlite +from dnslib import A, AAAA, SOA, NS, MX, CNAME, RR, DNSRecord, QTYPE, DNSHeader + +from chia.util.chia_logging import initialize_logging +from chia.util.path import mkdir, path_from_root +from chia.util.config import load_config +from chia.util.default_root import DEFAULT_ROOT_PATH + +SERVICE_NAME = "seeder" +log = logging.getLogger(__name__) + +# DNS snippet taken from: https://gist.github.com/pklaus/b5a7876d4d2cf7271873 + + +class DomainName(str): + def __getattr__(self, item): + return DomainName(item + "." + self) + + +D = None +ns = None +IP = "127.0.0.1" +TTL = None +soa_record = None +ns_records: List[Any] = [] + + +class EchoServerProtocol(asyncio.DatagramProtocol): + def __init__(self, callback): + self.data_queue = asyncio.Queue(loop=asyncio.get_event_loop()) + self.callback = callback + asyncio.ensure_future(self.respond()) + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + asyncio.ensure_future(self.handler(data, addr), loop=asyncio.get_event_loop()) + + async def respond(self): + while True: + try: + resp, caller = await self.data_queue.get() + self.transport.sendto(resp, caller) + except Exception as e: + log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.") + + async def handler(self, data, caller): + try: + data = await self.callback(data) + if data is None: + return + await self.data_queue.put((data, caller)) + except Exception as e: + log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.") + + +class DNSServer: + reliable_peers_v4: List[str] + reliable_peers_v6: List[str] + lock: asyncio.Lock + pointer: int + crawl_db: aiosqlite.Connection + + def __init__(self): + self.reliable_peers_v4 = [] + self.reliable_peers_v6 = [] + self.lock = asyncio.Lock() + self.pointer_v4 = 0 + self.pointer_v6 = 0 + db_path_replaced: str = "crawler.db" + root_path = DEFAULT_ROOT_PATH + self.db_path = path_from_root(root_path, db_path_replaced) + mkdir(self.db_path.parent) + + async def start(self): + # self.crawl_db = await aiosqlite.connect(self.db_path) + # Get a reference to the event loop as we plan to use + # low-level APIs. + loop = asyncio.get_running_loop() + + # One protocol instance will be created to serve all + # client requests. + self.transport, self.protocol = await loop.create_datagram_endpoint( + lambda: EchoServerProtocol(self.dns_response), local_addr=("0.0.0.0", 53) + ) + self.reliable_task = asyncio.create_task(self.periodically_get_reliable_peers()) + + async def periodically_get_reliable_peers(self): + sleep_interval = 0 + while True: + sleep_interval = min(15, sleep_interval + 1) + await asyncio.sleep(sleep_interval * 60) + try: + # TODO: double check this. It shouldn't take this long to connect. + crawl_db = await aiosqlite.connect(self.db_path, timeout=600) + cursor = await crawl_db.execute( + "SELECT * from good_peers", + ) + new_reliable_peers = [] + rows = await cursor.fetchall() + await cursor.close() + await crawl_db.close() + for row in rows: + new_reliable_peers.append(row[0]) + if len(new_reliable_peers) > 0: + random.shuffle(new_reliable_peers) + async with self.lock: + self.reliable_peers_v4 = [] + self.reliable_peers_v6 = [] + for peer in new_reliable_peers: + ipv4 = True + try: + _ = ipaddress.IPv4Address(peer) + except ValueError: + ipv4 = False + if ipv4: + self.reliable_peers_v4.append(peer) + else: + try: + _ = ipaddress.IPv6Address(peer) + except ValueError: + continue + self.reliable_peers_v6.append(peer) + self.pointer_v4 = 0 + self.pointer_v6 = 0 + log.error( + f"Number of reliable peers discovered in dns server:" + f" IPv4 count - {len(self.reliable_peers_v4)}" + f" IPv6 count - {len(self.reliable_peers_v6)}" + ) + except Exception as e: + log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.") + + async def get_peers_to_respond(self, ipv4_count, ipv6_count): + peers = [] + async with self.lock: + # Append IPv4. + size = len(self.reliable_peers_v4) + if ipv4_count > 0 and size <= ipv4_count: + peers = self.reliable_peers_v4 + elif ipv4_count > 0: + peers = [self.reliable_peers_v4[i % size] for i in range(self.pointer_v4, self.pointer_v4 + ipv4_count)] + self.pointer_v4 = (self.pointer_v4 + ipv4_count) % size + # Append IPv6. + size = len(self.reliable_peers_v6) + if ipv6_count > 0 and size <= ipv6_count: + peers = peers + self.reliable_peers_v6 + elif ipv6_count > 0: + peers = peers + [ + self.reliable_peers_v6[i % size] for i in range(self.pointer_v6, self.pointer_v6 + ipv6_count) + ] + self.pointer_v6 = (self.pointer_v6 + ipv6_count) % size + return peers + + async def dns_response(self, data): + try: + request = DNSRecord.parse(data) + IPs = [MX(D.mail), soa_record] + ns_records + ipv4_count = 0 + ipv6_count = 0 + if request.q.qtype == 1: + ipv4_count = 32 + elif request.q.qtype == 28: + ipv6_count = 32 + elif request.q.qtype == 255: + ipv4_count = 16 + ipv6_count = 16 + else: + ipv4_count = 32 + peers = await self.get_peers_to_respond(ipv4_count, ipv6_count) + if len(peers) == 0: + return None + for peer in peers: + ipv4 = True + try: + _ = ipaddress.IPv4Address(peer) + except ValueError: + ipv4 = False + if ipv4: + IPs.append(A(peer)) + else: + try: + _ = ipaddress.IPv6Address(peer) + except ValueError: + continue + IPs.append(AAAA(peer)) + reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=len(IPs), ra=1), q=request.q) + + records = { + D: IPs, + D.ns1: [A(IP)], # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3) + D.ns2: [A(IP)], + D.mail: [A(IP)], + D.andrei: [CNAME(D)], + } + + qname = request.q.qname + qn = str(qname) + qtype = request.q.qtype + qt = QTYPE[qtype] + if qn == D or qn.endswith("." + D): + for name, rrs in records.items(): + if name == qn: + for rdata in rrs: + rqt = rdata.__class__.__name__ + if qt in ["*", rqt] or (qt == "ANY" and (rqt == "A" or rqt == "AAAA")): + reply.add_answer( + RR(rname=qname, rtype=getattr(QTYPE, rqt), rclass=1, ttl=TTL, rdata=rdata) + ) + + for rdata in ns_records: + reply.add_ar(RR(rname=D, rtype=QTYPE.NS, rclass=1, ttl=TTL, rdata=rdata)) + + reply.add_auth(RR(rname=D, rtype=QTYPE.SOA, rclass=1, ttl=TTL, rdata=soa_record)) + + return reply.pack() + except Exception as e: + log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.") + + +async def serve_dns(): + dns_server = DNSServer() + await dns_server.start() + + # TODO: Make this cleaner? + while True: + await asyncio.sleep(3600) + + +async def kill_processes(): + # TODO: implement. + pass + + +def main(): + root_path = DEFAULT_ROOT_PATH + config = load_config(root_path, "config.yaml", SERVICE_NAME) + initialize_logging(SERVICE_NAME, config["logging"], root_path) + global D + global ns + global TTL + global soa_record + global ns_records + D = DomainName(config["domain_name"]) + ns = DomainName(config["nameserver"]) + TTL = config["ttl"] + soa_record = SOA( + mname=ns, # primary name server + rname=config["soa"]["rname"], # email of the domain administrator + times=( + config["soa"]["serial_number"], + config["soa"]["refresh"], + config["soa"]["retry"], + config["soa"]["expire"], + config["soa"]["minimum"], + ), + ) + ns_records = [NS(ns)] + + def signal_received(): + asyncio.create_task(kill_processes()) + + loop = asyncio.get_event_loop() + + try: + loop.add_signal_handler(signal.SIGINT, signal_received) + loop.add_signal_handler(signal.SIGTERM, signal_received) + except NotImplementedError: + log.info("signal handlers unsupported") + + try: + loop.run_until_complete(serve_dns()) + finally: + loop.close() + + +if __name__ == "__main__": + main() diff --git a/chia/seeder/peer_record.py b/chia/seeder/peer_record.py new file mode 100644 index 000000000000..cab9d57758ac --- /dev/null +++ b/chia/seeder/peer_record.py @@ -0,0 +1,147 @@ +import math +import time +from dataclasses import dataclass + +from chia.util.ints import uint32, uint64 +from chia.util.streamable import Streamable, streamable + + +@dataclass(frozen=True) +@streamable +class PeerRecord(Streamable): + peer_id: str + ip_address: str + port: uint32 + connected: bool + last_try_timestamp: uint64 + try_count: uint32 + connected_timestamp: uint64 + added_timestamp: uint64 + best_timestamp: uint64 + version: str + handshake_time: uint64 + + def update_version(self, version, now): + if version != "undefined": + object.__setattr__(self, "version", version) + object.__setattr__(self, "handshake_time", uint64(now)) + + +class PeerStat: + weight: float + count: float + reliability: float + + def __init__(self, weight, count, reliability): + self.weight = weight + self.count = count + self.reliability = reliability + + def update(self, is_reachable: bool, age: int, tau: int): + f = math.exp(-age / tau) + self.reliability = self.reliability * f + (1.0 - f if is_reachable else 0.0) + self.count = self.count * f + 1.0 + self.weight = self.weight * f + 1.0 - f + + +class PeerReliability: + peer_id: str + ignore_till: int + ban_till: int + stat_2h: PeerStat + stat_8h: PeerStat + stat_1d: PeerStat + stat_1w: PeerStat + stat_1m: PeerStat + tries: int + successes: int + + def __init__( + self, + peer_id: str, + ignore_till: int = 0, + ban_till: int = 0, + stat_2h_weight: float = 0.0, + stat_2h_count: float = 0.0, + stat_2h_reliability: float = 0.0, + stat_8h_weight: float = 0.0, + stat_8h_count: float = 0.0, + stat_8h_reliability: float = 0.0, + stat_1d_weight: float = 0.0, + stat_1d_count: float = 0.0, + stat_1d_reliability: float = 0.0, + stat_1w_weight: float = 0.0, + stat_1w_count: float = 0.0, + stat_1w_reliability: float = 0.0, + stat_1m_weight: float = 0.0, + stat_1m_count: float = 0.0, + stat_1m_reliability: float = 0.0, + tries: int = 0, + successes: int = 0, + ): + self.peer_id = peer_id + self.ignore_till = ignore_till + self.ban_till = ban_till + self.stat_2h = PeerStat(stat_2h_weight, stat_2h_count, stat_2h_reliability) + self.stat_8h = PeerStat(stat_8h_weight, stat_8h_count, stat_8h_reliability) + self.stat_1d = PeerStat(stat_1d_weight, stat_1d_count, stat_1d_reliability) + self.stat_1w = PeerStat(stat_1w_weight, stat_1w_count, stat_1w_reliability) + self.stat_1m = PeerStat(stat_1m_weight, stat_1m_count, stat_1m_reliability) + self.tries = tries + self.successes = successes + + def is_reliable(self) -> bool: + if self.tries > 0 and self.tries <= 3 and self.successes * 2 >= self.tries: + return True + if self.stat_2h.reliability > 0.85 and self.stat_2h.count > 2: + return True + if self.stat_8h.reliability > 0.7 and self.stat_8h.count > 4: + return True + if self.stat_1d.reliability > 0.55 and self.stat_1d.count > 8: + return True + if self.stat_1w.reliability > 0.45 and self.stat_1w.count > 16: + return True + if self.stat_1m.reliability > 0.35 and self.stat_1m.count > 32: + return True + return False + + def get_ban_time(self) -> int: + if self.is_reliable(): + return 0 + if self.stat_1m.reliability - self.stat_1m.weight + 1 < 0.15 and self.stat_1m.count > 32: + return 30 * 86400 + if self.stat_1w.reliability - self.stat_1w.weight + 1.0 < 0.10 and self.stat_1w.count > 16: + return 7 * 86400 + if self.stat_1d.reliability - self.stat_1d.weight + 1.0 < 0.05 and self.stat_1d.count > 8: + return 86400 + return 0 + + def get_ignore_time(self) -> int: + if self.is_reliable(): + return 0 + if self.stat_1m.reliability - self.stat_1m.weight + 1.0 < 0.20 and self.stat_1m.count > 2: + return 10 * 86400 + if self.stat_1w.reliability - self.stat_1w.weight + 1.0 < 0.16 and self.stat_1w.count > 2: + return 3 * 86400 + if self.stat_1d.reliability - self.stat_1d.weight + 1.0 < 0.12 and self.stat_1d.count > 2: + return 8 * 3600 + if self.stat_8h.reliability - self.stat_8h.weight + 1.0 < 0.08 and self.stat_8h.count > 2: + return 2 * 3600 + return 0 + + def update(self, is_reachable: bool, age: int): + self.stat_2h.update(is_reachable, age, 2 * 3600) + self.stat_8h.update(is_reachable, age, 8 * 3600) + self.stat_1d.update(is_reachable, age, 24 * 3600) + self.stat_1w.update(is_reachable, age, 7 * 24 * 3600) + self.stat_1m.update(is_reachable, age, 24 * 30 * 3600) + self.tries += 1 + if is_reachable: + self.successes += 1 + current_ignore_time = self.get_ignore_time() + now = int(time.time()) + if current_ignore_time > 0 and (self.ignore_till == 0 or self.ignore_till < current_ignore_time + now): + self.ignore_till = current_ignore_time + now + current_ban_time = self.get_ban_time() + if current_ban_time > 0 and (self.ban_till == 0 or self.ban_till < current_ban_time + now): + self.ban_till = current_ban_time + now diff --git a/chia/seeder/start_crawler.py b/chia/seeder/start_crawler.py new file mode 100644 index 000000000000..e0edf194f387 --- /dev/null +++ b/chia/seeder/start_crawler.py @@ -0,0 +1,59 @@ +import logging +import pathlib +from multiprocessing import freeze_support +from typing import Dict + +from chia.consensus.constants import ConsensusConstants +from chia.consensus.default_constants import DEFAULT_CONSTANTS +from chia.seeder.crawler import Crawler +from chia.seeder.crawler_api import CrawlerAPI +from chia.server.outbound_message import NodeType +from chia.server.start_service import run_service +from chia.util.config import load_config_cli +from chia.util.default_root import DEFAULT_ROOT_PATH + +# See: https://bugs.python.org/issue29288 +"".encode("idna") + +SERVICE_NAME = "full_node" +log = logging.getLogger(__name__) + + +def service_kwargs_for_full_node_crawler( + root_path: pathlib.Path, config: Dict, consensus_constants: ConsensusConstants +) -> Dict: + crawler = Crawler( + config, + root_path=root_path, + consensus_constants=consensus_constants, + ) + api = CrawlerAPI(crawler) + + network_id = config["selected_network"] + kwargs = dict( + root_path=root_path, + node=api.crawler, + peer_api=api, + node_type=NodeType.FULL_NODE, + advertised_port=config["port"], + service_name=SERVICE_NAME, + upnp_ports=[], + server_listen_ports=[config["port"]], + on_connect_callback=crawler.on_connect, + network_id=network_id, + ) + + return kwargs + + +def main(): + config = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", "seeder") + overrides = config["network_overrides"]["constants"][config["selected_network"]] + updated_constants = DEFAULT_CONSTANTS.replace_str_to_bytes(**overrides) + kwargs = service_kwargs_for_full_node_crawler(DEFAULT_ROOT_PATH, config, updated_constants) + return run_service(**kwargs) + + +if __name__ == "__main__": + freeze_support() + main() diff --git a/chia/seeder/util/__init__.py b/chia/seeder/util/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/chia/seeder/util/config.py b/chia/seeder/util/config.py new file mode 100644 index 000000000000..187ff1d82a4c --- /dev/null +++ b/chia/seeder/util/config.py @@ -0,0 +1,34 @@ +from pathlib import Path + +import pkg_resources + +from chia.util.config import load_config, save_config + + +def patch_default_seeder_config(root_path: Path, filename="config.yaml") -> None: + """ + Checks if the seeder: section exists in the config. If not, the default seeder settings are appended to the file + """ + + existing_config = load_config(root_path, "config.yaml") + + if "seeder" in existing_config: + print("Chia Seeder section exists in config. No action required.") + return + + print("Chia Seeder section does not exist in config. Patching...") + config = load_config(root_path, "config.yaml") + # The following ignores root_path when the second param is absolute, which this will be + seeder_config = load_config(root_path, pkg_resources.resource_filename("chia.util", "initial-config.yaml")) + + # Patch in the values with anchors, since pyyaml tends to change + # the anchors to things like id001, etc + config["seeder"] = seeder_config["seeder"] + config["seeder"]["network_overrides"] = config["network_overrides"] + config["seeder"]["selected_network"] = config["selected_network"] + config["seeder"]["logging"] = config["logging"] + + # When running as crawler, we default to a much lower client timeout + config["full_node"]["peer_connect_timeout"] = 2 + + save_config(root_path, "config.yaml", config) diff --git a/chia/seeder/util/service.py b/chia/seeder/util/service.py new file mode 100644 index 000000000000..bb5ad35668f5 --- /dev/null +++ b/chia/seeder/util/service.py @@ -0,0 +1,103 @@ +import os +import signal +import subprocess +import sys + +from pathlib import Path +from typing import Tuple + +from chia.daemon.server import pid_path_for_service +from chia.util.path import mkdir + + +def launch_service(root_path: Path, service_command) -> Tuple[subprocess.Popen, Path]: + """ + Launch a child process. + """ + # set up CHIA_ROOT + # invoke correct script + # save away PID + + # we need to pass on the possibly altered CHIA_ROOT + os.environ["CHIA_ROOT"] = str(root_path) + + print(f"Launching service with CHIA_ROOT: {os.environ['CHIA_ROOT']}") + + # Insert proper e + service_array = service_command.split() + service_executable = executable_for_service(service_array[0]) + service_array[0] = service_executable + + startupinfo = None + if os.name == "nt": + startupinfo = subprocess.STARTUPINFO() # type: ignore + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore + + # CREATE_NEW_PROCESS_GROUP allows graceful shutdown on windows, by CTRL_BREAK_EVENT signal + if sys.platform == "win32" or sys.platform == "cygwin": + creationflags = subprocess.CREATE_NEW_PROCESS_GROUP + else: + creationflags = 0 + environ_copy = os.environ.copy() + process = subprocess.Popen( + service_array, shell=False, startupinfo=startupinfo, creationflags=creationflags, env=environ_copy + ) + pid_path = pid_path_for_service(root_path, service_command) + try: + mkdir(pid_path.parent) + with open(pid_path, "w") as f: + f.write(f"{process.pid}\n") + except Exception: + pass + return process, pid_path + + +def kill_service(root_path: Path, service_name: str) -> bool: + pid_path = pid_path_for_service(root_path, service_name) + + try: + with open(pid_path) as f: + pid = int(f.read()) + + # @TODO SIGKILL seems necessary right now for the DNS server, but not the crawler (fix that) + # @TODO Ensure processes stop before renaming the files and returning + os.kill(pid, signal.SIGKILL) + print("sent SIGKILL to process") + except Exception: + pass + + try: + pid_path_killed = pid_path.with_suffix(".pid-killed") + if pid_path_killed.exists(): + pid_path_killed.unlink() + os.rename(pid_path, pid_path_killed) + except Exception: + pass + + return True + + +# determine if application is a script file or frozen exe +if getattr(sys, "frozen", False): + name_map = { + "chia_seeder": "chia_seeder", + "chia_seeder_crawler": "chia_seeder_crawler", + "chia_seeder_server": "chia_seeder_server", + } + + def executable_for_service(service_name: str) -> str: + application_path = os.path.dirname(sys.executable) + if sys.platform == "win32" or sys.platform == "cygwin": + executable = name_map[service_name] + path = f"{application_path}/{executable}.exe" + return path + else: + path = f"{application_path}/{name_map[service_name]}" + return path + + +else: + application_path = os.path.dirname(__file__) + + def executable_for_service(service_name: str) -> str: + return service_name diff --git a/chia/seeder/util/service_groups.py b/chia/seeder/util/service_groups.py new file mode 100644 index 000000000000..c42ee4b05e50 --- /dev/null +++ b/chia/seeder/util/service_groups.py @@ -0,0 +1,17 @@ +from typing import KeysView, Generator + +SERVICES_FOR_GROUP = { + "all": "chia_seeder_crawler chia_seeder_server".split(), + "crawler": "chia_seeder_crawler".split(), + "server": "chia_seeder_server".split(), +} + + +def all_groups() -> KeysView[str]: + return SERVICES_FOR_GROUP.keys() + + +def services_for_groups(groups) -> Generator[str, None, None]: + for group in groups: + for service in SERVICES_FOR_GROUP[group]: + yield service diff --git a/chia/server/server.py b/chia/server/server.py index 94f29c5e8029..6db6e590f0d8 100644 --- a/chia/server/server.py +++ b/chia/server/server.py @@ -187,13 +187,18 @@ class ChiaServer: Periodically checks for connections with no activity (have not sent us any data), and removes them, to allow room for other peers. """ + is_crawler = getattr(self.node, "crawl", None) while True: - await asyncio.sleep(600) + await asyncio.sleep(600 if is_crawler is None else 2) to_remove: List[WSChiaConnection] = [] for connection in self.all_connections.values(): if self._local_type == NodeType.FULL_NODE and connection.connection_type == NodeType.FULL_NODE: - if time.time() - connection.last_message_time > 1800: - to_remove.append(connection) + if is_crawler is not None: + if time.time() - connection.creation_time > 5: + to_remove.append(connection) + else: + if time.time() - connection.last_message_time > 1800: + to_remove.append(connection) for connection in to_remove: self.log.debug(f"Garbage collecting connection {connection.peer_host} due to inactivity") await connection.close() @@ -243,6 +248,9 @@ class ChiaServer: self.log.info(f"Started listening on port: {self._port}") async def incoming_connection(self, request): + if getattr(self.node, "crawl", None) is not None: + return + if request.remote in self.banned_peers and time.time() < self.banned_peers[request.remote]: self.log.warning(f"Peer {request.remote} is banned, refusing connection") return None diff --git a/chia/server/ws_connection.py b/chia/server/ws_connection.py index 6002a9483752..65af463d8a76 100644 --- a/chia/server/ws_connection.py +++ b/chia/server/ws_connection.py @@ -105,7 +105,7 @@ class WSChiaConnection: self.outbound_rate_limiter = RateLimiter(incoming=False, percentage_of_limit=outbound_rate_limit_percent) self.inbound_rate_limiter = RateLimiter(incoming=True, percentage_of_limit=inbound_rate_limit_percent) - # Used by crawler/dns introducer + # Used by the Chia Seeder. self.version = None async def perform_handshake(self, network_id: str, protocol_version: str, server_port: int, local_type: NodeType): @@ -187,7 +187,7 @@ class WSChiaConnection: async def close(self, ban_time: int = 0, ws_close_code: WSCloseCode = WSCloseCode.OK, error: Optional[Err] = None): """ - Closes the connection, and finally calls the close_callback on the server, so the connections gets removed + Closes the connection, and finally calls the close_callback on the server, so the connection gets removed from the global list. """ @@ -486,7 +486,7 @@ class WSChiaConnection: await asyncio.sleep(3) return None - # Used by crawler/dns introducer + # Used by the Chia Seeder. def get_version(self): return self.version diff --git a/chia/util/initial-config.yaml b/chia/util/initial-config.yaml index 5d337a3a97e9..6a8fc5798e96 100644 --- a/chia/util/initial-config.yaml +++ b/chia/util/initial-config.yaml @@ -115,6 +115,34 @@ logging: &logging log_syslog_host: "localhost" # Send logging messages to a remote or local Unix syslog log_syslog_port: 514 # UDP port of the remote or local Unix syslog +seeder: + # The fake full node used for crawling will run on this port. + port: 8444 + # Most full nodes on the network run on this port. (i.e. 8444 for mainnet, 58444 for testnet). + other_peers_port: 8444 + # Path to crawler DB. If empty, will use $CHIA_ROOT/crawler.db + crawler_db_path: "" + # Peers used for the initial run. + bootstrap_peers: + - "node.chia.net" + # Only consider nodes synced at least to this height. + minimum_height: 240000 + # How many of a particular version we need to see before reporting it in the logs + minimum_version_count: 100 + domain_name: "seeder.example.com." + nameserver: "example.com." + ttl: 300 + soa: + rname: "hostmaster.example.com." + serial_number: 1619105223 + refresh: 10800 + retry: 10800 + expire: 604800 + minimum: 1800 + network_overrides: *network_overrides + selected_network: *selected_network + logging: *logging + harvester: # The harvester server (if run) will run on this port port: 8448 @@ -383,7 +411,7 @@ introducer: host: *self_hostname port: 8445 max_peers_to_send: 20 - # The introducer will only return peers who it has seen in the last + # The introducer will only return peers it has seen in the last # recent_peer_threshold seconds recent_peer_threshold: 6000 logging: *logging diff --git a/chia/util/service_groups.py b/chia/util/service_groups.py index 3facee460e27..459e56998cd9 100644 --- a/chia/util/service_groups.py +++ b/chia/util/service_groups.py @@ -1,7 +1,7 @@ from typing import KeysView, Generator SERVICES_FOR_GROUP = { - "all": "chia_harvester chia_timelord_launcher chia_timelord chia_farmer chia_full_node chia_wallet".split(), + "all": ("chia_harvester chia_timelord_launcher chia_timelord " "chia_farmer chia_full_node chia_wallet").split(), "node": "chia_full_node".split(), "harvester": "chia_harvester".split(), "farmer": "chia_harvester chia_farmer chia_full_node chia_wallet".split(), diff --git a/mozilla-ca b/mozilla-ca index 9e242f32e0c0..b1b808ab9300 160000 --- a/mozilla-ca +++ b/mozilla-ca @@ -1 +1 @@ -Subproject commit 9e242f32e0c005f602d00c47c240d86c57e393a2 +Subproject commit b1b808ab930004fc6b4afc4b248dee0a136f3f00 diff --git a/setup.py b/setup.py index d4c1723ef8f5..c173e8f73072 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ dependencies = [ "click==7.1.2", # For the CLI "dnspythonchia==2.2.0", # Query DNS seeds "watchdog==2.1.6", # Filesystem event watching - watches keyring.yaml + "dnslib==0.9.14", # dns lib ] upnp_dependencies = [ @@ -86,6 +87,8 @@ kwargs = dict( "chia.pools", "chia.protocols", "chia.rpc", + "chia.seeder", + "chia.seeder.util", "chia.server", "chia.simulator", "chia.types.blockchain_format", @@ -110,6 +113,9 @@ kwargs = dict( "chia_harvester = chia.server.start_harvester:main", "chia_farmer = chia.server.start_farmer:main", "chia_introducer = chia.server.start_introducer:main", + "chia_seeder = chia.cmds.seeder:main", + "chia_seeder_crawler = chia.seeder.start_crawler:main", + "chia_seeder_server = chia.seeder.dns_server:main", "chia_timelord = chia.server.start_timelord:main", "chia_timelord_launcher = chia.timelord.timelord_launcher:main", "chia_full_node_simulator = chia.simulator.start_simulator:main",