add chia function to convert blockchain database to v2 (#9613)

This commit is contained in:
Arvid Norberg 2022-01-19 20:43:56 +01:00 committed by GitHub
parent 69fe8751ba
commit c780f0978c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 453 additions and 0 deletions

View File

@ -15,6 +15,7 @@ from chia.cmds.stop import stop_cmd
from chia.cmds.wallet import wallet_cmd
from chia.cmds.plotnft import plotnft_cmd
from chia.cmds.plotters import plotters_cmd
from chia.cmds.db import db_cmd
from chia.util.default_root import DEFAULT_KEYS_ROOT_PATH, DEFAULT_ROOT_PATH
from chia.util.keychain import (
Keychain,
@ -136,6 +137,7 @@ cli.add_command(stop_cmd)
cli.add_command(netspace_cmd)
cli.add_command(farm_cmd)
cli.add_command(plotters_cmd)
cli.add_command(db_cmd)
if supports_keyring_passphrase():
cli.add_command(passphrase_cmd)

37
chia/cmds/db.py Normal file
View File

@ -0,0 +1,37 @@
from pathlib import Path
import click
from chia.cmds.db_upgrade_func import db_upgrade_func
@click.group("db", short_help="Manage the blockchain database")
def db_cmd() -> None:
pass
@db_cmd.command("upgrade", short_help="EXPERIMENTAL: upgrade a v1 database to v2")
@click.option("--input", default=None, type=click.Path(), help="specify input database file")
@click.option("--output", default=None, type=click.Path(), help="specify output database file")
@click.option(
"--no-update-config",
default=False,
is_flag=True,
help="don't update config file to point to new database. When specifying a "
"custom output file, the config will not be updated regardless",
)
@click.pass_context
def db_upgrade_cmd(ctx: click.Context, no_update_config: bool, **kwargs) -> None:
in_db_path = kwargs.get("input")
out_db_path = kwargs.get("output")
db_upgrade_func(
Path(ctx.obj["root_path"]),
None if in_db_path is None else Path(in_db_path),
None if out_db_path is None else Path(out_db_path),
no_update_config,
)
if __name__ == "__main__":
from chia.util.default_root import DEFAULT_ROOT_PATH
db_upgrade_func(DEFAULT_ROOT_PATH)

View File

@ -0,0 +1,301 @@
from typing import Dict, Optional
from pathlib import Path
import sys
from time import time
import asyncio
import zstd
from chia.util.config import load_config, save_config
from chia.util.path import mkdir, path_from_root
from chia.full_node.block_store import BlockStore
from chia.full_node.coin_store import CoinStore
from chia.full_node.hint_store import HintStore
from chia.types.blockchain_format.sized_bytes import bytes32
# if either the input database or output database file is specified, the
# configuration file will not be updated to use the new database. Only when using
# the currently configured db file, and writing to the default output file will
# the configuration file also be updated
def db_upgrade_func(
root_path: Path,
in_db_path: Optional[Path] = None,
out_db_path: Optional[Path] = None,
no_update_config: bool = False,
):
update_config: bool = in_db_path is None and out_db_path is None and not no_update_config
config: Dict
selected_network: str
db_pattern: str
if in_db_path is None or out_db_path is None:
config = load_config(root_path, "config.yaml")["full_node"]
selected_network = config["selected_network"]
db_pattern = config["database_path"]
db_path_replaced: str
if in_db_path is None:
db_path_replaced = db_pattern.replace("CHALLENGE", selected_network)
in_db_path = path_from_root(root_path, db_path_replaced)
if out_db_path is None:
db_path_replaced = db_pattern.replace("CHALLENGE", selected_network).replace("_v1_", "_v2_")
out_db_path = path_from_root(root_path, db_path_replaced)
mkdir(out_db_path.parent)
asyncio.run(convert_v1_to_v2(in_db_path, out_db_path))
if update_config:
print("updating config.yaml")
config = load_config(root_path, "config.yaml")
new_db_path = db_pattern.replace("_v1_", "_v2_")
config["full_node"]["database_path"] = new_db_path
print(f"database_path: {new_db_path}")
save_config(root_path, "config.yaml", config)
print(f"\n\nLEAVING PREVIOUS DB FILE UNTOUCHED {in_db_path}\n")
BLOCK_COMMIT_RATE = 5000
SES_COMMIT_RATE = 1000
HINT_COMMIT_RATE = 1000
COIN_COMMIT_RATE = 15000
async def convert_v1_to_v2(in_path: Path, out_path: Path) -> None:
import aiosqlite
from chia.util.db_wrapper import DBWrapper
if out_path.exists():
print(f"output file already exists. {out_path}")
raise RuntimeError("already exists")
print(f"opening file for reading: {in_path}")
async with aiosqlite.connect(in_path) as in_db:
try:
async with in_db.execute("SELECT * from database_version") as cursor:
row = await cursor.fetchone()
if row is not None and row[0] != 1:
print(f"blockchain database already version {row[0]}\nDone")
raise RuntimeError("already v2")
except aiosqlite.OperationalError:
pass
store_v1 = await BlockStore.create(DBWrapper(in_db, db_version=1))
print(f"opening file for writing: {out_path}")
async with aiosqlite.connect(out_path) as out_db:
await out_db.execute("pragma journal_mode=OFF")
await out_db.execute("pragma synchronous=OFF")
await out_db.execute("pragma cache_size=1000000")
await out_db.execute("pragma locking_mode=exclusive")
await out_db.execute("pragma temp_store=memory")
print("initializing v2 version")
await out_db.execute("CREATE TABLE database_version(version int)")
await out_db.execute("INSERT INTO database_version VALUES(?)", (2,))
print("initializing v2 block store")
await BlockStore.create(DBWrapper(out_db, db_version=2))
peak_hash, peak_height = await store_v1.get_peak()
print(f"peak: {peak_hash.hex()} height: {peak_height}")
await out_db.execute("INSERT INTO current_peak VALUES(?, ?)", (0, peak_hash))
await out_db.commit()
print("[1/4] converting full_blocks")
height = peak_height + 1
hh = peak_hash
commit_in = BLOCK_COMMIT_RATE
rate = 1.0
start_time = time()
block_start_time = start_time
block_values = []
async with in_db.execute(
"SELECT header_hash, prev_hash, block, sub_epoch_summary FROM block_records ORDER BY height DESC"
) as cursor:
async with in_db.execute(
"SELECT header_hash, height, is_fully_compactified, block FROM full_blocks ORDER BY height DESC"
) as cursor_2:
await out_db.execute("begin transaction")
async for row in cursor:
header_hash = bytes.fromhex(row[0])
if header_hash != hh:
continue
# progress cursor_2 until we find the header hash
while True:
row_2 = await cursor_2.fetchone()
if row_2 is None:
print(f"ERROR: could not find block {hh.hex()}")
raise RuntimeError(f"block {hh.hex()} not found")
if bytes.fromhex(row_2[0]) == hh:
break
assert row_2[1] == height - 1
height = row_2[1]
is_fully_compactified = row_2[2]
block_bytes = row_2[3]
prev_hash = bytes.fromhex(row[1])
block_record = row[2]
ses = row[3]
block_values.append(
(
hh,
prev_hash,
height,
ses,
is_fully_compactified,
1, # in_main_chain
zstd.compress(block_bytes),
block_record,
)
)
hh = prev_hash
if (height % 1000) == 0:
print(
f"\r{height: 10d} {(peak_height-height)*100/peak_height:.2f}% "
f"{rate:0.1f} blocks/s ETA: {height//rate} s ",
end="",
)
sys.stdout.flush()
commit_in -= 1
if commit_in == 0:
commit_in = BLOCK_COMMIT_RATE
await out_db.executemany(
"INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?, ?, ?, ?)", block_values
)
await out_db.commit()
await out_db.execute("begin transaction")
block_values = []
end_time = time()
rate = BLOCK_COMMIT_RATE / (end_time - start_time)
start_time = end_time
await out_db.executemany("INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?, ?, ?, ?)", block_values)
await out_db.commit()
end_time = time()
print(f"\r {end_time - block_start_time:.2f} seconds ")
print("[2/4] converting sub_epoch_segments_v3")
commit_in = SES_COMMIT_RATE
ses_values = []
ses_start_time = time()
async with in_db.execute("SELECT ses_block_hash, challenge_segments FROM sub_epoch_segments_v3") as cursor:
count = 0
await out_db.execute("begin transaction")
async for row in cursor:
block_hash = bytes32.fromhex(row[0])
ses = row[1]
ses_values.append((block_hash, ses))
count += 1
if (count % 100) == 0:
print(f"\r{count:10d} ", end="")
sys.stdout.flush()
commit_in -= 1
if commit_in == 0:
commit_in = SES_COMMIT_RATE
await out_db.executemany("INSERT INTO sub_epoch_segments_v3 VALUES (?, ?)", ses_values)
await out_db.commit()
await out_db.execute("begin transaction")
ses_values = []
await out_db.executemany("INSERT INTO sub_epoch_segments_v3 VALUES (?, ?)", ses_values)
await out_db.commit()
end_time = time()
print(f"\r {end_time - ses_start_time:.2f} seconds ")
print("[3/4] converting hint_store")
commit_in = HINT_COMMIT_RATE
hint_start_time = time()
hint_values = []
await HintStore.create(DBWrapper(out_db, db_version=2))
await out_db.commit()
async with in_db.execute("SELECT coin_id, hint FROM hints") as cursor:
count = 0
await out_db.execute("begin transaction")
async for row in cursor:
hint_values.append((None, row[0], row[1]))
commit_in -= 1
if commit_in == 0:
commit_in = HINT_COMMIT_RATE
await out_db.executemany("INSERT INTO hints VALUES (?, ?, ?)", hint_values)
await out_db.commit()
await out_db.execute("begin transaction")
hint_values = []
await out_db.executemany("INSERT INTO hints VALUES (?, ?, ?)", hint_values)
await out_db.commit()
end_time = time()
print(f"\r {end_time - hint_start_time:.2f} seconds ")
print("[4/4] converting coin_store")
await CoinStore.create(DBWrapper(out_db, db_version=2))
await out_db.commit()
commit_in = COIN_COMMIT_RATE
rate = 1.0
start_time = time()
coin_values = []
coin_start_time = start_time
async with in_db.execute(
"SELECT coin_name, confirmed_index, spent_index, coinbase, puzzle_hash, coin_parent, amount, timestamp "
"FROM coin_record WHERE confirmed_index <= ?",
(peak_height,),
) as cursor:
count = 0
await out_db.execute("begin transaction")
async for row in cursor:
spent_index = row[2]
# in order to convert a consistent snapshot of the
# blockchain state, any coin that was spent *after* our
# cutoff must be converted into an unspent coin
if spent_index > peak_height:
spent_index = 0
coin_values.append(
(
bytes.fromhex(row[0]),
row[1],
spent_index,
row[3],
bytes.fromhex(row[4]),
bytes.fromhex(row[5]),
row[6],
row[7],
)
)
count += 1
if (count % 2000) == 0:
print(f"\r{count//1000:10d}k coins {rate:0.1f} coins/s ", end="")
sys.stdout.flush()
commit_in -= 1
if commit_in == 0:
commit_in = COIN_COMMIT_RATE
await out_db.executemany("INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)", coin_values)
await out_db.commit()
await out_db.execute("begin transaction")
coin_values = []
end_time = time()
rate = COIN_COMMIT_RATE / (end_time - start_time)
start_time = end_time
await out_db.executemany("INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)", coin_values)
await out_db.commit()
end_time = time()
print(f"\r {end_time - coin_start_time:.2f} seconds ")

View File

@ -0,0 +1,113 @@
import pytest
import aiosqlite
import tempfile
import random
from pathlib import Path
from typing import List, Tuple
from tests.setup_nodes import bt, test_constants
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.cmds.db_upgrade_func import convert_v1_to_v2
from chia.util.db_wrapper import DBWrapper
from chia.full_node.block_store import BlockStore
from chia.full_node.coin_store import CoinStore
from chia.full_node.hint_store import HintStore
from chia.consensus.blockchain import Blockchain
class TempFile:
def __init__(self):
self.path = Path(tempfile.NamedTemporaryFile().name)
def __enter__(self) -> DBWrapper:
if self.path.exists():
self.path.unlink()
return self.path
def __exit__(self, exc_t, exc_v, exc_tb):
self.path.unlink()
def rand_bytes(num) -> bytes:
ret = bytearray(num)
for i in range(num):
ret[i] = random.getrandbits(8)
return bytes(ret)
class TestDbUpgrade:
@pytest.mark.asyncio
async def test_blocks(self):
blocks = bt.get_consecutive_blocks(758)
hints: List[Tuple[bytes32, bytes]] = []
for i in range(351):
hints.append((bytes32(rand_bytes(32)), rand_bytes(20)))
with TempFile() as in_file, TempFile() as out_file:
async with aiosqlite.connect(in_file) as conn:
db_wrapper1 = DBWrapper(conn, 1)
block_store1 = await BlockStore.create(db_wrapper1)
coin_store1 = await CoinStore.create(db_wrapper1, 0)
hint_store1 = await HintStore.create(db_wrapper1)
for hint in hints:
await hint_store1.add_hints([(hint[0], hint[1])])
bc = await Blockchain.create(
coin_store1, block_store1, test_constants, hint_store1, Path("."), reserved_cores=0
)
await db_wrapper1.commit_transaction()
for block in blocks:
await bc.receive_block(block)
# now, convert v1 in_file to v2 out_file
await convert_v1_to_v2(in_file, out_file)
async with aiosqlite.connect(out_file) as conn2:
db_wrapper2 = DBWrapper(conn2, 2)
block_store2 = await BlockStore.create(db_wrapper2)
coin_store2 = await CoinStore.create(db_wrapper2, 0)
hint_store2 = await HintStore.create(db_wrapper2)
# check hints
for hint in hints:
assert hint[0] in await hint_store1.get_coin_ids(hint[1])
assert hint[0] in await hint_store2.get_coin_ids(hint[1])
# check peak
assert await block_store1.get_peak() == await block_store2.get_peak()
# check blocks
for block in blocks:
hh = block.header_hash
height = block.height
assert await block_store1.get_full_block(hh) == await block_store2.get_full_block(hh)
assert await block_store1.get_full_block_bytes(hh) == await block_store2.get_full_block_bytes(
hh
)
assert await block_store1.get_full_blocks_at([height]) == await block_store2.get_full_blocks_at(
[height]
)
assert await block_store1.get_block_records_by_hash(
[hh]
) == await block_store2.get_block_records_by_hash([hh])
assert await block_store1.get_block_record(hh) == await block_store2.get_block_record(hh)
assert await block_store1.is_fully_compactified(hh) == await block_store2.is_fully_compactified(
hh
)
# check coins
for block in blocks:
coins = await coin_store1.get_coins_added_at_height(block.height)
assert await coin_store2.get_coins_added_at_height(block.height) == coins
assert await coin_store1.get_coins_removed_at_height(
block.height
) == await coin_store2.get_coins_removed_at_height(block.height)
for c in coins:
n = c.coin.name()
assert await coin_store1.get_coin_record(n) == await coin_store2.get_coin_record(n)