chia-blockchain/chia/data_layer/data_layer.py

325 lines
15 KiB
Python
Raw Normal View History

import logging
2021-09-17 17:30:14 +03:00
from pathlib import Path
2022-01-20 19:11:17 +03:00
from typing import Any, Callable, Dict, List, Optional, Tuple, Awaitable
import aiosqlite
2022-02-02 23:06:37 +03:00
import traceback
import asyncio
2022-02-15 22:22:54 +03:00
from chia.data_layer.data_layer_types import InternalNode, TerminalNode, DownloadMode, Subscription, Root
from chia.data_layer.data_store import DataStore
2022-01-20 15:44:20 +03:00
from chia.rpc.wallet_rpc_client import WalletRpcClient
2021-09-29 02:37:50 +03:00
from chia.server.server import ChiaServer
2021-10-13 21:18:00 +03:00
from chia.types.blockchain_format.sized_bytes import bytes32
2021-11-10 17:12:11 +03:00
from chia.util.config import load_config
2021-09-17 17:30:14 +03:00
from chia.util.db_wrapper import DBWrapper
from chia.util.ints import uint32, uint64, uint16
from chia.util.path import mkdir, path_from_root
from chia.wallet.transaction_record import TransactionRecord
from chia.data_layer.data_layer_wallet import SingletonRecord
2022-02-02 23:06:37 +03:00
from chia.data_layer.download_data import download_data
2022-02-08 18:29:29 +03:00
from chia.data_layer.data_layer_server import DataLayerServer
2021-09-17 15:03:48 +03:00
class DataLayer:
data_store: DataStore
2022-02-08 18:29:29 +03:00
data_layer_server: DataLayerServer
2021-09-17 17:30:14 +03:00
db_wrapper: DBWrapper
db_path: Path
2021-12-09 06:25:30 +03:00
connection: Optional[aiosqlite.Connection]
2021-10-04 01:20:18 +03:00
config: Dict[str, Any]
log: logging.Logger
2022-01-20 19:11:17 +03:00
wallet_rpc_init: Awaitable[WalletRpcClient]
2021-10-04 01:20:18 +03:00
state_changed_callback: Optional[Callable[..., object]]
2022-01-20 16:07:50 +03:00
wallet_id: uint64
initialized: bool
2021-09-17 15:03:48 +03:00
def __init__(
self,
2021-09-17 17:30:14 +03:00
root_path: Path,
2022-01-20 19:11:17 +03:00
wallet_rpc_init: Awaitable[WalletRpcClient],
name: Optional[str] = None,
):
if name == "":
# TODO: If no code depends on "" counting as 'unspecified' then we do not
# need this.
name = None
2021-11-10 17:12:11 +03:00
config = load_config(root_path, "config.yaml", "data_layer")
self.initialized = False
2021-10-04 00:35:44 +03:00
self.config = config
2021-12-09 06:25:30 +03:00
self.connection = None
2022-01-20 19:11:17 +03:00
self.wallet_rpc_init = wallet_rpc_init
self.log = logging.getLogger(name if name is None else __name__)
2022-01-27 21:11:30 +03:00
self._shut_down: bool = False
db_path_replaced: str = config["database_path"].replace("CHALLENGE", config["selected_network"])
self.db_path = path_from_root(root_path, db_path_replaced)
mkdir(self.db_path.parent)
2022-02-08 18:29:29 +03:00
self.data_layer_server = DataLayerServer(self.config, self.db_path, self.log)
2021-09-17 15:03:48 +03:00
2021-09-29 02:37:50 +03:00
def _set_state_changed_callback(self, callback: Callable[..., object]) -> None:
self.state_changed_callback = callback
def set_server(self, server: ChiaServer) -> None:
self.server = server
2021-09-17 15:03:48 +03:00
async def _start(self) -> bool:
self.connection = await aiosqlite.connect(self.db_path)
2021-09-17 17:30:14 +03:00
self.db_wrapper = DBWrapper(self.connection)
self.data_store = await DataStore.create(self.db_wrapper)
2022-01-20 19:11:17 +03:00
self.wallet_rpc = await self.wallet_rpc_init
self.periodically_fetch_data_task: asyncio.Task[Any] = asyncio.create_task(self.periodically_fetch_data())
self.subscription_lock: asyncio.Lock = asyncio.Lock()
2022-02-08 18:29:29 +03:00
if self.config.get("run_server", False):
await self.data_layer_server.start()
return True
2021-09-29 02:37:50 +03:00
def _close(self) -> None:
# TODO: review for anything else we need to do here
self._shut_down = True
2021-09-29 02:37:50 +03:00
async def _await_closed(self) -> None:
2021-12-09 06:25:30 +03:00
if self.connection is not None:
await self.connection.close()
2022-02-08 18:29:29 +03:00
if self.config.get("run_server", False):
await self.data_layer_server.stop()
self.periodically_fetch_data_task.cancel()
2021-09-29 02:37:50 +03:00
async def create_store(
self, fee: uint64, root: bytes32 = bytes32([0] * 32)
) -> Tuple[List[TransactionRecord], bytes32]:
2022-01-20 16:07:50 +03:00
txs, tree_id = await self.wallet_rpc.create_new_dl(root, fee)
2022-01-20 22:30:27 +03:00
res = await self.data_store.create_tree(tree_id=tree_id)
if res is None:
self.log.fatal("failed creating store")
2022-01-20 15:44:20 +03:00
self.initialized = True
2022-01-20 16:07:50 +03:00
return txs, tree_id
2021-10-13 21:18:00 +03:00
async def batch_update(
2021-10-13 21:18:00 +03:00
self,
tree_id: bytes32,
changelist: List[Dict[str, Any]],
fee: uint64,
2022-01-22 05:17:15 +03:00
) -> TransactionRecord:
2021-10-13 21:18:00 +03:00
for change in changelist:
if change["action"] == "insert":
key = change["key"]
value = change["value"]
reference_node_hash = change.get("reference_node_hash")
side = change.get("side")
if reference_node_hash or side:
await self.data_store.insert(key, value, tree_id, reference_node_hash, side)
await self.data_store.autoinsert(key, value, tree_id)
2021-10-13 21:18:00 +03:00
else:
assert change["action"] == "delete"
key = change["key"]
2021-10-13 21:18:00 +03:00
await self.data_store.delete(key, tree_id)
await self.data_store.get_tree_root(tree_id)
2021-10-27 14:58:36 +03:00
root = await self.data_store.get_tree_root(tree_id)
# todo return empty node hash from get_tree_root
if root.node_hash is not None:
node_hash = root.node_hash
else:
node_hash = bytes32([0] * 32) # todo change
transaction_record = await self.wallet_rpc.dl_update_root(tree_id, node_hash, fee)
2022-01-22 05:17:15 +03:00
assert transaction_record
# todo register callback to change status in data store
# await self.data_store.change_root_status(root, Status.COMMITTED)
2022-01-22 05:17:15 +03:00
return transaction_record
2021-10-13 21:18:00 +03:00
async def get_value(self, store_id: bytes32, key: bytes) -> Optional[bytes]:
2021-10-18 12:42:55 +03:00
res = await self.data_store.get_node_by_key(tree_id=store_id, key=key)
if res is None:
self.log.error("Failed to fetch key")
return None
2021-10-18 12:42:55 +03:00
return res.value
2022-01-24 15:53:56 +03:00
async def get_keys_values(self, store_id: bytes32, root_hash: Optional[bytes32]) -> List[TerminalNode]:
res = await self.data_store.get_keys_values(store_id, root_hash)
2021-10-18 12:42:55 +03:00
if res is None:
self.log.error("Failed to fetch keys values")
2021-10-18 12:42:55 +03:00
return res
2021-10-13 21:18:00 +03:00
2021-12-09 06:25:30 +03:00
async def get_ancestors(self, node_hash: bytes32, store_id: bytes32) -> List[InternalNode]:
2022-01-25 20:41:43 +03:00
res = await self.data_store.get_ancestors(node_hash=node_hash, tree_id=store_id)
2021-10-27 14:58:36 +03:00
if res is None:
self.log.error("Failed to get ancestors")
2021-10-27 14:58:36 +03:00
return res
2022-02-15 22:22:54 +03:00
async def get_root(self, store_id: bytes32) -> Optional[SingletonRecord]:
latest = await self.wallet_rpc.dl_latest_singleton(store_id, True)
2022-02-08 14:11:12 +03:00
if latest is None:
self.log.error(f"Failed to get root for {store_id.hex()}")
2022-02-15 22:22:54 +03:00
return latest
2022-02-09 16:13:35 +03:00
async def get_root_history(self, store_id: bytes32) -> List[SingletonRecord]:
records = await self.wallet_rpc.dl_history(store_id)
if records is None:
self.log.error(f"Failed to get root history for {store_id.hex()}")
root_history = []
prev: Optional[SingletonRecord] = None
for record in records:
2022-02-10 17:18:38 +03:00
if prev is None or record.root != prev.root:
2022-02-09 16:13:35 +03:00
root_history.append(record)
return root_history
2022-01-31 16:58:27 +03:00
async def _validate_batch(
self,
tree_id: bytes32,
to_check: List[SingletonRecord],
min_generation: int,
max_generation: int,
) -> bool:
last_checked_hash: Optional[bytes32] = None
for record in to_check:
# Ignore two consecutive identical root hashes, as we've already validated it.
if last_checked_hash is not None and record.root == last_checked_hash:
continue
# Pick the latest root in our data store with the desired hash, before our already validated data.
root: Optional[Root] = await self.data_store.get_last_tree_root_by_hash(
tree_id, record.root, max_generation
)
if root is None or root.generation < min_generation:
return False
2022-02-04 19:47:44 +03:00
self.log.info(
f"Validated chain hash {record.root} in downloaded datastore. "
f"Wallet generation: {record.generation}"
)
2022-01-31 16:58:27 +03:00
max_generation = root.generation
last_checked_hash = record.root
return True
async def fetch_and_validate(self, subscription: Subscription) -> None:
tree_id = subscription.tree_id
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None:
2022-02-02 23:06:37 +03:00
self.log.info(f"Fetch data: No singleton record for {tree_id}.")
return
2022-01-31 16:58:27 +03:00
if singleton_record.generation == uint32(0):
2022-02-02 23:06:37 +03:00
self.log.info(f"Fetch data: No data on chain for {tree_id}.")
return
2022-01-27 21:11:30 +03:00
old_root: Optional[Root] = None
2022-02-02 23:06:37 +03:00
try:
2022-01-27 21:11:30 +03:00
old_root = await self.data_store.get_tree_root(tree_id=tree_id)
2022-02-02 23:06:37 +03:00
except Exception:
pass
wallet_current_generation = await self.data_store.get_validated_wallet_generation(tree_id)
2022-01-31 16:58:27 +03:00
assert int(wallet_current_generation) <= singleton_record.generation
# Wallet generation didn't change, so no new data committed on chain.
if wallet_current_generation is not None and uint32(wallet_current_generation) == singleton_record.generation:
2022-02-02 23:06:37 +03:00
self.log.info(f"Fetch data: wallet generation matching on-chain generation: {tree_id}.")
2022-01-31 16:58:27 +03:00
return
to_check: List[SingletonRecord] = []
2022-01-27 21:11:30 +03:00
if subscription.mode is DownloadMode.LATEST:
to_check = [singleton_record]
2022-01-27 21:11:30 +03:00
if subscription.mode is DownloadMode.HISTORY:
2022-01-30 18:28:47 +03:00
to_check = await self.wallet_rpc.dl_history(
2022-01-31 16:58:27 +03:00
launcher_id=tree_id, min_generation=uint32(wallet_current_generation + 1)
2022-01-30 18:28:47 +03:00
)
2022-01-31 16:58:27 +03:00
# No root hash changes in the new wallet records, so ignore.
2022-02-02 23:06:37 +03:00
# TODO: wallet should handle identical hashes part?
2022-01-31 16:58:27 +03:00
if (
old_root is not None
and old_root.node_hash is not None
and to_check[0].root == old_root.node_hash
and len(set(record.root for record in to_check)) == 1
):
2022-02-02 23:06:37 +03:00
await self.data_store.set_validated_wallet_generation(tree_id, int(singleton_record.generation))
self.log.info(
f"Fetch data: fast-forwarded for {tree_id} as all on-chain hashes are identical to our root hash. "
f"Current wallet generation saved: {int(singleton_record.generation)}"
)
2022-01-31 16:58:27 +03:00
return
# Delete all identical root hashes to our old root hash, until we detect a change.
if old_root is not None and old_root.node_hash is not None:
while to_check[-1].root == old_root.node_hash:
to_check.pop()
self.log.info(
f"Downloading and validating {subscription.tree_id}. "
f"Current wallet generation: {int(wallet_current_generation)}. "
f"Target wallet generation: {singleton_record.generation}."
)
2022-02-02 23:06:37 +03:00
downloaded = await download_data(self.data_store, subscription, singleton_record.root)
if not downloaded:
raise RuntimeError("Could not download the data.")
2022-02-02 23:06:37 +03:00
self.log.info(f"Successfully downloaded data for {tree_id}.")
root = await self.data_store.get_tree_root(tree_id=tree_id)
2022-01-31 16:58:27 +03:00
# Wallet root hash must match to our data store root hash.
2022-02-04 19:47:44 +03:00
if root.node_hash is not None and root.node_hash == to_check[0].root:
self.log.info(
f"Validated chain hash {root.node_hash} in downloaded datastore. "
f"Wallet generation: {to_check[0].generation}"
)
else:
raise RuntimeError("Can't find data on chain in our datastore.")
to_check.pop(0)
2022-01-27 21:11:30 +03:00
min_generation = (0 if old_root is None else old_root.generation) + 1
max_generation = root.generation
2022-01-31 16:58:27 +03:00
# Light validation: check the new set of operations against the new set of wallet records.
# If this matches, we know all data will match, as we've previously checked that data matches
# for `min_generation` data store root and `wallet_current_generation` wallet record.
is_valid: bool = await self._validate_batch(tree_id, to_check, min_generation, max_generation)
# If for some reason we have mismatched data using the light checks, recheck all history as a fallback.
if not is_valid:
self.log.warning(f"Light validation failed for {tree_id}. Validating all history.")
to_check = await self.wallet_rpc.dl_history(launcher_id=tree_id, min_generation=uint32(1))
2022-02-04 19:47:44 +03:00
# Already checked above.
self.log.info(
f"Validated chain hash {root.node_hash} in downloaded datastore. "
f"Wallet generation: {to_check[0].generation}"
)
to_check.pop(0)
2022-01-31 16:58:27 +03:00
is_valid = await self._validate_batch(tree_id, to_check, 0, max_generation)
if not is_valid:
raise RuntimeError("Could not validate on-chain data.")
2022-01-28 18:57:25 +03:00
self.log.info(
f"Finished downloading and validating {subscription.tree_id}. "
2022-02-02 23:06:37 +03:00
f"Wallet generation saved: {singleton_record.generation}. "
2022-01-28 18:57:25 +03:00
f"Root hash saved: {singleton_record.root}."
)
2022-02-02 23:06:37 +03:00
await self.data_store.set_validated_wallet_generation(tree_id, int(singleton_record.generation))
async def subscribe(self, store_id: bytes32, mode: DownloadMode, ip: str, port: uint16) -> None:
subscription = Subscription(store_id, mode, ip, port)
subscriptions = await self.get_subscriptions()
if subscription.tree_id in [subscription.tree_id for subscription in subscriptions]:
return
await self.wallet_rpc.dl_track_new(subscription.tree_id)
async with self.subscription_lock:
await self.data_store.subscribe(subscription)
2022-01-28 18:57:25 +03:00
self.log.info(f"Subscribed to {subscription.tree_id}")
async def unsubscribe(self, tree_id: bytes32) -> None:
subscriptions = await self.get_subscriptions()
if tree_id not in [subscription.tree_id for subscription in subscriptions]:
return
async with self.subscription_lock:
await self.data_store.unsubscribe(tree_id)
2022-01-28 15:36:45 +03:00
await self.wallet_rpc.dl_stop_tracking(tree_id)
2022-01-28 18:57:25 +03:00
self.log.info(f"Unsubscribed to {tree_id}")
async def get_subscriptions(self) -> List[Subscription]:
async with self.subscription_lock:
return await self.data_store.get_subscriptions()
async def periodically_fetch_data(self) -> None:
fetch_data_interval = self.config.get("fetch_data_interval", 60)
while not self._shut_down:
async with self.subscription_lock:
2022-01-27 21:11:30 +03:00
subscriptions = await self.data_store.get_subscriptions()
for subscription in subscriptions:
2022-02-02 23:06:37 +03:00
try:
await self.fetch_and_validate(subscription)
except Exception as e:
self.log.error(f"Exception while fetching data: {type(e)} {e} {traceback.format_exc()}.")
try:
await asyncio.sleep(fetch_data_interval)
except asyncio.CancelledError:
pass