mirror of
https://github.com/Chia-Network/chia-blockchain.git
synced 2024-09-21 08:31:52 +03:00
c4e14f5c78
* Serialize/deserialize peer data alongside existing sqlite implementation (to be removed) * Simplified AddressManagerStore. No longer uses sqlite and is no longer async. * Removed aiosqlite usage from AddressManagerStore. Added PeerStoreResolver class to determine the appropriate location for "peers.dat" Updated initial-config.yaml to include "peers_file_path" default, replacing "peer_db_path" (similar change for "wallet_peers_path") * Minor comment changes/additions * Added migration from sqlite peer db. Made AddressManagerStore's serialization async as it was right at the edge of blocking for too long. * Minor tweaks to checking for migration * Removed AddressManagerSQLiteStore class scaffolding * makePeerDataSerialization now returns bytes instead of a PeerDataSerialization object * Async file I/O for write_file_async using aiofiles Added more tests * Separate out the synchronous part of move_file * Renamed write_file to files since we're opening up the capabilities a bit * Update references to write_file * Renamed test_write_file to test_files * Tests covering move_file and move_file_async * Minor refinements to behavior and tests * Use aiofiles for reading peers.dat * Added missing mypy typing info for aiofiles. Also added types-PyYAML to dev_dependencies so that `mypy chia tests` doesn't require running with --install-types * Add types-aiofiles to the linting workflow * Directory perms can now be passed into write_file_async. Added an explicit f.flush() followed by os.fsync() after writing the temp file contents.
236 lines
9.5 KiB
Python
236 lines
9.5 KiB
Python
import aiofiles
|
|
import asyncio
|
|
import logging
|
|
|
|
from chia.server.address_manager import (
|
|
BUCKET_SIZE,
|
|
NEW_BUCKET_COUNT,
|
|
NEW_BUCKETS_PER_ADDRESS,
|
|
AddressManager,
|
|
ExtendedPeerInfo,
|
|
)
|
|
from chia.util.files import write_file_async
|
|
from chia.util.ints import uint64
|
|
from chia.util.path import mkdir
|
|
from chia.util.streamable import streamable, Streamable
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from timeit import default_timer as timer
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
@streamable
|
|
class PeerDataSerialization(Streamable):
|
|
"""
|
|
Serializable property bag for the peer data that was previously stored in sqlite.
|
|
"""
|
|
|
|
metadata: List[Tuple[str, str]]
|
|
nodes: List[Tuple[uint64, str]]
|
|
new_table: List[Tuple[uint64, uint64]]
|
|
|
|
|
|
async def makePeerDataSerialization(
|
|
metadata: List[Tuple[str, Any]], nodes: List[Tuple[int, ExtendedPeerInfo]], new_table: List[Tuple[int, int]]
|
|
) -> bytes:
|
|
"""
|
|
Create a PeerDataSerialization, adapting the provided collections
|
|
"""
|
|
transformed_nodes: List[Tuple[uint64, str]] = []
|
|
transformed_new_table: List[Tuple[uint64, uint64]] = []
|
|
|
|
for index, [node_id, peer_info] in enumerate(nodes):
|
|
transformed_nodes.append((uint64(node_id), peer_info.to_string()))
|
|
# Come up to breathe for a moment
|
|
if index % 1000 == 0:
|
|
await asyncio.sleep(0)
|
|
|
|
for index, [node_id, bucket_id] in enumerate(new_table):
|
|
transformed_new_table.append((uint64(node_id), uint64(bucket_id)))
|
|
# Come up to breathe for a moment
|
|
if index % 1000 == 0:
|
|
await asyncio.sleep(0)
|
|
|
|
serialized_bytes: bytes = bytes(PeerDataSerialization(metadata, transformed_nodes, transformed_new_table))
|
|
return serialized_bytes
|
|
|
|
|
|
class AddressManagerStore:
|
|
"""
|
|
Metadata table:
|
|
- private key
|
|
- new table count
|
|
- tried table count
|
|
Nodes table:
|
|
* Maps entries from new/tried table to unique node ids.
|
|
- node_id
|
|
- IP, port, together with the IP, port of the source peer.
|
|
New table:
|
|
* Stores node_id, bucket for each occurrence in the new table of an entry.
|
|
* Once we know the buckets, we can also deduce the bucket positions.
|
|
Every other information, such as tried_matrix, map_addr, map_info, random_pos,
|
|
be deduced and it is not explicitly stored, instead it is recalculated.
|
|
"""
|
|
|
|
@classmethod
|
|
async def create_address_manager(cls, peers_file_path: Path) -> AddressManager:
|
|
"""
|
|
Create an address manager using data deserialized from a peers file.
|
|
"""
|
|
address_manager: Optional[AddressManager] = None
|
|
if peers_file_path.exists():
|
|
try:
|
|
log.info(f"Loading peers from {peers_file_path}")
|
|
address_manager = await cls._deserialize(peers_file_path)
|
|
except Exception:
|
|
log.exception(f"Unable to create address_manager from {peers_file_path}")
|
|
|
|
if address_manager is None:
|
|
log.info("Creating new address_manager")
|
|
address_manager = AddressManager()
|
|
|
|
return address_manager
|
|
|
|
@classmethod
|
|
async def serialize(cls, address_manager: AddressManager, peers_file_path: Path) -> None:
|
|
"""
|
|
Serialize the address manager's peer data to a file.
|
|
"""
|
|
metadata: List[Tuple[str, str]] = []
|
|
nodes: List[Tuple[int, ExtendedPeerInfo]] = []
|
|
new_table_entries: List[Tuple[int, int]] = []
|
|
unique_ids: Dict[int, int] = {}
|
|
count_ids: int = 0
|
|
|
|
log.info("Serializing peer data")
|
|
metadata.append(("key", str(address_manager.key)))
|
|
|
|
for node_id, info in address_manager.map_info.items():
|
|
unique_ids[node_id] = count_ids
|
|
if info.ref_count > 0:
|
|
assert count_ids != address_manager.new_count
|
|
nodes.append((count_ids, info))
|
|
count_ids += 1
|
|
metadata.append(("new_count", str(count_ids)))
|
|
|
|
tried_ids = 0
|
|
for node_id, info in address_manager.map_info.items():
|
|
if info.is_tried:
|
|
assert info is not None
|
|
assert tried_ids != address_manager.tried_count
|
|
nodes.append((count_ids, info))
|
|
count_ids += 1
|
|
tried_ids += 1
|
|
metadata.append(("tried_count", str(tried_ids)))
|
|
|
|
for bucket in range(NEW_BUCKET_COUNT):
|
|
for i in range(BUCKET_SIZE):
|
|
if address_manager.new_matrix[bucket][i] != -1:
|
|
index = unique_ids[address_manager.new_matrix[bucket][i]]
|
|
new_table_entries.append((index, bucket))
|
|
|
|
try:
|
|
# Ensure the parent directory exists
|
|
mkdir(peers_file_path.parent)
|
|
start_time = timer()
|
|
await cls._write_peers(peers_file_path, metadata, nodes, new_table_entries)
|
|
log.debug(f"Serializing peer data took {timer() - start_time} seconds")
|
|
except Exception:
|
|
log.exception(f"Failed to write peer data to {peers_file_path}")
|
|
|
|
@classmethod
|
|
async def _deserialize(cls, peers_file_path: Path) -> AddressManager:
|
|
"""
|
|
Create an address manager using data deserialized from a peers file.
|
|
"""
|
|
peer_data: Optional[PeerDataSerialization] = None
|
|
address_manager = AddressManager()
|
|
start_time = timer()
|
|
try:
|
|
peer_data = await cls._read_peers(peers_file_path)
|
|
except Exception:
|
|
log.exception(f"Unable to deserialize peers from {peers_file_path}")
|
|
|
|
if peer_data is not None:
|
|
metadata: Dict[str, str] = {key: value for key, value in peer_data.metadata}
|
|
nodes: List[Tuple[int, ExtendedPeerInfo]] = [
|
|
(node_id, ExtendedPeerInfo.from_string(info_str)) for node_id, info_str in peer_data.nodes
|
|
]
|
|
new_table_entries: List[Tuple[int, int]] = [(node_id, bucket) for node_id, bucket in peer_data.new_table]
|
|
log.debug(f"Deserializing peer data took {timer() - start_time} seconds")
|
|
|
|
address_manager.key = int(metadata["key"])
|
|
address_manager.new_count = int(metadata["new_count"])
|
|
# address_manager.tried_count = int(metadata["tried_count"])
|
|
address_manager.tried_count = 0
|
|
|
|
new_table_nodes = [(node_id, info) for node_id, info in nodes if node_id < address_manager.new_count]
|
|
for n, info in new_table_nodes:
|
|
address_manager.map_addr[info.peer_info.host] = n
|
|
address_manager.map_info[n] = info
|
|
info.random_pos = len(address_manager.random_pos)
|
|
address_manager.random_pos.append(n)
|
|
address_manager.id_count = len(new_table_nodes)
|
|
tried_table_nodes = [(node_id, info) for node_id, info in nodes if node_id >= address_manager.new_count]
|
|
# lost_count = 0
|
|
for node_id, info in tried_table_nodes:
|
|
tried_bucket = info.get_tried_bucket(address_manager.key)
|
|
tried_bucket_pos = info.get_bucket_position(address_manager.key, False, tried_bucket)
|
|
if address_manager.tried_matrix[tried_bucket][tried_bucket_pos] == -1:
|
|
info.random_pos = len(address_manager.random_pos)
|
|
info.is_tried = True
|
|
id_count = address_manager.id_count
|
|
address_manager.random_pos.append(id_count)
|
|
address_manager.map_info[id_count] = info
|
|
address_manager.map_addr[info.peer_info.host] = id_count
|
|
address_manager.tried_matrix[tried_bucket][tried_bucket_pos] = id_count
|
|
address_manager.id_count += 1
|
|
address_manager.tried_count += 1
|
|
# else:
|
|
# lost_count += 1
|
|
|
|
# address_manager.tried_count -= lost_count
|
|
for node_id, bucket in new_table_entries:
|
|
if node_id >= 0 and node_id < address_manager.new_count:
|
|
info = address_manager.map_info[node_id]
|
|
bucket_pos = info.get_bucket_position(address_manager.key, True, bucket)
|
|
if (
|
|
address_manager.new_matrix[bucket][bucket_pos] == -1
|
|
and info.ref_count < NEW_BUCKETS_PER_ADDRESS
|
|
):
|
|
info.ref_count += 1
|
|
address_manager.new_matrix[bucket][bucket_pos] = node_id
|
|
|
|
for node_id, info in list(address_manager.map_info.items()):
|
|
if not info.is_tried and info.ref_count == 0:
|
|
address_manager.delete_new_entry_(node_id)
|
|
|
|
address_manager.load_used_table_positions()
|
|
|
|
return address_manager
|
|
|
|
@classmethod
|
|
async def _read_peers(cls, peers_file_path: Path) -> PeerDataSerialization:
|
|
"""
|
|
Read the peers file and return the data as a PeerDataSerialization object.
|
|
"""
|
|
async with aiofiles.open(peers_file_path, "rb") as f:
|
|
return PeerDataSerialization.from_bytes(await f.read())
|
|
|
|
@classmethod
|
|
async def _write_peers(
|
|
cls,
|
|
peers_file_path: Path,
|
|
metadata: List[Tuple[str, Any]],
|
|
nodes: List[Tuple[int, ExtendedPeerInfo]],
|
|
new_table: List[Tuple[int, int]],
|
|
) -> None:
|
|
"""
|
|
Serializes the given peer data and writes it to the peers file.
|
|
"""
|
|
serialized_bytes: bytes = await makePeerDataSerialization(metadata, nodes, new_table)
|
|
await write_file_async(peers_file_path, serialized_bytes, file_mode=0o644)
|