Mempool logging (#17161)

* name worker processes based on what they are used for

* log queue length of mempool spend bundle validation

* improve logging of block validation

* log spendbundle pre-validation time for individual bundles, not the time to drain the queue
This commit is contained in:
Arvid Norberg 2024-01-03 18:27:32 +01:00 committed by GitHub
parent 60d0106ef6
commit e80ab1c275
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 33 deletions

View File

@ -149,7 +149,7 @@ class Blockchain(BlockchainInterface):
max_workers=num_workers,
mp_context=multiprocessing_context,
initializer=setproctitle,
initargs=(f"{getproctitle()}_worker",),
initargs=(f"{getproctitle()}_block_validation_worker",),
)
log.info(f"Started {num_workers} processes for block validation")

View File

@ -257,7 +257,7 @@ class FullNode:
self._hint_store = await HintStore.create(self.db_wrapper)
self._coin_store = await CoinStore.create(self.db_wrapper)
self.log.info("Initializing blockchain from disk")
start_time = time.time()
start_time = time.monotonic()
reserved_cores = self.config.get("reserved_cores", 0)
single_threaded = self.config.get("single_threaded", False)
multiprocessing_start_method = process_config_start_method(config=self.config, log=self.log)
@ -291,7 +291,7 @@ class FullNode:
if self.config.get("enable_memory_profiler", False):
asyncio.create_task(mem_profile_task(self.root_path, "node", self.log))
time_taken = time.time() - start_time
time_taken = time.monotonic() - start_time
peak: Optional[BlockRecord] = self.blockchain.get_peak()
if peak is None:
self.log.info(f"Initialized with empty blockchain time taken: {int(time_taken)}s")
@ -1330,7 +1330,7 @@ class FullNode:
if agg_state_change_summary is not None:
self._state_changed("new_peak")
self.log.debug(
f"Total time for {len(blocks_to_validate)} blocks: {time.time() - pre_validate_start}, "
f"Total time for {len(blocks_to_validate)} blocks: {time.monotonic() - pre_validate_start}, "
f"advanced: True"
)
return True, agg_state_change_summary, None
@ -1710,7 +1710,7 @@ class FullNode:
# After acquiring the lock, check again, because another asyncio thread might have added it
if self.blockchain.contains_block(header_hash):
return None
validation_start = time.time()
validation_start = time.monotonic()
# Tries to add the block to the blockchain, if we already validated transactions, don't do it again
npc_results = {}
if pre_validation_result is not None and pre_validation_result.npc_result is not None:
@ -1722,7 +1722,7 @@ class FullNode:
[block], npc_results, validate_signatures=False
)
added: Optional[AddBlockResult] = None
pre_validation_time = time.time() - validation_start
pre_validation_time = time.monotonic() - validation_start
try:
if len(pre_validation_results) < 1:
raise ValueError(f"Failed to validate block {header_hash} height {block.height}")
@ -1759,12 +1759,15 @@ class FullNode:
elif added == AddBlockResult.NEW_PEAK:
# Only propagate blocks which extend the blockchain (becomes one of the heads)
assert state_change_summary is not None
post_process_time = time.monotonic()
ppp_result = await self.peak_post_processing(block, state_change_summary, peer)
post_process_time = time.monotonic() - post_process_time
elif added == AddBlockResult.ADDED_AS_ORPHAN:
self.log.info(
f"Received orphan block of height {block.height} rh {block.reward_chain_block.get_hash()}"
)
post_process_time = 0
else:
# Should never reach here, all the cases are covered
raise RuntimeError(f"Invalid result from add_block {added}")
@ -1776,7 +1779,7 @@ class FullNode:
await self.peak_post_processing(block, state_change_summary, peer)
raise
validation_time = time.time() - validation_start
validation_time = time.monotonic() - validation_start
if ppp_result is not None:
assert state_change_summary is not None
@ -1795,6 +1798,7 @@ class FullNode:
logging.WARNING if validation_time > 2 else logging.DEBUG,
f"Block validation time: {validation_time:0.2f} seconds, "
f"pre_validation time: {pre_validation_time:0.2f} seconds, "
f"post-process time: {post_process_time:0.2f} seconds, "
f"cost: {block.transactions_info.cost if block.transactions_info is not None else 'None'}"
f"{percent_full_str} header_hash: {header_hash} height: {block.height}",
)
@ -1907,21 +1911,21 @@ class FullNode:
pre_validation_time = None
async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high):
start_header_time = time.time()
start_header_time = time.monotonic()
_, header_error = await self.blockchain.validate_unfinished_block_header(block)
if header_error is not None:
if header_error == Err.TIMESTAMP_TOO_FAR_IN_FUTURE:
raise TimestampError()
else:
raise ConsensusError(header_error)
validate_time = time.time() - start_header_time
validate_time = time.monotonic() - start_header_time
self.log.log(
logging.WARNING if validate_time > 2 else logging.DEBUG,
f"Time for header validate: {validate_time:0.3f}s",
)
if block.transactions_generator is not None:
pre_validation_start = time.time()
pre_validation_start = time.monotonic()
assert block.transactions_info is not None
try:
block_generator: Optional[BlockGenerator] = await self.blockchain.get_block_generator(block)
@ -1934,7 +1938,7 @@ class FullNode:
height = uint32(0) if prev_b is None else uint32(prev_b.height + 1)
npc_result = await self.blockchain.run_generator(block_bytes, block_generator, height)
pre_validation_time = time.time() - pre_validation_start
pre_validation_time = time.monotonic() - pre_validation_start
# blockchain.run_generator throws on errors, so npc_result is
# guaranteed to represent a successful run
@ -1947,7 +1951,7 @@ class FullNode:
async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high):
# TODO: pre-validate VDFs outside of lock
validation_start = time.time()
validation_start = time.monotonic()
validate_result = await self.blockchain.validate_unfinished_block(block, npc_result)
if validate_result.error is not None:
if validate_result.error == Err.COIN_AMOUNT_NEGATIVE.value:
@ -1955,7 +1959,7 @@ class FullNode:
self.log.info(f"Consensus error {validate_result.error}, not disconnecting")
return
raise ConsensusError(Err(validate_result.error))
validation_time = time.time() - validation_start
validation_time = time.monotonic() - validation_start
# respond_block will later use the cache (validated_signature=True)
validate_result = dataclasses.replace(validate_result, validated_signature=True)

