drop support for database schema v1 (#15895)

* don't run tests with database schema v1

* remove support for database schema v1 from BlockStore

* remove support for database schema v1 from CoinStore

* remove support for database schema v1 from HintStore

* remove support for v1 BlockStore schema from blockchain reorg logic

* remove support for database schema v1 from BlockHeightMap

* run block store tests both with and without the cache

* add test with empty blockchain for BlockHeightMap

* fix typo
This commit is contained in:
Arvid Norberg 2023-08-03 21:18:33 +02:00 committed by GitHub
parent 0555ea2116
commit 1d8d476056
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 360 additions and 625 deletions

View File

@ -152,7 +152,7 @@ class SpendSim:
else:
uri = f"file:{db_path}"
self.db_wrapper = await DBWrapper2.create(database=uri, uri=True, reader_count=1)
self.db_wrapper = await DBWrapper2.create(database=uri, uri=True, reader_count=1, db_version=2)
self.coin_store = await CoinStore.create(self.db_wrapper)
self.mempool_manager = MempoolManager(self.coin_store.get_coin_record, defaults)
@ -205,13 +205,13 @@ class SpendSim:
coins = set()
async with self.db_wrapper.reader_no_transaction() as conn:
cursor = await conn.execute(
"SELECT * from coin_record WHERE coinbase=0 AND spent=0 ",
"SELECT puzzle_hash,coin_parent,amount from coin_record WHERE coinbase=0 AND spent_index==0 ",
)
rows = await cursor.fetchall()
await cursor.close()
for row in rows:
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
coin = Coin(bytes32(row[1]), bytes32(row[0]), uint64.from_bytes(row[2]))
coins.add(coin)
return list(coins)

View File

@ -901,22 +901,9 @@ class Blockchain(BlockchainInterface):
):
# We are not in a reorg, no need to look up alternate header hashes
# (we can get them from height_to_hash)
if self.block_store.db_wrapper.db_version == 2:
# in the v2 database, we can look up blocks by height directly
# (as long as we're in the main chain)
result = await self.block_store.get_generators_at(block.transactions_generator_ref_list)
else:
for ref_height in block.transactions_generator_ref_list:
header_hash = self.height_to_hash(ref_height)
# if ref_height is invalid, this block should have failed with
# FUTURE_GENERATOR_REFS before getting here
assert header_hash is not None
ref_gen = await self.block_store.get_generator(header_hash)
if ref_gen is None:
raise ValueError(Err.GENERATOR_REF_HAS_NO_GENERATOR)
result.append(ref_gen)
# in the v2 database, we can look up blocks by height directly
# (as long as we're in the main chain)
result = await self.block_store.get_generators_at(block.transactions_generator_ref_list)
else:
# First tries to find the blocks in additional_blocks
reorg_chain: Dict[uint32, FullBlock] = {}

View File

@ -52,6 +52,8 @@ class BlockHeightMap:
@classmethod
async def create(cls, blockchain_dir: Path, db: DBWrapper2) -> "BlockHeightMap":
if db.db_version != 2:
raise RuntimeError(f"BlockHeightMap does not support database schema v{db.db_version}")
self = BlockHeightMap()
self.db = db
@ -62,26 +64,18 @@ class BlockHeightMap:
self.__ses_filename = blockchain_dir / "sub-epoch-summaries"
async with self.db.reader_no_transaction() as conn:
if db.db_version == 2:
async with conn.execute("SELECT hash FROM current_peak WHERE key = 0") as cursor:
peak_row = await cursor.fetchone()
if peak_row is None:
return self
async with conn.execute("SELECT hash FROM current_peak WHERE key = 0") as cursor:
peak_row = await cursor.fetchone()
if peak_row is None:
return self
async with conn.execute(
"SELECT header_hash,prev_hash,height,sub_epoch_summary FROM full_blocks WHERE header_hash=?",
(peak_row[0],),
) as cursor:
row = await cursor.fetchone()
if row is None:
return self
else:
async with await conn.execute(
"SELECT header_hash,prev_hash,height,sub_epoch_summary from block_records WHERE is_peak=1"
) as cursor:
row = await cursor.fetchone()
if row is None:
return self
async with conn.execute(
"SELECT header_hash,prev_hash,height,sub_epoch_summary FROM full_blocks WHERE header_hash=?",
(peak_row[0],),
) as cursor:
row = await cursor.fetchone()
if row is None:
return self
try:
async with aiofiles.open(self.__height_to_hash_filename, "rb") as f:
@ -97,14 +91,8 @@ class BlockHeightMap:
# it's OK if this file doesn't exist, we can rebuild it
pass
peak: bytes32
prev_hash: bytes32
if db.db_version == 2:
peak = row[0]
prev_hash = row[1]
else:
peak = bytes32.fromhex(row[0])
prev_hash = bytes32.fromhex(row[1])
peak: bytes32 = row[0]
prev_hash: bytes32 = row[1]
height = row[2]
# allocate memory for height to hash map
@ -161,28 +149,18 @@ class BlockHeightMap:
# load 5000 blocks at a time
window_end = max(0, height - 5000)
if self.db.db_version == 2:
query = (
"SELECT header_hash,prev_hash,height,sub_epoch_summary from full_blocks "
"INDEXED BY height WHERE height>=? AND height <?"
)
else:
query = (
"SELECT header_hash,prev_hash,height,sub_epoch_summary from block_records "
"INDEXED BY height WHERE height>=? AND height <?"
)
query = (
"SELECT header_hash,prev_hash,height,sub_epoch_summary from full_blocks "
"INDEXED BY height WHERE height>=? AND height <?"
)
async with self.db.reader_no_transaction() as conn:
async with conn.execute(query, (window_end, height)) as cursor:
# maps block-hash -> (height, prev-hash, sub-epoch-summary)
ordered: Dict[bytes32, Tuple[uint32, bytes32, Optional[bytes]]] = {}
if self.db.db_version == 2:
for r in await cursor.fetchall():
ordered[r[0]] = (r[2], r[1], r[3])
else:
for r in await cursor.fetchall():
ordered[bytes32.fromhex(r[0])] = (r[2], bytes32.fromhex(r[1]), r[3])
for r in await cursor.fetchall():
ordered[r[0]] = (r[2], r[1], r[3])
while height > window_end:
if prev_hash not in ordered:

View File

