This commit is contained in:
Mariano Sorgente 2020-12-10 14:40:12 +09:00 committed by Yostra
parent 231bfea165
commit 6ea2103136
23 changed files with 124 additions and 360 deletions

View File

@ -1,4 +1,3 @@
import time
from typing import Optional, Dict, List
from src.types.condition_var_pair import ConditionVarPair

View File

@ -1,4 +1,4 @@
from typing import Dict, Optional, List
from typing import Dict, Optional
from src.consensus.constants import ConsensusConstants
from src.consensus.pot_iterations import is_overflow_sub_block

View File

@ -102,9 +102,6 @@ class FarmerAPI:
"""
There are two cases: receiving signatures for sps, or receiving signatures for the block.
"""
import time
start = time.time()
if response.sp_hash not in self.farmer.sps:
self.farmer.log.warning(f"Do not have challenge hash {response.challenge_hash}")
return

View File

@ -9,7 +9,7 @@ from typing import AsyncGenerator, Optional, Dict, Callable, List, Tuple, Any, U
import aiosqlite
from blspy import AugSchemeMPL
import src.server.ws_connection as ws
import src.server.ws_connection as ws # lgtm [py/import-and-import-from]
from src.consensus.blockchain import Blockchain, ReceiveBlockResult
from src.consensus.constants import ConsensusConstants
from src.consensus.difficulty_adjustment import get_sub_slot_iters_and_difficulty, can_finish_sub_and_full_epoch
@ -23,7 +23,7 @@ from src.full_node.mempool_manager import MempoolManager
from src.full_node.sync_blocks_processor import SyncBlocksProcessor
from src.full_node.sync_peers_handler import SyncPeersHandler
from src.full_node.sync_store import SyncStore
from src.full_node.weight_proof import WeightProofHandler, BlockCache, BlockCacheMock
from src.full_node.weight_proof import WeightProofHandler, BlockCache
from src.protocols import (
full_node_protocol,
timelord_protocol,
@ -40,7 +40,7 @@ from src.types.sized_bytes import bytes32
from src.types.sub_epoch_summary import SubEpochSummary
from src.types.unfinished_block import UnfinishedBlock
from src.types.weight_proof import WeightProof
from src.util.errors import ConsensusError, Err
from src.util.errors import ConsensusError
from src.util.ints import uint32, uint128, uint8
from src.util.path import mkdir, path_from_root
@ -127,23 +127,6 @@ class FullNode:
False,
self.blockchain.sub_blocks,
)
try:
"""
self.full_node_peers = FullNodePeers(
self.server,
self.root_path,
self.global_connections,
self.config["target_peer_count"] - self.config["target_outbound_peer_count"],
self.config["target_outbound_peer_count"],
self.config["peer_db_path"],
self.config["introducer_peer"],
self.config["peer_connect_interval"],
self.log,
)
await self.full_node_peers.start()
"""
except Exception as e:
self.log.error(f"Exception in peer discovery: {e}")
def set_server(self, server: ChiaServer):
self.server = server
@ -283,7 +266,6 @@ class FullNode:
highest_weight: uint128 = uint128(0)
target_peak_sb_height: uint32 = uint32(0)
sync_start_time = time.time()
peak_hash: bytes32 = None
# Based on responses from peers about the current heads, see which head is the heaviest
# (similar to longest chain rule).
@ -298,7 +280,6 @@ class FullNode:
if potential_peak_block.weight > highest_weight:
highest_weight = potential_peak_block.weight
target_peak_sb_height = potential_peak_block.sub_block_height
peak_hash = header_hash
if self.blockchain.get_peak() is not None and highest_weight <= self.blockchain.get_peak().weight:
self.log.info("Not performing sync, already caught up.")

View File

@ -2,9 +2,9 @@ import asyncio
import dataclasses
import time
from pathlib import Path
from typing import Optional, Callable, List, Tuple
from typing import Callable, List, Tuple
from blspy import AugSchemeMPL, G2Element, G1Element
from blspy import AugSchemeMPL, G2Element
from chiapos import DiskProver
from src.consensus.pot_iterations import calculate_sp_interval_iters, calculate_iterations_quality

View File

