Chia Seeder (#8991)

* initial hack

* crawler

* add pytz

* Checkpoint.

* Catch some bugs.

* Localhost dig working.

* Checkpoint: return only high quality nodes.

* Statistics.

* Try improving finding reliable nodes.

* Bug.

* Move db to memory.

* Timestamp in the last 5 days.

* Increase crawl parameters, 180+ connections per sec.

* Bug.

* Optimize for DNS traffic.

* Prepare for hosting.

* Minimum height.

* Typo.

* Try catch everything.

* dnslib.

* Add db, format code.

* nits.

* No connections for the dns server.

* Rename src -> chia

* Fix some issues with v1.1

* Crawler task pool.

* Optimize closing connections.

* Split crawler and dns server.

* Install instructions.

* Catch startup bug.

* Try a big timeout for lock aquire.

* lint.

* Lint.

* Initial commit extended stats.

* Simplify code.

* Config.

* Correct stats.

* Be more restrictive in crawling.

* Attempt to fix stats bug.

* Add other peers port to config.

* Update README for the config.

* Simplify crawl task.

* Fix bug on restarts.

* Prevent log spamming.

* More spam prevention.

* Fix bug.

* Ipv6 (#1)

* Enable ipv6.

* Fix bug.

* Use numeric codes for QTYPE.

* ANY working.

* More spam prevention.

* Try to improve IPv6 selection.

* Log IPv6 available.

* Try to crawl more aggresive for v6.

* rename dns.py to crawler_dns.py so it doesn't conflict with imported package names

* Remove pytz package off dependencies

* Tidy-up ws_connection.py

* Fix spelling

* Reinstate chia-blockchain readme, with additional lines pertaining to the DNS introducer & crawler

* More detailed info in the README wrt Chia Seeder

* Nit

* More memetic naming of Chia Seeder

* Nit

* Add entry points

* Add entry in packages

* Patch some methods on the upstream server

* Update peer record fields

* Standard library imports first

* Crawler API check

* Reconcile crawl store

* Account for crawler_db_path in config

* Await crawl store load DB and load reliable peers

* Updates to crawler

* Rename to dns_server

* Crawler-specific overrides for the chia server

* Edit comment

* Undo changes to ChiaServer in view of crawler-specific overrides introduced in previous commit

* Nit

* Update service groups

* Expand name maps, mostly

* Fix the init config

* Remove unused import

* total_records unused at this stage

* Remove ios_reliable in peer_reliability table

* Remove row[20] entry

* Split overly long line

* Fix

* Type hint for ns_records

* Reconcile mismatch btw type int and uint64

* Type annotations in crawler

* Check whether crawl store is set

* Remove upnp_list

* Lint

* Chia Seeder CLI

* Lint

* Two white spaces

* 3rd party package import

* Cleaner way to handle overrides for ChiaServer method

* Address linter warnings

* Rename

* Nits

* Fix

* Change port #

* Most chia_seeder commands up and running

* Rename

* Progress of sorts

* Fix

* Improve legibility

* Fix naming

* Fix setup.py

* Lint

* None -> ''

* Remove whitespace

* Rename

* Log ipv6 better. (#9227)

* Log ipv6 better.

* Lint.

* -

* Undo GUI changes

* Another attempt

* GUI changes

Co-authored-by: Yostra <straya@chia.net>
Co-authored-by: Florin Chirica <fchirica96@gmail.com>
Co-authored-by: Chris Marslender <chrismarslender@gmail.com>
This commit is contained in:
Gregory 2021-11-27 18:30:25 -08:00 committed by GitHub
parent a80b7df38c
commit 025c45f0b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1717 additions and 9 deletions

View File

@ -11,6 +11,7 @@
Chia is a modern cryptocurrency built from scratch, designed to be efficient, decentralized, and secure. Here are some of the features and benefits:
* [Proof of space and time](https://docs.google.com/document/d/1tmRIb7lgi4QfKkNaxuKOBHRmwbVlGL4f7EsBDr_5xZE/edit) based consensus which allows anyone to farm with commodity hardware
* Very easy to use full node and farmer GUI and cli (thousands of nodes active on mainnet)
* [Chia seeder](https://github.com/Chia-Network/chia-blockchain/wiki/Chia-Seeder-User-Guide), which maintains a list of reliable nodes within the Chia network via a built-in DNS server.
* Simplified UTXO based transaction model, with small on-chain state
* Lisp-style Turing-complete functional [programming language](https://chialisp.com/) for money related use cases
* BLS keys and aggregate signatures (only one signature per block)
@ -33,6 +34,7 @@ TCP port 8444 access to your peer.
These methods tend to be router make/model specific.
Most users should only install harvesters, farmers, plotter, full nodes, and wallets.
Setting up a seeder is best left to more advanced users.
Building Timelords and VDFs is for sophisticated users, in most environments.
Chia Network and additional volunteers are running sufficient Timelords
for consensus.

204
chia/cmds/seeder.py Normal file
View File

@ -0,0 +1,204 @@
import os
from pathlib import Path
from typing import Dict
import click
import chia.cmds.configure as chia_configure
from chia import __version__
from chia.cmds.chia import monkey_patch_click
from chia.cmds.init_funcs import init
from chia.seeder.util.config import patch_default_seeder_config
from chia.seeder.util.service_groups import all_groups, services_for_groups
from chia.seeder.util.service import launch_service, kill_service
from chia.util.config import load_config, save_config
from chia.util.default_root import DEFAULT_ROOT_PATH
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.group(
help=f"\n Manage the Chia Seeder ({__version__})\n",
epilog="Try 'chia seeder start crawler' or 'chia seeder start server'",
context_settings=CONTEXT_SETTINGS,
)
@click.option("--root-path", default=DEFAULT_ROOT_PATH, help="Config file root", type=click.Path(), show_default=True)
@click.pass_context
def cli(
ctx: click.Context,
root_path: str,
) -> None:
from pathlib import Path
ctx.ensure_object(dict)
ctx.obj["root_path"] = Path(root_path)
@cli.command("version", short_help="Show the Chia Seeder version")
def version_cmd() -> None:
print(__version__)
@click.command("init", short_help="Create or migrate the configuration")
@click.pass_context
def init_cmd(ctx: click.Context, **kwargs):
print("Calling Chia Seeder Init...")
init(None, ctx.obj["root_path"], True)
if os.environ.get("CHIA_ROOT", None) is not None:
print(f"warning, your CHIA_ROOT is set to {os.environ['CHIA_ROOT']}.")
root_path = ctx.obj["root_path"]
print(f"Chia directory {root_path}")
if root_path.is_dir() and not Path(root_path / "config" / "config.yaml").exists():
# This is reached if CHIA_ROOT is set, but there is no config
# This really shouldn't happen, but if we dont have the base chia config, we can't continue
print("Config does not exist. Can't continue!")
return -1
patch_default_seeder_config(root_path)
return 0
@click.command("start", short_help="Start service groups")
@click.argument("group", type=click.Choice(all_groups()), nargs=-1, required=True)
@click.pass_context
def start_cmd(ctx: click.Context, group: str) -> None:
services = services_for_groups(group)
for service in services:
print(f"Starting {service}")
launch_service(ctx.obj["root_path"], service)
@click.command("stop", short_help="Stop service groups")
@click.argument("group", type=click.Choice(all_groups()), nargs=-1, required=True)
@click.pass_context
def stop_cmd(ctx: click.Context, group: str) -> None:
services = services_for_groups(group)
for service in services:
print(f"Stopping {service}")
kill_service(ctx.obj["root_path"], service)
def configure(
root_path: Path,
testnet: str,
crawler_db_path: str,
minimum_version_count: int,
domain_name: str,
nameserver: str,
):
# Run the parent config, in case anything there (testnet) needs to be run, THEN load the config for local changes
chia_configure.configure(root_path, "", "", "", "", "", "", "", "", testnet, "")
config: Dict = load_config(DEFAULT_ROOT_PATH, "config.yaml")
change_made = False
if testnet is not None:
if testnet == "true" or testnet == "t":
print("Updating Chia Seeder to testnet settings")
port = 58444
network = "testnet7"
bootstrap = ["testnet-node.chia.net"]
config["seeder"]["port"] = port
config["seeder"]["other_peers_port"] = port
config["seeder"]["selected_network"] = network
config["seeder"]["bootstrap_peers"] = bootstrap
change_made = True
elif testnet == "false" or testnet == "f":
print("Updating Chia Seeder to mainnet settings")
port = 8444
network = "mainnet"
bootstrap = ["node.chia.net"]
config["seeder"]["port"] = port
config["seeder"]["other_peers_port"] = port
config["seeder"]["selected_network"] = network
config["seeder"]["bootstrap_peers"] = bootstrap
change_made = True
else:
print("Please choose True or False")
if crawler_db_path is not None:
config["seeder"]["crawler_db_path"] = crawler_db_path
change_made = True
if minimum_version_count is not None:
config["seeder"]["minimum_version_count"] = minimum_version_count
change_made = True
if domain_name is not None:
config["seeder"]["domain_name"] = domain_name
change_made = True
if nameserver is not None:
config["seeder"]["nameserver"] = nameserver
change_made = True
if change_made:
print("Restart any running Chia Seeder services for changes to take effect")
save_config(root_path, "config.yaml", config)
return 0
@click.command("configure", short_help="Modify configuration")
@click.option(
"--testnet",
"-t",
help="configures for connection to testnet",
type=click.Choice(["true", "t", "false", "f"]),
)
@click.option(
"--crawler-db-path",
help="configures for path to the crawler database",
type=str,
)
@click.option(
"--minimum-version-count",
help="configures how many of a particular version must be seen to be reported in logs",
type=int,
)
@click.option(
"--domain-name",
help="configures the domain_name setting. Ex: `seeder.example.com.`",
type=str,
)
@click.option(
"--nameserver",
help="configures the nameserver setting. Ex: `example.com.`",
type=str,
)
@click.pass_context
def configure_cmd(
ctx,
testnet,
crawler_db_path,
minimum_version_count,
domain_name,
nameserver,
):
configure(
ctx.obj["root_path"],
testnet,
crawler_db_path,
minimum_version_count,
domain_name,
nameserver,
)
cli.add_command(init_cmd)
cli.add_command(start_cmd)
cli.add_command(stop_cmd)
cli.add_command(configure_cmd)
def main() -> None:
monkey_patch_click()
cli() # pylint: disable=no-value-for-parameter
if __name__ == "__main__":
main()

View File

@ -100,6 +100,9 @@ if getattr(sys, "frozen", False):
"chia_timelord": "start_timelord",
"chia_timelord_launcher": "timelord_launcher",
"chia_full_node_simulator": "start_simulator",
"chia_seeder": "chia_seeder",
"chia_seeder_crawler": "chia_seeder_crawler",
"chia_seeder_dns": "chia_seeder_dns",
}
def executable_for_service(service_name: str) -> str:

0
chia/seeder/__init__.py Normal file
View File

360
chia/seeder/crawl_store.py Normal file
View File

@ -0,0 +1,360 @@
import asyncio
import dataclasses
import ipaddress
import logging
import random
import time
from typing import List, Dict
import aiosqlite
from chia.seeder.peer_record import PeerRecord, PeerReliability
log = logging.getLogger(__name__)
class CrawlStore:
crawl_db: aiosqlite.Connection
last_timestamp: int
lock: asyncio.Lock
host_to_records: Dict
host_to_selected_time: Dict
host_to_reliability: Dict
banned_peers: int
ignored_peers: int
reliable_peers: int
@classmethod
async def create(cls, connection: aiosqlite.Connection):
self = cls()
self.crawl_db = connection
await self.crawl_db.execute(
(
"CREATE TABLE IF NOT EXISTS peer_records("
" peer_id text PRIMARY KEY,"
" ip_address text,"
" port bigint,"
" connected int,"
" last_try_timestamp bigint,"
" try_count bigint,"
" connected_timestamp bigint,"
" added_timestamp bigint,"
" best_timestamp bigint,"
" version text,"
" handshake_time text)"
)
)
await self.crawl_db.execute(
(
"CREATE TABLE IF NOT EXISTS peer_reliability("
" peer_id text PRIMARY KEY,"
" ignore_till int, ban_till int,"
" stat_2h_w real, stat_2h_c real, stat_2h_r real,"
" stat_8h_w real, stat_8h_c real, stat_8h_r real,"
" stat_1d_w real, stat_1d_c real, stat_1d_r real,"
" stat_1w_w real, stat_1w_c real, stat_1w_r real,"
" stat_1m_w real, stat_1m_c real, stat_1m_r real,"
" tries int, successes int)"
)
)
await self.crawl_db.execute(("CREATE TABLE IF NOT EXISTS good_peers(ip text)"))
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS ip_address on peer_records(ip_address)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS port on peer_records(port)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS connected on peer_records(connected)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS added_timestamp on peer_records(added_timestamp)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS peer_id on peer_reliability(peer_id)")
await self.crawl_db.execute("CREATE INDEX IF NOT EXISTS ignore_till on peer_reliability(ignore_till)")
await self.crawl_db.commit()
self.last_timestamp = 0
self.ignored_peers = 0
self.banned_peers = 0
self.reliable_peers = 0
self.host_to_selected_time = {}
await self.unload_from_db()
return self
def maybe_add_peer(self, peer_record: PeerRecord, peer_reliability: PeerReliability):
if peer_record.peer_id not in self.host_to_records:
self.host_to_records[peer_record.peer_id] = peer_record
if peer_reliability.peer_id not in self.host_to_reliability:
self.host_to_reliability[peer_reliability.peer_id] = peer_reliability
async def add_peer(self, peer_record: PeerRecord, peer_reliability: PeerReliability, save_db: bool = False):
if not save_db:
self.host_to_records[peer_record.peer_id] = peer_record
self.host_to_reliability[peer_reliability.peer_id] = peer_reliability
return
added_timestamp = int(time.time())
cursor = await self.crawl_db.execute(
"INSERT OR REPLACE INTO peer_records VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
peer_record.peer_id,
peer_record.ip_address,
peer_record.port,
int(peer_record.connected),
peer_record.last_try_timestamp,
peer_record.try_count,
peer_record.connected_timestamp,
added_timestamp,
peer_record.best_timestamp,
peer_record.version,
peer_record.handshake_time,
),
)
await cursor.close()
cursor = await self.crawl_db.execute(
"INSERT OR REPLACE INTO peer_reliability"
" VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
peer_reliability.peer_id,
peer_reliability.ignore_till,
peer_reliability.ban_till,
peer_reliability.stat_2h.weight,
peer_reliability.stat_2h.count,
peer_reliability.stat_2h.reliability,
peer_reliability.stat_8h.weight,
peer_reliability.stat_8h.count,
peer_reliability.stat_8h.reliability,
peer_reliability.stat_1d.weight,
peer_reliability.stat_1d.count,
peer_reliability.stat_1d.reliability,
peer_reliability.stat_1w.weight,
peer_reliability.stat_1w.count,
peer_reliability.stat_1w.reliability,
peer_reliability.stat_1m.weight,
peer_reliability.stat_1m.count,
peer_reliability.stat_1m.reliability,
peer_reliability.tries,
peer_reliability.successes,
),
)
await cursor.close()
async def get_peer_reliability(self, peer_id: str) -> PeerReliability:
return self.host_to_reliability[peer_id]
async def peer_failed_to_connect(self, peer: PeerRecord):
now = int(time.time())
age_timestamp = int(max(peer.last_try_timestamp, peer.connected_timestamp))
if age_timestamp == 0:
age_timestamp = now - 1000
replaced = dataclasses.replace(peer, try_count=peer.try_count + 1, last_try_timestamp=now)
reliability = await self.get_peer_reliability(peer.peer_id)
if reliability is None:
reliability = PeerReliability(peer.peer_id)
reliability.update(False, now - age_timestamp)
await self.add_peer(replaced, reliability)
async def peer_connected(self, peer: PeerRecord):
now = int(time.time())
age_timestamp = int(max(peer.last_try_timestamp, peer.connected_timestamp))
if age_timestamp == 0:
age_timestamp = now - 1000
replaced = dataclasses.replace(peer, connected=True, connected_timestamp=now)
reliability = await self.get_peer_reliability(peer.peer_id)
if reliability is None:
reliability = PeerReliability(peer.peer_id)
reliability.update(True, now - age_timestamp)
await self.add_peer(replaced, reliability)
async def update_best_timestamp(self, host: str, timestamp):
if host not in self.host_to_records:
return
record = self.host_to_records[host]
replaced = dataclasses.replace(record, best_timestamp=timestamp)
if host not in self.host_to_reliability:
return
reliability = self.host_to_reliability[host]
await self.add_peer(replaced, reliability)
async def peer_connected_hostname(self, host: str, connected: bool = True):
if host not in self.host_to_records:
return
record = self.host_to_records[host]
if connected:
await self.peer_connected(record)
else:
await self.peer_failed_to_connect(record)
async def get_peers_to_crawl(self, min_batch_size, max_batch_size) -> List[PeerRecord]:
now = int(time.time())
records = []
records_v6 = []
counter = 0
self.ignored_peers = 0
self.banned_peers = 0
for peer_id in self.host_to_reliability:
add = False
counter += 1
reliability = self.host_to_reliability[peer_id]
if reliability.ignore_till < now and reliability.ban_till < now:
add = True
else:
if reliability.ban_till >= now:
self.banned_peers += 1
elif reliability.ignore_till >= now:
self.ignored_peers += 1
record = self.host_to_records[peer_id]
if record.last_try_timestamp == 0 and record.connected_timestamp == 0:
add = True
if peer_id in self.host_to_selected_time:
last_selected = self.host_to_selected_time[peer_id]
if time.time() - last_selected < 120:
add = False
if add:
v6 = True
try:
_ = ipaddress.IPv6Address(peer_id)
except ValueError:
v6 = False
delta_time = 600 if v6 else 1000
if now - record.last_try_timestamp >= delta_time and now - record.connected_timestamp >= delta_time:
if not v6:
records.append(record)
else:
records_v6.append(record)
batch_size = max(min_batch_size, len(records) // 10)
batch_size = min(batch_size, max_batch_size)
if len(records) > batch_size:
random.shuffle(records)
records = records[:batch_size]
if len(records_v6) > batch_size:
random.shuffle(records_v6)
records_v6 = records_v6[:batch_size]
records += records_v6
for record in records:
self.host_to_selected_time[record.peer_id] = time.time()
return records
def get_ipv6_peers(self) -> int:
counter = 0
for peer_id in self.host_to_reliability:
v6 = True
try:
_ = ipaddress.IPv6Address(peer_id)
except ValueError:
v6 = False
if v6:
counter += 1
return counter
def get_total_records(self) -> int:
return len(self.host_to_records)
def get_ignored_peers(self) -> int:
return self.ignored_peers
def get_banned_peers(self) -> int:
return self.banned_peers
def get_reliable_peers(self) -> int:
return self.reliable_peers
async def load_to_db(self):
for peer_id in list(self.host_to_reliability.keys()):
if peer_id in self.host_to_reliability and peer_id in self.host_to_records:
reliability = self.host_to_reliability[peer_id]
record = self.host_to_records[peer_id]
await self.add_peer(record, reliability, True)
await self.crawl_db.commit()
async def unload_from_db(self):
self.host_to_records = {}
self.host_to_reliability = {}
cursor = await self.crawl_db.execute(
"SELECT * from peer_reliability",
)
rows = await cursor.fetchall()
await cursor.close()
for row in rows:
reliability = PeerReliability(
row[0],
row[1],
row[2],
row[3],
row[4],
row[5],
row[6],
row[7],
row[8],
row[9],
row[10],
row[11],
row[12],
row[13],
row[14],
row[15],
row[16],
row[17],
row[18],
row[19],
)
self.host_to_reliability[row[0]] = reliability
cursor = await self.crawl_db.execute(
"SELECT * from peer_records",
)
rows = await cursor.fetchall()
await cursor.close()
for row in rows:
peer = PeerRecord(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10])
self.host_to_records[row[0]] = peer
# Crawler -> DNS.
async def load_reliable_peers_to_db(self):
peers = []
for peer_id in self.host_to_reliability:
reliability = self.host_to_reliability[peer_id]
if reliability.is_reliable():
peers.append(peer_id)
self.reliable_peers = len(peers)
cursor = await self.crawl_db.execute(
"DELETE from good_peers",
)
await cursor.close()
for peer in peers:
cursor = await self.crawl_db.execute(
"INSERT OR REPLACE INTO good_peers VALUES(?)",
(peer,),
)
await cursor.close()
await self.crawl_db.commit()
def load_host_to_version(self):
versions = {}
handshake = {}
for host, record in self.host_to_records.items():
if host not in self.host_to_records:
continue
record = self.host_to_records[host]
if record.version == "undefined":
continue
versions[host] = record.version
handshake[host] = record.handshake_time
return (versions, handshake)
def load_best_peer_reliability(self):
best_timestamp = {}
for host, record in self.host_to_records.items():
best_timestamp[host] = record.best_timestamp
return best_timestamp
async def update_version(self, host, version, now):
record = self.host_to_records.get(host, None)
reliability = self.host_to_reliability.get(host, None)
if record is None or reliability is None:
return
record.update_version(version, now)
await self.add_peer(record, reliability)

322
chia/seeder/crawler.py Normal file
View File

@ -0,0 +1,322 @@
import asyncio
import logging
import time
import traceback
import ipaddress
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
import aiosqlite
import chia.server.ws_connection as ws
from chia.consensus.constants import ConsensusConstants
from chia.full_node.coin_store import CoinStore
from chia.protocols import full_node_protocol
from chia.seeder.crawl_store import CrawlStore
from chia.seeder.peer_record import PeerRecord, PeerReliability
from chia.server.server import ChiaServer
from chia.types.peer_info import PeerInfo
from chia.util.path import mkdir, path_from_root
from chia.util.ints import uint32, uint64
log = logging.getLogger(__name__)
class Crawler:
sync_store: Any
coin_store: CoinStore
connection: aiosqlite.Connection
config: Dict
server: Any
log: logging.Logger
constants: ConsensusConstants
_shut_down: bool
root_path: Path
peer_count: int
with_peak: set
def __init__(
self,
config: Dict,
root_path: Path,
consensus_constants: ConsensusConstants,
name: str = None,
):
self.initialized = False
self.root_path = root_path
self.config = config
self.server = None
self._shut_down = False # Set to true to close all infinite loops
self.constants = consensus_constants
self.state_changed_callback: Optional[Callable] = None
self.crawl_store = None
self.log = log
self.peer_count = 0
self.with_peak = set()
self.peers_retrieved: List[Any] = []
self.host_to_version: Dict[str, str] = {}
self.version_cache: List[Tuple[str, str]] = []
self.handshake_time: Dict[str, int] = {}
self.best_timestamp_per_peer: Dict[str, int] = {}
if "crawler_db_path" in config and config["crawler_db_path"] != "":
path = Path(config["crawler_db_path"])
self.db_path = path.resolve()
else:
db_path_replaced: str = "crawler.db"
self.db_path = path_from_root(root_path, db_path_replaced)
mkdir(self.db_path.parent)
self.bootstrap_peers = config["bootstrap_peers"]
self.minimum_height = config["minimum_height"]
self.other_peers_port = config["other_peers_port"]
def _set_state_changed_callback(self, callback: Callable):
self.state_changed_callback = callback
async def create_client(self, peer_info, on_connect):
return await self.server.start_client(peer_info, on_connect)
async def connect_task(self, peer):
async def peer_action(peer: ws.WSChiaConnection):
peer_info = peer.get_peer_info()
version = peer.get_version()
if peer_info is not None and version is not None:
self.version_cache.append((peer_info.host, version))
# Ask peer for peers
response = await peer.request_peers(full_node_protocol.RequestPeers(), timeout=3)
# Add peers to DB
if isinstance(response, full_node_protocol.RespondPeers):
self.peers_retrieved.append(response)
peer_info = peer.get_peer_info()
tries = 0
got_peak = False
while tries < 25:
tries += 1
if peer_info is None:
break
if peer_info in self.with_peak:
got_peak = True
break
await asyncio.sleep(0.1)
if not got_peak and peer_info is not None and self.crawl_store is not None:
await self.crawl_store.peer_connected_hostname(peer_info.host, False)
await peer.close()
try:
connected = await self.create_client(PeerInfo(peer.ip_address, peer.port), peer_action)
if not connected:
await self.crawl_store.peer_failed_to_connect(peer)
except Exception as e:
self.log.info(f"Exception: {e}. Traceback: {traceback.format_exc()}.")
await self.crawl_store.peer_failed_to_connect(peer)
async def _start(self):
self.task = asyncio.create_task(self.crawl())
async def crawl(self):
try:
self.connection = await aiosqlite.connect(self.db_path)
self.crawl_store = await CrawlStore.create(self.connection)
self.log.info("Started")
await self.crawl_store.load_to_db()
await self.crawl_store.load_reliable_peers_to_db()
t_start = time.time()
total_nodes = 0
self.seen_nodes = set()
tried_nodes = set()
for peer in self.bootstrap_peers:
new_peer = PeerRecord(
peer,
peer,
self.other_peers_port,
False,
0,
0,
0,
uint64(int(time.time())),
uint64(0),
"undefined",
uint64(0),
)
new_peer_reliability = PeerReliability(peer)
self.crawl_store.maybe_add_peer(new_peer, new_peer_reliability)
self.host_to_version, self.handshake_time = self.crawl_store.load_host_to_version()
self.best_timestamp_per_peer = self.crawl_store.load_best_peer_reliability()
while True:
self.with_peak = set()
peers_to_crawl = await self.crawl_store.get_peers_to_crawl(25000, 250000)
tasks = set()
for peer in peers_to_crawl:
if peer.port == self.other_peers_port:
total_nodes += 1
if peer.ip_address not in tried_nodes:
tried_nodes.add(peer.ip_address)
task = asyncio.create_task(self.connect_task(peer))
tasks.add(task)
if len(tasks) >= 250:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = set(filter(lambda t: not t.done(), tasks))
if len(tasks) > 0:
await asyncio.wait(tasks, timeout=30)
for response in self.peers_retrieved:
for response_peer in response.peer_list:
if response_peer.host not in self.best_timestamp_per_peer:
self.best_timestamp_per_peer[response_peer.host] = response_peer.timestamp
self.best_timestamp_per_peer[response_peer.host] = max(
self.best_timestamp_per_peer[response_peer.host], response_peer.timestamp
)
if (
response_peer.host not in self.seen_nodes
and response_peer.timestamp > time.time() - 5 * 24 * 3600
):
self.seen_nodes.add(response_peer.host)
new_peer = PeerRecord(
response_peer.host,
response_peer.host,
uint32(response_peer.port),
False,
uint64(0),
uint32(0),
uint64(0),
uint64(int(time.time())),
uint64(response_peer.timestamp),
"undefined",
uint64(0),
)
new_peer_reliability = PeerReliability(response_peer.host)
if self.crawl_store is not None:
self.crawl_store.maybe_add_peer(new_peer, new_peer_reliability)
await self.crawl_store.update_best_timestamp(
response_peer.host,
self.best_timestamp_per_peer[response_peer.host],
)
for host, version in self.version_cache:
self.handshake_time[host] = int(time.time())
self.host_to_version[host] = version
await self.crawl_store.update_version(host, version, int(time.time()))
to_remove = set()
now = int(time.time())
for host in self.host_to_version.keys():
active = True
if host not in self.handshake_time:
active = False
elif self.handshake_time[host] < now - 5 * 24 * 3600:
active = False
if not active:
to_remove.add(host)
self.host_to_version = {
host: version for host, version in self.host_to_version.items() if host not in to_remove
}
self.best_timestamp_per_peer = {
host: timestamp
for host, timestamp in self.best_timestamp_per_peer.items()
if timestamp >= now - 5 * 24 * 3600
}
versions = {}
for host, version in self.host_to_version.items():
if version not in versions:
versions[version] = 0
versions[version] += 1
self.version_cache = []
self.peers_retrieved = []
self.server.banned_peers = {}
if len(peers_to_crawl) == 0:
continue
await self.crawl_store.load_to_db()
await self.crawl_store.load_reliable_peers_to_db()
total_records = self.crawl_store.get_total_records()
ipv6_count = self.crawl_store.get_ipv6_peers()
self.log.error("***")
self.log.error("Finished batch:")
self.log.error(f"Total IPs stored in DB: {total_records}.")
self.log.error(f"Total IPV6 addresses stored in DB: {ipv6_count}")
self.log.error(f"Total connections attempted since crawler started: {total_nodes}.")
self.log.error(f"Total unique nodes attempted since crawler started: {len(tried_nodes)}.")
t_now = time.time()
t_delta = int(t_now - t_start)
if t_delta > 0:
self.log.error(f"Avg connections per second: {total_nodes // t_delta}.")
# Periodically print detailed stats.
reliable_peers = self.crawl_store.get_reliable_peers()
self.log.error(f"High quality reachable nodes, used by DNS introducer in replies: {reliable_peers}")
banned_peers = self.crawl_store.get_banned_peers()
ignored_peers = self.crawl_store.get_ignored_peers()
available_peers = len(self.host_to_version)
addresses_count = len(self.best_timestamp_per_peer)
total_records = self.crawl_store.get_total_records()
ipv6_addresses_count = 0
for host in self.best_timestamp_per_peer.keys():
try:
_ = ipaddress.IPv6Address(host)
ipv6_addresses_count += 1
except ValueError:
continue
self.log.error(
"IPv4 addresses gossiped with timestamp in the last 5 days with respond_peers messages: "
f"{addresses_count - ipv6_addresses_count}."
)
self.log.error(
"IPv6 addresses gossiped with timestamp in the last 5 days with respond_peers messages: "
f"{ipv6_addresses_count}."
)
ipv6_available_peers = 0
for host in self.host_to_version.keys():
try:
_ = ipaddress.IPv6Address(host)
ipv6_available_peers += 1
except ValueError:
continue
self.log.error(
f"Total IPv4 nodes reachable in the last 5 days: {available_peers - ipv6_available_peers}."
)
self.log.error(f"Total IPv6 nodes reachable in the last 5 days: {ipv6_available_peers}.")
self.log.error("Version distribution among reachable in the last 5 days (at least 100 nodes):")
if "minimum_version_count" in self.config and self.config["minimum_version_count"] > 0:
minimum_version_count = self.config["minimum_version_count"]
else:
minimum_version_count = 100
for version, count in sorted(versions.items(), key=lambda kv: kv[1], reverse=True):
if count >= minimum_version_count:
self.log.error(f"Version: {version} - Count: {count}")
self.log.error(f"Banned addresses in the DB: {banned_peers}")
self.log.error(f"Temporary ignored addresses in the DB: {ignored_peers}")
self.log.error(
"Peers to crawl from in the next batch (total IPs - ignored - banned): "
f"{total_records - banned_peers - ignored_peers}"
)
self.log.error("***")
except Exception as e:
self.log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.")
def set_server(self, server: ChiaServer):
self.server = server
def _state_changed(self, change: str):
if self.state_changed_callback is not None:
self.state_changed_callback(change)
async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection):
try:
peer_info = peer.get_peer_info()
if peer_info is None:
return
if request.height >= self.minimum_height:
if self.crawl_store is not None:
await self.crawl_store.peer_connected_hostname(peer_info.host, True)
self.with_peak.add(peer_info)
except Exception as e:
self.log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.")
async def on_connect(self, connection: ws.WSChiaConnection):
pass
def _close(self):
self._shut_down = True
async def _await_closed(self):
await self.connection.close()

128
chia/seeder/crawler_api.py Normal file
View File

@ -0,0 +1,128 @@
from typing import Callable, Optional
import chia.server.ws_connection as ws
from chia.full_node.full_node import full_node_protocol, wallet_protocol
from chia.seeder.crawler import Crawler
from chia.server.outbound_message import Message
from chia.util.api_decorators import api_request, peer_required
class CrawlerAPI:
crawler: Crawler
def __init__(self, crawler):
self.crawler = crawler
def _set_state_changed_callback(self, callback: Callable):
pass
def __getattr__(self, attr_name: str):
async def invoke(*args, **kwargs):
pass
return invoke
@property
def server(self):
return self.crawler.server
@property
def log(self):
return self.crawler.log
@peer_required
@api_request
async def request_peers(self, _request: full_node_protocol.RequestPeers, peer: ws.WSChiaConnection):
pass
@peer_required
@api_request
async def respond_peers(
self, request: full_node_protocol.RespondPeers, peer: ws.WSChiaConnection
) -> Optional[Message]:
pass
@peer_required
@api_request
async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection) -> Optional[Message]:
await self.crawler.new_peak(request, peer)
return None
@api_request
async def new_transaction(self, transaction: full_node_protocol.NewTransaction) -> Optional[Message]:
pass
@api_request
@peer_required
async def new_signage_point_or_end_of_sub_slot(
self, new_sp: full_node_protocol.NewSignagePointOrEndOfSubSlot, peer: ws.WSChiaConnection
) -> Optional[Message]:
pass
@api_request
async def new_unfinished_block(
self, new_unfinished_block: full_node_protocol.NewUnfinishedBlock
) -> Optional[Message]:
pass
@peer_required
@api_request
async def new_compact_vdf(self, request: full_node_protocol.NewCompactVDF, peer: ws.WSChiaConnection):
pass
@api_request
async def request_transaction(self, request: full_node_protocol.RequestTransaction) -> Optional[Message]:
pass
@api_request
async def request_proof_of_weight(self, request: full_node_protocol.RequestProofOfWeight) -> Optional[Message]:
pass
@api_request
async def request_block(self, request: full_node_protocol.RequestBlock) -> Optional[Message]:
pass
@api_request
async def request_blocks(self, request: full_node_protocol.RequestBlocks) -> Optional[Message]:
pass
@api_request
async def request_unfinished_block(
self, request_unfinished_block: full_node_protocol.RequestUnfinishedBlock
) -> Optional[Message]:
pass
@api_request
async def request_signage_point_or_end_of_sub_slot(
self, request: full_node_protocol.RequestSignagePointOrEndOfSubSlot
) -> Optional[Message]:
pass
@peer_required
@api_request
async def request_mempool_transactions(
self,
request: full_node_protocol.RequestMempoolTransactions,
peer: ws.WSChiaConnection,
) -> Optional[Message]:
pass
@api_request
async def request_block_header(self, request: wallet_protocol.RequestBlockHeader) -> Optional[Message]:
pass
@api_request
async def request_additions(self, request: wallet_protocol.RequestAdditions) -> Optional[Message]:
pass
@api_request
async def request_removals(self, request: wallet_protocol.RequestRemovals) -> Optional[Message]:
pass
@api_request
async def request_puzzle_solution(self, request: wallet_protocol.RequestPuzzleSolution) -> Optional[Message]:
pass
@api_request
async def request_header_blocks(self, request: wallet_protocol.RequestHeaderBlocks) -> Optional[Message]:
pass

287
chia/seeder/dns_server.py Normal file
View File

@ -0,0 +1,287 @@
import asyncio
import ipaddress
import logging
import random
import signal
import traceback
from typing import Any, List
import aiosqlite
from dnslib import A, AAAA, SOA, NS, MX, CNAME, RR, DNSRecord, QTYPE, DNSHeader
from chia.util.chia_logging import initialize_logging
from chia.util.path import mkdir, path_from_root
from chia.util.config import load_config
from chia.util.default_root import DEFAULT_ROOT_PATH
SERVICE_NAME = "seeder"
log = logging.getLogger(__name__)
# DNS snippet taken from: https://gist.github.com/pklaus/b5a7876d4d2cf7271873
class DomainName(str):
def __getattr__(self, item):
return DomainName(item + "." + self)
D = None
ns = None
IP = "127.0.0.1"
TTL = None
soa_record = None
ns_records: List[Any] = []
class EchoServerProtocol(asyncio.DatagramProtocol):
def __init__(self, callback):
self.data_queue = asyncio.Queue(loop=asyncio.get_event_loop())
self.callback = callback
asyncio.ensure_future(self.respond())
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
asyncio.ensure_future(self.handler(data, addr), loop=asyncio.get_event_loop())
async def respond(self):
while True:
try:
resp, caller = await self.data_queue.get()
self.transport.sendto(resp, caller)
except Exception as e:
log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.")
async def handler(self, data, caller):
try:
data = await self.callback(data)
if data is None:
return
await self.data_queue.put((data, caller))
except Exception as e:
log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.")
class DNSServer:
reliable_peers_v4: List[str]
reliable_peers_v6: List[str]
lock: asyncio.Lock
pointer: int
crawl_db: aiosqlite.Connection
def __init__(self):
self.reliable_peers_v4 = []
self.reliable_peers_v6 = []
self.lock = asyncio.Lock()
self.pointer_v4 = 0
self.pointer_v6 = 0
db_path_replaced: str = "crawler.db"
root_path = DEFAULT_ROOT_PATH
self.db_path = path_from_root(root_path, db_path_replaced)
mkdir(self.db_path.parent)
async def start(self):
# self.crawl_db = await aiosqlite.connect(self.db_path)
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
self.transport, self.protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(self.dns_response), local_addr=("0.0.0.0", 53)
)
self.reliable_task = asyncio.create_task(self.periodically_get_reliable_peers())
async def periodically_get_reliable_peers(self):
sleep_interval = 0
while True:
sleep_interval = min(15, sleep_interval + 1)
await asyncio.sleep(sleep_interval * 60)
try:
# TODO: double check this. It shouldn't take this long to connect.
crawl_db = await aiosqlite.connect(self.db_path, timeout=600)
cursor = await crawl_db.execute(
"SELECT * from good_peers",
)
new_reliable_peers = []
rows = await cursor.fetchall()
await cursor.close()
await crawl_db.close()
for row in rows:
new_reliable_peers.append(row[0])
if len(new_reliable_peers) > 0:
random.shuffle(new_reliable_peers)
async with self.lock:
self.reliable_peers_v4 = []
self.reliable_peers_v6 = []
for peer in new_reliable_peers:
ipv4 = True
try:
_ = ipaddress.IPv4Address(peer)
except ValueError:
ipv4 = False
if ipv4:
self.reliable_peers_v4.append(peer)
else:
try:
_ = ipaddress.IPv6Address(peer)
except ValueError:
continue
self.reliable_peers_v6.append(peer)
self.pointer_v4 = 0
self.pointer_v6 = 0
log.error(
f"Number of reliable peers discovered in dns server:"
f" IPv4 count - {len(self.reliable_peers_v4)}"
f" IPv6 count - {len(self.reliable_peers_v6)}"
)
except Exception as e:
log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.")
async def get_peers_to_respond(self, ipv4_count, ipv6_count):
peers = []
async with self.lock:
# Append IPv4.
size = len(self.reliable_peers_v4)
if ipv4_count > 0 and size <= ipv4_count:
peers = self.reliable_peers_v4
elif ipv4_count > 0:
peers = [self.reliable_peers_v4[i % size] for i in range(self.pointer_v4, self.pointer_v4 + ipv4_count)]
self.pointer_v4 = (self.pointer_v4 + ipv4_count) % size
# Append IPv6.
size = len(self.reliable_peers_v6)
if ipv6_count > 0 and size <= ipv6_count:
peers = peers + self.reliable_peers_v6
elif ipv6_count > 0:
peers = peers + [
self.reliable_peers_v6[i % size] for i in range(self.pointer_v6, self.pointer_v6 + ipv6_count)
]
self.pointer_v6 = (self.pointer_v6 + ipv6_count) % size
return peers
async def dns_response(self, data):
try:
request = DNSRecord.parse(data)
IPs = [MX(D.mail), soa_record] + ns_records
ipv4_count = 0
ipv6_count = 0
if request.q.qtype == 1:
ipv4_count = 32
elif request.q.qtype == 28:
ipv6_count = 32
elif request.q.qtype == 255:
ipv4_count = 16
ipv6_count = 16
else:
ipv4_count = 32
peers = await self.get_peers_to_respond(ipv4_count, ipv6_count)
if len(peers) == 0:
return None
for peer in peers:
ipv4 = True
try:
_ = ipaddress.IPv4Address(peer)
except ValueError:
ipv4 = False
if ipv4:
IPs.append(A(peer))
else:
try:
_ = ipaddress.IPv6Address(peer)
except ValueError:
continue
IPs.append(AAAA(peer))
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=len(IPs), ra=1), q=request.q)
records = {
D: IPs,
D.ns1: [A(IP)], # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3)
D.ns2: [A(IP)],
D.mail: [A(IP)],
D.andrei: [CNAME(D)],
}
qname = request.q.qname
qn = str(qname)
qtype = request.q.qtype
qt = QTYPE[qtype]
if qn == D or qn.endswith("." + D):
for name, rrs in records.items():
if name == qn:
for rdata in rrs:
rqt = rdata.__class__.__name__
if qt in ["*", rqt] or (qt == "ANY" and (rqt == "A" or rqt == "AAAA")):
reply.add_answer(
RR(rname=qname, rtype=getattr(QTYPE, rqt), rclass=1, ttl=TTL, rdata=rdata)
)
for rdata in ns_records:
reply.add_ar(RR(rname=D, rtype=QTYPE.NS, rclass=1, ttl=TTL, rdata=rdata))
reply.add_auth(RR(rname=D, rtype=QTYPE.SOA, rclass=1, ttl=TTL, rdata=soa_record))
return reply.pack()
except Exception as e:
log.error(f"Exception: {e}. Traceback: {traceback.format_exc()}.")
async def serve_dns():
dns_server = DNSServer()
await dns_server.start()
# TODO: Make this cleaner?
while True:
await asyncio.sleep(3600)
async def kill_processes():
# TODO: implement.
pass
def main():
root_path = DEFAULT_ROOT_PATH
config = load_config(root_path, "config.yaml", SERVICE_NAME)
initialize_logging(SERVICE_NAME, config["logging"], root_path)
global D
global ns
global TTL
global soa_record
global ns_records
D = DomainName(config["domain_name"])
ns = DomainName(config["nameserver"])
TTL = config["ttl"]
soa_record = SOA(
mname=ns, # primary name server
rname=config["soa"]["rname"], # email of the domain administrator
times=(
config["soa"]["serial_number"],
config["soa"]["refresh"],
config["soa"]["retry"],
config["soa"]["expire"],
config["soa"]["minimum"],
),
)
ns_records = [NS(ns)]
def signal_received():
asyncio.create_task(kill_processes())
loop = asyncio.get_event_loop()
try:
loop.add_signal_handler(signal.SIGINT, signal_received)
loop.add_signal_handler(signal.SIGTERM, signal_received)
except NotImplementedError:
log.info("signal handlers unsupported")
try:
loop.run_until_complete(serve_dns())
finally:
loop.close()
if __name__ == "__main__":
main()

