Preserve correct MempoolItem block height when rebuilding mempool (#13951)

This commit is contained in:
Adam Kelly 2022-11-21 14:28:48 -08:00 committed by GitHub
parent 9a951d835e
commit 5cbc415589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 193 additions and 23 deletions

View File

@ -148,7 +148,7 @@ async def run_mempool_benchmark(single_threaded: bool) -> None:
for tx in spend_bundles:
npc = await mempool.pre_validate_spendbundle(tx, None, tx.name())
assert npc is not None
_, status, error = await mempool.add_spend_bundle(tx, npc, tx.name())
_, status, error = await mempool.add_spend_bundle(tx, npc, tx.name(), height)
assert status == MempoolInclusionStatus.SUCCESS
assert error is None

@ -1 +1 @@
Subproject commit dd4d7d031995189711248628f10d85ebd6764c25
Subproject commit 736a1e361681b1d65cc48bb1297d96ea2d4c5e45

View File

@ -2,7 +2,7 @@ import random
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, List, Dict, Tuple, Any, Type, TypeVar
from typing import Optional, List, Dict, Tuple, Any, Type, TypeVar, Callable
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.blockchain_format.coin import Coin
@ -174,7 +174,11 @@ class SpendSim:
return None
return simple_solution_generator(bundle)
async def farm_block(self, puzzle_hash: bytes32 = bytes32(b"0" * 32)) -> Tuple[List[Coin], List[Coin]]:
async def farm_block(
self,
puzzle_hash: bytes32 = bytes32(b"0" * 32),
item_inclusion_filter: Optional[Callable[[MempoolManager, MempoolItem], bool]] = None,
) -> Tuple[List[Coin], List[Coin]]:
# Fees get calculated
fees = uint64(0)
if self.mempool_manager.mempool.spends:
@ -206,7 +210,7 @@ class SpendSim:
if (len(self.block_records) > 0) and (self.mempool_manager.mempool.spends):
peak = self.mempool_manager.peak
if peak is not None:
result = await self.mempool_manager.create_bundle_from_mempool(peak.header_hash)
result = await self.mempool_manager.create_bundle_from_mempool(peak.header_hash, item_inclusion_filter)
if result is not None:
bundle, additions, removals = result
@ -214,12 +218,12 @@ class SpendSim:
return_additions = additions
return_removals = removals
await self.mempool_manager.coin_store._add_coin_records(
[self.new_coin_record(addition) for addition in additions]
)
await self.mempool_manager.coin_store._set_spent(
[r.name() for r in removals], uint32(self.block_height + 1)
)
await self.mempool_manager.coin_store._add_coin_records(
[self.new_coin_record(addition) for addition in additions]
)
await self.mempool_manager.coin_store._set_spent(
[r.name() for r in removals], uint32(self.block_height + 1)
)
# SimBlockRecord is created
generator: Optional[BlockGenerator] = await self.generate_transaction_generator(generator_bundle)
@ -275,8 +279,9 @@ class SimClient:
)
except ValidationError as e:
return MempoolInclusionStatus.FAILED, e.code
assert self.service.mempool_manager.peak
cost, status, error = await self.service.mempool_manager.add_spend_bundle(
spend_bundle, cost_result, spend_bundle.name()
spend_bundle, cost_result, spend_bundle.name(), self.service.mempool_manager.peak.height
)
return status, error

View File

