More performance (#3279)

* speed

* more speed

* same event loop init

* typo

* sempahore

* ignore if more than semaphore

* higher_compact_semaphore

* don't ignore
This commit is contained in:
Yostra 2021-05-01 01:44:12 -04:00 committed by GitHub
parent 75d75de190
commit 976678d651
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 24 deletions

View File

@ -644,7 +644,14 @@ class Blockchain(BlockchainInterface):
header_hash: bytes32 = self.height_to_hash(uint32(height))
hashes.append(header_hash)
blocks: List[FullBlock] = await self.block_store.get_blocks_by_hash(hashes)
blocks: List[FullBlock] = []
for hash in hashes.copy():
block = self.block_store.block_cache.get(hash)
if block is not None:
blocks.append(block)
hashes.remove(hash)
blocks_on_disk: List[FullBlock] = await self.block_store.get_blocks_by_hash(hashes)
blocks.extend(blocks_on_disk)
header_blocks: Dict[bytes32, HeaderBlock] = {}
for block in blocks:

View File

@ -19,6 +19,7 @@ class BlockStore:
db: aiosqlite.Connection
block_cache: LRUCache
db_wrapper: DBWrapper
ses_challenge_cache: LRUCache
@classmethod
async def create(cls, db_wrapper: DBWrapper):
@ -62,6 +63,7 @@ class BlockStore:
await self.db.commit()
self.block_cache = LRUCache(1000)
self.ses_challenge_cache = LRUCache(50)
return self
async def add_full_block(self, block: FullBlock, block_record: BlockRecord) -> None:
@ -114,13 +116,18 @@ class BlockStore:
self,
ses_block_hash: bytes32,
) -> Optional[List[SubEpochChallengeSegment]]:
cached = self.ses_challenge_cache.get(ses_block_hash)
if cached is not None:
return cached
cursor = await self.db.execute(
"SELECT challenge_segments from sub_epoch_segments_v3 WHERE ses_block_hash=?", (ses_block_hash.hex(),)
)
row = await cursor.fetchone()
await cursor.close()
if row is not None:
return SubEpochSegments.from_bytes(row[0]).challenge_segments
challenge_segments = SubEpochSegments.from_bytes(row[0]).challenge_segments
self.ses_challenge_cache.put(ses_block_hash, challenge_segments)
return challenge_segments
return None
def cache_block(self, block: FullBlock):
@ -139,6 +146,17 @@ class BlockStore:
return block
return None
async def get_full_block_bytes(self, header_hash: bytes32) -> Optional[bytes]:
cached = self.block_cache.get(header_hash)
if cached is not None:
return cached
cursor = await self.db.execute("SELECT block from full_blocks WHERE header_hash=?", (header_hash.hex(),))
row = await cursor.fetchone()
await cursor.close()
if row is not None:
return row[0]
return None
async def get_full_blocks_at(self, heights: List[uint32]) -> List[FullBlock]:
if len(heights) == 0:
return []
@ -192,6 +210,7 @@ class BlockStore:
for row in rows:
full_block: FullBlock = FullBlock.from_bytes(row[0])
all_blocks[full_block.header_hash] = full_block
self.block_cache.put(full_block.header_hash, full_block)
ret: List[FullBlock] = []
for hh in header_hashes:
if hh not in all_blocks:

View File