147
chia/seeder/peer_record.py Normal file
View File

@ -0,0 +1,147 @@
import math
import time
from dataclasses import dataclass
from chia.util.ints import uint32, uint64
from chia.util.streamable import Streamable, streamable
@dataclass(frozen=True)
@streamable
class PeerRecord(Streamable):
peer_id: str
ip_address: str
port: uint32
connected: bool
last_try_timestamp: uint64
try_count: uint32
connected_timestamp: uint64
added_timestamp: uint64
best_timestamp: uint64
version: str
handshake_time: uint64
def update_version(self, version, now):
if version != "undefined":
object.__setattr__(self, "version", version)
object.__setattr__(self, "handshake_time", uint64(now))
class PeerStat:
weight: float
count: float
reliability: float
def __init__(self, weight, count, reliability):
self.weight = weight
self.count = count
self.reliability = reliability
def update(self, is_reachable: bool, age: int, tau: int):
f = math.exp(-age / tau)
self.reliability = self.reliability * f + (1.0 - f if is_reachable else 0.0)
self.count = self.count * f + 1.0
self.weight = self.weight * f + 1.0 - f
class PeerReliability:
peer_id: str
ignore_till: int
ban_till: int
stat_2h: PeerStat
stat_8h: PeerStat
stat_1d: PeerStat
stat_1w: PeerStat
stat_1m: PeerStat
tries: int
successes: int
def __init__(
self,
peer_id: str,
ignore_till: int = 0,
ban_till: int = 0,
stat_2h_weight: float = 0.0,
stat_2h_count: float = 0.0,
stat_2h_reliability: float = 0.0,
stat_8h_weight: float = 0.0,
stat_8h_count: float = 0.0,
stat_8h_reliability: float = 0.0,
stat_1d_weight: float = 0.0,
stat_1d_count: float = 0.0,
stat_1d_reliability: float = 0.0,
stat_1w_weight: float = 0.0,
stat_1w_count: float = 0.0,
stat_1w_reliability: float = 0.0,
stat_1m_weight: float = 0.0,
stat_1m_count: float = 0.0,
stat_1m_reliability: float = 0.0,
tries: int = 0,
successes: int = 0,
):
self.peer_id = peer_id
self.ignore_till = ignore_till
self.ban_till = ban_till
self.stat_2h = PeerStat(stat_2h_weight, stat_2h_count, stat_2h_reliability)
self.stat_8h = PeerStat(stat_8h_weight, stat_8h_count, stat_8h_reliability)
self.stat_1d = PeerStat(stat_1d_weight, stat_1d_count, stat_1d_reliability)
self.stat_1w = PeerStat(stat_1w_weight, stat_1w_count, stat_1w_reliability)
self.stat_1m = PeerStat(stat_1m_weight, stat_1m_count, stat_1m_reliability)
self.tries = tries
self.successes = successes
def is_reliable(self) -> bool:
if self.tries > 0 and self.tries <= 3 and self.successes * 2 >= self.tries:
return True
if self.stat_2h.reliability > 0.85 and self.stat_2h.count > 2:
return True
if self.stat_8h.reliability > 0.7 and self.stat_8h.count > 4:
return True
if self.stat_1d.reliability > 0.55 and self.stat_1d.count > 8:
return True
if self.stat_1w.reliability > 0.45 and self.stat_1w.count > 16:
return True
if self.stat_1m.reliability > 0.35 and self.stat_1m.count > 32:
return True
return False
def get_ban_time(self) -> int:
if self.is_reliable():
return 0
if self.stat_1m.reliability - self.stat_1m.weight + 1 < 0.15 and self.stat_1m.count > 32:
return 30 * 86400
if self.stat_1w.reliability - self.stat_1w.weight + 1.0 < 0.10 and self.stat_1w.count > 16:
return 7 * 86400
if self.stat_1d.reliability - self.stat_1d.weight + 1.0 < 0.05 and self.stat_1d.count > 8:
return 86400
return 0
def get_ignore_time(self) -> int:
if self.is_reliable():
return 0
if self.stat_1m.reliability - self.stat_1m.weight + 1.0 < 0.20 and self.stat_1m.count > 2:
return 10 * 86400
if self.stat_1w.reliability - self.stat_1w.weight + 1.0 < 0.16 and self.stat_1w.count > 2:
return 3 * 86400
if self.stat_1d.reliability - self.stat_1d.weight + 1.0 < 0.12 and self.stat_1d.count > 2:
return 8 * 3600
if self.stat_8h.reliability - self.stat_8h.weight + 1.0 < 0.08 and self.stat_8h.count > 2:
return 2 * 3600
return 0
def update(self, is_reachable: bool, age: int):
self.stat_2h.update(is_reachable, age, 2 * 3600)
self.stat_8h.update(is_reachable, age, 8 * 3600)
self.stat_1d.update(is_reachable, age, 24 * 3600)
self.stat_1w.update(is_reachable, age, 7 * 24 * 3600)
self.stat_1m.update(is_reachable, age, 24 * 30 * 3600)
self.tries += 1
if is_reachable:
self.successes += 1
current_ignore_time = self.get_ignore_time()
now = int(time.time())
if current_ignore_time > 0 and (self.ignore_till == 0 or self.ignore_till < current_ignore_time + now):
self.ignore_till = current_ignore_time + now
current_ban_time = self.get_ban_time()
if current_ban_time > 0 and (self.ban_till == 0 or self.ban_till < current_ban_time + now):
self.ban_till = current_ban_time + now