View File

@ -52,13 +52,14 @@ MEMPOOL_MIN_FEE_INCREASE = uint64(10000000)
# the constants through here
def validate_clvm_and_signature(
spend_bundle_bytes: bytes, max_cost: int, constants: ConsensusConstants, height: uint32
) -> Tuple[Optional[Err], bytes, Dict[bytes32, bytes]]:
) -> Tuple[Optional[Err], bytes, Dict[bytes32, bytes], float]:
"""
Validates CLVM and aggregate signature for a spendbundle. This is meant to be called under a ProcessPoolExecutor
in order to validate the heavy parts of a transaction in a different thread. Returns an optional error,
the NPCResult and a cache of the new pairings validated (if not error)
"""
start_time = time.monotonic()
additional_data = constants.AGG_SIG_ME_ADDITIONAL_DATA
try:
@ -70,7 +71,7 @@ def validate_clvm_and_signature(
)
if result.error is not None:
return Err(result.error), b"", {}
return Err(result.error), b"", {}, time.monotonic() - start_time
pks: List[bytes48] = []
msgs: List[bytes] = []
@ -80,16 +81,16 @@ def validate_clvm_and_signature(
# Verify aggregated signature
cache: LRUCache[bytes32, GTElement] = LRUCache(10000)
if not cached_bls.aggregate_verify(pks, msgs, bundle.aggregated_signature, True, cache):
return Err.BAD_AGGREGATE_SIGNATURE, b"", {}
return Err.BAD_AGGREGATE_SIGNATURE, b"", {}, time.monotonic() - start_time
new_cache_entries: Dict[bytes32, bytes] = {}
for k, v in cache.cache.items():
new_cache_entries[k] = bytes(v)
except ValidationError as e:
return e.code, b"", {}
return e.code, b"", {}, time.monotonic() - start_time
except Exception:
return Err.UNKNOWN, b"", {}
return Err.UNKNOWN, b"", {}, time.monotonic() - start_time
return None, bytes(result), new_cache_entries
return None, bytes(result), new_cache_entries, time.monotonic() - start_time
@dataclass
@ -162,6 +163,7 @@ class MempoolManager:
seen_cache_size: int
peak: Optional[BlockRecordProtocol]
mempool: Mempool
_worker_queue_size: int
def __init__(
self,
@ -191,6 +193,7 @@ class MempoolManager:
self._conflict_cache = ConflictTxCache(self.constants.MAX_BLOCK_COST_CLVM * 1, 1000)
self._pending_cache = PendingTxCache(self.constants.MAX_BLOCK_COST_CLVM * 1, 1000)
self.seen_cache_size = 10000
self._worker_queue_size = 0
if single_threaded:
self.pool = InlineExecutor()
else:
@ -198,7 +201,7 @@ class MempoolManager:
max_workers=2,
mp_context=multiprocessing_context,
initializer=setproctitle,
initargs=(f"{getproctitle()}_worker",),
initargs=(f"{getproctitle()}_mempool_worker",),
)
# The mempool will correspond to a certain peak
@ -278,7 +281,6 @@ class MempoolManager:
Errors are included within the cached_result.
This runs in another process so we don't block the main thread
"""
start_time = time.time()
if new_spend_bytes is None:
new_spend_bytes = bytes(new_spend)
@ -287,25 +289,28 @@ class MempoolManager:
assert self.peak is not None
err, cached_result_bytes, new_cache_entries = await asyncio.get_running_loop().run_in_executor(
self.pool,
validate_clvm_and_signature,
new_spend_bytes,
self.max_block_clvm_cost,
self.constants,
self.peak.height,
)
self._worker_queue_size += 1
try:
err, cached_result_bytes, new_cache_entries, duration = await asyncio.get_running_loop().run_in_executor(
self.pool,
validate_clvm_and_signature,
new_spend_bytes,
self.max_block_clvm_cost,
self.constants,
self.peak.height,
)
finally:
self._worker_queue_size -= 1
if err is not None:
raise ValidationError(err)
for cache_entry_key, cached_entry_value in new_cache_entries.items():
LOCAL_CACHE.put(cache_entry_key, GTElement.from_bytes_unchecked(cached_entry_value))
ret: NPCResult = NPCResult.from_bytes(cached_result_bytes)
end_time = time.time()
duration = end_time - start_time
log.log(
logging.DEBUG if duration < 2 else logging.WARNING,
f"pre_validate_spendbundle took {end_time - start_time:0.4f} seconds for {spend_name}",
f"pre_validate_spendbundle took {duration:0.4f} seconds "
f"for {spend_name} (queue-size: {self._worker_queue_size})",
)
return ret

View File

@ -609,7 +609,7 @@ class WeightProofHandler:
max_workers=self._num_processes,
mp_context=self.multiprocessing_context,
initializer=setproctitle,
initargs=(f"{getproctitle()}_worker",),
initargs=(f"{getproctitle()}_weight_proof_worker",),
) as executor:
# The shutdown file manager must be inside of the executor manager so that
# we request the workers close prior to waiting for them to close.