@ -113,6 +113,8 @@ class FullNode:
async def _start(self):
self.timelord_lock = asyncio.Lock()
self.compact_vdf_lock = asyncio.Semaphore(4)
self.new_peak_lock = asyncio.Semaphore(8)
# create the store (db) and full node instance
self.connection = await aiosqlite.connect(self.db_path)
self.db_wrapper = DBWrapper(self.connection)
@ -1729,13 +1731,12 @@ class FullNode:
return
field_vdf = CompressibleVDFField(int(request.field_vdf))
if await self._needs_compact_proof(request.vdf_info, header_block, field_vdf):
msg = make_msg(
ProtocolMessageTypes.request_compact_vdf,
full_node_protocol.RequestCompactVDF(
request.height, request.header_hash, request.field_vdf, request.vdf_info
),
peer_request = full_node_protocol.RequestCompactVDF(
request.height, request.header_hash, request.field_vdf, request.vdf_info
)
await peer.send_message(msg)
response = await peer.request_compact_vdf(peer_request, timeout=10)
if response is not None and isinstance(response, full_node_protocol.RespondCompactVDF):
await self.respond_compact_vdf(response, peer)
async def request_compact_vdf(self, request: full_node_protocol.RequestCompactVDF, peer: ws.WSChiaConnection):
header_block = await self.blockchain.get_header_block_by_height(request.height, request.header_hash)

View File

@ -2,7 +2,7 @@ import asyncio
import dataclasses
import time
from secrets import token_bytes
from typing import Callable, Dict, List, Optional, Tuple, Set
from typing import Callable, Dict, List, Optional, Tuple, Set, Any
from blspy import AugSchemeMPL, G2Element
from chiabip158 import PyBIP158
@ -100,7 +100,8 @@ class FullNodeAPI:
A peer notifies us that they have added a new peak to their blockchain. If we don't have it,
we can ask for it.
"""
return await self.full_node.new_peak(request, peer)
async with self.full_node.new_peak_lock:
return await self.full_node.new_peak(request, peer)
@peer_required
@api_request
@ -252,9 +253,19 @@ class FullNodeAPI:
if wp is None:
self.log.error(f"failed creating weight proof for peak {request.tip}")
return None
return make_msg(
# Serialization of wp is slow
if (
self.full_node.full_node_store.serialized_wp_message_tip is not None
and self.full_node.full_node_store.serialized_wp_message_tip == request.tip
):
return self.full_node.full_node_store.serialized_wp_message
message = make_msg(
ProtocolMessageTypes.respond_proof_of_weight, full_node_protocol.RespondProofOfWeight(wp, request.tip)
)
self.full_node.full_node_store.serialized_wp_message_tip = request.tip
self.full_node.full_node_store.serialized_wp_message = message
return message
@api_request
async def respond_proof_of_weight(self, request: full_node_protocol.RespondProofOfWeight) -> Optional[Message]:
@ -289,19 +300,28 @@ class FullNodeAPI:
msg = make_msg(ProtocolMessageTypes.reject_blocks, reject)
return msg
blocks = []
for i in range(request.start_height, request.end_height + 1):
block: Optional[FullBlock] = await self.full_node.block_store.get_full_block(
self.full_node.blockchain.height_to_hash(uint32(i))
)
if block is None:
reject = RejectBlocks(request.start_height, request.end_height)
msg = make_msg(ProtocolMessageTypes.reject_blocks, reject)
return msg
if not request.include_transaction_block:
blocks: List[Any] = []
if not request.include_transaction_block:
for i in range(request.start_height, request.end_height + 1):
block: Optional[FullBlock] = await self.full_node.block_store.get_full_block(
self.full_node.blockchain.height_to_hash(uint32(i))
)
if block is None:
reject = RejectBlocks(request.start_height, request.end_height)
msg = make_msg(ProtocolMessageTypes.reject_blocks, reject)
return msg
block = dataclasses.replace(block, transactions_generator=None)
blocks.append(block)
blocks.append(block)
else:
for i in range(request.start_height, request.end_height + 1):
block_bytes: Optional[bytes] = await self.full_node.block_store.get_full_block_bytes(
self.full_node.blockchain.height_to_hash(uint32(i))
)
if block_bytes is None:
reject = RejectBlocks(request.start_height, request.end_height)
msg = make_msg(ProtocolMessageTypes.reject_blocks, reject)
return msg
blocks.append(block_bytes)
msg = make_msg(
ProtocolMessageTypes.respond_blocks,
@ -1233,12 +1253,14 @@ class FullNodeAPI:
return None
await self.full_node.respond_compact_proof_of_time(request)
@execute_task
@peer_required
@api_request
async def new_compact_vdf(self, request: full_node_protocol.NewCompactVDF, peer: ws.WSChiaConnection):
if self.full_node.sync_store.get_sync_mode():
return None
await self.full_node.new_compact_vdf(request, peer)
async with self.full_node.compact_vdf_lock:
await self.full_node.new_compact_vdf(request, peer)
@peer_required
@api_request

View File

@ -12,6 +12,7 @@ from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary
from chia.consensus.multiprocess_validation import PreValidationResult
from chia.full_node.signage_point import SignagePoint
from chia.protocols import timelord_protocol
from chia.server.outbound_message import Message
from chia.types.blockchain_format.classgroup import ClassgroupElement
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary
@ -66,6 +67,8 @@ class FullNodeStore:
pending_tx_request: Dict[bytes32, bytes32] # tx_id: peer_id
peers_with_tx: Dict[bytes32, Set[bytes32]] # tx_id: Set[peer_ids}
tx_fetch_tasks: Dict[bytes32, asyncio.Task] # Task id: task
serialized_wp_message: Optional[Message]
serialized_wp_message_tip: Optional[bytes32]
def __init__(self, constants: ConsensusConstants):
self.candidate_blocks = {}
@ -85,6 +88,8 @@ class FullNodeStore:
self.pending_tx_request = {}
self.peers_with_tx = {}
self.tx_fetch_tasks = {}
self.serialized_wp_message = None
self.serialized_wp_message_tip = None
def add_candidate_block(
self, quality_string: bytes32, height: uint32, unfinished_block: UnfinishedBlock, backup: bool = False