block_store performance (#4573)

* compute header_hash once

* read header_hash from the DB instead of computing it
when adding a block to the block store, also add it to the cache. This saves an otherwise mandatory round-trip to the DB.
This commit is contained in:
Arvid Norberg 2021-05-14 22:23:34 +02:00 committed by GitHub
parent bb662f27dc
commit a7d607ee90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 22 deletions

View File

@ -257,7 +257,8 @@ class Blockchain(BlockchainInterface):
try:
# Perform the DB operations to update the state, and rollback if something goes wrong
await self.block_store.db_wrapper.begin_transaction()
await self.block_store.add_full_block(block, block_record)
header_hash: bytes32 = block.header_hash
await self.block_store.add_full_block(header_hash, block, block_record)
fork_height, peak_height, records = await self._reconsider_peak(
block_record, genesis, fork_point_with_peak, npc_result
)
@ -273,8 +274,8 @@ class Blockchain(BlockchainInterface):
] = fetched_block_record.sub_epoch_summary_included
if peak_height is not None:
self._peak_height = peak_height
self.block_store.cache_block(block)
except BaseException:
self.block_store.rollback_cache_block(header_hash)
await self.block_store.db_wrapper.rollback_transaction()
raise
if fork_height is not None:
@ -306,7 +307,7 @@ class Blockchain(BlockchainInterface):
else:
tx_removals, tx_additions = [], []
await self.coin_store.new_block(block, tx_additions, tx_removals)
await self.block_store.set_peak(block.header_hash)
await self.block_store.set_peak(block_record.header_hash)
return uint32(0), uint32(0), [block_record]
return None, None, []

View File

@ -66,16 +66,12 @@ class BlockStore:
self.ses_challenge_cache = LRUCache(50)
return self
async def add_full_block(self, block: FullBlock, block_record: BlockRecord) -> None:
cached = self.block_cache.get(block.header_hash)
if cached is not None:
# Since write to db can fail, we remove from cache here to avoid potential inconsistency
# Adding to cache only from reading
self.block_cache.remove(block.header_hash)
async def add_full_block(self, header_hash: bytes32, block: FullBlock, block_record: BlockRecord) -> None:
self.block_cache.put(header_hash, block)
cursor_1 = await self.db.execute(
"INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?)",
(
block.header_hash.hex(),
header_hash.hex(),
block.height,
int(block.is_transaction_block()),
int(block.is_fully_compactified()),
@ -88,7 +84,7 @@ class BlockStore:
cursor_2 = await self.db.execute(
"INSERT OR REPLACE INTO block_records VALUES(?, ?, ?, ?,?, ?, ?)",
(
block.header_hash.hex(),
header_hash.hex(),
block.prev_header_hash.hex(),
block.height,
bytes(block_record),
@ -130,26 +126,30 @@ class BlockStore:
return challenge_segments
return None
def cache_block(self, block: FullBlock):
self.block_cache.put(block.header_hash, block)
def rollback_cache_block(self, header_hash: bytes32):
self.block_cache.remove(header_hash)
async def get_full_block(self, header_hash: bytes32) -> Optional[FullBlock]:
cached = self.block_cache.get(header_hash)
if cached is not None:
log.debug(f"cache hit for block {header_hash.hex()}")
return cached
log.debug(f"cache miss for block {header_hash.hex()}")
cursor = await self.db.execute("SELECT block from full_blocks WHERE header_hash=?", (header_hash.hex(),))
row = await cursor.fetchone()
await cursor.close()
if row is not None:
block = FullBlock.from_bytes(row[0])
self.block_cache.put(block.header_hash, block)
self.block_cache.put(header_hash, block)
return block
return None
async def get_full_block_bytes(self, header_hash: bytes32) -> Optional[bytes]:
cached = self.block_cache.get(header_hash)
if cached is not None:
log.debug(f"cache hit for block {header_hash.hex()}")
return cached
log.debug(f"cache miss for block {header_hash.hex()}")
cursor = await self.db.execute("SELECT block from full_blocks WHERE header_hash=?", (header_hash.hex(),))
row = await cursor.fetchone()
await cursor.close()
@ -202,15 +202,18 @@ class BlockStore:
return []
header_hashes_db = tuple([hh.hex() for hh in header_hashes])
formatted_str = f'SELECT block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes_db) - 1)}?)'
formatted_str = (
f'SELECT header_hash, block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes_db) - 1)}?)'
)
cursor = await self.db.execute(formatted_str, header_hashes_db)
rows = await cursor.fetchall()
await cursor.close()
all_blocks: Dict[bytes32, FullBlock] = {}
for row in rows:
full_block: FullBlock = FullBlock.from_bytes(row[0])
all_blocks[full_block.header_hash] = full_block
self.block_cache.put(full_block.header_hash, full_block)
header_hash = bytes.fromhex(row[0])
full_block: FullBlock = FullBlock.from_bytes(row[1])
all_blocks[header_hash] = full_block
self.block_cache.put(header_hash, full_block)
ret: List[FullBlock] = []
for hh in header_hashes:
if hh not in all_blocks:

View File

@ -1727,7 +1727,7 @@ class FullNode:
new_block = dataclasses.replace(block, challenge_chain_ip_proof=vdf_proof)
assert new_block is not None
async with self.db_wrapper.lock:
await self.block_store.add_full_block(new_block, block_record)
await self.block_store.add_full_block(new_block.header_hash, new_block, block_record)
await self.block_store.db_wrapper.commit_transaction()
async def respond_compact_proof_of_time(self, request: timelord_protocol.RespondCompactProofOfTime):

View File

@ -51,8 +51,8 @@ class TestBlockStore:
await bc.receive_block(block)
block_record = bc.block_record(block.header_hash)
block_record_hh = block_record.header_hash
await store.add_full_block(block, block_record)
await store.add_full_block(block, block_record)
await store.add_full_block(block.header_hash, block, block_record)
await store.add_full_block(block.header_hash, block, block_record)
assert block == await store.get_full_block(block.header_hash)
assert block == await store.get_full_block(block.header_hash)
assert block_record == (await store.get_block_record(block_record_hh))
@ -115,7 +115,11 @@ class TestBlockStore:
for i in range(10000):
rand_i = random.randint(0, 9)
if random.random() < 0.5:
tasks.append(asyncio.create_task(store.add_full_block(blocks[rand_i], block_records[rand_i])))
tasks.append(
asyncio.create_task(
store.add_full_block(blocks[rand_i].header_hash, blocks[rand_i], block_records[rand_i])
)
)
if random.random() < 0.5:
tasks.append(asyncio.create_task(store.get_full_block(blocks[rand_i].header_hash)))
await asyncio.gather(*tasks)