Ms.mempool simplify (#13314)

* Remove mempool.additions

* Don't re run the program, and remove program from mempool item

* Removals only stores item ids, and stores a list

* Move pending cache down to prevent cache dos

* Separate validation from adding to pool, and remove mypy exceptions

* Fix bug with replacing

* Add to mypy

* Revert cbgui

* precommit fail

* Properly update the seen dict

* lint error

* Fix mempool bug

* Update after merge with main

* Address comments
This commit is contained in:
Mariano Sorgente 2022-09-06 13:32:03 -04:00 committed by GitHub
parent dda347e09c
commit 8ef4c980b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 142 additions and 120 deletions

View File

@ -146,7 +146,7 @@ async def run_mempool_benchmark(single_threaded: bool) -> None:
for tx in spend_bundles:
npc = await mempool.pre_validate_spendbundle(tx, None, tx.name())
assert npc is not None
_, status, error = await mempool.add_spendbundle(tx, npc, tx.name())
_, status, error = await mempool.add_spend_bundle(tx, npc, tx.name())
assert status == MempoolInclusionStatus.SUCCESS
assert error is None

View File

@ -277,7 +277,7 @@ class SimClient:
)
except ValidationError as e:
return MempoolInclusionStatus.FAILED, e.code
cost, status, error = await self.service.mempool_manager.add_spendbundle(
cost, status, error = await self.service.mempool_manager.add_spend_bundle(
spend_bundle, cost_result, spend_bundle.name()
)
return status, error

View File

@ -2092,7 +2092,7 @@ class FullNode:
if self.mempool_manager.get_spendbundle(spend_name) is not None:
self.mempool_manager.remove_seen(spend_name)
return MempoolInclusionStatus.SUCCESS, None
cost, status, error = await self.mempool_manager.add_spendbundle(transaction, cost_result, spend_name)
cost, status, error = await self.mempool_manager.add_spend_bundle(transaction, cost_result, spend_name)
if status == MempoolInclusionStatus.SUCCESS:
self.log.debug(
f"Added transaction to mempool: {spend_name} mempool size: "

View File

@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict, List, Optional
from sortedcontainers import SortedDict
@ -11,8 +11,7 @@ class Mempool:
def __init__(self, max_size_in_cost: int):
self.spends: Dict[bytes32, MempoolItem] = {}
self.sorted_spends: SortedDict = SortedDict()
self.additions: Dict[bytes32, MempoolItem] = {}
self.removals: Dict[bytes32, MempoolItem] = {}
self.removals: Dict[bytes32, List[bytes32]] = {} # From removal coin id to spend bundle id
self.max_size_in_cost: int = max_size_in_cost
self.total_mempool_cost: int = 0
@ -25,6 +24,7 @@ class Mempool:
current_cost = self.total_mempool_cost
# Iterates through all spends in increasing fee per cost
fee_per_cost: float
for fee_per_cost, spends_with_fpc in self.sorted_spends.items():
for spend_name, item in spends_with_fpc.items():
current_cost -= item.cost
@ -37,28 +37,30 @@ class Mempool:
else:
return 0
def remove_from_pool(self, item: MempoolItem):
def remove_from_pool(self, items: List[bytes32]) -> None:
"""
Removes an item from the mempool.
"""
removals: List[Coin] = item.removals
additions: List[Coin] = item.additions
for rem in removals:
del self.removals[rem.name()]
for add in additions:
del self.additions[add.name()]
del self.spends[item.name]
del self.sorted_spends[item.fee_per_cost][item.name]
dic = self.sorted_spends[item.fee_per_cost]
if len(dic.values()) == 0:
del self.sorted_spends[item.fee_per_cost]
self.total_mempool_cost -= item.cost
assert self.total_mempool_cost >= 0
for spend_bundle_id in items:
item: Optional[MempoolItem] = self.spends.get(spend_bundle_id)
if item is None:
continue
assert item.name == spend_bundle_id
removals: List[Coin] = item.removals
for rem in removals:
rem_name: bytes32 = rem.name()
self.removals[rem_name].remove(spend_bundle_id)
if len(self.removals[rem_name]) == 0:
del self.removals[rem_name]
del self.spends[item.name]
del self.sorted_spends[item.fee_per_cost][item.name]
dic = self.sorted_spends[item.fee_per_cost]
if len(dic.values()) == 0:
del self.sorted_spends[item.fee_per_cost]
self.total_mempool_cost -= item.cost
assert self.total_mempool_cost >= 0
def add_to_pool(
self,
item: MempoolItem,
):
def add_to_pool(self, item: MempoolItem) -> None:
"""
Adds an item to the mempool by kicking out transactions (if it doesn't fit), in order of increasing fee per cost
"""
@ -66,8 +68,8 @@ class Mempool:
while self.at_full_capacity(item.cost):
# Val is Dict[hash, MempoolItem]
fee_per_cost, val = self.sorted_spends.peekitem(index=0)
to_remove = list(val.values())[0]
self.remove_from_pool(to_remove)
to_remove: MempoolItem = list(val.values())[0]
self.remove_from_pool([to_remove.name])
self.spends[item.name] = item
@ -77,10 +79,11 @@ class Mempool:
self.sorted_spends[item.fee_per_cost][item.name] = item
for add in item.additions:
self.additions[add.name()] = item
for coin in item.removals:
self.removals[coin.name()] = item
coin_id = coin.name()
if coin_id not in self.removals:
self.removals[coin_id] = []
self.removals[coin_id].append(item.name)
self.total_mempool_cost += item.cost
def at_full_capacity(self, cost: int) -> bool:

View File

@ -20,7 +20,6 @@ from chia.full_node.mempool import Mempool
from chia.full_node.mempool_check_conditions import get_name_puzzle_conditions
from chia.full_node.pending_tx_cache import PendingTxCache
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import SerializedProgram
from chia.types.blockchain_format.sized_bytes import bytes32, bytes48
from chia.types.coin_record import CoinRecord
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
@ -121,7 +120,7 @@ class MempoolManager:
self.peak: Optional[BlockRecord] = None
self.mempool: Mempool = Mempool(self.mempool_max_total_cost)
def shut_down(self):
def shut_down(self) -> None:
self.pool.shutdown(wait=True)
async def create_bundle_from_mempool(
@ -193,7 +192,7 @@ class MempoolManager:
return True
return False
def add_and_maybe_pop_seen(self, spend_name: bytes32):
def add_and_maybe_pop_seen(self, spend_name: bytes32) -> None:
self.seen_bundle_hashes[spend_name] = spend_name
while len(self.seen_bundle_hashes) > self.seen_cache_size:
first_in = list(self.seen_bundle_hashes.keys())[0]
@ -203,7 +202,7 @@ class MempoolManager:
"""Return true if we saw this spendbundle recently"""
return bundle_hash in self.seen_bundle_hashes
def remove_seen(self, bundle_hash: bytes32):
def remove_seen(self, bundle_hash: bytes32) -> None:
if bundle_hash in self.seen_bundle_hashes:
self.seen_bundle_hashes.pop(bundle_hash)
@ -277,7 +276,7 @@ class MempoolManager:
raise ValidationError(err)
for cache_entry_key, cached_entry_value in new_cache_entries.items():
LOCAL_CACHE.put(cache_entry_key, GTElement.from_bytes_unchecked(cached_entry_value))
ret = NPCResult.from_bytes(cached_result_bytes)
ret: NPCResult = NPCResult.from_bytes(cached_result_bytes)
end_time = time.time()
duration = end_time - start_time
log.log(
@ -286,27 +285,78 @@ class MempoolManager:
)
return ret
async def add_spendbundle(
async def add_spend_bundle(
self,
new_spend: SpendBundle,
npc_result: NPCResult,
spend_name: bytes32,
program: Optional[SerializedProgram] = None,
) -> Tuple[Optional[uint64], MempoolInclusionStatus, Optional[Err]]:
"""
Tries to add spend bundle to the mempool
Returns the cost (if SUCCESS), the result (MempoolInclusion status), and an optional error
Validates and adds to mempool a new_spend with the given NPCResult, and spend_name, and the current mempool.
The mempool should be locked during this call (blockchain lock). If there are mempool conflicts, the conflicting
spends might be removed (if the new spend is a superset of the previous). Otherwise, the new spend might be
added to the potential pool.
Args:
new_spend: spend bundle to validate and add
npc_result: result of running the clvm transaction in a fake block
spend_name: hash of the spend bundle data, passed in as an optimization
Returns:
Optional[uint64]: cost of the entire transaction, None iff status is FAILED
MempoolInclusionStatus: SUCCESS (should add to pool), FAILED (cannot add), and PENDING (can add later)
Optional[Err]: Err is set iff status is FAILED
"""
# Skip if already added
if spend_name in self.mempool.spends:
cost: Optional[uint64] = self.mempool.spends[spend_name].cost
assert cost is not None
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
err, item, remove_items = await self.validate_spend_bundle(new_spend, npc_result, spend_name)
if err is None:
# No error, immediately add to mempool, after removing conflicting TXs.
assert item is not None
self.mempool.add_to_pool(item)
self.mempool.remove_from_pool(remove_items)
return item.cost, MempoolInclusionStatus.SUCCESS, None
elif item is not None:
# There is an error, but we still returned a mempool item, this means we should add to the pending pool.
self.potential_cache.add(item)
return item.cost, MempoolInclusionStatus.PENDING, err
else:
# Cannot add to the mempool or pending pool.
return None, MempoolInclusionStatus.FAILED, err
async def validate_spend_bundle(
self,
new_spend: SpendBundle,
npc_result: NPCResult,
spend_name: bytes32,
) -> Tuple[Optional[Err], Optional[MempoolItem], List[bytes32]]:
"""
Validates new_spend with the given NPCResult, and spend_name, and the current mempool. The mempool should
be locked during this call (blockchain lock).
Args:
new_spend: spend bundle to validate
npc_result: result of running the clvm transaction in a fake block
spend_name: hash of the spend bundle data, passed in as an optimization
Returns:
Optional[Err]: Err is set if we cannot add to the mempool, None if we will immediately add to mempool
Optional[MempoolItem]: the item to add (to mempool or pending pool)
List[bytes32]: conflicting mempool items to remove, if no Err
"""
start_time = time.time()
if self.peak is None:
return None, MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED
return Err.MEMPOOL_NOT_INITIALIZED, None, []
assert npc_result.error is None
if npc_result.error is not None:
return None, MempoolInclusionStatus.FAILED, Err(npc_result.error)
return Err(npc_result.error), None, []
if program is None:
program = simple_solution_generator(new_spend).program
cost = npc_result.cost
log.debug(f"Cost: {cost}")
@ -314,14 +364,14 @@ class MempoolManager:
if cost > int(self.limit_factor * self.constants.MAX_BLOCK_COST_CLVM):
# we shouldn't ever end up here, since the cost is limited when we
# execute the CLVM program.
return None, MempoolInclusionStatus.FAILED, Err.BLOCK_COST_EXCEEDS_MAX
return Err.BLOCK_COST_EXCEEDS_MAX, None, []
assert npc_result.conds is not None
# build removal list
removal_names: List[bytes32] = [bytes32(spend.coin_id) for spend in npc_result.conds.spends]
if set(removal_names) != set([s.name() for s in new_spend.removals()]):
# If you reach here it's probably because your program reveal doesn't match the coin's puzzle hash
return None, MempoolInclusionStatus.FAILED, Err.INVALID_SPEND_BUNDLE
return Err.INVALID_SPEND_BUNDLE, None, []
additions = additions_for_npc(npc_result)
@ -333,38 +383,27 @@ class MempoolManager:
# Check additions for max coin amount
for coin in additions:
if coin.amount < 0:
return (
None,
MempoolInclusionStatus.FAILED,
Err.COIN_AMOUNT_NEGATIVE,
)
return Err.COIN_AMOUNT_NEGATIVE, None, []
if coin.amount > self.constants.MAX_COIN_AMOUNT:
return (
None,
MempoolInclusionStatus.FAILED,
Err.COIN_AMOUNT_EXCEEDS_MAXIMUM,
)
return Err.COIN_AMOUNT_EXCEEDS_MAXIMUM, None, []
addition_amount = addition_amount + coin.amount
# Check for duplicate outputs
addition_counter = collections.Counter(_.name() for _ in additions)
for k, v in addition_counter.items():
if v > 1:
return None, MempoolInclusionStatus.FAILED, Err.DUPLICATE_OUTPUT
return Err.DUPLICATE_OUTPUT, None, []
# Check for duplicate inputs
removal_counter = collections.Counter(name for name in removal_names)
for k, v in removal_counter.items():
if v > 1:
return None, MempoolInclusionStatus.FAILED, Err.DOUBLE_SPEND
# Skip if already added
if spend_name in self.mempool.spends:
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
return Err.DOUBLE_SPEND, None, []
removal_record_dict: Dict[bytes32, CoinRecord] = {}
removal_amount: int = 0
for name in removal_names:
removal_record = await self.coin_store.get_coin_record(name)
if removal_record is None and name not in additions_dict:
return None, MempoolInclusionStatus.FAILED, Err.UNKNOWN_UNSPENT
return Err.UNKNOWN_UNSPENT, None, []
elif name in additions_dict:
removal_coin = additions_dict[name]
# The timestamp and block-height of this coin being spent needs
@ -389,50 +428,33 @@ class MempoolManager:
removals: List[Coin] = [record.coin for record in removal_record_dict.values()]
if addition_amount > removal_amount:
return None, MempoolInclusionStatus.FAILED, Err.MINTING_COIN
return Err.MINTING_COIN, None, []
fees = uint64(removal_amount - addition_amount)
assert_fee_sum: uint64 = uint64(npc_result.conds.reserve_fee)
if fees < assert_fee_sum:
return (
None,
MempoolInclusionStatus.FAILED,
Err.RESERVE_FEE_CONDITION_FAILED,
)
return Err.RESERVE_FEE_CONDITION_FAILED, None, []
if cost == 0:
return None, MempoolInclusionStatus.FAILED, Err.UNKNOWN
return Err.UNKNOWN, None, []
fees_per_cost: float = fees / cost
# If pool is at capacity check the fee, if not then accept even without the fee
if self.mempool.at_full_capacity(cost):
if fees_per_cost < self.nonzero_fee_minimum_fpc:
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_TOO_CLOSE_TO_ZERO
return Err.INVALID_FEE_TOO_CLOSE_TO_ZERO, None, []
if fees_per_cost <= self.mempool.get_min_fee_rate(cost):
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_LOW_FEE
return Err.INVALID_FEE_LOW_FEE, None, []
# Check removals against UnspentDB + DiffStore + Mempool + SpendBundle
# Use this information later when constructing a block
fail_reason, conflicts = await self.check_removals(removal_record_dict)
# If there is a mempool conflict check if this spendbundle has a higher fee per cost than all others
# If there is a mempool conflict check if this spend bundle has a higher fee per cost than all others
conflicting_pool_items: Dict[bytes32, MempoolItem] = {}
if fail_reason is Err.MEMPOOL_CONFLICT:
for conflicting in conflicts:
sb: MempoolItem = self.mempool.removals[conflicting.name()]
conflicting_pool_items[sb.name] = sb
if not self.can_replace(conflicting_pool_items, removal_record_dict, fees, fees_per_cost):
potential = MempoolItem(
new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program
)
self.potential_cache.add(potential)
return (
uint64(cost),
MempoolInclusionStatus.PENDING,
Err.MEMPOOL_CONFLICT,
)
elif fail_reason:
return None, MempoolInclusionStatus.FAILED, fail_reason
# If we have a mempool conflict, continue, since we still want to keep around the TX in the pending pool.
if fail_reason is not None and fail_reason is not Err.MEMPOOL_CONFLICT:
return fail_reason, None, []
# Verify conditions, create hash_key list for aggsig check
for spend in npc_result.conds.spends:
@ -441,38 +463,38 @@ class MempoolManager:
if spend.puzzle_hash != coin_record.coin.puzzle_hash:
log.warning("Mempool rejecting transaction because of wrong puzzle_hash")
log.warning(f"{spend.puzzle_hash.hex()} != {coin_record.coin.puzzle_hash.hex()}")
return None, MempoolInclusionStatus.FAILED, Err.WRONG_PUZZLE_HASH
return Err.WRONG_PUZZLE_HASH, None, []
chialisp_height = (
self.peak.prev_transaction_block_height if not self.peak.is_transaction_block else self.peak.height
)
assert self.peak.timestamp is not None
error: Optional[Err] = mempool_check_time_locks(
tl_error: Optional[Err] = mempool_check_time_locks(
removal_record_dict,
npc_result.conds,
uint32(chialisp_height),
self.peak.timestamp,
)
if error:
if error is Err.ASSERT_HEIGHT_ABSOLUTE_FAILED or error is Err.ASSERT_HEIGHT_RELATIVE_FAILED:
potential = MempoolItem(
new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program
)
self.potential_cache.add(potential)
return uint64(cost), MempoolInclusionStatus.PENDING, error
if tl_error:
if tl_error is Err.ASSERT_HEIGHT_ABSOLUTE_FAILED or tl_error is Err.ASSERT_HEIGHT_RELATIVE_FAILED:
potential = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals)
return tl_error, potential, []
else:
return None, MempoolInclusionStatus.FAILED, error
return tl_error, None, []
# Remove all conflicting Coins and SpendBundles
if fail_reason:
mempool_item: MempoolItem
for mempool_item in conflicting_pool_items.values():
self.mempool.remove_from_pool(mempool_item)
if fail_reason is Err.MEMPOOL_CONFLICT:
for conflicting in conflicts:
for c_sb_id in self.mempool.removals[conflicting.name()]:
sb: MempoolItem = self.mempool.spends[c_sb_id]
conflicting_pool_items[sb.name] = sb
log.warning(f"Conflicting pool items: {len(conflicting_pool_items)}")
if not self.can_replace(conflicting_pool_items, removal_record_dict, fees, fees_per_cost):
potential = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals)
return Err.MEMPOOL_CONFLICT, potential, []
new_item = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program)
self.mempool.add_to_pool(new_item)
new_item = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals)
duration = time.time() - start_time
log.log(
logging.DEBUG if duration < 2 else logging.WARNING,
@ -480,7 +502,7 @@ class MempoolManager:
f"Cost: {cost} ({round(100.0 * cost/self.constants.MAX_BLOCK_COST_CLVM, 3)}% of max block cost)",
)
return uint64(cost), MempoolInclusionStatus.SUCCESS, None
return None, new_item, list(conflicting_pool_items.keys())
async def check_removals(self, removals: Dict[bytes32, CoinRecord]) -> Tuple[Optional[Err], List[Coin]]:
"""
@ -540,27 +562,27 @@ class MempoolManager:
if last_npc_result.conds is not None:
for spend in last_npc_result.conds.spends:
if spend.coin_id in self.mempool.removals:
item = self.mempool.removals[bytes32(spend.coin_id)]
self.mempool.remove_from_pool(item)
self.remove_seen(item.spend_bundle_name)
c_ids: List[bytes32] = self.mempool.removals[bytes32(spend.coin_id)]
self.mempool.remove_from_pool(c_ids)
for c_id in c_ids:
self.remove_seen(c_id)
else:
old_pool = self.mempool
self.mempool = Mempool(self.mempool_max_total_cost)
self.seen_bundle_hashes = {}
for item in old_pool.spends.values():
_, result, _ = await self.add_spendbundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, item.program
)
_, result, _ = await self.add_spend_bundle(item.spend_bundle, item.npc_result, item.spend_bundle_name)
# If the spend bundle was confirmed or conflicting (can no longer be in mempool), it won't be
# successfully added to the new mempool. In this case, remove it from seen, so in the case of a reorg,
# it can be resubmitted
if result != MempoolInclusionStatus.SUCCESS:
self.remove_seen(item.spend_bundle_name)
if result == MempoolInclusionStatus.SUCCESS:
self.seen(item.spend_bundle_name)
potential_txs = self.potential_cache.drain()
txs_added = []
for item in potential_txs.values():
cost, status, error = await self.add_spendbundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, program=item.program
cost, status, error = await self.add_spend_bundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name
)
if status == MempoolInclusionStatus.SUCCESS:
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))

View File

@ -14,7 +14,7 @@ class PendingTxCache:
self._cache_cost = 0
self._txs = {}
def add(self, item: MempoolItem):
def add(self, item: MempoolItem) -> None:
"""
Adds SpendBundles that have failed to be added to the pool in potential tx set.
This is later used to retry to add them.

View File

@ -3,7 +3,6 @@ from typing import List
from chia.consensus.cost_calculator import NPCResult
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import SerializedProgram
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.spend_bundle import SpendBundle
from chia.util.ints import uint64
@ -20,7 +19,6 @@ class MempoolItem(Streamable):
spend_bundle_name: bytes32
additions: List[Coin]
removals: List[Coin]
program: SerializedProgram
def __lt__(self, other):
return self.fee_per_cost < other.fee_per_cost

File diff suppressed because one or more lines are too long

View File

@ -86,7 +86,6 @@ def make_item(idx: int, cost: uint64 = uint64(80)) -> MempoolItem:
spend_bundle_name,
[],
[],
SerializedProgram(),
)

View File

@ -27,7 +27,7 @@ def assert_sb_not_in_pool(node: FullNodeAPI, sb: SpendBundle) -> None:
def evict_from_pool(node: FullNodeAPI, sb: SpendBundle) -> None:
mempool_item = node.full_node.mempool_manager.mempool.spends[sb.name()]
node.full_node.mempool_manager.mempool.remove_from_pool(mempool_item)
node.full_node.mempool_manager.mempool.remove_from_pool([mempool_item.name])
node.full_node.mempool_manager.remove_seen(sb.name())