View File

@ -0,0 +1,59 @@
import logging
import pathlib
from multiprocessing import freeze_support
from typing import Dict
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
from chia.seeder.crawler import Crawler
from chia.seeder.crawler_api import CrawlerAPI
from chia.server.outbound_message import NodeType
from chia.server.start_service import run_service
from chia.util.config import load_config_cli
from chia.util.default_root import DEFAULT_ROOT_PATH
# See: https://bugs.python.org/issue29288
"".encode("idna")
SERVICE_NAME = "full_node"
log = logging.getLogger(__name__)
def service_kwargs_for_full_node_crawler(
root_path: pathlib.Path, config: Dict, consensus_constants: ConsensusConstants
) -> Dict:
crawler = Crawler(
config,
root_path=root_path,
consensus_constants=consensus_constants,
)
api = CrawlerAPI(crawler)
network_id = config["selected_network"]
kwargs = dict(
root_path=root_path,
node=api.crawler,
peer_api=api,
node_type=NodeType.FULL_NODE,
advertised_port=config["port"],
service_name=SERVICE_NAME,
upnp_ports=[],
server_listen_ports=[config["port"]],
on_connect_callback=crawler.on_connect,
network_id=network_id,
)
return kwargs
def main():
config = load_config_cli(DEFAULT_ROOT_PATH, "config.yaml", "seeder")
overrides = config["network_overrides"]["constants"][config["selected_network"]]
updated_constants = DEFAULT_CONSTANTS.replace_str_to_bytes(**overrides)
kwargs = service_kwargs_for_full_node_crawler(DEFAULT_ROOT_PATH, config, updated_constants)
return run_service(**kwargs)
if __name__ == "__main__":
freeze_support()
main()