@ -2242,7 +2242,10 @@ class FullNode:
if self.mempool_manager.get_spendbundle(spend_name) is not None:
self.mempool_manager.remove_seen(spend_name)
return MempoolInclusionStatus.SUCCESS, None
cost, status, error = await self.mempool_manager.add_spend_bundle(transaction, cost_result, spend_name)
assert self.mempool_manager.peak
cost, status, error = await self.mempool_manager.add_spend_bundle(
transaction, cost_result, spend_name, self.mempool_manager.peak.height
)
if status == MempoolInclusionStatus.SUCCESS:
self.log.debug(
f"Added transaction to mempool: {spend_name} mempool size: "

View File

@ -7,7 +7,7 @@ import time
from concurrent.futures import Executor
from concurrent.futures.process import ProcessPoolExecutor
from multiprocessing.context import BaseContext
from typing import Dict, List, Optional, Set, Tuple
from typing import Callable, Dict, List, Optional, Set, Tuple
from blspy import GTElement
from chiabip158 import PyBIP158
@ -132,7 +132,9 @@ class MempoolManager:
self.pool.shutdown(wait=True)
async def create_bundle_from_mempool(
self, last_tb_header_hash: bytes32
self,
last_tb_header_hash: bytes32,
item_inclusion_filter: Optional[Callable[[MempoolManager, MempoolItem], bool]] = None,
) -> Optional[Tuple[SpendBundle, List[Coin], List[Coin]]]:
"""
Returns aggregated spendbundle that can be used for creating new block,
@ -141,6 +143,13 @@ class MempoolManager:
if self.peak is None or self.peak.header_hash != last_tb_header_hash:
return None
if item_inclusion_filter is None:
def always(mm: MempoolManager, mi: MempoolItem) -> bool:
return True
item_inclusion_filter = always
cost_sum = 0 # Checks that total cost does not exceed block maximum
fee_sum = 0 # Checks that total fees don't exceed 64 bits
spend_bundles: List[SpendBundle] = []
@ -156,6 +165,7 @@ class MempoolManager:
if (
item.cost + cost_sum <= self.limit_factor * self.constants.MAX_BLOCK_COST_CLVM
and item.fee + fee_sum <= self.constants.MAX_COIN_AMOUNT
and item_inclusion_filter(self, item)
):
spend_bundles.append(item.spend_bundle)
cost_sum += item.cost
@ -294,10 +304,7 @@ class MempoolManager:
return ret
async def add_spend_bundle(
self,
new_spend: SpendBundle,
npc_result: NPCResult,
spend_name: bytes32,
self, new_spend: SpendBundle, npc_result: NPCResult, spend_name: bytes32, first_added_height: uint32
) -> Tuple[Optional[uint64], MempoolInclusionStatus, Optional[Err]]:
"""
Validates and adds to mempool a new_spend with the given NPCResult, and spend_name, and the current mempool.
@ -322,7 +329,9 @@ class MempoolManager:
assert cost is not None
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
err, item, remove_items = await self.validate_spend_bundle(new_spend, npc_result, spend_name)
err, item, remove_items = await self.validate_spend_bundle(
new_spend, npc_result, spend_name, first_added_height
)
if err is None:
# No error, immediately add to mempool, after removing conflicting TXs.
assert item is not None
@ -342,6 +351,7 @@ class MempoolManager:
new_spend: SpendBundle,
npc_result: NPCResult,
spend_name: bytes32,
first_added_height: uint32,
) -> Tuple[Optional[Err], Optional[MempoolItem], List[bytes32]]:
"""
Validates new_spend with the given NPCResult, and spend_name, and the current mempool. The mempool should
@ -351,6 +361,9 @@ class MempoolManager:
new_spend: spend bundle to validate
npc_result: result of running the clvm transaction in a fake block
spend_name: hash of the spend bundle data, passed in as an optimization
first_added_height: The block height that `new_spend` first entered this node's mempool.
Used to estimate how long a spend has taken to be included on the chain.
This value could differ node to node. Not preserved across full_node restarts.
Returns:
Optional[Err]: Err is set if we cannot add to the mempool, None if we will immediately add to mempool
@ -483,7 +496,7 @@ class MempoolManager:
self.peak.timestamp,
)
potential = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, self.peak.height)
potential = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, first_added_height)
if tl_error:
if tl_error is Err.ASSERT_HEIGHT_ABSOLUTE_FAILED or tl_error is Err.ASSERT_HEIGHT_RELATIVE_FAILED:
@ -583,7 +596,9 @@ class MempoolManager:
)
self.seen_bundle_hashes = {}
for item in old_pool.spends.values():
_, result, err = await self.add_spend_bundle(item.spend_bundle, item.npc_result, item.spend_bundle_name)
_, result, err = await self.add_spend_bundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, item.height_added_to_mempool
)
# Only add to `seen` if inclusion worked, so it can be resubmitted in case of a reorg
if result == MempoolInclusionStatus.SUCCESS:
self.add_and_maybe_pop_seen(item.spend_bundle_name)
@ -598,7 +613,7 @@ class MempoolManager:
txs_added = []
for item in potential_txs.values():
cost, status, error = await self.add_spend_bundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name
item.spend_bundle, item.npc_result, item.spend_bundle_name, item.height_added_to_mempool
)
if status == MempoolInclusionStatus.SUCCESS:
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))

View File

@ -0,0 +1,147 @@
from __future__ import annotations
import logging
from typing import Callable, List, Optional, Tuple
import pytest
from blspy import G2Element
from chia_rs import Coin
from chia.clvm.spend_sim import SimClient, SpendSim
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
from chia.full_node.bitcoin_fee_estimator import BitcoinFeeEstimator
from chia.full_node.mempool_manager import MempoolManager
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.types.mempool_item import MempoolItem
from chia.types.spend_bundle import SpendBundle
log = logging.getLogger(__name__)
the_puzzle_hash = bytes32(
bytes.fromhex("9dcf97a184f32623d11a73124ceb99a5709b083721e878a16d78f596718ba7b2")
) # Program.to(1)
async def farm(
sim: SpendSim,
puzzle_hash: bytes32,
item_inclusion_filter: Optional[Callable[[MempoolManager, MempoolItem], bool]] = None,
) -> Tuple[List[Coin], List[Coin], List[Coin]]:
additions, removals = await sim.farm_block(puzzle_hash) # , item_inclusion_filter)
height = sim.get_height()
new_reward_coins = sim.block_records[height].reward_claims_incorporated
return additions, removals, new_reward_coins
def make_tx_sb(from_coin: Coin) -> SpendBundle:
coin_spend = CoinSpend(
from_coin,
Program.to(1),
Program.to([[51, from_coin.puzzle_hash, from_coin.amount]]),
)
spend_bundle = SpendBundle([coin_spend], G2Element())
return spend_bundle
async def init_test(
puzzle_hash: bytes32, spends_per_block: int
) -> Tuple[SpendSim, SimClient, BitcoinFeeEstimator, List[Coin], List[Coin]]:
defaults: ConsensusConstants = DEFAULT_CONSTANTS
sim = await SpendSim.create(defaults=defaults.replace(MAX_BLOCK_COST_CLVM=300000000, MEMPOOL_BLOCK_BUFFER=1))
cli = SimClient(sim)
new_reward_coins = []
spend_coins = []
fee_coins = []
await farm(sim, puzzle_hash)
for i in range(1, spends_per_block + 1):
await farm(sim, puzzle_hash)
new_reward_coins.extend(sim.block_records[i].reward_claims_incorporated)
fee_coins.append(sim.block_records[i].reward_claims_incorporated[0])
spend_coins.append(sim.block_records[i].reward_claims_incorporated[1])
await farm(sim, puzzle_hash)
assert len(sim.blocks) == spends_per_block + 2
assert sim.blocks[-1].height == spends_per_block + 1
assert sim.block_records[0].reward_claims_incorporated[0].amount == 18375000000000000000
assert sim.block_records[0].reward_claims_incorporated[1].amount == 2625000000000000000
assert sim.block_records[1].reward_claims_incorporated[0].amount == 1750000000000
assert sim.block_records[1].reward_claims_incorporated[1].amount == 250000000000
estimator: BitcoinFeeEstimator = sim.mempool_manager.mempool.fee_estimator # type:ignore
return sim, cli, estimator, spend_coins, fee_coins # new_reward_coins
@pytest.mark.asyncio
async def test_mempool_inclusion_filter_basic() -> None:
sim, cli, estimator, spend_coins, fee_coins = await init_test(the_puzzle_hash, 1)
assert len(sim.mempool_manager.mempool.spends) == 0
spend_bundle: SpendBundle = make_tx_sb(spend_coins[0])
status, error = await cli.push_tx(spend_bundle)
assert len(sim.mempool_manager.mempool.spends) == 1
assert error is None
mempool_item = sim.mempool_manager.get_mempool_item(spend_bundle.name())
assert mempool_item
def include_none(mm: MempoolManager, mi: MempoolItem) -> bool:
return False
def include_all(mm: MempoolManager, mi: MempoolItem) -> bool:
return True
additions, removals = await sim.farm_block(the_puzzle_hash, item_inclusion_filter=include_none)
assert len(sim.mempool_manager.mempool.spends) == 1
assert removals == []
additions, removals = await sim.farm_block(the_puzzle_hash, item_inclusion_filter=include_all)
assert len(sim.mempool_manager.mempool.spends) == 0
removal_ids = [c.name() for c in removals]
assert mempool_item.name not in removal_ids
await sim.close()
@pytest.mark.asyncio
async def test_mempoolitem_height_added(db_version: int) -> None:
sim, cli, estimator, spend_coins, fee_coins = await init_test(the_puzzle_hash, 1)
assert len(sim.mempool_manager.mempool.spends) == 0
spend_bundle: SpendBundle = make_tx_sb(spend_coins[0])
status, error = await cli.push_tx(spend_bundle)
assert len(sim.mempool_manager.mempool.spends) == 1
log.warning(f"{status, error} = cli.push_tx({spend_bundle.name()})")
mempool_item = sim.mempool_manager.get_mempool_item(spend_bundle.name())
assert mempool_item
heights = {sim.get_height(): mempool_item.height_added_to_mempool}
def ignore_spend(mm: MempoolManager, mi: MempoolItem) -> bool:
assert mempool_item
return mi.name != mempool_item.name
additions, removals = await sim.farm_block(the_puzzle_hash, item_inclusion_filter=ignore_spend)
removal_ids = [c.name() for c in removals]
assert mempool_item.name not in removal_ids
mempool_item2 = sim.mempool_manager.get_mempool_item(spend_bundle.name())
assert len(sim.mempool_manager.mempool.spends) == 1
assert mempool_item2
# This is the important check in this test: ensure height_added_to_mempool does not
# change when the mempool is rebuilt
assert mempool_item2.height_added_to_mempool == mempool_item2.height_added_to_mempool
# Now farm it into the next block
additions, removals = await sim.farm_block(the_puzzle_hash)
assert len(sim.mempool_manager.mempool.spends) == 0
assert len(removals) == 1
log.warning(heights)
await sim.close()