diff --git a/src/consensus/blockchain_check_conditions.py b/src/consensus/blockchain_check_conditions.py index 231be21f4f5a..8a4d913de370 100644 --- a/src/consensus/blockchain_check_conditions.py +++ b/src/consensus/blockchain_check_conditions.py @@ -1,4 +1,3 @@ -import time from typing import Optional, Dict, List from src.types.condition_var_pair import ConditionVarPair diff --git a/src/consensus/full_block_to_sub_block_record.py b/src/consensus/full_block_to_sub_block_record.py index 9fa3d4cc5a3c..7e2a7684b5cb 100644 --- a/src/consensus/full_block_to_sub_block_record.py +++ b/src/consensus/full_block_to_sub_block_record.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, List +from typing import Dict, Optional from src.consensus.constants import ConsensusConstants from src.consensus.pot_iterations import is_overflow_sub_block diff --git a/src/farmer_api.py b/src/farmer_api.py index 7af7b0d20f85..42f1ea1779bf 100644 --- a/src/farmer_api.py +++ b/src/farmer_api.py @@ -102,9 +102,6 @@ class FarmerAPI: """ There are two cases: receiving signatures for sps, or receiving signatures for the block. """ - import time - - start = time.time() if response.sp_hash not in self.farmer.sps: self.farmer.log.warning(f"Do not have challenge hash {response.challenge_hash}") return diff --git a/src/full_node/full_node.py b/src/full_node/full_node.py index 3ddbdfcf9028..a8878a41e492 100644 --- a/src/full_node/full_node.py +++ b/src/full_node/full_node.py @@ -9,7 +9,7 @@ from typing import AsyncGenerator, Optional, Dict, Callable, List, Tuple, Any, U import aiosqlite from blspy import AugSchemeMPL -import src.server.ws_connection as ws +import src.server.ws_connection as ws # lgtm [py/import-and-import-from] from src.consensus.blockchain import Blockchain, ReceiveBlockResult from src.consensus.constants import ConsensusConstants from src.consensus.difficulty_adjustment import get_sub_slot_iters_and_difficulty, can_finish_sub_and_full_epoch @@ -23,7 +23,7 @@ from src.full_node.mempool_manager import MempoolManager from src.full_node.sync_blocks_processor import SyncBlocksProcessor from src.full_node.sync_peers_handler import SyncPeersHandler from src.full_node.sync_store import SyncStore -from src.full_node.weight_proof import WeightProofHandler, BlockCache, BlockCacheMock +from src.full_node.weight_proof import WeightProofHandler, BlockCache from src.protocols import ( full_node_protocol, timelord_protocol, @@ -40,7 +40,7 @@ from src.types.sized_bytes import bytes32 from src.types.sub_epoch_summary import SubEpochSummary from src.types.unfinished_block import UnfinishedBlock from src.types.weight_proof import WeightProof -from src.util.errors import ConsensusError, Err +from src.util.errors import ConsensusError from src.util.ints import uint32, uint128, uint8 from src.util.path import mkdir, path_from_root @@ -127,23 +127,6 @@ class FullNode: False, self.blockchain.sub_blocks, ) - try: - """ - self.full_node_peers = FullNodePeers( - self.server, - self.root_path, - self.global_connections, - self.config["target_peer_count"] - self.config["target_outbound_peer_count"], - self.config["target_outbound_peer_count"], - self.config["peer_db_path"], - self.config["introducer_peer"], - self.config["peer_connect_interval"], - self.log, - ) - await self.full_node_peers.start() - """ - except Exception as e: - self.log.error(f"Exception in peer discovery: {e}") def set_server(self, server: ChiaServer): self.server = server @@ -283,7 +266,6 @@ class FullNode: highest_weight: uint128 = uint128(0) target_peak_sb_height: uint32 = uint32(0) sync_start_time = time.time() - peak_hash: bytes32 = None # Based on responses from peers about the current heads, see which head is the heaviest # (similar to longest chain rule). @@ -298,7 +280,6 @@ class FullNode: if potential_peak_block.weight > highest_weight: highest_weight = potential_peak_block.weight target_peak_sb_height = potential_peak_block.sub_block_height - peak_hash = header_hash if self.blockchain.get_peak() is not None and highest_weight <= self.blockchain.get_peak().weight: self.log.info("Not performing sync, already caught up.") diff --git a/src/harvester_api.py b/src/harvester_api.py index 138b978e0fc0..bc3744aeaf52 100644 --- a/src/harvester_api.py +++ b/src/harvester_api.py @@ -2,9 +2,9 @@ import asyncio import dataclasses import time from pathlib import Path -from typing import Optional, Callable, List, Tuple +from typing import Callable, List, Tuple -from blspy import AugSchemeMPL, G2Element, G1Element +from blspy import AugSchemeMPL, G2Element from chiapos import DiskProver from src.consensus.pot_iterations import calculate_sp_interval_iters, calculate_iterations_quality diff --git a/src/protocols/full_node_protocol.py b/src/protocols/full_node_protocol.py index 67dc4a5b0481..7faa9d37d730 100644 --- a/src/protocols/full_node_protocol.py +++ b/src/protocols/full_node_protocol.py @@ -10,7 +10,7 @@ from src.types.sized_bytes import bytes32 from src.types.vdf import VDFInfo, VDFProof from src.types.weight_proof import WeightProof from src.util.cbor_message import cbor_message -from src.util.ints import uint8, uint32, uint64, uint128, int32 +from src.util.ints import uint8, uint32, uint64, uint128 from src.types.peer_info import TimestampedPeerInfo """ diff --git a/src/rpc/farmer_rpc_api.py b/src/rpc/farmer_rpc_api.py index 66e150addb05..b6438a10b7ff 100644 --- a/src/rpc/farmer_rpc_api.py +++ b/src/rpc/farmer_rpc_api.py @@ -1,7 +1,6 @@ from typing import Callable, Set, Dict, List from src.farmer import Farmer -from src.util.ws_message import create_payload class FarmerRpcApi: diff --git a/src/rpc/full_node_rpc_api.py b/src/rpc/full_node_rpc_api.py index 4ec85e0e52e7..d0efa71bec0f 100644 --- a/src/rpc/full_node_rpc_api.py +++ b/src/rpc/full_node_rpc_api.py @@ -1,17 +1,6 @@ from src.full_node.full_node import FullNode from typing import Callable, List, Optional, Dict -from src.full_node.full_node_api import FullNodeAPI - -# from src.types.header import Header -from src.types.full_block import FullBlock -from src.util.ints import uint32, uint64, uint128 -from src.types.sized_bytes import bytes32 -from src.util.byte_types import hexstr_to_bytes - -# from src.consensus.pot_iterations import calculate_min_iters_from_iterations -from src.util.ws_message import create_payload - class FullNodeRpcApi: def __init__(self, api: FullNode): @@ -33,7 +22,7 @@ class FullNodeRpcApi: } async def _state_changed(self, change: str) -> List[Dict]: - payloads = [] + # payloads = [] # if change == "sub_block": # data = await self.get_latest_block_headers({}) # assert data is not None diff --git a/src/server/connection_utils.py b/src/server/connection_utils.py index 153fd75564a9..9f807929eada 100644 --- a/src/server/connection_utils.py +++ b/src/server/connection_utils.py @@ -1,18 +1,19 @@ import asyncio -from asyncio import FIRST_COMPLETED from typing import List, Any, Tuple, Optional from src.server.ws_connection import WSChiaConnection -async def send_all_first_reply(func, arg, peers: List[WSChiaConnection]) -> Optional[Tuple[Any, WSChiaConnection]]: +async def send_all_first_reply( + func: str, arg: Any, peers: List[WSChiaConnection] +) -> Optional[Tuple[Any, WSChiaConnection]]: """performs an API request to peers and returns the result of the first response and the peer that sent it.""" - async def do_func(peer, func, arg): - method_to_call = getattr(peer, func) - result = await method_to_call(arg) - if result is not None: - return result, peer + async def do_func(peer_x: WSChiaConnection, func_x: str, arg_x: Any): + method_to_call = getattr(peer_x, func_x) + result_x = await method_to_call(arg_x) + if result_x is not None: + return result_x, peer_x else: await asyncio.sleep(15) return None @@ -21,7 +22,7 @@ async def send_all_first_reply(func, arg, peers: List[WSChiaConnection]) -> Opti for peer in peers: tasks.append(do_func(peer, func, arg)) - done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED) + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) if len(done) > 0: d = done.pop() diff --git a/src/server/node_discovery.py b/src/server/node_discovery.py index 2bb08b27bb52..a29127428046 100644 --- a/src/server/node_discovery.py +++ b/src/server/node_discovery.py @@ -1,7 +1,6 @@ import asyncio import time import math -from asyncio import Queue from pathlib import Path import aiosqlite @@ -43,7 +42,7 @@ class FullNodeDiscovery: log, ): self.server: ChiaServer = server - self.message_queue: Queue = asyncio.Queue() + self.message_queue: asyncio.Queue = asyncio.Queue() self.is_closed = False self.target_outbound_count = target_outbound_count self.peer_db_path = path_from_root(root_path, peer_db_path) diff --git a/src/server/server.py b/src/server/server.py index 2a022ce41666..82a9b92a2f2c 100644 --- a/src/server/server.py +++ b/src/server/server.py @@ -1,14 +1,13 @@ import asyncio import logging import ssl -from asyncio import Queue from pathlib import Path from typing import Any, List, Dict, Tuple, Callable, Optional, Set import aiohttp -from aiohttp import web from aiohttp.web_app import Application from aiohttp.web_runner import TCPSite +from aiohttp import web from src.server.introducer_peers import IntroducerPeers from src.server.outbound_message import NodeType, Message, Payload @@ -93,7 +92,7 @@ class ChiaServer: self.root_path = root_path self.config = config self.on_connect: Optional[Callable] = None - self.incoming_messages: Queue[Tuple[Payload, WSChiaConnection]] = asyncio.Queue() + self.incoming_messages: asyncio.Queue[Tuple[Payload, WSChiaConnection]] = asyncio.Queue() self.shut_down_event = asyncio.Event() if self._local_type is NodeType.INTRODUCER: diff --git a/src/simulator/full_node_simulator.py b/src/simulator/full_node_simulator.py index b49d013c77f3..402bc31b183e 100644 --- a/src/simulator/full_node_simulator.py +++ b/src/simulator/full_node_simulator.py @@ -1,25 +1,16 @@ -from secrets import token_bytes - from typing import AsyncGenerator, List, Optional from src.consensus.sub_block_record import SubBlockRecord from src.full_node.full_node_api import FullNodeAPI -from src.protocols import ( - full_node_protocol, -) from src.protocols.full_node_protocol import RespondSubBlock -from src.server.server import ChiaServer -from src.server.ws_connection import WSChiaConnection -from src.simulator.simulator_protocol import FarmNewBlockProtocol, ReorgProtocol -from src.full_node.bundle_tools import best_solution_program +from src.simulator.simulator_protocol import FarmNewBlockProtocol from src.server.outbound_message import OutboundMessage from src.types.full_block import FullBlock from src.types.spend_bundle import SpendBundle -# from src.types.header import Header from src.util.api_decorators import api_request from src.util.block_tools import BlockTools, test_constants -from src.util.ints import uint64 +from src.util.ints import uint8 OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None] @@ -27,10 +18,10 @@ bt = BlockTools(constants=test_constants) class FullNodeSimulator(FullNodeAPI): - def __init__(self, full_node, bt): + def __init__(self, full_node, block_tools): super().__init__(full_node) self.full_node = full_node - self.bt = bt + self.bt = block_tools async def get_all_full_blocks(self) -> List[FullBlock]: peak: Optional[SubBlockRecord] = self.full_node.blockchain.get_peak() @@ -56,7 +47,7 @@ class FullNodeSimulator(FullNodeAPI): self.log.info("Farming new block!") current_blocks = await self.get_all_full_blocks() if len(current_blocks) == 0: - genesis = bt.get_consecutive_blocks(1, force_overflow=True)[0] + genesis = bt.get_consecutive_blocks(uint8(1), force_overflow=True)[0] await self.full_node.blockchain.receive_block(genesis) peak = self.full_node.blockchain.get_peak() diff --git a/src/simulator/start_simulator.py b/src/simulator/start_simulator.py index b159c860fcb6..a0c838582fda 100644 --- a/src/simulator/start_simulator.py +++ b/src/simulator/start_simulator.py @@ -39,15 +39,6 @@ def service_kwargs_for_full_node_simulator( peer_api = FullNodeSimulator(node, bt) - async def start_callback(): - await node._start() - - def stop_callback(): - node._close() - - async def await_closed_callback(): - await node._await_closed() - kwargs = dict( root_path=root_path, node=node, diff --git a/src/timelord.py b/src/timelord.py index 89470fa3ac30..960dce4ebcca 100644 --- a/src/timelord.py +++ b/src/timelord.py @@ -4,7 +4,6 @@ import io import logging import time import traceback -from asyncio import StreamReader, StreamWriter from enum import Enum from typing import Dict, List, Optional, Tuple, Union @@ -900,8 +899,8 @@ class Timelord: challenge: bytes32, initial_form: ClassgroupElement, ip: str, - reader: StreamReader, - writer: StreamWriter, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, ): disc: int = create_discriminant(challenge, self.constants.DISCRIMINANT_SIZE_BITS) diff --git a/src/types/proof_of_space.py b/src/types/proof_of_space.py index 62f5e4ae7845..f4210032de0f 100644 --- a/src/types/proof_of_space.py +++ b/src/types/proof_of_space.py @@ -1,4 +1,3 @@ -import math from dataclasses import dataclass from typing import Optional diff --git a/src/types/sub_epoch_summary.py b/src/types/sub_epoch_summary.py index 18ff097d867d..6a7b1d8b8112 100644 --- a/src/types/sub_epoch_summary.py +++ b/src/types/sub_epoch_summary.py @@ -2,7 +2,7 @@ from typing import Optional from dataclasses import dataclass from src.types.sized_bytes import bytes32 -from src.util.ints import uint8, uint64, uint128 +from src.util.ints import uint8, uint64 from src.util.streamable import Streamable, streamable diff --git a/src/util/prev_block.py b/src/util/prev_block.py index e0250767aaf1..1efd5de186b1 100644 --- a/src/util/prev_block.py +++ b/src/util/prev_block.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional +from typing import Dict from src.consensus.sub_block_record import SubBlockRecord from src.types.sized_bytes import bytes32 diff --git a/src/wallet/block_record.py b/src/wallet/block_record.py index 04e12ee1f6af..291c51b26492 100644 --- a/src/wallet/block_record.py +++ b/src/wallet/block_record.py @@ -1,10 +1,8 @@ from dataclasses import dataclass -from typing import List, Optional +from typing import List from src.types.coin import Coin from src.types.header_block import HeaderBlock -from src.types.sized_bytes import bytes32 -from src.util.ints import uint128, uint32, uint64 from src.util.streamable import Streamable, streamable @@ -12,7 +10,7 @@ from src.util.streamable import Streamable, streamable @streamable class HeaderBlockRecord(Streamable): """ - These are values that are stored in the wallet database, corresponding to infomation + These are values that are stored in the wallet database, corresponding to information that the wallet cares about in each block """ diff --git a/src/wallet/wallet_block_store.py b/src/wallet/wallet_block_store.py index fe61d62872a5..9068dea30261 100644 --- a/src/wallet/wallet_block_store.py +++ b/src/wallet/wallet_block_store.py @@ -5,7 +5,6 @@ from src.consensus.sub_block_record import SubBlockRecord from src.types.header_block import HeaderBlock from src.wallet.block_record import HeaderBlockRecord from src.types.sized_bytes import bytes32 -from src.util.ints import uint32 class WalletBlockStore: @@ -26,20 +25,12 @@ class WalletBlockStore: " timestamp int, block blob)" ) - await self.db.execute( - "CREATE INDEX IF NOT EXISTS header_hash on header_blocks(header_hash)" - ) + await self.db.execute("CREATE INDEX IF NOT EXISTS header_hash on header_blocks(header_hash)") - await self.db.execute( - "CREATE INDEX IF NOT EXISTS timestamp on header_blocks(timestamp)" - ) + await self.db.execute("CREATE INDEX IF NOT EXISTS timestamp on header_blocks(timestamp)") - await self.db.execute( - "CREATE INDEX IF NOT EXISTS sub_height on header_blocks(sub_height)" - ) - await self.db.execute( - "CREATE INDEX IF NOT EXISTS height on header_blocks(height)" - ) + await self.db.execute("CREATE INDEX IF NOT EXISTS sub_height on header_blocks(sub_height)") + await self.db.execute("CREATE INDEX IF NOT EXISTS height on header_blocks(height)") # Sub block records await self.db.execute( @@ -49,19 +40,11 @@ class WalletBlockStore: ) # Height index so we can look up in order of height for sync purposes - await self.db.execute( - "CREATE INDEX IF NOT EXISTS sub_block_height on sub_block_records(sub_height)" - ) - await self.db.execute( - "CREATE INDEX IF NOT EXISTS height on sub_block_records(height)" - ) + await self.db.execute("CREATE INDEX IF NOT EXISTS sub_block_height on sub_block_records(sub_height)") + await self.db.execute("CREATE INDEX IF NOT EXISTS height on sub_block_records(height)") - await self.db.execute( - "CREATE INDEX IF NOT EXISTS hh on sub_block_records(header_hash)" - ) - await self.db.execute( - "CREATE INDEX IF NOT EXISTS peak on sub_block_records(is_peak)" - ) + await self.db.execute("CREATE INDEX IF NOT EXISTS hh on sub_block_records(header_hash)") + await self.db.execute("CREATE INDEX IF NOT EXISTS peak on sub_block_records(is_peak)") await self.db.commit() await self.db.commit() @@ -76,9 +59,7 @@ class WalletBlockStore: # TODO pass - async def add_block_record( - self, block_record: HeaderBlockRecord, sub_block: SubBlockRecord - ): + async def add_block_record(self, block_record: HeaderBlockRecord, sub_block: SubBlockRecord): """ Adds a block record to the database. This block record is assumed to be connected to the chain, but it may or may not be in the LCA path. @@ -102,12 +83,8 @@ class WalletBlockStore: block_record.header.prev_header_hash.hex(), block_record.header.sub_block_height, block_record.header.height, - block_record.header.weight.to_bytes( - 128 // 8, "big", signed=False - ).hex(), - block_record.header.total_iters.to_bytes( - 128 // 8, "big", signed=False - ).hex(), + block_record.header.weight.to_bytes(128 // 8, "big", signed=False).hex(), + block_record.header.total_iters.to_bytes(128 // 8, "big", signed=False).hex(), bytes(sub_block), False, ), @@ -117,9 +94,7 @@ class WalletBlockStore: async def get_header_block(self, header_hash: bytes32) -> Optional[HeaderBlock]: """Gets a block record from the database, if present""" - cursor = await self.db.execute( - "SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),) - ) + cursor = await self.db.execute("SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),)) row = await cursor.fetchone() await cursor.close() if row is not None: @@ -128,13 +103,9 @@ class WalletBlockStore: else: return None - async def get_header_block_record( - self, header_hash: bytes32 - ) -> Optional[HeaderBlockRecord]: + async def get_header_block_record(self, header_hash: bytes32) -> Optional[HeaderBlockRecord]: """Gets a block record from the database, if present""" - cursor = await self.db.execute( - "SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),) - ) + cursor = await self.db.execute("SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),)) row = await cursor.fetchone() await cursor.close() if row is not None: @@ -143,9 +114,7 @@ class WalletBlockStore: else: return None - async def get_sub_block_record( - self, header_hash: bytes32 - ) -> Optional[SubBlockRecord]: + async def get_sub_block_record(self, header_hash: bytes32) -> Optional[SubBlockRecord]: cursor = await self.db.execute( "SELECT sub_block from sub_block_records WHERE header_hash=?", (header_hash.hex(),), @@ -177,9 +146,7 @@ class WalletBlockStore: return ret, peak async def set_peak(self, header_hash: bytes32) -> None: - cursor_1 = await self.db.execute( - "UPDATE sub_block_records SET is_peak=0 WHERE is_peak=1" - ) + cursor_1 = await self.db.execute("UPDATE sub_block_records SET is_peak=0 WHERE is_peak=1") await cursor_1.close() cursor_2 = await self.db.execute( "UPDATE sub_block_records SET is_peak=1 WHERE header_hash=?", diff --git a/src/wallet/wallet_blockchain.py b/src/wallet/wallet_blockchain.py index 696dd9baa9f9..3fe0903c1e99 100644 --- a/src/wallet/wallet_blockchain.py +++ b/src/wallet/wallet_blockchain.py @@ -3,7 +3,7 @@ import logging from concurrent.futures.process import ProcessPoolExecutor from enum import Enum import multiprocessing -from typing import Dict, List, Optional, Tuple, Any, Callable +from typing import Dict, List, Optional, Tuple, Callable from src.consensus.constants import ConsensusConstants from src.consensus.difficulty_adjustment import ( @@ -11,7 +11,6 @@ from src.consensus.difficulty_adjustment import ( get_next_sub_slot_iters, ) from src.consensus.full_block_to_sub_block_record import block_to_sub_block_record -from src.types.coin import Coin from src.types.end_of_slot_bundle import EndOfSubSlotBundle from src.types.full_block import FullBlock from src.types.header_block import HeaderBlock @@ -26,7 +25,6 @@ from src.consensus.block_header_validation import ( validate_finished_header_block, ) from src.wallet.block_record import HeaderBlockRecord -from src.wallet.wallet_coin_record import WalletCoinRecord from src.wallet.wallet_coin_store import WalletCoinStore from src.wallet.wallet_block_store import WalletBlockStore @@ -44,9 +42,7 @@ class ReceiveBlockResult(Enum): ADDED_AS_ORPHAN = 2 # Added as an orphan/stale block (not a new peak of the chain) INVALID_BLOCK = 3 # Block was not added because it was invalid ALREADY_HAVE_BLOCK = 4 # Block is already present in this blockchain - DISCONNECTED_BLOCK = ( - 5 # Block's parent (previous pointer) is not in this blockchain - ) + DISCONNECTED_BLOCK = 5 # Block's parent (previous pointer) is not in this blockchain class WalletBlockchain: @@ -80,7 +76,7 @@ class WalletBlockchain: async def create( block_store: WalletBlockStore, consensus_constants: ConsensusConstants, - coins_of_interest_received: Callable, # coins_of_interest_received(removals: List[Coin], additions: List[Coin], height: uint32) + coins_of_interest_received: Callable, # f(removals: List[Coin], additions: List[Coin], height: uint32) reorg_rollback: Callable, ): """ @@ -129,15 +125,11 @@ class WalletBlockchain: while True: self.sub_height_to_hash[curr.sub_block_height] = curr.header_hash if curr.sub_epoch_summary_included is not None: - self.sub_epoch_summaries[ - curr.sub_block_height - ] = curr.sub_epoch_summary_included + self.sub_epoch_summaries[curr.sub_block_height] = curr.sub_epoch_summary_included if curr.height == 0: break curr = self.sub_blocks[curr.prev_hash] - assert ( - len(self.sub_blocks) == len(self.sub_height_to_hash) == self.peak_height + 1 - ) + assert len(self.sub_blocks) == len(self.sub_height_to_hash) == self.peak_height + 1 def get_peak(self) -> Optional[SubBlockRecord]: """ @@ -151,9 +143,7 @@ class WalletBlockchain: if self.peak_height is None: return None """ Return list of FullBlocks that are peaks""" - block = await self.block_store.get_header_block( - self.sub_height_to_hash[self.peak_height] - ) + block = await self.block_store.get_header_block(self.sub_height_to_hash[self.peak_height]) assert block is not None return block @@ -187,8 +177,6 @@ class WalletBlockchain: invalid. Also returns the fork height, in the case of a new peak. """ block = block_record.header - additions = block_record.additions - removals = block_record.removals genesis: bool = block.sub_block_height == 0 if block.header_hash in self.sub_blocks: @@ -210,9 +198,7 @@ class WalletBlockchain: ) if error is not None: - log.error( - f"block {block.header_hash} failed validation {error.code} {error.error_msg}" - ) + log.error(f"block {block.header_hash} failed validation {error.code} {error.error_msg}") return ReceiveBlockResult.INVALID_BLOCK, error.code, None sub_block = block_to_sub_block_record( @@ -235,9 +221,7 @@ class WalletBlockchain: else: return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None - async def _reconsider_peak( - self, sub_block: SubBlockRecord, genesis: bool - ) -> Optional[uint32]: + async def _reconsider_peak(self, sub_block: SubBlockRecord, genesis: bool) -> Optional[uint32]: """ When a new block is added, this is called, to check if the new block is the new peak of the chain. This also handles reorgs by reverting blocks which are not in the heaviest chain. @@ -246,17 +230,13 @@ class WalletBlockchain: """ if genesis: if self.get_peak() is None: - block: Optional[ - HeaderBlockRecord - ] = await self.block_store.get_header_block_record( + block: Optional[HeaderBlockRecord] = await self.block_store.get_header_block_record( sub_block.header_hash ) assert block is not None for removed in block.removals: self.log.info(f"Removed: {removed.name()}") - await self.coins_of_interest_received( - block.removals, block.additions, block.height - ) + await self.coins_of_interest_received(block.removals, block.additions, block.height) self.sub_height_to_hash[uint32(0)] = block.header_hash self.peak_height = uint32(0) return uint32(0) @@ -270,8 +250,11 @@ class WalletBlockchain: fork_h: int = find_fork_point_in_chain(self.sub_blocks, sub_block, peak) # Rollback to fork + # TODO(straya): reorg coins based on height not sub-block height + # TODO(straya): handle fork_h == -1 case self.log.info( - f"fork_h: {fork_h}, {sub_block.height}, {sub_block.sub_block_height}, {peak.sub_block_height}, {peak.height}" + f"fork_h: {fork_h}, {sub_block.height}, {sub_block.sub_block_height}, {peak.sub_block_height}, " + f"{peak.height}" ) fork_hash = self.sub_height_to_hash[fork_h] fork_block = self.sub_blocks[fork_hash] @@ -289,12 +272,8 @@ class WalletBlockchain: blocks_to_add: List[Tuple[HeaderBlockRecord, SubBlockRecord]] = [] curr = sub_block.header_hash while fork_h < 0 or curr != self.sub_height_to_hash[uint32(fork_h)]: - fetched_block: Optional[ - HeaderBlockRecord - ] = await self.block_store.get_header_block_record(curr) - fetched_sub_block: Optional[ - SubBlockRecord - ] = await self.block_store.get_sub_block_record(curr) + fetched_block: Optional[HeaderBlockRecord] = await self.block_store.get_header_block_record(curr) + fetched_sub_block: Optional[SubBlockRecord] = await self.block_store.get_sub_block_record(curr) assert fetched_block is not None assert fetched_sub_block is not None blocks_to_add.append((fetched_block, fetched_sub_block)) @@ -304,9 +283,7 @@ class WalletBlockchain: curr = fetched_sub_block.prev_hash for fetched_block, fetched_sub_block in reversed(blocks_to_add): - self.sub_height_to_hash[ - fetched_sub_block.sub_block_height - ] = fetched_sub_block.header_hash + self.sub_height_to_hash[fetched_sub_block.sub_block_height] = fetched_sub_block.header_hash if fetched_sub_block.is_block: await self.coins_of_interest_received( fetched_block.removals, @@ -363,9 +340,7 @@ class WalletBlockchain: async def get_sp_and_ip_sub_slots( self, header_hash: bytes32 ) -> Optional[Tuple[Optional[EndOfSubSlotBundle], Optional[EndOfSubSlotBundle]]]: - block: Optional[FullBlock] = await self.block_store.get_header_block( - header_hash - ) + block: Optional[FullBlock] = await self.block_store.get_header_block(header_hash) is_overflow = self.sub_blocks[block.header_hash].overflow if block is None: return None diff --git a/src/wallet/wallet_node.py b/src/wallet/wallet_node.py index c0e2f9cf3cf8..6301a862f390 100644 --- a/src/wallet/wallet_node.py +++ b/src/wallet/wallet_node.py @@ -1,7 +1,5 @@ import asyncio import json -import random -import time import traceback from typing import Dict, Optional, Tuple, List, AsyncGenerator, Callable, Union from pathlib import Path @@ -14,7 +12,6 @@ from src.protocols.wallet_protocol import ( RequestSubBlockHeader, RespondSubBlockHeader, RequestAdditions, - RequestRemovals, RespondAdditions, RespondRemovals, RejectRemovalsRequest, @@ -109,9 +106,7 @@ class WalletNode: def get_key_for_fingerprint(self, fingerprint): private_keys = self.keychain.get_all_private_keys() if len(private_keys) == 0: - self.log.warning( - "No keys present. Create keys with the UI, or with the 'chia keys' program." - ) + self.log.warning("No keys present. Create keys with the UI, or with the 'chia keys' program.") return None private_key: Optional[PrivateKey] = None @@ -136,22 +131,16 @@ class WalletNode: return False db_path_key_suffix = str(private_key.get_g1().get_fingerprint()) - path = path_from_root( - self.root_path, f"{self.config['database_path']}-{db_path_key_suffix}" - ) + path = path_from_root(self.root_path, f"{self.config['database_path']}-{db_path_key_suffix}") mkdir(path.parent) - self.wallet_state_manager = await WalletStateManager.create( - private_key, self.config, path, self.constants - ) + self.wallet_state_manager = await WalletStateManager.create(private_key, self.config, path, self.constants) self.wsm_close_task = None assert self.wallet_state_manager is not None - backup_settings: BackupInitialized = ( - self.wallet_state_manager.user_settings.get_backup_settings() - ) + backup_settings: BackupInitialized = self.wallet_state_manager.user_settings.get_backup_settings() if backup_settings.user_initialized is False: if new_wallet is True: await self.wallet_state_manager.user_settings.user_created_new_wallet() @@ -168,14 +157,10 @@ class WalletNode: self.backup_initialized = True if backup_file is not None: - json_dict = open_backup_file( - backup_file, self.wallet_state_manager.private_key - ) + json_dict = open_backup_file(backup_file, self.wallet_state_manager.private_key) if "start_height" in json_dict["data"]: start_height = json_dict["data"]["start_height"] - self.config["starting_height"] = max( - 0, start_height - self.config["start_height_buffer"] - ) + self.config["starting_height"] = max(0, start_height - self.config["start_height_buffer"]) else: self.config["starting_height"] = 0 else: @@ -195,12 +180,8 @@ class WalletNode: self._shut_down = True if self.wallet_state_manager is None: return - self.wsm_close_task = asyncio.create_task( - self.wallet_state_manager.close_all_stores() - ) - self.wallet_peers_task = asyncio.create_task( - self.wallet_peers.ensure_is_closed() - ) + self.wsm_close_task = asyncio.create_task(self.wallet_state_manager.close_all_stores()) + self.wallet_peers_task = asyncio.create_task(self.wallet_peers.ensure_is_closed()) async def _await_closed(self): if self.wallet_state_manager is None or self.backup_initialized is False: @@ -225,9 +206,7 @@ class WalletNode: async def _action_messages(self) -> List[Message]: if self.wallet_state_manager is None or self.backup_initialized is False: return [] - actions: List[ - WalletAction - ] = await self.wallet_state_manager.action_store.get_all_pending_actions() + actions: List[WalletAction] = await self.wallet_state_manager.action_store.get_all_pending_actions() result: List[Message] = [] for action in actions: data = json.loads(action.data) @@ -273,17 +252,11 @@ class WalletNode: await self.server.send_to_all([msg], NodeType.FULL_NODE) async def _messages_to_resend(self) -> List[Message]: - if ( - self.wallet_state_manager is None - or self.backup_initialized is False - or self._shut_down - ): + if self.wallet_state_manager is None or self.backup_initialized is False or self._shut_down: return [] messages: List[Message] = [] - records: List[ - TransactionRecord - ] = await self.wallet_state_manager.tx_store.get_not_sent() + records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent() for record in records: if record.spend_bundle is None: @@ -333,21 +306,15 @@ class WalletNode: self.config["full_node_peer"]["port"], ) peers = [c.get_peer_info() for c in self.server.get_full_node_connections()] - full_node_resolved = PeerInfo( - socket.gethostbyname(full_node_peer.host), full_node_peer.port - ) + full_node_resolved = PeerInfo(socket.gethostbyname(full_node_peer.host), full_node_peer.port) if full_node_peer in peers or full_node_resolved in peers: - self.log.info( - f"Will not attempt to connect to other nodes, already connected to {full_node_peer}" - ) + self.log.info(f"Will not attempt to connect to other nodes, already connected to {full_node_peer}") for connection in self.server.get_full_node_connections(): if ( connection.get_peer_info() != full_node_peer and connection.get_peer_info() != full_node_resolved ): - self.log.info( - f"Closing unnecessary connection to {connection.get_peer_info()}." - ) + self.log.info(f"Closing unnecessary connection to {connection.get_peer_info()}.") asyncio.create_task(connection.close()) return True return False @@ -388,9 +355,7 @@ class WalletNode: return request = wallet_protocol.RequestSubBlockHeader(peak.sub_block_height) - response: Optional[RespondSubBlockHeader] = await peer.request_sub_block_header( - request - ) + response: Optional[RespondSubBlockHeader] = await peer.request_sub_block_header(request) header_block = response.header_block if header_block is not None: @@ -404,9 +369,7 @@ class WalletNode: async def check_new_peak(self): await asyncio.sleep(1) - current_peak: Optional[ - SubBlockRecord - ] = self.wallet_state_manager.blockchain.get_peak() + current_peak: Optional[SubBlockRecord] = self.wallet_state_manager.blockchain.get_peak() if current_peak is None: return potential_peaks: List[ @@ -455,10 +418,7 @@ class WalletNode: highest_weight = potential_peak_block.weight peak_height = potential_peak_block.height - if ( - self.wallet_state_manager.peak is not None - and highest_weight <= self.wallet_state_manager.peak.weight - ): + if self.wallet_state_manager.peak is not None and highest_weight <= self.wallet_state_manager.peak.weight: self.log.info("Not performing sync, already caught up.") return @@ -473,15 +433,11 @@ class WalletNode: for i in range(0, peak_height + 1): self.log.info(f"Requesting block {i}") request = RequestSubBlockHeader(i) - response, peer = await send_all_first_reply( - "request_sub_block_header", request, peers - ) + response, peer = await send_all_first_reply("request_sub_block_header", request, peers) peer: WSChiaConnection = peer res: RespondSubBlockHeader = response block_i: HeaderBlock = res.header_block - self.log.error( - f"Received block {block_i.sub_block_height}, {block_i.height}, {block_i.is_block}" - ) + self.log.error(f"Received block {block_i.sub_block_height}, {block_i.height}, {block_i.is_block}") if block_i is None: continue @@ -490,9 +446,7 @@ class WalletNode: ( additions, removals, - ) = await self.wallet_state_manager.get_filter_additions_removals( - block_i, block_i.transactions_filter - ) + ) = await self.wallet_state_manager.get_filter_additions_removals(block_i, block_i.transactions_filter) # Get Additions added_coins = await self.get_additions(peer, block_i, additions) @@ -500,15 +454,11 @@ class WalletNode: raise ValueError("Failed to fetch additions") # Get removals - removed_coins = await self.get_removals( - peer, block_i, added_coins, removals - ) + removed_coins = await self.get_removals(peer, block_i, added_coins, removals) if removed_coins is None: raise ValueError("Failed to fetch removals") - header_block_record = HeaderBlockRecord( - block_i, added_coins, removed_coins - ) + header_block_record = HeaderBlockRecord(block_i, added_coins, removed_coins) else: header_block_record = HeaderBlockRecord(block_i, [], []) @@ -518,9 +468,7 @@ class WalletNode: result, error, fork_h, - ) = await self.wallet_state_manager.blockchain.receive_block( - header_block_record - ) + ) = await self.wallet_state_manager.blockchain.receive_block(header_block_record) if result == ReceiveBlockResult.NEW_PEAK: self.wallet_state_manager.state_changed("new_block") @@ -590,7 +538,7 @@ class WalletNode: # Verify removals root removals_merkle_set = MerkleSet() - for coin in all_coins: + for coin in coins: if coin is not None: removals_merkle_set.add_already_hashed(coin.name()) removals_root = removals_merkle_set.get_root() @@ -628,16 +576,12 @@ class WalletNode: return False return True - async def get_additions( - self, peer: WSChiaConnection, block_i, additions - ) -> Optional[List[Coin]]: + async def get_additions(self, peer: WSChiaConnection, block_i, additions) -> Optional[List[Coin]]: if len(additions) > 0: - additions_request = RequestAdditions( - block_i.sub_block_height, block_i.header_hash, additions + additions_request = RequestAdditions(block_i.sub_block_height, block_i.header_hash, additions) + additions_res: Optional[Union[RespondAdditions, RejectAdditionsRequest]] = await peer.request_additions( + additions_request ) - additions_res: Optional[ - Union[RespondAdditions, RejectAdditionsRequest] - ] = await peer.request_additions(additions_request) if additions_res is None: await peer.close() return None @@ -663,37 +607,28 @@ class WalletNode: added_coins = [] return added_coins - async def get_removals( - self, peer: WSChiaConnection, block_i, additions, removals - ) -> Optional[List[Coin]]: + async def get_removals(self, peer: WSChiaConnection, block_i, additions, removals) -> Optional[List[Coin]]: request_all_removals = False # Check if we need all removals for coin in additions: puzzle_store = self.wallet_state_manager.puzzle_store - record_info: Optional[ - DerivationRecord - ] = await puzzle_store.get_derivation_record_for_puzzle_hash( + record_info: Optional[DerivationRecord] = await puzzle_store.get_derivation_record_for_puzzle_hash( coin.puzzle_hash.hex() ) - if ( - record_info is not None - and record_info.wallet_type == WalletType.COLOURED_COIN - ): + if record_info is not None and record_info.wallet_type == WalletType.COLOURED_COIN: request_all_removals = True break if len(removals) > 0 or request_all_removals: if request_all_removals: - removals_request = wallet_protocol.RequestRemovals( - block_i.sub_block_height, block_i.header_hash, None - ) + removals_request = wallet_protocol.RequestRemovals(block_i.sub_block_height, block_i.header_hash, None) else: removals_request = wallet_protocol.RequestRemovals( block_i.sub_block_height, block_i.header_hash, removals ) - removals_res: Optional[ - Union[RespondRemovals, RejectRemovalsRequest] - ] = await peer.request_removals(removals_request) + removals_res: Optional[Union[RespondRemovals, RejectRemovalsRequest]] = await peer.request_removals( + removals_request + ) if removals_res is None: return None elif isinstance(removals_res, RespondRemovals): diff --git a/src/wallet/wallet_node_api.py b/src/wallet/wallet_node_api.py index 6e2f4f853ed7..83bd789da147 100644 --- a/src/wallet/wallet_node_api.py +++ b/src/wallet/wallet_node_api.py @@ -1,12 +1,9 @@ -import traceback -from typing import List, Optional +from typing import List from src.protocols import wallet_protocol -from src.server.outbound_message import Message from src.server.ws_connection import WSChiaConnection -from src.types.coin import Coin, hash_coin_list +from src.types.coin import Coin from src.types.mempool_inclusion_status import MempoolInclusionStatus -from src.types.sized_bytes import bytes32 from src.util.api_decorators import api_request, peer_required from src.util.errors import Err from src.util.merkle_set import ( @@ -14,8 +11,6 @@ from src.util.merkle_set import ( confirm_not_included_already_hashed, confirm_included_already_hashed, ) -from src.wallet.derivation_record import DerivationRecord -from src.wallet.util.wallet_types import WalletType from src.wallet.wallet_node import WalletNode @@ -27,17 +22,12 @@ class WalletNodeAPI: @peer_required @api_request - async def respond_removals( - self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection - ): + async def respond_removals(self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection): """ The full node has responded with the removals for a block. We will use this to try to finish the block, and add it to the state. """ - if ( - self.wallet_node.wallet_state_manager is None - or self.wallet_node.backup_initialized is False - ): + if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False: return if self.wallet_node._shut_down: return @@ -45,14 +35,10 @@ class WalletNodeAPI: response.header_hash not in self.wallet_node.cached_blocks or self.wallet_node.cached_blocks[response.header_hash][0].additions is None ): - self.wallet_node.log.warning( - "Do not have header for removals, or do not have additions" - ) + self.wallet_node.log.warning("Do not have header for removals, or do not have additions") return - block_record, header_block, transaction_filter = self.wallet_node.cached_blocks[ - response.header_hash - ] + block_record, header_block, transaction_filter = self.wallet_node.cached_blocks[response.header_hash] assert response.height == block_record.height all_coins: List[Coin] = [] @@ -124,32 +110,22 @@ class WalletNodeAPI: # await self.respond_header(respond_header_msg, peer) @api_request - async def reject_removals_request( - self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection - ): + async def reject_removals_request(self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection): """ The full node has rejected our request for removals. """ # TODO(mariano): implement - if ( - self.wallet_node.wallet_state_manager is None - or self.wallet_node.backup_initialized is False - ): + if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False: return self.wallet_node.log.error("Removals request rejected") @api_request - async def reject_additions_request( - self, response: wallet_protocol.RejectAdditionsRequest - ): + async def reject_additions_request(self, response: wallet_protocol.RejectAdditionsRequest): """ The full node has rejected our request for additions. """ # TODO(mariano): implement - if ( - self.wallet_node.wallet_state_manager is None - or self.wallet_node.backup_initialized is False - ): + if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False: return self.wallet_node.log.error("Additions request rejected") @@ -162,52 +138,37 @@ class WalletNodeAPI: await self.wallet_node.new_peak(peak, peer) @api_request - async def reject_header_request( - self, response: wallet_protocol.RejectHeaderRequest - ): + async def reject_header_request(self, response: wallet_protocol.RejectHeaderRequest): """ The full node has rejected our request for a header. """ # TODO(mariano): implement - if ( - self.wallet_node.wallet_state_manager is None - or self.wallet_node.backup_initialized is False - ): + if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False: return self.wallet_node.log.error("Header request rejected") @api_request - async def respond_sub_block_header( - self, response: wallet_protocol.RespondSubBlockHeader - ): + async def respond_sub_block_header(self, response: wallet_protocol.RespondSubBlockHeader): pass @peer_required @api_request - async def respond_additions( - self, response: wallet_protocol.RespondAdditions, peer: WSChiaConnection - ): + async def respond_additions(self, response: wallet_protocol.RespondAdditions, peer: WSChiaConnection): pass @peer_required @api_request - async def reject_additions_request( - self, response: wallet_protocol.RejectAdditionsRequest, peer: WSChiaConnection - ): + async def reject_additions_request(self, response: wallet_protocol.RejectAdditionsRequest, peer: WSChiaConnection): pass @peer_required @api_request - async def respond_removals( - self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection - ): + async def respond_removals(self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection): pass @peer_required @api_request - async def reject_removals_request( - self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection - ): + async def reject_removals_request(self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection): pass # if ( @@ -357,40 +318,25 @@ class WalletNodeAPI: @peer_required @api_request - async def transaction_ack( - self, ack: wallet_protocol.TransactionAck, peer: WSChiaConnection - ): + async def transaction_ack(self, ack: wallet_protocol.TransactionAck, peer: WSChiaConnection): """ This is an ack for our previous SendTransaction call. This removes the transaction from the send queue if we have sent it to enough nodes. """ assert peer.peer_node_id is not None name = peer.peer_node_id.hex() - if ( - self.wallet_node.wallet_state_manager is None - or self.wallet_node.backup_initialized is False - ): + if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False: return if ack.status == MempoolInclusionStatus.SUCCESS: - self.wallet_node.log.info( - f"SpendBundle has been received and accepted to mempool by the FullNode. {ack}" - ) + self.wallet_node.log.info(f"SpendBundle has been received and accepted to mempool by the FullNode. {ack}") elif ack.status == MempoolInclusionStatus.PENDING: - self.wallet_node.log.info( - f"SpendBundle has been received (and is pending) by the FullNode. {ack}" - ) + self.wallet_node.log.info(f"SpendBundle has been received (and is pending) by the FullNode. {ack}") else: - self.wallet_node.log.warning( - f"SpendBundle has been rejected by the FullNode. {ack}" - ) + self.wallet_node.log.warning(f"SpendBundle has been rejected by the FullNode. {ack}") if ack.error is not None: - await self.wallet_node.wallet_state_manager.remove_from_queue( - ack.txid, name, ack.status, Err[ack.error] - ) + await self.wallet_node.wallet_state_manager.remove_from_queue(ack.txid, name, ack.status, Err[ack.error]) else: - await self.wallet_node.wallet_state_manager.remove_from_queue( - ack.txid, name, ack.status, None - ) + await self.wallet_node.wallet_state_manager.remove_from_queue(ack.txid, name, ack.status, None) # @api_request # async def respond_all_proof_hashes( diff --git a/src/wallet/wallet_state_manager.py b/src/wallet/wallet_state_manager.py index b1ab03625c61..e345a03a0c61 100644 --- a/src/wallet/wallet_state_manager.py +++ b/src/wallet/wallet_state_manager.py @@ -13,7 +13,6 @@ from chiabip158 import PyBIP158 from blspy import PrivateKey, G1Element, AugSchemeMPL from cryptography.fernet import Fernet -from src.consensus.blockchain import ReceiveBlockResult from src.consensus.constants import ConsensusConstants from src.consensus.sub_block_record import SubBlockRecord from src.types.coin import Coin