View File

View File

@ -0,0 +1,34 @@
from pathlib import Path
import pkg_resources
from chia.util.config import load_config, save_config
def patch_default_seeder_config(root_path: Path, filename="config.yaml") -> None:
"""
Checks if the seeder: section exists in the config. If not, the default seeder settings are appended to the file
"""
existing_config = load_config(root_path, "config.yaml")
if "seeder" in existing_config:
print("Chia Seeder section exists in config. No action required.")
return
print("Chia Seeder section does not exist in config. Patching...")
config = load_config(root_path, "config.yaml")
# The following ignores root_path when the second param is absolute, which this will be
seeder_config = load_config(root_path, pkg_resources.resource_filename("chia.util", "initial-config.yaml"))
# Patch in the values with anchors, since pyyaml tends to change
# the anchors to things like id001, etc
config["seeder"] = seeder_config["seeder"]
config["seeder"]["network_overrides"] = config["network_overrides"]
config["seeder"]["selected_network"] = config["selected_network"]
config["seeder"]["logging"] = config["logging"]
# When running as crawler, we default to a much lower client timeout
config["full_node"]["peer_connect_timeout"] = 2
save_config(root_path, "config.yaml", config)

103
chia/seeder/util/service.py Normal file
View File

@ -0,0 +1,103 @@
import os
import signal
import subprocess
import sys
from pathlib import Path
from typing import Tuple
from chia.daemon.server import pid_path_for_service
from chia.util.path import mkdir
def launch_service(root_path: Path, service_command) -> Tuple[subprocess.Popen, Path]:
"""
Launch a child process.
"""
# set up CHIA_ROOT
# invoke correct script
# save away PID
# we need to pass on the possibly altered CHIA_ROOT
os.environ["CHIA_ROOT"] = str(root_path)
print(f"Launching service with CHIA_ROOT: {os.environ['CHIA_ROOT']}")
# Insert proper e
service_array = service_command.split()
service_executable = executable_for_service(service_array[0])
service_array[0] = service_executable
startupinfo = None
if os.name == "nt":
startupinfo = subprocess.STARTUPINFO() # type: ignore
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
# CREATE_NEW_PROCESS_GROUP allows graceful shutdown on windows, by CTRL_BREAK_EVENT signal
if sys.platform == "win32" or sys.platform == "cygwin":
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP
else:
creationflags = 0
environ_copy = os.environ.copy()
process = subprocess.Popen(
service_array, shell=False, startupinfo=startupinfo, creationflags=creationflags, env=environ_copy
)
pid_path = pid_path_for_service(root_path, service_command)
try:
mkdir(pid_path.parent)
with open(pid_path, "w") as f:
f.write(f"{process.pid}\n")
except Exception:
pass
return process, pid_path
def kill_service(root_path: Path, service_name: str) -> bool:
pid_path = pid_path_for_service(root_path, service_name)
try:
with open(pid_path) as f:
pid = int(f.read())
# @TODO SIGKILL seems necessary right now for the DNS server, but not the crawler (fix that)
# @TODO Ensure processes stop before renaming the files and returning
os.kill(pid, signal.SIGKILL)
print("sent SIGKILL to process")
except Exception:
pass
try:
pid_path_killed = pid_path.with_suffix(".pid-killed")
if pid_path_killed.exists():
pid_path_killed.unlink()
os.rename(pid_path, pid_path_killed)
except Exception:
pass
return True
# determine if application is a script file or frozen exe
if getattr(sys, "frozen", False):
name_map = {
"chia_seeder": "chia_seeder",
"chia_seeder_crawler": "chia_seeder_crawler",
"chia_seeder_server": "chia_seeder_server",
}
def executable_for_service(service_name: str) -> str:
application_path = os.path.dirname(sys.executable)
if sys.platform == "win32" or sys.platform == "cygwin":
executable = name_map[service_name]
path = f"{application_path}/{executable}.exe"
return path
else:
path = f"{application_path}/{name_map[service_name]}"
return path
else:
application_path = os.path.dirname(__file__)
def executable_for_service(service_name: str) -> str:
return service_name

