Arvid Norberg 507899ff19
Mempool multifetch (#17139)
* update type annotation for CoinStore.get_coin_records to support both List and Set

* update the mempool to fetch multiple coin records per query

* optimize the slow-path of updating the mempool by fetching all coin records up-front, in a single sql query
2023-12-22 16:29:39 -06:00

275 lines
9.9 KiB

from __future__ import annotations
import asyncio
import cProfile
from contextlib import contextmanager
from dataclasses import dataclass
from subprocess import check_call
from time import monotonic
from typing import Collection, Dict, Iterator, List, Optional, Tuple
from chia.consensus.coinbase import create_farmer_coin, create_pool_coin
from chia.consensus.default_constants import DEFAULT_CONSTANTS
from chia.full_node.mempool_manager import MempoolManager
from chia.simulator.wallet_tools import WalletTool
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_record import CoinRecord
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.util.ints import uint32, uint64
from chia.util.misc import to_batches
def enable_profiler(profile: bool, name: str) -> Iterator[None]:
if not profile:
with cProfile.Profile() as pr:
output_file = f"mempool-{name}"
pr.dump_stats(output_file + ".profile")
check_call(["gprof2dot", "-f", "pstats", "-o", output_file + ".dot", output_file + ".profile"])
with open(output_file + ".png", "w+") as f:
check_call(["dot", "-T", "png", output_file + ".dot"], stdout=f)
print(" output written to: %s.png" % output_file)
def make_hash(height: int) -> bytes32:
return bytes32(height.to_bytes(32, byteorder="big"))
class BenchBlockRecord:
This is a subset of BlockRecord that the mempool manager uses for peak.
header_hash: bytes32
height: uint32
timestamp: Optional[uint64]
prev_transaction_block_height: uint32
prev_transaction_block_hash: Optional[bytes32]
def is_transaction_block(self) -> bool:
return self.timestamp is not None
def fake_block_record(block_height: uint32, timestamp: uint64) -> BenchBlockRecord:
this_hash = make_hash(block_height)
prev_hash = make_hash(block_height - 1)
return BenchBlockRecord(
prev_transaction_block_height=uint32(block_height - 1),
async def run_mempool_benchmark() -> None:
all_coins: Dict[bytes32, CoinRecord] = {}
async def get_coin_records(coin_ids: Collection[bytes32]) -> List[CoinRecord]:
ret: List[CoinRecord] = []
for name in coin_ids:
r = all_coins.get(name)
if r is not None:
return ret
spend_bundles: List[List[SpendBundle]] = []
# these spend the same coins as spend_bundles but with a higher fee
replacement_spend_bundles: List[List[SpendBundle]] = []
# these spend the same coins as spend_bundles, but they are organized in
# much larger bundles
large_spend_bundles: List[List[SpendBundle]] = []
timestamp = uint64(1631794488)
height = uint32(1)
print("Building SpendBundles")
for peer in range(NUM_PEERS):
print(f" peer {peer}")
print(" reward coins")
unspent: List[Coin] = []
for idx in range(NUM_ITERS):
height = uint32(height + 1)
# 19 seconds per block
timestamp = uint64(timestamp + 19)
# farm rewards
farmer_coin = create_farmer_coin(
height, wt.get_new_puzzlehash(), uint64(250000000), DEFAULT_CONSTANTS.GENESIS_CHALLENGE
pool_coin = create_pool_coin(
height, wt.get_new_puzzlehash(), uint64(1750000000), DEFAULT_CONSTANTS.GENESIS_CHALLENGE
all_coins[] = CoinRecord(farmer_coin, height, uint32(0), True, timestamp)
all_coins[] = CoinRecord(pool_coin, height, uint32(0), True, timestamp)
unspent.extend([farmer_coin, pool_coin])
print(" spend bundles")
bundles: List[SpendBundle] = []
for coin in unspent:
tx: SpendBundle = wt.generate_signed_transaction(
uint64(coin.amount // 2), wt.get_new_puzzlehash(), coin, fee=peer + idx
bundles = []
print(" replacement spend bundles")
for coin in unspent:
tx = wt.generate_signed_transaction(
uint64(coin.amount // 2), wt.get_new_puzzlehash(), coin, fee=peer + idx + 10000000
bundles = []
print(" large spend bundles")
for batch in to_batches(unspent, 200):
print(f"{len(batch.entries)} coins")
tx = SpendBundle.aggregate(
wt.generate_signed_transaction(uint64(c.amount // 2), wt.get_new_puzzlehash(), c, fee=peer + idx)
for c in batch.entries
start_height = height
for single_threaded in [False, True]:
if single_threaded:
print("\n== Single-threaded")
print("\n== Multi-threaded")
mempool = MempoolManager(get_coin_records, DEFAULT_CONSTANTS, single_threaded=single_threaded)
height = start_height
rec = fake_block_record(height, timestamp)
await mempool.new_peak(rec, None)
async def add_spend_bundles(spend_bundles: List[SpendBundle]) -> None:
for tx in spend_bundles:
spend_bundle_id =
npc = await mempool.pre_validate_spendbundle(tx, None, spend_bundle_id)
assert npc is not None
_, status, error = await mempool.add_spend_bundle(tx, npc, spend_bundle_id, height)
assert status == MempoolInclusionStatus.SUCCESS
assert error is None
suffix = "st" if single_threaded else "mt"
print("\nProfiling add_spend_bundle() with large bundles")
total_bundles = 0
tasks = []
with enable_profiler(True, f"add-large-{suffix}"):
start = monotonic()
for peer in range(NUM_PEERS):
total_bundles += len(large_spend_bundles[peer])
await asyncio.gather(*tasks)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / total_bundles * 1000:0.2f}ms")
mempool = MempoolManager(get_coin_records, DEFAULT_CONSTANTS, single_threaded=single_threaded)
height = start_height
rec = fake_block_record(height, timestamp)
await mempool.new_peak(rec, None)
print("\nProfiling add_spend_bundle()")
total_bundles = 0
tasks = []
with enable_profiler(True, f"add-{suffix}"):
start = monotonic()
for peer in range(NUM_PEERS):
total_bundles += len(spend_bundles[peer])
await asyncio.gather(*tasks)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / total_bundles * 1000:0.2f}ms")
print("\nProfiling add_spend_bundle() with replace-by-fee")
total_bundles = 0
tasks = []
with enable_profiler(True, f"replace-{suffix}"):
start = monotonic()
for peer in range(NUM_PEERS):
total_bundles += len(replacement_spend_bundles[peer])
await asyncio.gather(*tasks)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / total_bundles * 1000:0.2f}ms")
print("\nProfiling create_bundle_from_mempool()")
with enable_profiler(True, f"create-{suffix}"):
start = monotonic()
for _ in range(500):
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / 500 * 1000:0.2f}ms")
print("\nProfiling new_peak() (optimized)")
blocks: List[Tuple[BenchBlockRecord, List[bytes32]]] = []
for coin_id in all_coins.keys():
height = uint32(height + 1)
timestamp = uint64(timestamp + 19)
rec = fake_block_record(height, timestamp)
blocks.append((rec, [coin_id]))
with enable_profiler(True, f"new-peak-{suffix}"):
start = monotonic()
for rec, spends in blocks:
await mempool.new_peak(rec, spends)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / len(blocks) * 1000:0.2f}ms")
print("\nProfiling new_peak() (reorg)")
blocks = []
for coin_id in all_coins.keys():
height = uint32(height + 2)
timestamp = uint64(timestamp + 28)
rec = fake_block_record(height, timestamp)
blocks.append((rec, [coin_id]))
with enable_profiler(True, f"new-peak-reorg-{suffix}"):
start = monotonic()
for rec, spends in blocks:
await mempool.new_peak(rec, spends)
stop = monotonic()
print(f" time: {stop - start:0.4f}s")
print(f" per call: {(stop - start) / len(blocks) * 1000:0.2f}ms")
if __name__ == "__main__":
import logging
logger = logging.getLogger()