@ -10,7 +10,7 @@ from src.types.sized_bytes import bytes32
from src.types.vdf import VDFInfo, VDFProof
from src.types.weight_proof import WeightProof
from src.util.cbor_message import cbor_message
from src.util.ints import uint8, uint32, uint64, uint128, int32
from src.util.ints import uint8, uint32, uint64, uint128
from src.types.peer_info import TimestampedPeerInfo
"""

View File

@ -1,7 +1,6 @@
from typing import Callable, Set, Dict, List
from src.farmer import Farmer
from src.util.ws_message import create_payload
class FarmerRpcApi:

View File

@ -1,17 +1,6 @@
from src.full_node.full_node import FullNode
from typing import Callable, List, Optional, Dict
from src.full_node.full_node_api import FullNodeAPI
# from src.types.header import Header
from src.types.full_block import FullBlock
from src.util.ints import uint32, uint64, uint128
from src.types.sized_bytes import bytes32
from src.util.byte_types import hexstr_to_bytes
# from src.consensus.pot_iterations import calculate_min_iters_from_iterations
from src.util.ws_message import create_payload
class FullNodeRpcApi:
def __init__(self, api: FullNode):
@ -33,7 +22,7 @@ class FullNodeRpcApi:
}
async def _state_changed(self, change: str) -> List[Dict]:
payloads = []
# payloads = []
# if change == "sub_block":
# data = await self.get_latest_block_headers({})
# assert data is not None

View File

@ -1,18 +1,19 @@
import asyncio
from asyncio import FIRST_COMPLETED
from typing import List, Any, Tuple, Optional
from src.server.ws_connection import WSChiaConnection
async def send_all_first_reply(func, arg, peers: List[WSChiaConnection]) -> Optional[Tuple[Any, WSChiaConnection]]:
async def send_all_first_reply(
func: str, arg: Any, peers: List[WSChiaConnection]
) -> Optional[Tuple[Any, WSChiaConnection]]:
"""performs an API request to peers and returns the result of the first response and the peer that sent it."""
async def do_func(peer, func, arg):
method_to_call = getattr(peer, func)
result = await method_to_call(arg)
if result is not None:
return result, peer
async def do_func(peer_x: WSChiaConnection, func_x: str, arg_x: Any):
method_to_call = getattr(peer_x, func_x)
result_x = await method_to_call(arg_x)
if result_x is not None:
return result_x, peer_x
else:
await asyncio.sleep(15)
return None
@ -21,7 +22,7 @@ async def send_all_first_reply(func, arg, peers: List[WSChiaConnection]) -> Opti
for peer in peers:
tasks.append(do_func(peer, func, arg))
done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if len(done) > 0:
d = done.pop()

View File

@ -1,7 +1,6 @@
import asyncio
import time
import math
from asyncio import Queue
from pathlib import Path
import aiosqlite
@ -43,7 +42,7 @@ class FullNodeDiscovery:
log,
):
self.server: ChiaServer = server
self.message_queue: Queue = asyncio.Queue()
self.message_queue: asyncio.Queue = asyncio.Queue()
self.is_closed = False
self.target_outbound_count = target_outbound_count
self.peer_db_path = path_from_root(root_path, peer_db_path)

View File

@ -1,14 +1,13 @@
import asyncio
import logging
import ssl
from asyncio import Queue
from pathlib import Path
from typing import Any, List, Dict, Tuple, Callable, Optional, Set
import aiohttp
from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_runner import TCPSite
from aiohttp import web
from src.server.introducer_peers import IntroducerPeers
from src.server.outbound_message import NodeType, Message, Payload
@ -93,7 +92,7 @@ class ChiaServer:
self.root_path = root_path
self.config = config
self.on_connect: Optional[Callable] = None
self.incoming_messages: Queue[Tuple[Payload, WSChiaConnection]] = asyncio.Queue()
self.incoming_messages: asyncio.Queue[Tuple[Payload, WSChiaConnection]] = asyncio.Queue()
self.shut_down_event = asyncio.Event()
if self._local_type is NodeType.INTRODUCER:

View File

@ -1,25 +1,16 @@
from secrets import token_bytes
from typing import AsyncGenerator, List, Optional
from src.consensus.sub_block_record import SubBlockRecord
from src.full_node.full_node_api import FullNodeAPI
from src.protocols import (
full_node_protocol,
)
from src.protocols.full_node_protocol import RespondSubBlock
from src.server.server import ChiaServer
from src.server.ws_connection import WSChiaConnection
from src.simulator.simulator_protocol import FarmNewBlockProtocol, ReorgProtocol
from src.full_node.bundle_tools import best_solution_program
from src.simulator.simulator_protocol import FarmNewBlockProtocol
from src.server.outbound_message import OutboundMessage
from src.types.full_block import FullBlock
from src.types.spend_bundle import SpendBundle
# from src.types.header import Header
from src.util.api_decorators import api_request
from src.util.block_tools import BlockTools, test_constants
from src.util.ints import uint64
from src.util.ints import uint8
OutboundMessageGenerator = AsyncGenerator[OutboundMessage, None]
@ -27,10 +18,10 @@ bt = BlockTools(constants=test_constants)
class FullNodeSimulator(FullNodeAPI):
def __init__(self, full_node, bt):
def __init__(self, full_node, block_tools):
super().__init__(full_node)
self.full_node = full_node
self.bt = bt
self.bt = block_tools
async def get_all_full_blocks(self) -> List[FullBlock]:
peak: Optional[SubBlockRecord] = self.full_node.blockchain.get_peak()
@ -56,7 +47,7 @@ class FullNodeSimulator(FullNodeAPI):
self.log.info("Farming new block!")
current_blocks = await self.get_all_full_blocks()
if len(current_blocks) == 0:
genesis = bt.get_consecutive_blocks(1, force_overflow=True)[0]
genesis = bt.get_consecutive_blocks(uint8(1), force_overflow=True)[0]
await self.full_node.blockchain.receive_block(genesis)
peak = self.full_node.blockchain.get_peak()

View File

@ -39,15 +39,6 @@ def service_kwargs_for_full_node_simulator(
peer_api = FullNodeSimulator(node, bt)
async def start_callback():
await node._start()
def stop_callback():
node._close()
async def await_closed_callback():
await node._await_closed()
kwargs = dict(
root_path=root_path,
node=node,

View File

@ -4,7 +4,6 @@ import io
import logging
import time
import traceback
from asyncio import StreamReader, StreamWriter
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
@ -900,8 +899,8 @@ class Timelord:
challenge: bytes32,
initial_form: ClassgroupElement,
ip: str,
reader: StreamReader,
writer: StreamWriter,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
):
disc: int = create_discriminant(challenge, self.constants.DISCRIMINANT_SIZE_BITS)

View File

@ -1,4 +1,3 @@
import math
from dataclasses import dataclass
from typing import Optional

View File

@ -2,7 +2,7 @@ from typing import Optional
from dataclasses import dataclass
from src.types.sized_bytes import bytes32
from src.util.ints import uint8, uint64, uint128
from src.util.ints import uint8, uint64
from src.util.streamable import Streamable, streamable

View File

@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Dict
from src.consensus.sub_block_record import SubBlockRecord
from src.types.sized_bytes import bytes32

View File

@ -1,10 +1,8 @@
from dataclasses import dataclass
from typing import List, Optional
from typing import List
from src.types.coin import Coin
from src.types.header_block import HeaderBlock
from src.types.sized_bytes import bytes32
from src.util.ints import uint128, uint32, uint64
from src.util.streamable import Streamable, streamable
@ -12,7 +10,7 @@ from src.util.streamable import Streamable, streamable
@streamable
class HeaderBlockRecord(Streamable):
"""
These are values that are stored in the wallet database, corresponding to infomation
These are values that are stored in the wallet database, corresponding to information
that the wallet cares about in each block
"""

View File

@ -5,7 +5,6 @@ from src.consensus.sub_block_record import SubBlockRecord
from src.types.header_block import HeaderBlock
from src.wallet.block_record import HeaderBlockRecord
from src.types.sized_bytes import bytes32
from src.util.ints import uint32
class WalletBlockStore:
@ -26,20 +25,12 @@ class WalletBlockStore:
" timestamp int, block blob)"
)
await self.db.execute(
"CREATE INDEX IF NOT EXISTS header_hash on header_blocks(header_hash)"
)
await self.db.execute("CREATE INDEX IF NOT EXISTS header_hash on header_blocks(header_hash)")
await self.db.execute(
"CREATE INDEX IF NOT EXISTS timestamp on header_blocks(timestamp)"
)
await self.db.execute("CREATE INDEX IF NOT EXISTS timestamp on header_blocks(timestamp)")
await self.db.execute(
"CREATE INDEX IF NOT EXISTS sub_height on header_blocks(sub_height)"
)
await self.db.execute(
"CREATE INDEX IF NOT EXISTS height on header_blocks(height)"
)
await self.db.execute("CREATE INDEX IF NOT EXISTS sub_height on header_blocks(sub_height)")
await self.db.execute("CREATE INDEX IF NOT EXISTS height on header_blocks(height)")
# Sub block records
await self.db.execute(
@ -49,19 +40,11 @@ class WalletBlockStore:
)
# Height index so we can look up in order of height for sync purposes
await self.db.execute(
"CREATE INDEX IF NOT EXISTS sub_block_height on sub_block_records(sub_height)"
)
await self.db.execute(
"CREATE INDEX IF NOT EXISTS height on sub_block_records(height)"
)
await self.db.execute("CREATE INDEX IF NOT EXISTS sub_block_height on sub_block_records(sub_height)")
await self.db.execute("CREATE INDEX IF NOT EXISTS height on sub_block_records(height)")
await self.db.execute(
"CREATE INDEX IF NOT EXISTS hh on sub_block_records(header_hash)"
)
await self.db.execute(
"CREATE INDEX IF NOT EXISTS peak on sub_block_records(is_peak)"
)
await self.db.execute("CREATE INDEX IF NOT EXISTS hh on sub_block_records(header_hash)")
await self.db.execute("CREATE INDEX IF NOT EXISTS peak on sub_block_records(is_peak)")
await self.db.commit()
await self.db.commit()
@ -76,9 +59,7 @@ class WalletBlockStore:
# TODO
pass
async def add_block_record(
self, block_record: HeaderBlockRecord, sub_block: SubBlockRecord
):
async def add_block_record(self, block_record: HeaderBlockRecord, sub_block: SubBlockRecord):
"""
Adds a block record to the database. This block record is assumed to be connected
to the chain, but it may or may not be in the LCA path.
@ -102,12 +83,8 @@ class WalletBlockStore:
block_record.header.prev_header_hash.hex(),
block_record.header.sub_block_height,
block_record.header.height,
block_record.header.weight.to_bytes(
128 // 8, "big", signed=False
).hex(),
block_record.header.total_iters.to_bytes(
128 // 8, "big", signed=False
).hex(),
block_record.header.weight.to_bytes(128 // 8, "big", signed=False).hex(),
block_record.header.total_iters.to_bytes(128 // 8, "big", signed=False).hex(),
bytes(sub_block),
False,
),
@ -117,9 +94,7 @@ class WalletBlockStore:
async def get_header_block(self, header_hash: bytes32) -> Optional[HeaderBlock]:
"""Gets a block record from the database, if present"""
cursor = await self.db.execute(
"SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),)
)
cursor = await self.db.execute("SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),))
row = await cursor.fetchone()
await cursor.close()
if row is not None:
@ -128,13 +103,9 @@ class WalletBlockStore:
else:
return None
async def get_header_block_record(
self, header_hash: bytes32
) -> Optional[HeaderBlockRecord]:
async def get_header_block_record(self, header_hash: bytes32) -> Optional[HeaderBlockRecord]:
"""Gets a block record from the database, if present"""
cursor = await self.db.execute(
"SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),)
)
cursor = await self.db.execute("SELECT * from header_blocks WHERE header_hash=?", (header_hash.hex(),))
row = await cursor.fetchone()
await cursor.close()
if row is not None:
@ -143,9 +114,7 @@ class WalletBlockStore:
else:
return None
async def get_sub_block_record(
self, header_hash: bytes32
) -> Optional[SubBlockRecord]:
async def get_sub_block_record(self, header_hash: bytes32) -> Optional[SubBlockRecord]:
cursor = await self.db.execute(
"SELECT sub_block from sub_block_records WHERE header_hash=?",
(header_hash.hex(),),
@ -177,9 +146,7 @@ class WalletBlockStore:
return ret, peak
async def set_peak(self, header_hash: bytes32) -> None:
cursor_1 = await self.db.execute(
"UPDATE sub_block_records SET is_peak=0 WHERE is_peak=1"
)
cursor_1 = await self.db.execute("UPDATE sub_block_records SET is_peak=0 WHERE is_peak=1")
await cursor_1.close()
cursor_2 = await self.db.execute(
"UPDATE sub_block_records SET is_peak=1 WHERE header_hash=?",

View File

@ -3,7 +3,7 @@ import logging
from concurrent.futures.process import ProcessPoolExecutor
from enum import Enum
import multiprocessing
from typing import Dict, List, Optional, Tuple, Any, Callable
from typing import Dict, List, Optional, Tuple, Callable
from src.consensus.constants import ConsensusConstants
from src.consensus.difficulty_adjustment import (
@ -11,7 +11,6 @@ from src.consensus.difficulty_adjustment import (
get_next_sub_slot_iters,
)
from src.consensus.full_block_to_sub_block_record import block_to_sub_block_record
from src.types.coin import Coin
from src.types.end_of_slot_bundle import EndOfSubSlotBundle
from src.types.full_block import FullBlock
from src.types.header_block import HeaderBlock
@ -26,7 +25,6 @@ from src.consensus.block_header_validation import (
validate_finished_header_block,
)
from src.wallet.block_record import HeaderBlockRecord
from src.wallet.wallet_coin_record import WalletCoinRecord
from src.wallet.wallet_coin_store import WalletCoinStore
from src.wallet.wallet_block_store import WalletBlockStore
@ -44,9 +42,7 @@ class ReceiveBlockResult(Enum):
ADDED_AS_ORPHAN = 2 # Added as an orphan/stale block (not a new peak of the chain)
INVALID_BLOCK = 3 # Block was not added because it was invalid
ALREADY_HAVE_BLOCK = 4 # Block is already present in this blockchain
DISCONNECTED_BLOCK = (
5 # Block's parent (previous pointer) is not in this blockchain
)
DISCONNECTED_BLOCK = 5 # Block's parent (previous pointer) is not in this blockchain
class WalletBlockchain:
@ -80,7 +76,7 @@ class WalletBlockchain:
async def create(
block_store: WalletBlockStore,
consensus_constants: ConsensusConstants,
coins_of_interest_received: Callable, # coins_of_interest_received(removals: List[Coin], additions: List[Coin], height: uint32)
coins_of_interest_received: Callable, # f(removals: List[Coin], additions: List[Coin], height: uint32)
reorg_rollback: Callable,
):
"""
@ -129,15 +125,11 @@ class WalletBlockchain:
while True:
self.sub_height_to_hash[curr.sub_block_height] = curr.header_hash
if curr.sub_epoch_summary_included is not None:
self.sub_epoch_summaries[
curr.sub_block_height
] = curr.sub_epoch_summary_included
self.sub_epoch_summaries[curr.sub_block_height] = curr.sub_epoch_summary_included
if curr.height == 0:
break
curr = self.sub_blocks[curr.prev_hash]
assert (
len(self.sub_blocks) == len(self.sub_height_to_hash) == self.peak_height + 1
)
assert len(self.sub_blocks) == len(self.sub_height_to_hash) == self.peak_height + 1
def get_peak(self) -> Optional[SubBlockRecord]:
"""
@ -151,9 +143,7 @@ class WalletBlockchain:
if self.peak_height is None:
return None
""" Return list of FullBlocks that are peaks"""
block = await self.block_store.get_header_block(
self.sub_height_to_hash[self.peak_height]
)
block = await self.block_store.get_header_block(self.sub_height_to_hash[self.peak_height])
assert block is not None
return block
@ -187,8 +177,6 @@ class WalletBlockchain:
invalid. Also returns the fork height, in the case of a new peak.
"""
block = block_record.header
additions = block_record.additions
removals = block_record.removals
genesis: bool = block.sub_block_height == 0
if block.header_hash in self.sub_blocks:
@ -210,9 +198,7 @@ class WalletBlockchain:
)
if error is not None:
log.error(
f"block {block.header_hash} failed validation {error.code} {error.error_msg}"
)
log.error(f"block {block.header_hash} failed validation {error.code} {error.error_msg}")
return ReceiveBlockResult.INVALID_BLOCK, error.code, None
sub_block = block_to_sub_block_record(
@ -235,9 +221,7 @@ class WalletBlockchain:
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None
async def _reconsider_peak(
self, sub_block: SubBlockRecord, genesis: bool
) -> Optional[uint32]:
async def _reconsider_peak(self, sub_block: SubBlockRecord, genesis: bool) -> Optional[uint32]:
"""
When a new block is added, this is called, to check if the new block is the new peak of the chain.
This also handles reorgs by reverting blocks which are not in the heaviest chain.
@ -246,17 +230,13 @@ class WalletBlockchain:
"""
if genesis:
if self.get_peak() is None:
block: Optional[
HeaderBlockRecord
] = await self.block_store.get_header_block_record(
block: Optional[HeaderBlockRecord] = await self.block_store.get_header_block_record(
sub_block.header_hash
)
assert block is not None
for removed in block.removals:
self.log.info(f"Removed: {removed.name()}")
await self.coins_of_interest_received(
block.removals, block.additions, block.height
)
await self.coins_of_interest_received(block.removals, block.additions, block.height)
self.sub_height_to_hash[uint32(0)] = block.header_hash
self.peak_height = uint32(0)
return uint32(0)
@ -270,8 +250,11 @@ class WalletBlockchain:
fork_h: int = find_fork_point_in_chain(self.sub_blocks, sub_block, peak)
# Rollback to fork
# TODO(straya): reorg coins based on height not sub-block height
# TODO(straya): handle fork_h == -1 case
self.log.info(
f"fork_h: {fork_h}, {sub_block.height}, {sub_block.sub_block_height}, {peak.sub_block_height}, {peak.height}"
f"fork_h: {fork_h}, {sub_block.height}, {sub_block.sub_block_height}, {peak.sub_block_height}, "
f"{peak.height}"
)
fork_hash = self.sub_height_to_hash[fork_h]
fork_block = self.sub_blocks[fork_hash]
@ -289,12 +272,8 @@ class WalletBlockchain:
blocks_to_add: List[Tuple[HeaderBlockRecord, SubBlockRecord]] = []
curr = sub_block.header_hash
while fork_h < 0 or curr != self.sub_height_to_hash[uint32(fork_h)]:
fetched_block: Optional[
HeaderBlockRecord
] = await self.block_store.get_header_block_record(curr)
fetched_sub_block: Optional[
SubBlockRecord
] = await self.block_store.get_sub_block_record(curr)
fetched_block: Optional[HeaderBlockRecord] = await self.block_store.get_header_block_record(curr)
fetched_sub_block: Optional[SubBlockRecord] = await self.block_store.get_sub_block_record(curr)
assert fetched_block is not None
assert fetched_sub_block is not None
blocks_to_add.append((fetched_block, fetched_sub_block))
@ -304,9 +283,7 @@ class WalletBlockchain:
curr = fetched_sub_block.prev_hash
for fetched_block, fetched_sub_block in reversed(blocks_to_add):
self.sub_height_to_hash[
fetched_sub_block.sub_block_height
] = fetched_sub_block.header_hash
self.sub_height_to_hash[fetched_sub_block.sub_block_height] = fetched_sub_block.header_hash
if fetched_sub_block.is_block:
await self.coins_of_interest_received(
fetched_block.removals,
@ -363,9 +340,7 @@ class WalletBlockchain:
async def get_sp_and_ip_sub_slots(
self, header_hash: bytes32
) -> Optional[Tuple[Optional[EndOfSubSlotBundle], Optional[EndOfSubSlotBundle]]]:
block: Optional[FullBlock] = await self.block_store.get_header_block(
header_hash
)
block: Optional[FullBlock] = await self.block_store.get_header_block(header_hash)
is_overflow = self.sub_blocks[block.header_hash].overflow
if block is None:
return None

View File

@ -1,7 +1,5 @@
import asyncio
import json
import random
import time
import traceback
from typing import Dict, Optional, Tuple, List, AsyncGenerator, Callable, Union
from pathlib import Path
@ -14,7 +12,6 @@ from src.protocols.wallet_protocol import (
RequestSubBlockHeader,
RespondSubBlockHeader,
RequestAdditions,
RequestRemovals,
RespondAdditions,
RespondRemovals,
RejectRemovalsRequest,
@ -109,9 +106,7 @@ class WalletNode:
def get_key_for_fingerprint(self, fingerprint):
private_keys = self.keychain.get_all_private_keys()
if len(private_keys) == 0:
self.log.warning(
"No keys present. Create keys with the UI, or with the 'chia keys' program."
)
self.log.warning("No keys present. Create keys with the UI, or with the 'chia keys' program.")
return None
private_key: Optional[PrivateKey] = None
@ -136,22 +131,16 @@ class WalletNode:
return False
db_path_key_suffix = str(private_key.get_g1().get_fingerprint())
path = path_from_root(
self.root_path, f"{self.config['database_path']}-{db_path_key_suffix}"
)
path = path_from_root(self.root_path, f"{self.config['database_path']}-{db_path_key_suffix}")
mkdir(path.parent)
self.wallet_state_manager = await WalletStateManager.create(
private_key, self.config, path, self.constants
)
self.wallet_state_manager = await WalletStateManager.create(private_key, self.config, path, self.constants)
self.wsm_close_task = None
assert self.wallet_state_manager is not None
backup_settings: BackupInitialized = (
self.wallet_state_manager.user_settings.get_backup_settings()
)
backup_settings: BackupInitialized = self.wallet_state_manager.user_settings.get_backup_settings()
if backup_settings.user_initialized is False:
if new_wallet is True:
await self.wallet_state_manager.user_settings.user_created_new_wallet()
@ -168,14 +157,10 @@ class WalletNode:
self.backup_initialized = True
if backup_file is not None:
json_dict = open_backup_file(
backup_file, self.wallet_state_manager.private_key
)
json_dict = open_backup_file(backup_file, self.wallet_state_manager.private_key)
if "start_height" in json_dict["data"]:
start_height = json_dict["data"]["start_height"]
self.config["starting_height"] = max(
0, start_height - self.config["start_height_buffer"]
)
self.config["starting_height"] = max(0, start_height - self.config["start_height_buffer"])
else:
self.config["starting_height"] = 0
else:
@ -195,12 +180,8 @@ class WalletNode:
self._shut_down = True
if self.wallet_state_manager is None:
return
self.wsm_close_task = asyncio.create_task(
self.wallet_state_manager.close_all_stores()
)
self.wallet_peers_task = asyncio.create_task(
self.wallet_peers.ensure_is_closed()
)
self.wsm_close_task = asyncio.create_task(self.wallet_state_manager.close_all_stores())
self.wallet_peers_task = asyncio.create_task(self.wallet_peers.ensure_is_closed())
async def _await_closed(self):
if self.wallet_state_manager is None or self.backup_initialized is False:
@ -225,9 +206,7 @@ class WalletNode:
async def _action_messages(self) -> List[Message]:
if self.wallet_state_manager is None or self.backup_initialized is False:
return []
actions: List[
WalletAction
] = await self.wallet_state_manager.action_store.get_all_pending_actions()
actions: List[WalletAction] = await self.wallet_state_manager.action_store.get_all_pending_actions()
result: List[Message] = []
for action in actions:
data = json.loads(action.data)
@ -273,17 +252,11 @@ class WalletNode:
await self.server.send_to_all([msg], NodeType.FULL_NODE)
async def _messages_to_resend(self) -> List[Message]:
if (
self.wallet_state_manager is None
or self.backup_initialized is False
or self._shut_down
):
if self.wallet_state_manager is None or self.backup_initialized is False or self._shut_down:
return []
messages: List[Message] = []
records: List[
TransactionRecord
] = await self.wallet_state_manager.tx_store.get_not_sent()
records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent()
for record in records:
if record.spend_bundle is None:
@ -333,21 +306,15 @@ class WalletNode:
self.config["full_node_peer"]["port"],
)
peers = [c.get_peer_info() for c in self.server.get_full_node_connections()]
full_node_resolved = PeerInfo(
socket.gethostbyname(full_node_peer.host), full_node_peer.port
)
full_node_resolved = PeerInfo(socket.gethostbyname(full_node_peer.host), full_node_peer.port)
if full_node_peer in peers or full_node_resolved in peers:
self.log.info(
f"Will not attempt to connect to other nodes, already connected to {full_node_peer}"
)
self.log.info(f"Will not attempt to connect to other nodes, already connected to {full_node_peer}")
for connection in self.server.get_full_node_connections():
if (
connection.get_peer_info() != full_node_peer
and connection.get_peer_info() != full_node_resolved
):
self.log.info(
f"Closing unnecessary connection to {connection.get_peer_info()}."
)
self.log.info(f"Closing unnecessary connection to {connection.get_peer_info()}.")
asyncio.create_task(connection.close())
return True
return False
@ -388,9 +355,7 @@ class WalletNode:
return
request = wallet_protocol.RequestSubBlockHeader(peak.sub_block_height)
response: Optional[RespondSubBlockHeader] = await peer.request_sub_block_header(
request
)
response: Optional[RespondSubBlockHeader] = await peer.request_sub_block_header(request)
header_block = response.header_block
if header_block is not None:
@ -404,9 +369,7 @@ class WalletNode:
async def check_new_peak(self):
await asyncio.sleep(1)
current_peak: Optional[
SubBlockRecord
] = self.wallet_state_manager.blockchain.get_peak()
current_peak: Optional[SubBlockRecord] = self.wallet_state_manager.blockchain.get_peak()
if current_peak is None:
return
potential_peaks: List[
@ -455,10 +418,7 @@ class WalletNode:
highest_weight = potential_peak_block.weight
peak_height = potential_peak_block.height
if (
self.wallet_state_manager.peak is not None
and highest_weight <= self.wallet_state_manager.peak.weight
):
if self.wallet_state_manager.peak is not None and highest_weight <= self.wallet_state_manager.peak.weight:
self.log.info("Not performing sync, already caught up.")
return
@ -473,15 +433,11 @@ class WalletNode:
for i in range(0, peak_height + 1):
self.log.info(f"Requesting block {i}")
request = RequestSubBlockHeader(i)
response, peer = await send_all_first_reply(
"request_sub_block_header", request, peers
)
response, peer = await send_all_first_reply("request_sub_block_header", request, peers)
peer: WSChiaConnection = peer
res: RespondSubBlockHeader = response
block_i: HeaderBlock = res.header_block
self.log.error(
f"Received block {block_i.sub_block_height}, {block_i.height}, {block_i.is_block}"
)
self.log.error(f"Received block {block_i.sub_block_height}, {block_i.height}, {block_i.is_block}")
if block_i is None:
continue
@ -490,9 +446,7 @@ class WalletNode:
(
additions,
removals,
) = await self.wallet_state_manager.get_filter_additions_removals(
block_i, block_i.transactions_filter
)
) = await self.wallet_state_manager.get_filter_additions_removals(block_i, block_i.transactions_filter)
# Get Additions
added_coins = await self.get_additions(peer, block_i, additions)
@ -500,15 +454,11 @@ class WalletNode:
raise ValueError("Failed to fetch additions")
# Get removals
removed_coins = await self.get_removals(
peer, block_i, added_coins, removals
)
removed_coins = await self.get_removals(peer, block_i, added_coins, removals)
if removed_coins is None:
raise ValueError("Failed to fetch removals")
header_block_record = HeaderBlockRecord(
block_i, added_coins, removed_coins
)
header_block_record = HeaderBlockRecord(block_i, added_coins, removed_coins)
else:
header_block_record = HeaderBlockRecord(block_i, [], [])
@ -518,9 +468,7 @@ class WalletNode:
result,
error,
fork_h,
) = await self.wallet_state_manager.blockchain.receive_block(
header_block_record
)
) = await self.wallet_state_manager.blockchain.receive_block(header_block_record)
if result == ReceiveBlockResult.NEW_PEAK:
self.wallet_state_manager.state_changed("new_block")
@ -590,7 +538,7 @@ class WalletNode:
# Verify removals root
removals_merkle_set = MerkleSet()
for coin in all_coins:
for coin in coins:
if coin is not None:
removals_merkle_set.add_already_hashed(coin.name())
removals_root = removals_merkle_set.get_root()
@ -628,16 +576,12 @@ class WalletNode:
return False
return True
async def get_additions(
self, peer: WSChiaConnection, block_i, additions
) -> Optional[List[Coin]]:
async def get_additions(self, peer: WSChiaConnection, block_i, additions) -> Optional[List[Coin]]:
if len(additions) > 0:
additions_request = RequestAdditions(
block_i.sub_block_height, block_i.header_hash, additions
additions_request = RequestAdditions(block_i.sub_block_height, block_i.header_hash, additions)
additions_res: Optional[Union[RespondAdditions, RejectAdditionsRequest]] = await peer.request_additions(
additions_request
)
additions_res: Optional[
Union[RespondAdditions, RejectAdditionsRequest]
] = await peer.request_additions(additions_request)
if additions_res is None:
await peer.close()
return None
@ -663,37 +607,28 @@ class WalletNode:
added_coins = []
return added_coins
async def get_removals(
self, peer: WSChiaConnection, block_i, additions, removals
) -> Optional[List[Coin]]:
async def get_removals(self, peer: WSChiaConnection, block_i, additions, removals) -> Optional[List[Coin]]:
request_all_removals = False
# Check if we need all removals
for coin in additions:
puzzle_store = self.wallet_state_manager.puzzle_store
record_info: Optional[
DerivationRecord
] = await puzzle_store.get_derivation_record_for_puzzle_hash(
record_info: Optional[DerivationRecord] = await puzzle_store.get_derivation_record_for_puzzle_hash(
coin.puzzle_hash.hex()
)
if (
record_info is not None
and record_info.wallet_type == WalletType.COLOURED_COIN
):
if record_info is not None and record_info.wallet_type == WalletType.COLOURED_COIN:
request_all_removals = True
break
if len(removals) > 0 or request_all_removals:
if request_all_removals:
removals_request = wallet_protocol.RequestRemovals(
block_i.sub_block_height, block_i.header_hash, None
)
removals_request = wallet_protocol.RequestRemovals(block_i.sub_block_height, block_i.header_hash, None)
else:
removals_request = wallet_protocol.RequestRemovals(
block_i.sub_block_height, block_i.header_hash, removals
)
removals_res: Optional[
Union[RespondRemovals, RejectRemovalsRequest]
] = await peer.request_removals(removals_request)
removals_res: Optional[Union[RespondRemovals, RejectRemovalsRequest]] = await peer.request_removals(
removals_request
)
if removals_res is None:
return None
elif isinstance(removals_res, RespondRemovals):

View File

@ -1,12 +1,9 @@
import traceback
from typing import List, Optional
from typing import List
from src.protocols import wallet_protocol
from src.server.outbound_message import Message
from src.server.ws_connection import WSChiaConnection
from src.types.coin import Coin, hash_coin_list
from src.types.coin import Coin
from src.types.mempool_inclusion_status import MempoolInclusionStatus
from src.types.sized_bytes import bytes32
from src.util.api_decorators import api_request, peer_required
from src.util.errors import Err
from src.util.merkle_set import (
@ -14,8 +11,6 @@ from src.util.merkle_set import (
confirm_not_included_already_hashed,
confirm_included_already_hashed,
)
from src.wallet.derivation_record import DerivationRecord
from src.wallet.util.wallet_types import WalletType
from src.wallet.wallet_node import WalletNode
@ -27,17 +22,12 @@ class WalletNodeAPI:
@peer_required
@api_request
async def respond_removals(
self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection
):
async def respond_removals(self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection):
"""
The full node has responded with the removals for a block. We will use this
to try to finish the block, and add it to the state.
"""
if (
self.wallet_node.wallet_state_manager is None
or self.wallet_node.backup_initialized is False
):
if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False:
return
if self.wallet_node._shut_down:
return
@ -45,14 +35,10 @@ class WalletNodeAPI:
response.header_hash not in self.wallet_node.cached_blocks
or self.wallet_node.cached_blocks[response.header_hash][0].additions is None
):
self.wallet_node.log.warning(
"Do not have header for removals, or do not have additions"
)
self.wallet_node.log.warning("Do not have header for removals, or do not have additions")
return
block_record, header_block, transaction_filter = self.wallet_node.cached_blocks[
response.header_hash
]
block_record, header_block, transaction_filter = self.wallet_node.cached_blocks[response.header_hash]
assert response.height == block_record.height
all_coins: List[Coin] = []
@ -124,32 +110,22 @@ class WalletNodeAPI:
# await self.respond_header(respond_header_msg, peer)
@api_request
async def reject_removals_request(
self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection
):
async def reject_removals_request(self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection):
"""
The full node has rejected our request for removals.
"""
# TODO(mariano): implement
if (
self.wallet_node.wallet_state_manager is None
or self.wallet_node.backup_initialized is False
):
if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False:
return
self.wallet_node.log.error("Removals request rejected")
@api_request
async def reject_additions_request(
self, response: wallet_protocol.RejectAdditionsRequest
):
async def reject_additions_request(self, response: wallet_protocol.RejectAdditionsRequest):
"""
The full node has rejected our request for additions.
"""
# TODO(mariano): implement
if (
self.wallet_node.wallet_state_manager is None
or self.wallet_node.backup_initialized is False
):
if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False:
return
self.wallet_node.log.error("Additions request rejected")
@ -162,52 +138,37 @@ class WalletNodeAPI:
await self.wallet_node.new_peak(peak, peer)
@api_request
async def reject_header_request(
self, response: wallet_protocol.RejectHeaderRequest
):
async def reject_header_request(self, response: wallet_protocol.RejectHeaderRequest):
"""
The full node has rejected our request for a header.
"""
# TODO(mariano): implement
if (
self.wallet_node.wallet_state_manager is None
or self.wallet_node.backup_initialized is False
):
if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False:
return
self.wallet_node.log.error("Header request rejected")
@api_request
async def respond_sub_block_header(
self, response: wallet_protocol.RespondSubBlockHeader
):
async def respond_sub_block_header(self, response: wallet_protocol.RespondSubBlockHeader):
pass
@peer_required
@api_request
async def respond_additions(
self, response: wallet_protocol.RespondAdditions, peer: WSChiaConnection
):
async def respond_additions(self, response: wallet_protocol.RespondAdditions, peer: WSChiaConnection):
pass
@peer_required
@api_request
async def reject_additions_request(
self, response: wallet_protocol.RejectAdditionsRequest, peer: WSChiaConnection
):
async def reject_additions_request(self, response: wallet_protocol.RejectAdditionsRequest, peer: WSChiaConnection):
pass
@peer_required
@api_request
async def respond_removals(
self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection
):
async def respond_removals(self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection):
pass
@peer_required
@api_request
async def reject_removals_request(
self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection
):
async def reject_removals_request(self, response: wallet_protocol.RejectRemovalsRequest, peer: WSChiaConnection):
pass
# if (
@ -357,40 +318,25 @@ class WalletNodeAPI:
@peer_required
@api_request
async def transaction_ack(
self, ack: wallet_protocol.TransactionAck, peer: WSChiaConnection
):
async def transaction_ack(self, ack: wallet_protocol.TransactionAck, peer: WSChiaConnection):
"""
This is an ack for our previous SendTransaction call. This removes the transaction from
the send queue if we have sent it to enough nodes.
"""
assert peer.peer_node_id is not None
name = peer.peer_node_id.hex()
if (
self.wallet_node.wallet_state_manager is None
or self.wallet_node.backup_initialized is False
):
if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False:
return
if ack.status == MempoolInclusionStatus.SUCCESS:
self.wallet_node.log.info(
f"SpendBundle has been received and accepted to mempool by the FullNode. {ack}"
)
self.wallet_node.log.info(f"SpendBundle has been received and accepted to mempool by the FullNode. {ack}")
elif ack.status == MempoolInclusionStatus.PENDING:
self.wallet_node.log.info(
f"SpendBundle has been received (and is pending) by the FullNode. {ack}"
)
self.wallet_node.log.info(f"SpendBundle has been received (and is pending) by the FullNode. {ack}")
else:
self.wallet_node.log.warning(
f"SpendBundle has been rejected by the FullNode. {ack}"
)
self.wallet_node.log.warning(f"SpendBundle has been rejected by the FullNode. {ack}")
if ack.error is not None:
await self.wallet_node.wallet_state_manager.remove_from_queue(
ack.txid, name, ack.status, Err[ack.error]
)
await self.wallet_node.wallet_state_manager.remove_from_queue(ack.txid, name, ack.status, Err[ack.error])
else:
await self.wallet_node.wallet_state_manager.remove_from_queue(
ack.txid, name, ack.status, None
)
await self.wallet_node.wallet_state_manager.remove_from_queue(ack.txid, name, ack.status, None)
# @api_request
# async def respond_all_proof_hashes(

View File

@ -13,7 +13,6 @@ from chiabip158 import PyBIP158
from blspy import PrivateKey, G1Element, AugSchemeMPL
from cryptography.fernet import Fernet
from src.consensus.blockchain import ReceiveBlockResult
from src.consensus.constants import ConsensusConstants
from src.consensus.sub_block_record import SubBlockRecord
from src.types.coin import Coin