View File

@ -0,0 +1,17 @@
from typing import KeysView, Generator
SERVICES_FOR_GROUP = {
"all": "chia_seeder_crawler chia_seeder_server".split(),
"crawler": "chia_seeder_crawler".split(),
"server": "chia_seeder_server".split(),
}
def all_groups() -> KeysView[str]:
return SERVICES_FOR_GROUP.keys()
def services_for_groups(groups) -> Generator[str, None, None]:
for group in groups:
for service in SERVICES_FOR_GROUP[group]:
yield service

View File

@ -187,13 +187,18 @@ class ChiaServer:
Periodically checks for connections with no activity (have not sent us any data), and removes them,
to allow room for other peers.
"""
is_crawler = getattr(self.node, "crawl", None)
while True:
await asyncio.sleep(600)
await asyncio.sleep(600 if is_crawler is None else 2)
to_remove: List[WSChiaConnection] = []
for connection in self.all_connections.values():
if self._local_type == NodeType.FULL_NODE and connection.connection_type == NodeType.FULL_NODE:
if time.time() - connection.last_message_time > 1800:
to_remove.append(connection)
if is_crawler is not None:
if time.time() - connection.creation_time > 5:
to_remove.append(connection)
else:
if time.time() - connection.last_message_time > 1800:
to_remove.append(connection)
for connection in to_remove:
self.log.debug(f"Garbage collecting connection {connection.peer_host} due to inactivity")
await connection.close()
@ -243,6 +248,9 @@ class ChiaServer:
self.log.info(f"Started listening on port: {self._port}")
async def incoming_connection(self, request):
if getattr(self.node, "crawl", None) is not None:
return
if request.remote in self.banned_peers and time.time() < self.banned_peers[request.remote]:
self.log.warning(f"Peer {request.remote} is banned, refusing connection")
return None

View File

@ -105,7 +105,7 @@ class WSChiaConnection:
self.outbound_rate_limiter = RateLimiter(incoming=False, percentage_of_limit=outbound_rate_limit_percent)
self.inbound_rate_limiter = RateLimiter(incoming=True, percentage_of_limit=inbound_rate_limit_percent)
# Used by crawler/dns introducer
# Used by the Chia Seeder.
self.version = None
async def perform_handshake(self, network_id: str, protocol_version: str, server_port: int, local_type: NodeType):
@ -187,7 +187,7 @@ class WSChiaConnection:
async def close(self, ban_time: int = 0, ws_close_code: WSCloseCode = WSCloseCode.OK, error: Optional[Err] = None):
"""
Closes the connection, and finally calls the close_callback on the server, so the connections gets removed
Closes the connection, and finally calls the close_callback on the server, so the connection gets removed
from the global list.
"""
@ -486,7 +486,7 @@ class WSChiaConnection:
await asyncio.sleep(3)
return None
# Used by crawler/dns introducer
# Used by the Chia Seeder.
def get_version(self):
return self.version

View File

@ -115,6 +115,34 @@ logging: &logging
log_syslog_host: "localhost" # Send logging messages to a remote or local Unix syslog
log_syslog_port: 514 # UDP port of the remote or local Unix syslog
seeder:
# The fake full node used for crawling will run on this port.
port: 8444
# Most full nodes on the network run on this port. (i.e. 8444 for mainnet, 58444 for testnet).
other_peers_port: 8444
# Path to crawler DB. If empty, will use $CHIA_ROOT/crawler.db
crawler_db_path: ""
# Peers used for the initial run.
bootstrap_peers:
- "node.chia.net"
# Only consider nodes synced at least to this height.
minimum_height: 240000
# How many of a particular version we need to see before reporting it in the logs
minimum_version_count: 100
domain_name: "seeder.example.com."
nameserver: "example.com."
ttl: 300
soa:
rname: "hostmaster.example.com."
serial_number: 1619105223
refresh: 10800
retry: 10800
expire: 604800
minimum: 1800
network_overrides: *network_overrides
selected_network: *selected_network
logging: *logging
harvester:
# The harvester server (if run) will run on this port
port: 8448
@ -383,7 +411,7 @@ introducer:
host: *self_hostname
port: 8445
max_peers_to_send: 20
# The introducer will only return peers who it has seen in the last
# The introducer will only return peers it has seen in the last
# recent_peer_threshold seconds
recent_peer_threshold: 6000
logging: *logging

View File

@ -1,7 +1,7 @@
from typing import KeysView, Generator
SERVICES_FOR_GROUP = {
"all": "chia_harvester chia_timelord_launcher chia_timelord chia_farmer chia_full_node chia_wallet".split(),
"all": ("chia_harvester chia_timelord_launcher chia_timelord " "chia_farmer chia_full_node chia_wallet").split(),
"node": "chia_full_node".split(),
"harvester": "chia_harvester".split(),
"farmer": "chia_harvester chia_farmer chia_full_node chia_wallet".split(),

@ -1 +1 @@
Subproject commit 9e242f32e0c005f602d00c47c240d86c57e393a2
Subproject commit b1b808ab930004fc6b4afc4b248dee0a136f3f00

View File

@ -29,6 +29,7 @@ dependencies = [
"click==7.1.2", # For the CLI
"dnspythonchia==2.2.0", # Query DNS seeds
"watchdog==2.1.6", # Filesystem event watching - watches keyring.yaml
"dnslib==0.9.14", # dns lib
]
upnp_dependencies = [
@ -86,6 +87,8 @@ kwargs = dict(
"chia.pools",
"chia.protocols",
"chia.rpc",
"chia.seeder",
"chia.seeder.util",
"chia.server",
"chia.simulator",
"chia.types.blockchain_format",
@ -110,6 +113,9 @@ kwargs = dict(
"chia_harvester = chia.server.start_harvester:main",
"chia_farmer = chia.server.start_farmer:main",
"chia_introducer = chia.server.start_introducer:main",
"chia_seeder = chia.cmds.seeder:main",
"chia_seeder_crawler = chia.seeder.start_crawler:main",
"chia_seeder_server = chia.seeder.dns_server:main",
"chia_timelord = chia.server.start_timelord:main",
"chia_timelord_launcher = chia.timelord.timelord_launcher:main",
"chia_full_node_simulator = chia.simulator.start_simulator:main",