@ -4,7 +4,7 @@ import dataclasses
import logging
import sqlite3
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
import typing_extensions
import zstd
@ -137,148 +137,99 @@ class BlockStore:
ses_challenge_cache: LRUCache[bytes32, List[SubEpochChallengeSegment]]
@classmethod
async def create(cls, db_wrapper: DBWrapper2) -> BlockStore:
self = cls(LRUCache(1000), db_wrapper, LRUCache(50))
async def create(cls, db_wrapper: DBWrapper2, *, use_cache: bool = True) -> BlockStore:
if db_wrapper.db_version != 2:
raise RuntimeError(f"BlockStore does not support database schema v{db_wrapper.db_version}")
if use_cache:
self = cls(LRUCache(1000), db_wrapper, LRUCache(50))
else:
self = cls(LRUCache(0), db_wrapper, LRUCache(0))
async with self.db_wrapper.writer_maybe_transaction() as conn:
log.info("DB: Creating block store tables and indexes.")
if self.db_wrapper.db_version == 2:
# TODO: most data in block is duplicated in block_record. The only
# reason for this is that our parsing of a FullBlock is so slow,
# it's faster to store duplicate data to parse less when we just
# need the BlockRecord. Once we fix the parsing (and data structure)
# of FullBlock, this can use less space
await conn.execute(
"CREATE TABLE IF NOT EXISTS full_blocks("
"header_hash blob PRIMARY KEY,"
"prev_hash blob,"
"height bigint,"
"sub_epoch_summary blob,"
"is_fully_compactified tinyint,"
"in_main_chain tinyint,"
"block blob,"
"block_record blob)"
)
# TODO: most data in block is duplicated in block_record. The only
# reason for this is that our parsing of a FullBlock is so slow,
# it's faster to store duplicate data to parse less when we just
# need the BlockRecord. Once we fix the parsing (and data structure)
# of FullBlock, this can use less space
await conn.execute(
"CREATE TABLE IF NOT EXISTS full_blocks("
"header_hash blob PRIMARY KEY,"
"prev_hash blob,"
"height bigint,"
"sub_epoch_summary blob,"
"is_fully_compactified tinyint,"
"in_main_chain tinyint,"
"block blob,"
"block_record blob)"
)
# This is a single-row table containing the hash of the current
# peak. The "key" field is there to make update statements simple
await conn.execute("CREATE TABLE IF NOT EXISTS current_peak(key int PRIMARY KEY, hash blob)")
# This is a single-row table containing the hash of the current
# peak. The "key" field is there to make update statements simple
await conn.execute("CREATE TABLE IF NOT EXISTS current_peak(key int PRIMARY KEY, hash blob)")
# If any of these indices are altered, they should also be altered
# in the chia/cmds/db_upgrade.py file
log.info("DB: Creating index height")
await conn.execute("CREATE INDEX IF NOT EXISTS height on full_blocks(height)")
# If any of these indices are altered, they should also be altered
# in the chia/cmds/db_upgrade.py file
log.info("DB: Creating index height")
await conn.execute("CREATE INDEX IF NOT EXISTS height on full_blocks(height)")
# Sub epoch segments for weight proofs
await conn.execute(
"CREATE TABLE IF NOT EXISTS sub_epoch_segments_v3("
"ses_block_hash blob PRIMARY KEY,"
"challenge_segments blob)"
)
# Sub epoch segments for weight proofs
await conn.execute(
"CREATE TABLE IF NOT EXISTS sub_epoch_segments_v3("
"ses_block_hash blob PRIMARY KEY,"
"challenge_segments blob)"
)
# If any of these indices are altered, they should also be altered
# in the chia/cmds/db_upgrade.py file
log.info("DB: Creating index is_fully_compactified")
await conn.execute(
"CREATE INDEX IF NOT EXISTS is_fully_compactified ON"
" full_blocks(is_fully_compactified, in_main_chain) WHERE in_main_chain=1"
)
log.info("DB: Creating index main_chain")
await conn.execute(
"CREATE INDEX IF NOT EXISTS main_chain ON full_blocks(height, in_main_chain) WHERE in_main_chain=1"
)
else:
await conn.execute(
"CREATE TABLE IF NOT EXISTS full_blocks(header_hash text PRIMARY KEY, height bigint,"
" is_block tinyint, is_fully_compactified tinyint, block blob)"
)
# Block records
await conn.execute(
"CREATE TABLE IF NOT EXISTS block_records(header_hash "
"text PRIMARY KEY, prev_hash text, height bigint,"
"block blob, sub_epoch_summary blob, is_peak tinyint, is_block tinyint)"
)
# Sub epoch segments for weight proofs
await conn.execute(
"CREATE TABLE IF NOT EXISTS sub_epoch_segments_v3(ses_block_hash text PRIMARY KEY,"
"challenge_segments blob)"
)
# Height index so we can look up in order of height for sync purposes
log.info("DB: Creating index full_block_height")
await conn.execute("CREATE INDEX IF NOT EXISTS full_block_height on full_blocks(height)")
log.info("DB: Creating index is_fully_compactified")
await conn.execute(
"CREATE INDEX IF NOT EXISTS is_fully_compactified on full_blocks(is_fully_compactified)"
)
log.info("DB: Creating index height")
await conn.execute("CREATE INDEX IF NOT EXISTS height on block_records(height)")
log.info("DB: Creating index peak")
await conn.execute("CREATE INDEX IF NOT EXISTS peak on block_records(is_peak)")
# If any of these indices are altered, they should also be altered
# in the chia/cmds/db_upgrade.py file
log.info("DB: Creating index is_fully_compactified")
await conn.execute(
"CREATE INDEX IF NOT EXISTS is_fully_compactified ON"
" full_blocks(is_fully_compactified, in_main_chain) WHERE in_main_chain=1"
)
log.info("DB: Creating index main_chain")
await conn.execute(
"CREATE INDEX IF NOT EXISTS main_chain ON full_blocks(height, in_main_chain) WHERE in_main_chain=1"
)
return self
def maybe_from_hex(self, field: Union[bytes, str]) -> bytes32:
if self.db_wrapper.db_version == 2:
assert isinstance(field, bytes)
return bytes32(field)
else:
assert isinstance(field, str)
return bytes32.fromhex(field)
assert isinstance(field, bytes)
return bytes32(field)
def maybe_to_hex(self, field: bytes) -> Any:
if self.db_wrapper.db_version == 2:
return field
else:
return field.hex()
return field
def compress(self, block: FullBlock) -> bytes:
ret: bytes = zstd.compress(bytes(block))
return ret
def maybe_decompress(self, block_bytes: bytes) -> FullBlock:
if self.db_wrapper.db_version == 2:
ret: FullBlock = FullBlock.from_bytes(zstd.decompress(block_bytes))
else:
ret = FullBlock.from_bytes(block_bytes)
ret: FullBlock = FullBlock.from_bytes(zstd.decompress(block_bytes))
return ret
def maybe_decompress_blob(self, block_bytes: bytes) -> bytes:
if self.db_wrapper.db_version == 2:
ret: bytes = zstd.decompress(block_bytes)
return ret
else:
return block_bytes
ret: bytes = zstd.decompress(block_bytes)
return ret
async def rollback(self, height: int) -> None:
if self.db_wrapper.db_version == 2:
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute(
"UPDATE full_blocks SET in_main_chain=0 WHERE height>? AND in_main_chain=1", (height,)
)
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute("UPDATE full_blocks SET in_main_chain=0 WHERE height>? AND in_main_chain=1", (height,))
async def set_in_chain(self, header_hashes: List[Tuple[bytes32]]) -> None:
if self.db_wrapper.db_version == 2:
async with self.db_wrapper.writer_maybe_transaction() as conn:
async with await conn.executemany(
"UPDATE full_blocks SET in_main_chain=1 WHERE header_hash=?", header_hashes
) as cursor:
if cursor.rowcount != len(header_hashes):
raise RuntimeError(f"The blockchain database is corrupt. All of {header_hashes} should exist")
async with self.db_wrapper.writer_maybe_transaction() as conn:
async with await conn.executemany(
"UPDATE full_blocks SET in_main_chain=1 WHERE header_hash=?", header_hashes
) as cursor:
if cursor.rowcount != len(header_hashes):
raise RuntimeError(f"The blockchain database is corrupt. All of {header_hashes} should exist")
async def replace_proof(self, header_hash: bytes32, block: FullBlock) -> None:
assert header_hash == block.header_hash
block_bytes: bytes
if self.db_wrapper.db_version == 2:
block_bytes = self.compress(block)
else:
block_bytes = bytes(block)
block_bytes: bytes = self.compress(block)
self.block_cache.put(header_hash, block)
@ -296,56 +247,25 @@ class BlockStore:
self.block_cache.put(header_hash, block)
block_record_db: BlockRecordDB = BlockRecordDB.from_block_record(block_record)
if self.db_wrapper.db_version == 2:
ses: Optional[bytes] = (
None
if block_record.sub_epoch_summary_included is None
else bytes(block_record.sub_epoch_summary_included)
ses: Optional[bytes] = (
None if block_record.sub_epoch_summary_included is None else bytes(block_record.sub_epoch_summary_included)
)
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute(
"INSERT OR IGNORE INTO full_blocks VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
(
header_hash,
block.prev_header_hash,
block.height,
ses,
int(block.is_fully_compactified()),
False, # in_main_chain
self.compress(block),
bytes(block_record_db),
),
)
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute(
"INSERT OR IGNORE INTO full_blocks VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
(
header_hash,
block.prev_header_hash,
block.height,
ses,
int(block.is_fully_compactified()),
False, # in_main_chain
self.compress(block),
bytes(block_record_db),
),
)
else:
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute(
"INSERT OR IGNORE INTO full_blocks VALUES(?, ?, ?, ?, ?)",
(
header_hash.hex(),
block.height,
int(block.is_transaction_block()),
int(block.is_fully_compactified()),
bytes(block),
),
)
await conn.execute(
"INSERT OR IGNORE INTO block_records VALUES(?, ?, ?, ?,?, ?, ?)",
(
header_hash.hex(),
block.prev_header_hash.hex(),
block.height,
bytes(block_record_db),
None
if block_record.sub_epoch_summary_included is None
else bytes(block_record.sub_epoch_summary_included),
False,
block.is_transaction_block(),
),
)
async def persist_sub_epoch_challenge_segments(
self, ses_block_hash: bytes32, segments: List[SubEpochChallengeSegment]
) -> None:
@ -409,10 +329,7 @@ class BlockStore:
) as cursor:
row = await cursor.fetchone()
if row is not None:
if self.db_wrapper.db_version == 2:
ret: bytes = zstd.decompress(row[0])
else:
ret = row[0]
ret: bytes = zstd.decompress(row[0])
return ret
return None
@ -441,10 +358,7 @@ class BlockStore:
row = await execute_fetchone(conn, formatted_str, (self.maybe_to_hex(header_hash),))
if row is None:
return None
if self.db_wrapper.db_version == 2:
block_bytes = zstd.decompress(row[0])
else:
block_bytes = row[0]
block_bytes = zstd.decompress(row[0])
try:
return block_info_from_block(block_bytes)
@ -468,10 +382,7 @@ class BlockStore:
row = await execute_fetchone(conn, formatted_str, (self.maybe_to_hex(header_hash),))
if row is None:
return None
if self.db_wrapper.db_version == 2:
block_bytes = zstd.decompress(row[0])
else:
block_bytes = row[0]
block_bytes = zstd.decompress(row[0])
try:
return generator_from_block(block_bytes)
@ -484,8 +395,6 @@ class BlockStore:
return b.transactions_generator
async def get_generators_at(self, heights: List[uint32]) -> List[SerializedProgram]:
assert self.db_wrapper.db_version == 2
if len(heights) == 0:
return []
@ -523,43 +432,19 @@ class BlockStore:
return []
all_blocks: Dict[bytes32, BlockRecord] = {}
if self.db_wrapper.db_version == 2:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT header_hash,block_record,block FROM full_blocks "
f'WHERE header_hash in ({"?," * (len(header_hashes) - 1)}?)',
header_hashes,
) as cursor:
for row in await cursor.fetchall():
header_hash = bytes32(row[0])
block_rec_db: BlockRecordDB = BlockRecordDB.from_bytes(row[1])
plot_filter_info: PlotFilterInfo = plot_filter_info_from_block(zstd.decompress(row[2]))
all_blocks[block_rec_db.header_hash] = block_rec_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
else:
all_blocks_plot_filters: Dict[bytes32, PlotFilterInfo] = {}
formatted_str = (
f'SELECT header_hash,block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes) - 1)}?)'
)
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(formatted_str, [hh.hex() for hh in header_hashes]) as cursor:
for row in await cursor.fetchall():
header_hash = bytes32.fromhex(row[0])
all_blocks_plot_filters[header_hash] = plot_filter_info_from_block(row[1])
formatted_str = f'SELECT block from block_records WHERE header_hash in ({"?," * (len(header_hashes) - 1)}?)'
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(formatted_str, [hh.hex() for hh in header_hashes]) as cursor:
for row in await cursor.fetchall():
block_rec_db = BlockRecordDB.from_bytes(row[0])
assert block_rec_db.header_hash in all_blocks_plot_filters
plot_filter_info = all_blocks_plot_filters[block_rec_db.header_hash]
all_blocks[block_rec_db.header_hash] = block_rec_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT header_hash,block_record,block FROM full_blocks "
f'WHERE header_hash in ({"?," * (len(header_hashes) - 1)}?)',
header_hashes,
) as cursor:
for row in await cursor.fetchall():
block_rec_db: BlockRecordDB = BlockRecordDB.from_bytes(row[1])
plot_filter_info: PlotFilterInfo = plot_filter_info_from_block(zstd.decompress(row[2]))
all_blocks[block_rec_db.header_hash] = block_rec_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
ret: List[BlockRecord] = []
for hh in header_hashes:
@ -578,17 +463,12 @@ class BlockStore:
return []
assert len(header_hashes) < self.db_wrapper.host_parameter_limit
header_hashes_db: Sequence[Union[bytes32, str]]
if self.db_wrapper.db_version == 2:
header_hashes_db = header_hashes
else:
header_hashes_db = [hh.hex() for hh in header_hashes]
formatted_str = (
f'SELECT header_hash, block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes_db) - 1)}?)'
f'SELECT header_hash, block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes) - 1)}?)'
)
all_blocks: Dict[bytes32, bytes] = {}
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(formatted_str, header_hashes_db) as cursor:
async with conn.execute(formatted_str, header_hashes) as cursor:
for row in await cursor.fetchall():
header_hash = self.maybe_from_hex(row[0])
all_blocks[header_hash] = self.maybe_decompress_blob(row[1])
@ -611,17 +491,12 @@ class BlockStore:
if len(header_hashes) == 0:
return []
header_hashes_db: Sequence[Union[bytes32, str]]
if self.db_wrapper.db_version == 2:
header_hashes_db = header_hashes
else:
header_hashes_db = [hh.hex() for hh in header_hashes]
formatted_str = (
f'SELECT header_hash, block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes_db) - 1)}?)'
f'SELECT header_hash, block from full_blocks WHERE header_hash in ({"?," * (len(header_hashes) - 1)}?)'
)
all_blocks: Dict[bytes32, FullBlock] = {}
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(formatted_str, header_hashes_db) as cursor:
async with conn.execute(formatted_str, header_hashes) as cursor:
for row in await cursor.fetchall():
header_hash = self.maybe_from_hex(row[0])
full_block: FullBlock = self.maybe_decompress(row[1])
@ -635,35 +510,16 @@ class BlockStore:
return ret
async def get_block_record(self, header_hash: bytes32) -> Optional[BlockRecord]:
if self.db_wrapper.db_version == 2:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT block_record,block FROM full_blocks WHERE header_hash=?",
(header_hash,),
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
block_record_db = BlockRecordDB.from_bytes(row[0])
block_bytes = zstd.decompress(row[1])
else:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"""
SELECT block_records.block,full_blocks.block
FROM block_records JOIN full_blocks ON block_records.header_hash = full_blocks.header_hash
WHERE block_records.header_hash = ?
""",
(header_hash.hex(),),
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
block_record_db = BlockRecordDB.from_bytes(row[0])
block_bytes = row[1]
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT block_record,block FROM full_blocks WHERE header_hash=?",
(header_hash,),
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
block_record_db = BlockRecordDB.from_bytes(row[0])
block_bytes = zstd.decompress(row[1])
plot_filter_info = plot_filter_info_from_block(block_bytes)
block_record = block_record_db.to_block_record(
@ -683,45 +539,20 @@ class BlockStore:
"""
ret: Dict[bytes32, BlockRecord] = {}
if self.db_wrapper.db_version == 2:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT header_hash, block_record,block FROM full_blocks WHERE height >= ? AND height <= ?",
(start, stop),
) as cursor:
for row in await cursor.fetchall():
header_hash = bytes32(row[0])
block_record_db: BlockRecordDB = BlockRecordDB.from_bytes(row[1])
plot_filter_info: PlotFilterInfo = plot_filter_info_from_block(zstd.decompress(row[2]))
block_record = block_record_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
ret[header_hash] = block_record
else:
formatted_str = f"""
SELECT
block_records.header_hash AS header_hash_br,
full_blocks.header_hash AS header_hash_fb,
block_records.block AS block_br,
full_blocks.block AS block_fb
FROM block_records INNER JOIN full_blocks ON block_records.header_hash = full_blocks.header_hash
WHERE
block_records.height >= {start} and block_records.height <= {stop}
and full_blocks.height >= {start} and full_blocks.height <= {stop}
"""
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(formatted_str) as cursor:
for row in await cursor.fetchall():
header_hash = self.maybe_from_hex(row[0])
block_record_db = BlockRecordDB.from_bytes(row[2])
plot_filter_info = plot_filter_info_from_block(row[3])
block_record = block_record_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
ret[header_hash] = block_record
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT header_hash, block_record,block FROM full_blocks WHERE height >= ? AND height <= ?",
(start, stop),
) as cursor:
for row in await cursor.fetchall():
header_hash = bytes32(row[0])
block_record_db: BlockRecordDB = BlockRecordDB.from_bytes(row[1])
plot_filter_info: PlotFilterInfo = plot_filter_info_from_block(zstd.decompress(row[2]))
block_record = block_record_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
ret[header_hash] = block_record
return ret
@ -781,48 +612,20 @@ class BlockStore:
return {}, None
ret: Dict[bytes32, BlockRecord] = {}
if self.db_wrapper.db_version == 2:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT header_hash, block_record,block FROM full_blocks WHERE height >= ?",
(peak[1] - blocks_n,),
) as cursor:
for row in await cursor.fetchall():
header_hash = bytes32(row[0])
block_record_db: BlockRecordDB = BlockRecordDB.from_bytes(row[1])
plot_filter_info: PlotFilterInfo = plot_filter_info_from_block(zstd.decompress(row[2]))
block_record = block_record_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
ret[header_hash] = block_record
else:
height = peak[1] - blocks_n
formatted_str = f"""
SELECT
block_records.header_hash,
full_blocks.header_hash,
block_records.block,
full_blocks.block
FROM block_records INNER JOIN full_blocks ON block_records.header_hash = full_blocks.header_hash
WHERE
block_records.height >= {height}
AND full_blocks.height >= {height}
"""
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(formatted_str) as cursor:
for row in await cursor.fetchall():
header_hash = self.maybe_from_hex(row[0])
block_record_db = BlockRecordDB.from_bytes(row[2])
plot_filter_info = plot_filter_info_from_block(row[3])
block_record = block_record_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
ret[header_hash] = block_record
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT header_hash, block_record,block FROM full_blocks WHERE height >= ?",
(peak[1] - blocks_n,),
) as cursor:
for row in await cursor.fetchall():
header_hash = bytes32(row[0])
block_record_db: BlockRecordDB = BlockRecordDB.from_bytes(row[1])
plot_filter_info: PlotFilterInfo = plot_filter_info_from_block(zstd.decompress(row[2]))
block_record = block_record_db.to_block_record(
plot_filter_info.pos_ss_cc_challenge_hash,
plot_filter_info.cc_sp_hash,
)
ret[header_hash] = block_record
return ret, peak[0]
@ -830,17 +633,9 @@ class BlockStore:
# We need to be in a sqlite transaction here.
# Note: we do not commit this to the database yet, as we need to also change the coin store
if self.db_wrapper.db_version == 2:
# Note: we use the key field as 0 just to ensure all inserts replace the existing row
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute("INSERT OR REPLACE INTO current_peak VALUES(?, ?)", (0, header_hash))
else:
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute("UPDATE block_records SET is_peak=0 WHERE is_peak=1")
await conn.execute(
"UPDATE block_records SET is_peak=1 WHERE header_hash=?",
(self.maybe_to_hex(header_hash),),
)
# Note: we use the key field as 0 just to ensure all inserts replace the existing row
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.execute("INSERT OR REPLACE INTO current_peak VALUES(?, ?)", (0, header_hash))
async def is_fully_compactified(self, header_hash: bytes32) -> Optional[bool]:
async with self.db_wrapper.writer_maybe_transaction() as conn:
@ -853,40 +648,24 @@ class BlockStore:
return bool(row[0])
async def get_random_not_compactified(self, number: int) -> List[int]:
if self.db_wrapper.db_version == 2:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
f"SELECT height FROM full_blocks WHERE in_main_chain=1 AND is_fully_compactified=0 "
f"ORDER BY RANDOM() LIMIT {number}"
) as cursor:
rows = await cursor.fetchall()
else:
# Since orphan blocks do not get compactified, we need to check whether all blocks with a
# certain height are not compact. And if we do have compact orphan blocks, then all that
# happens is that the occasional chain block stays uncompact - not ideal, but harmless.
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
f"SELECT height FROM full_blocks GROUP BY height HAVING sum(is_fully_compactified)=0 "
f"ORDER BY RANDOM() LIMIT {number}"
) as cursor:
rows = await cursor.fetchall()
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
f"SELECT height FROM full_blocks WHERE in_main_chain=1 AND is_fully_compactified=0 "
f"ORDER BY RANDOM() LIMIT {number}"
) as cursor:
rows = await cursor.fetchall()
heights = [int(row[0]) for row in rows]
return heights
async def count_compactified_blocks(self) -> int:
if self.db_wrapper.db_version == 2:
# DB V2 has an index on is_fully_compactified only for blocks in the main chain
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"select count(*) from full_blocks where is_fully_compactified=1 and in_main_chain=1"
) as cursor:
row = await cursor.fetchone()
else:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute("select count(*) from full_blocks where is_fully_compactified=1") as cursor:
row = await cursor.fetchone()
# DB V2 has an index on is_fully_compactified only for blocks in the main chain
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"select count(*) from full_blocks where is_fully_compactified=1 and in_main_chain=1"
) as cursor:
row = await cursor.fetchone()
assert row is not None
@ -894,17 +673,12 @@ class BlockStore:
return int(count)
async def count_uncompactified_blocks(self) -> int:
if self.db_wrapper.db_version == 2:
# DB V2 has an index on is_fully_compactified only for blocks in the main chain
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"select count(*) from full_blocks where is_fully_compactified=0 and in_main_chain=1"
) as cursor:
row = await cursor.fetchone()
else:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute("select count(*) from full_blocks where is_fully_compactified=0") as cursor:
row = await cursor.fetchone()
# DB V2 has an index on is_fully_compactified only for blocks in the main chain
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"select count(*) from full_blocks where is_fully_compactified=0 and in_main_chain=1"
) as cursor:
row = await cursor.fetchone()
assert row is not None

View File

@ -33,42 +33,25 @@ class CoinStore:
@classmethod
async def create(cls, db_wrapper: DBWrapper2) -> CoinStore:
if db_wrapper.db_version != 2:
raise RuntimeError(f"CoinStore does not support database schema v{db_wrapper.db_version}")
self = CoinStore(db_wrapper, LRUCache(100))
async with self.db_wrapper.writer_maybe_transaction() as conn:
log.info("DB: Creating coin store tables and indexes.")
if self.db_wrapper.db_version == 2:
# the coin_name is unique in this table because the CoinStore always
# only represent a single peak
await conn.execute(
"CREATE TABLE IF NOT EXISTS coin_record("
"coin_name blob PRIMARY KEY,"
" confirmed_index bigint,"
" spent_index bigint," # if this is zero, it means the coin has not been spent
" coinbase int,"
" puzzle_hash blob,"
" coin_parent blob,"
" amount blob," # we use a blob of 8 bytes to store uint64
" timestamp bigint)"
)
else:
# the coin_name is unique in this table because the CoinStore always
# only represent a single peak
await conn.execute(
(
"CREATE TABLE IF NOT EXISTS coin_record("
"coin_name text PRIMARY KEY,"
" confirmed_index bigint,"
" spent_index bigint,"
" spent int,"
" coinbase int,"
" puzzle_hash text,"
" coin_parent text,"
" amount blob,"
" timestamp bigint)"
)
)
# the coin_name is unique in this table because the CoinStore always
# only represent a single peak
await conn.execute(
"CREATE TABLE IF NOT EXISTS coin_record("
"coin_name blob PRIMARY KEY,"
" confirmed_index bigint,"
" spent_index bigint," # if this is zero, it means the coin has not been spent
" coinbase int,"
" puzzle_hash blob,"
" coin_parent blob,"
" amount blob," # we use a blob of 8 bytes to store uint64
" timestamp bigint)"
)
# Useful for reorg lookups
log.info("DB: Creating index coin_confirmed_index")
@ -95,18 +78,11 @@ class CoinStore:
return 0
def maybe_from_hex(self, field: Union[bytes, str]) -> bytes32:
if self.db_wrapper.db_version == 2:
assert isinstance(field, bytes)
return bytes32(field)
else:
assert isinstance(field, str)
return bytes32.fromhex(field)
assert isinstance(field, bytes)
return bytes32(field)
def maybe_to_hex(self, field: bytes) -> Any:
if self.db_wrapper.db_version == 2:
return field
else:
return field.hex()
return field
async def new_block(
self,
@ -186,11 +162,7 @@ class CoinStore:
async with self.db_wrapper.reader_no_transaction() as conn:
cursors: List[Cursor] = []
for batch in to_batches(names, SQLITE_MAX_VARIABLE_NUMBER):
names_db: Tuple[Any, ...]
if self.db_wrapper.db_version == 2:
names_db = tuple(batch.entries)
else:
names_db = tuple(n.hex() for n in batch.entries)
names_db: Tuple[Any, ...] = tuple(batch.entries)
cursors.append(
await conn.execute(
f"SELECT confirmed_index, spent_index, coinbase, puzzle_hash, "
@ -297,10 +269,7 @@ class CoinStore:
coins = set()
puzzle_hashes_db: Tuple[Any, ...]
if self.db_wrapper.db_version == 2:
puzzle_hashes_db = tuple(puzzle_hashes)
else:
puzzle_hashes_db = tuple([ph.hex() for ph in puzzle_hashes])
puzzle_hashes_db = tuple(puzzle_hashes)
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
@ -327,11 +296,6 @@ class CoinStore:
return []
coins = set()
names_db: Tuple[Any, ...]
if self.db_wrapper.db_version == 2:
names_db = tuple(names)
else:
names_db = tuple([name.hex() for name in names])
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
@ -340,7 +304,7 @@ class CoinStore:
f'WHERE coin_name in ({"?," * (len(names) - 1)}?) '
f"AND confirmed_index>=? AND confirmed_index<? "
f"{'' if include_spent_coins else 'AND spent_index=0'}",
names_db + (start_height, end_height),
names + [start_height, end_height],
) as cursor:
for row in await cursor.fetchall():
coin = self.row_to_coin(row)
@ -372,11 +336,7 @@ class CoinStore:
coins: Set[CoinState] = set()
async with self.db_wrapper.reader_no_transaction() as conn:
for batch in to_batches(puzzle_hashes, SQLITE_MAX_VARIABLE_NUMBER):
puzzle_hashes_db: Tuple[Any, ...]
if self.db_wrapper.db_version == 2:
puzzle_hashes_db = tuple(batch.entries)
else:
puzzle_hashes_db = tuple([ph.hex() for ph in batch.entries])
puzzle_hashes_db: Tuple[Any, ...] = tuple(batch.entries)
async with conn.execute(
f"SELECT confirmed_index, spent_index, coinbase, puzzle_hash, "
f"coin_parent, amount, timestamp FROM coin_record INDEXED BY coin_puzzle_hash "
@ -408,11 +368,7 @@ class CoinStore:
coins = set()
async with self.db_wrapper.reader_no_transaction() as conn:
for batch in to_batches(parent_ids, SQLITE_MAX_VARIABLE_NUMBER):
parent_ids_db: Tuple[Any, ...]
if self.db_wrapper.db_version == 2:
parent_ids_db = tuple(batch.entries)
else:
parent_ids_db = tuple([pid.hex() for pid in batch.entries])
parent_ids_db: Tuple[Any, ...] = tuple(batch.entries)
async with conn.execute(
f"SELECT confirmed_index, spent_index, coinbase, puzzle_hash, coin_parent, amount, timestamp "
f'FROM coin_record WHERE coin_parent in ({"?," * (len(batch.entries) - 1)}?) '
@ -441,11 +397,7 @@ class CoinStore:
coins: List[CoinState] = []
async with self.db_wrapper.reader_no_transaction() as conn:
for batch in to_batches(coin_ids, SQLITE_MAX_VARIABLE_NUMBER):
coin_ids_db: Tuple[Any, ...]
if self.db_wrapper.db_version == 2:
coin_ids_db = tuple(batch.entries)
else:
coin_ids_db = tuple([pid.hex() for pid in batch.entries])
coin_ids_db: Tuple[Any, ...] = tuple(batch.entries)
max_height_sql = ""
if max_height != uint32.MAXIMUM:
@ -500,60 +452,32 @@ class CoinStore:
if record.name not in coin_changes:
coin_changes[record.name] = record
if self.db_wrapper.db_version == 2:
await conn.execute("UPDATE coin_record SET spent_index=0 WHERE spent_index>?", (block_index,))
else:
await conn.execute(
"UPDATE coin_record SET spent_index = 0, spent = 0 WHERE spent_index>?", (block_index,)
)
await conn.execute("UPDATE coin_record SET spent_index=0 WHERE spent_index>?", (block_index,))
self.coins_added_at_height_cache = LRUCache(self.coins_added_at_height_cache.capacity)
return list(coin_changes.values())
# Store CoinRecord in DB
async def _add_coin_records(self, records: List[CoinRecord]) -> None:
if self.db_wrapper.db_version == 2:
values2 = []
for record in records:
values2.append(
(
record.coin.name(),
record.confirmed_block_index,
record.spent_block_index,
int(record.coinbase),
record.coin.puzzle_hash,
record.coin.parent_coin_info,
bytes(uint64(record.coin.amount)),
record.timestamp,
)
values2 = []
for record in records:
values2.append(
(
record.coin.name(),
record.confirmed_block_index,
record.spent_block_index,
int(record.coinbase),
record.coin.puzzle_hash,
record.coin.parent_coin_info,
bytes(uint64(record.coin.amount)),
record.timestamp,
)
if len(values2) > 0:
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.executemany(
"INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
values2,
)
else:
values = []
for record in records:
values.append(
(
record.coin.name().hex(),
record.confirmed_block_index,
record.spent_block_index,
int(record.spent),
int(record.coinbase),
record.coin.puzzle_hash.hex(),
record.coin.parent_coin_info.hex(),
bytes(uint64(record.coin.amount)),
record.timestamp,
)
)
if len(values2) > 0:
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.executemany(
"INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
values2,
)
if len(values) > 0:
async with self.db_wrapper.writer_maybe_transaction() as conn:
await conn.executemany(
"INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)",
values,
)
# Update coin_record to be spent in DB
async def _set_spent(self, coin_names: List[bytes32], index: uint32) -> None:
@ -566,22 +490,13 @@ class CoinStore:
rows_updated: int = 0
for batch in to_batches(coin_names, SQLITE_MAX_VARIABLE_NUMBER):
name_params = ",".join(["?"] * len(batch.entries))
if self.db_wrapper.db_version == 2:
ret: Cursor = await conn.execute(
f"UPDATE coin_record INDEXED BY sqlite_autoindex_coin_record_1 "
f"SET spent_index={index} "
f"WHERE spent_index=0 "
f"AND coin_name IN ({name_params})",
batch.entries,
)
else:
ret = await conn.execute(
f"UPDATE coin_record INDEXED BY sqlite_autoindex_coin_record_1 "
f"SET spent=1, spent_index={index} "
f"WHERE spent_index=0 "
f"AND coin_name IN ({name_params})",
[name.hex() for name in batch.entries],
)
ret: Cursor = await conn.execute(
f"UPDATE coin_record INDEXED BY sqlite_autoindex_coin_record_1 "
f"SET spent_index={index} "
f"WHERE spent_index=0 "
f"AND coin_name IN ({name_params})",
batch.entries,
)
rows_updated += ret.rowcount
if rows_updated != len(coin_names):
raise ValueError(

View File

@ -19,16 +19,14 @@ class HintStore:
@classmethod
async def create(cls, db_wrapper: DBWrapper2) -> HintStore:
if db_wrapper.db_version != 2:
raise RuntimeError(f"HintStore does not support database schema v{db_wrapper.db_version}")
self = HintStore(db_wrapper)
async with self.db_wrapper.writer_maybe_transaction() as conn:
log.info("DB: Creating hint store tables and indexes.")
if self.db_wrapper.db_version == 2:
await conn.execute("CREATE TABLE IF NOT EXISTS hints(coin_id blob, hint blob, UNIQUE (coin_id, hint))")
else:
await conn.execute(
"CREATE TABLE IF NOT EXISTS hints(id INTEGER PRIMARY KEY AUTOINCREMENT, coin_id blob, hint blob)"
)
await conn.execute("CREATE TABLE IF NOT EXISTS hints(coin_id blob, hint blob, UNIQUE (coin_id, hint))")
log.info("DB: Creating index hint_index")
await conn.execute("CREATE INDEX IF NOT EXISTS hint_index on hints(hint)")
return self
@ -45,16 +43,10 @@ class HintStore:
return None
async with self.db_wrapper.writer_maybe_transaction() as conn:
if self.db_wrapper.db_version == 2:
cursor = await conn.executemany(
"INSERT OR IGNORE INTO hints VALUES(?, ?)",
coin_hint_list,
)
else:
cursor = await conn.executemany(
"INSERT INTO hints VALUES(?, ?, ?)",
[(None,) + record for record in coin_hint_list],
)
cursor = await conn.executemany(
"INSERT OR IGNORE INTO hints VALUES(?, ?)",
coin_hint_list,
)
await cursor.close()
async def count_hints(self) -> int:

View File

@ -164,7 +164,7 @@ def latest_db_version() -> int:
return 2
@pytest.fixture(scope="function", params=[1, 2])
@pytest.fixture(scope="function", params=[2])
def db_version(request) -> int:
return request.param

View File

@ -13,7 +13,7 @@ from chia.util.db_wrapper import DBWrapper2
async def create_ram_blockchain(consensus_constants: ConsensusConstants) -> Tuple[DBWrapper2, Blockchain]:
uri = f"file:db_{random.randint(0, 99999999)}?mode=memory&cache=shared"
db_wrapper = await DBWrapper2.create(database=uri, uri=True, reader_count=1)
db_wrapper = await DBWrapper2.create(database=uri, uri=True, reader_count=1, db_version=2)
block_store = await BlockStore.create(db_wrapper)
coin_store = await CoinStore.create(db_wrapper)
blockchain = await Blockchain.create(coin_store, block_store, consensus_constants, Path("."), 2)

View File

@ -6,9 +6,12 @@ import logging
import random
import sqlite3
from pathlib import Path
from typing import List
from typing import List, cast
import pytest
# TODO: update after resolution in https://github.com/pytest-dev/pytest/issues/7469
from _pytest.fixtures import SubRequest
from clvm.casts import int_to_bytes
from chia.consensus.blockchain import Blockchain
@ -22,6 +25,7 @@ from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.blockchain_format.vdf import VDFProof
from chia.types.full_block import FullBlock
from chia.util.db_wrapper import get_host_parameter_limit
from chia.util.full_block_utils import GeneratorBlockInfo
from chia.util.ints import uint8, uint32, uint64
from tests.blockchain.blockchain_test_utils import _validate_and_add_block
from tests.conftest import Mode
@ -30,8 +34,15 @@ from tests.util.db_connection import DBConnection
log = logging.getLogger(__name__)
@pytest.fixture(scope="function", params=[True, False])
def use_cache(request: SubRequest) -> bool:
return cast(bool, request.param)
@pytest.mark.asyncio
async def test_block_store(tmp_dir: Path, db_version: int, bt: BlockTools, consensus_mode: Mode) -> None:
async def test_block_store(
tmp_dir: Path, db_version: int, bt: BlockTools, consensus_mode: Mode, use_cache: bool
) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
@ -41,10 +52,10 @@ async def test_block_store(tmp_dir: Path, db_version: int, bt: BlockTools, conse
async with DBConnection(db_version) as db_wrapper, DBConnection(db_version) as db_wrapper_2:
# Use a different file for the blockchain
coin_store_2 = await CoinStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2, use_cache=use_cache)
bc = await Blockchain.create(coin_store_2, store_2, bt.constants, tmp_dir, 2)
store = await BlockStore.create(db_wrapper)
store = await BlockStore.create(db_wrapper, use_cache=use_cache)
await BlockStore.create(db_wrapper_2)
# Save/get block
@ -56,6 +67,11 @@ async def test_block_store(tmp_dir: Path, db_version: int, bt: BlockTools, conse
await store.add_full_block(block.header_hash, block, block_record)
assert block == await store.get_full_block(block.header_hash)
assert block == await store.get_full_block(block.header_hash)
assert bytes(block) == await store.get_full_block_bytes(block.header_hash)
assert GeneratorBlockInfo(
block.foliage.prev_block_hash, block.transactions_generator, block.transactions_generator_ref_list
) == await store.get_block_info(block.header_hash)
assert block.transactions_generator == await store.get_generator(block.header_hash)
assert block_record == (await store.get_block_record(block_record_hh))
await store.set_in_chain([(block_record.header_hash,)])
await store.set_peak(block_record.header_hash)
@ -86,7 +102,7 @@ async def test_block_store(tmp_dir: Path, db_version: int, bt: BlockTools, conse
@pytest.mark.asyncio
async def test_deadlock(tmp_dir: Path, db_version: int, bt: BlockTools, consensus_mode: Mode) -> None:
async def test_deadlock(tmp_dir: Path, db_version: int, bt: BlockTools, consensus_mode: Mode, use_cache: bool) -> None:
"""
This test was added because the store was deadlocking in certain situations, when fetching and
adding blocks repeatedly. The issue was patched.
@ -96,7 +112,7 @@ async def test_deadlock(tmp_dir: Path, db_version: int, bt: BlockTools, consensu
blocks = bt.get_consecutive_blocks(10)
async with DBConnection(db_version) as wrapper, DBConnection(db_version) as wrapper_2:
store = await BlockStore.create(wrapper)
store = await BlockStore.create(wrapper, use_cache=use_cache)
coin_store_2 = await CoinStore.create(wrapper_2)
store_2 = await BlockStore.create(wrapper_2)
bc = await Blockchain.create(coin_store_2, store_2, bt.constants, tmp_dir, 2)
@ -120,7 +136,7 @@ async def test_deadlock(tmp_dir: Path, db_version: int, bt: BlockTools, consensu
@pytest.mark.asyncio
async def test_rollback(bt: BlockTools, tmp_dir: Path, consensus_mode: Mode) -> None:
async def test_rollback(bt: BlockTools, tmp_dir: Path, consensus_mode: Mode, use_cache: bool) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
blocks = bt.get_consecutive_blocks(10)
@ -128,7 +144,7 @@ async def test_rollback(bt: BlockTools, tmp_dir: Path, consensus_mode: Mode) ->
async with DBConnection(2) as db_wrapper:
# Use a different file for the blockchain
coin_store = await CoinStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper, use_cache=use_cache)
bc = await Blockchain.create(coin_store, block_store, bt.constants, tmp_dir, 2)
# insert all blocks
@ -167,14 +183,16 @@ async def test_rollback(bt: BlockTools, tmp_dir: Path, consensus_mode: Mode) ->
@pytest.mark.asyncio
async def test_count_compactified_blocks(bt: BlockTools, tmp_dir: Path, db_version: int, consensus_mode: Mode) -> None:
async def test_count_compactified_blocks(
bt: BlockTools, tmp_dir: Path, db_version: int, consensus_mode: Mode, use_cache: bool
) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
blocks = bt.get_consecutive_blocks(10)
async with DBConnection(db_version) as db_wrapper:
coin_store = await CoinStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper, use_cache=use_cache)
bc = await Blockchain.create(coin_store, block_store, bt.constants, tmp_dir, 2)
count = await block_store.count_compactified_blocks()
@ -189,7 +207,7 @@ async def test_count_compactified_blocks(bt: BlockTools, tmp_dir: Path, db_versi
@pytest.mark.asyncio
async def test_count_uncompactified_blocks(
bt: BlockTools, tmp_dir: Path, db_version: int, consensus_mode: Mode
bt: BlockTools, tmp_dir: Path, db_version: int, consensus_mode: Mode, use_cache: bool
) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
@ -197,7 +215,7 @@ async def test_count_uncompactified_blocks(
async with DBConnection(db_version) as db_wrapper:
coin_store = await CoinStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper, use_cache=use_cache)
bc = await Blockchain.create(coin_store, block_store, bt.constants, tmp_dir, 2)
count = await block_store.count_uncompactified_blocks()
@ -211,7 +229,9 @@ async def test_count_uncompactified_blocks(
@pytest.mark.asyncio
async def test_replace_proof(bt: BlockTools, tmp_dir: Path, db_version: int, consensus_mode: Mode) -> None:
async def test_replace_proof(
bt: BlockTools, tmp_dir: Path, db_version: int, consensus_mode: Mode, use_cache: bool
) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
blocks = bt.get_consecutive_blocks(10)
@ -231,7 +251,7 @@ async def test_replace_proof(bt: BlockTools, tmp_dir: Path, db_version: int, con
async with DBConnection(db_version) as db_wrapper:
coin_store = await CoinStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper)
block_store = await BlockStore.create(db_wrapper, use_cache=use_cache)
bc = await Blockchain.create(coin_store, block_store, bt.constants, tmp_dir, 2)
for block in blocks:
await _validate_and_add_block(bc, block)
@ -259,7 +279,7 @@ async def test_replace_proof(bt: BlockTools, tmp_dir: Path, db_version: int, con
@pytest.mark.asyncio
async def test_get_generator(bt: BlockTools, db_version: int, consensus_mode: Mode) -> None:
async def test_get_generator(bt: BlockTools, db_version: int, consensus_mode: Mode, use_cache: bool) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
blocks = bt.get_consecutive_blocks(10)
@ -268,7 +288,7 @@ async def test_get_generator(bt: BlockTools, db_version: int, consensus_mode: Mo
return SerializedProgram.from_bytes(int_to_bytes(i))
async with DBConnection(db_version) as db_wrapper:
store = await BlockStore.create(db_wrapper)
store = await BlockStore.create(db_wrapper, use_cache=use_cache)
new_blocks = []
for i, block in enumerate(blocks):
@ -301,7 +321,9 @@ async def test_get_generator(bt: BlockTools, db_version: int, consensus_mode: Mo
@pytest.mark.asyncio
async def test_get_blocks_by_hash(tmp_dir: Path, bt: BlockTools, db_version: int, consensus_mode: Mode) -> None:
async def test_get_blocks_by_hash(
tmp_dir: Path, bt: BlockTools, db_version: int, consensus_mode: Mode, use_cache: bool
) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
assert sqlite3.threadsafety >= 1
@ -310,10 +332,10 @@ async def test_get_blocks_by_hash(tmp_dir: Path, bt: BlockTools, db_version: int
async with DBConnection(db_version) as db_wrapper, DBConnection(db_version) as db_wrapper_2:
# Use a different file for the blockchain
coin_store_2 = await CoinStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2, use_cache=use_cache)
bc = await Blockchain.create(coin_store_2, store_2, bt.constants, tmp_dir, 2)
store = await BlockStore.create(db_wrapper)
store = await BlockStore.create(db_wrapper, use_cache=use_cache)
await BlockStore.create(db_wrapper_2)
print("starting test")
@ -341,7 +363,9 @@ async def test_get_blocks_by_hash(tmp_dir: Path, bt: BlockTools, db_version: int
@pytest.mark.asyncio
async def test_get_block_bytes_in_range(tmp_dir: Path, bt: BlockTools, db_version: int, consensus_mode: Mode) -> None:
async def test_get_block_bytes_in_range(
tmp_dir: Path, bt: BlockTools, db_version: int, consensus_mode: Mode, use_cache: bool
) -> None:
if consensus_mode != Mode.PLAIN:
pytest.skip("only run in PLAIN mode to save time")
assert sqlite3.threadsafety >= 1
@ -350,7 +374,7 @@ async def test_get_block_bytes_in_range(tmp_dir: Path, bt: BlockTools, db_versio
async with DBConnection(db_version) as db_wrapper_2:
# Use a different file for the blockchain
coin_store_2 = await CoinStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2, use_cache=use_cache)
bc = await Blockchain.create(coin_store_2, store_2, bt.constants, tmp_dir, 2)
await BlockStore.create(db_wrapper_2)
@ -372,15 +396,15 @@ async def test_get_block_bytes_in_range(tmp_dir: Path, bt: BlockTools, db_versio
@pytest.mark.asyncio
async def test_get_plot_filer_info(
default_400_blocks: List[FullBlock], tmp_dir: Path, db_version: int, bt: BlockTools
default_400_blocks: List[FullBlock], tmp_dir: Path, db_version: int, bt: BlockTools, use_cache: bool
) -> None:
async with DBConnection(db_version) as db_wrapper, DBConnection(db_version) as db_wrapper_2:
# Use a different file for the blockchain
coin_store_2 = await CoinStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2)
store_2 = await BlockStore.create(db_wrapper_2, use_cache=use_cache)
bc = await Blockchain.create(coin_store_2, store_2, bt.constants, tmp_dir, 2)
store = await BlockStore.create(db_wrapper)
store = await BlockStore.create(db_wrapper, use_cache=use_cache)
blocks: List[FullBlock] = []
expected_cc_sp_hashes: List[bytes32] = []
for block in default_400_blocks:
@ -423,3 +447,10 @@ async def test_get_plot_filer_info(
block_record = block_records_dict[full_b.header_hash]
assert block_record.pos_ss_cc_challenge_hash == full_b.reward_chain_block.pos_ss_cc_challenge_hash
assert block_record.cc_sp_hash == expected_cc_sp
@pytest.mark.asyncio
async def test_unsupported_version(tmp_dir: Path, use_cache: bool) -> None:
with pytest.raises(RuntimeError, match="BlockStore does not support database schema v1"):
async with DBConnection(1) as db_wrapper:
await BlockStore.create(db_wrapper, use_cache=use_cache)

View File

@ -510,3 +510,10 @@ class TestCoinStoreWithBlocks:
# if the limit is very high, we should get all of them
assert len(await coin_store.get_coin_states_by_ids(True, coins, uint32(0), max_items=10000)) == 600
@pytest.mark.asyncio
async def test_unsupported_version(tmp_dir: Path) -> None:
with pytest.raises(RuntimeError, match="CoinStore does not support database schema v1"):
async with DBConnection(1) as db_wrapper:
await CoinStore.create(db_wrapper)

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import logging
from pathlib import Path
import pytest
from clvm.casts import int_to_bytes
@ -181,3 +182,10 @@ class TestHintStore:
assert len(await hint_store.get_coin_ids(hint, max_items=limit)) == limit
assert len(await hint_store.get_coin_ids(hint, max_items=10000)) == 200
@pytest.mark.asyncio
async def test_unsupported_version(tmp_dir: Path) -> None:
with pytest.raises(RuntimeError, match="HintStore does not support database schema v1"):
async with DBConnection(1) as db_wrapper:
await HintStore.create(db_wrapper)

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import struct
from pathlib import Path
from typing import Optional
import pytest
@ -407,3 +408,44 @@ class TestBlockHeightMap:
assert height_map.get_ses(6) == gen_ses(6)
with pytest.raises(KeyError) as _:
height_map.get_ses(8)
@pytest.mark.asyncio
async def test_unsupported_version(tmp_dir: Path) -> None:
with pytest.raises(RuntimeError, match="BlockHeightMap does not support database schema v1"):
async with DBConnection(1) as db_wrapper:
await BlockHeightMap.create(tmp_dir, db_wrapper)
@pytest.mark.asyncio
async def test_empty_chain(tmp_dir, db_version):
async with DBConnection(db_version) as db_wrapper:
await setup_db(db_wrapper)
height_map = await BlockHeightMap.create(tmp_dir, db_wrapper)
with pytest.raises(KeyError) as _:
height_map.get_ses(0)
with pytest.raises(AssertionError) as _:
height_map.get_hash(0)
@pytest.mark.asyncio
async def test_peak_only_chain(tmp_dir, db_version):
async with DBConnection(db_version) as db_wrapper:
await setup_db(db_wrapper)
async with db_wrapper.writer_maybe_transaction() as conn:
cursor = await conn.execute(
"INSERT OR REPLACE INTO current_peak VALUES(?, ?)", (0, b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
)
await cursor.close()
height_map = await BlockHeightMap.create(tmp_dir, db_wrapper)
with pytest.raises(KeyError) as _:
height_map.get_ses(0)
with pytest.raises(AssertionError) as _:
height_map.get_hash(0)

View File

@ -29,6 +29,7 @@ def rand_bytes(num) -> bytes:
class TestDbUpgrade:
@pytest.mark.asyncio
@pytest.mark.parametrize("with_hints", [True, False])
@pytest.mark.skip("we no longer support DB v1")
async def test_blocks(self, default_1000_blocks, with_hints: bool):
blocks = default_1000_blocks