eliminate polling in full node for processed tx (#17171)

This commit is contained in:
Kyle Altendorf 2023-12-27 10:15:29 -05:00 committed by GitHub
parent a389704fdf
commit cd78dbafdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 41 deletions

View File

@ -155,9 +155,6 @@ class FullNode:
_add_transaction_semaphore: Optional[asyncio.Semaphore] = None
_db_wrapper: Optional[DBWrapper2] = None
_hint_store: Optional[HintStore] = None
transaction_responses: List[Tuple[bytes32, MempoolInclusionStatus, Optional[Err]]] = dataclasses.field(
default_factory=list
)
_block_store: Optional[BlockStore] = None
_coin_store: Optional[CoinStore] = None
_mempool_manager: Optional[MempoolManager] = None
@ -285,7 +282,6 @@ class FullNode:
# Transactions go into this queue from the server, and get sent to respond_transaction
self._transaction_queue = TransactionQueue(1000, self.log)
self._transaction_queue_task: asyncio.Task[None] = asyncio.create_task(self._handle_transactions())
self.transaction_responses = []
self._init_weight_proof = asyncio.create_task(self.initialize_weight_proof())
@ -470,9 +466,7 @@ class FullNode:
peer = entry.peer
try:
inc_status, err = await self.add_transaction(entry.transaction, entry.spend_name, peer, entry.test)
self.transaction_responses.append((entry.spend_name, inc_status, err))
if len(self.transaction_responses) > 50:
self.transaction_responses = self.transaction_responses[1:]
entry.done.set_result((inc_status, err))
except asyncio.CancelledError:
error_stack = traceback.format_exc()
self.log.debug(f"Cancelling _handle_one_transaction, closing: {error_stack}")

View File

@ -10,6 +10,7 @@ from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
import anyio
from chia_rs import AugSchemeMPL, G1Element, G2Element
from chiabip158 import PyBIP158
@ -1271,22 +1272,12 @@ class FullNodeAPI:
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.SUCCESS), None)
return make_msg(ProtocolMessageTypes.transaction_ack, response)
await self.full_node.transaction_queue.put(
TransactionQueueEntry(request.transaction, None, spend_name, None, test), peer_id=None, high_priority=True
)
# Waits for the transaction to go into the mempool, times out after 45 seconds.
status, error = None, None
sleep_time = 0.01
for i in range(int(45 / sleep_time)):
await asyncio.sleep(sleep_time)
for potential_name, potential_status, potential_error in self.full_node.transaction_responses:
if spend_name == potential_name:
status = potential_status
error = potential_error
break
if status is not None:
break
if status is None:
queue_entry = TransactionQueueEntry(request.transaction, None, spend_name, None, test)
await self.full_node.transaction_queue.put(queue_entry, peer_id=None, high_priority=True)
try:
with anyio.fail_after(delay=45):
status, error = await queue_entry.done
except TimeoutError:
response = wallet_protocol.TransactionAck(spend_name, uint8(MempoolInclusionStatus.PENDING), None)
else:
error_name = error.name if error is not None else None

View File

@ -1,11 +1,14 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import asyncio
from dataclasses import dataclass, field
from typing import Optional, Tuple
from chia.server.ws_connection import WSChiaConnection
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.util.errors import Err
@dataclass(frozen=True)
@ -14,20 +17,12 @@ class TransactionQueueEntry:
A transaction received from peer. This is put into a queue, and not yet in the mempool.
"""
transaction: SpendBundle
transaction_bytes: Optional[bytes]
transaction: SpendBundle = field(compare=False)
transaction_bytes: Optional[bytes] = field(compare=False)
spend_name: bytes32
peer: Optional[WSChiaConnection]
test: bool
def __lt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name < other.spend_name
def __le__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name <= other.spend_name
def __gt__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name > other.spend_name
def __ge__(self, other: TransactionQueueEntry) -> bool:
return self.spend_name >= other.spend_name
peer: Optional[WSChiaConnection] = field(compare=False)
test: bool = field(compare=False)
done: asyncio.Future[Tuple[MempoolInclusionStatus, Optional[Err]]] = field(
default_factory=asyncio.Future,
compare=False,
)