chia-blockchain/chia/data_layer/data_layer.py

778 lines
34 KiB
Python
Raw Normal View History

2022-08-01 07:10:56 +03:00
import asyncio
import logging
2022-08-03 20:39:05 +03:00
import random
2022-05-02 12:52:03 +03:00
import time
2022-02-02 23:06:37 +03:00
import traceback
2022-08-01 07:10:56 +03:00
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple, Union
2022-08-01 07:10:56 +03:00
2022-02-17 17:39:44 +03:00
import aiohttp
2022-08-01 07:10:56 +03:00
import aiosqlite
2022-08-18 05:35:32 +03:00
from chia.data_layer.data_layer_errors import KeyNotFoundError
2022-08-01 07:10:56 +03:00
from chia.data_layer.data_layer_server import DataLayerServer
from chia.data_layer.data_layer_util import (
DiffData,
InternalNode,
KeyValue,
Layer,
Offer,
OfferStore,
Proof,
ProofOfInclusion,
ProofOfInclusionLayer,
Root,
ServerInfo,
Status,
StoreProofs,
Subscription,
TerminalNode,
leaf_hash,
)
from chia.data_layer.data_layer_wallet import DataLayerWallet, Mirror, SingletonRecord, verify_offer
from chia.data_layer.data_store import DataStore
2022-08-01 07:10:56 +03:00
from chia.data_layer.download_data import insert_from_delta_file, write_files_for_root
2022-01-20 15:44:20 +03:00
from chia.rpc.wallet_rpc_client import WalletRpcClient
2021-09-29 02:37:50 +03:00
from chia.server.server import ChiaServer
2021-10-13 21:18:00 +03:00
from chia.types.blockchain_format.sized_bytes import bytes32
2021-09-17 17:30:14 +03:00
from chia.util.db_wrapper import DBWrapper
2022-06-08 20:25:09 +03:00
from chia.util.ints import uint32, uint64
2022-07-16 20:03:18 +03:00
from chia.util.path import path_from_root
from chia.wallet.trade_record import TradeRecord
from chia.wallet.trading.offer import Offer as TradingOffer
from chia.wallet.transaction_record import TransactionRecord
2021-09-17 15:03:48 +03:00
class DataLayer:
data_store: DataStore
2022-02-08 18:29:29 +03:00
data_layer_server: DataLayerServer
2021-09-17 17:30:14 +03:00
db_wrapper: DBWrapper
2022-06-09 17:24:26 +03:00
batch_update_db_wrapper: DBWrapper
db_path: Path
2021-12-09 06:25:30 +03:00
connection: Optional[aiosqlite.Connection]
2021-10-04 01:20:18 +03:00
config: Dict[str, Any]
log: logging.Logger
2022-01-20 19:11:17 +03:00
wallet_rpc_init: Awaitable[WalletRpcClient]
2021-10-04 01:20:18 +03:00
state_changed_callback: Optional[Callable[..., object]]
2022-01-20 16:07:50 +03:00
wallet_id: uint64
initialized: bool
2022-02-22 19:11:44 +03:00
none_bytes: bytes32
2022-06-30 17:12:06 +03:00
lock: asyncio.Lock
2021-09-17 15:03:48 +03:00
def __init__(
self,
config: Dict[str, Any],
2021-09-17 17:30:14 +03:00
root_path: Path,
2022-01-20 19:11:17 +03:00
wallet_rpc_init: Awaitable[WalletRpcClient],
name: Optional[str] = None,
):
if name == "":
# TODO: If no code depends on "" counting as 'unspecified' then we do not
# need this.
name = None
self.initialized = False
2021-10-04 00:35:44 +03:00
self.config = config
2021-12-09 06:25:30 +03:00
self.connection = None
2022-01-20 19:11:17 +03:00
self.wallet_rpc_init = wallet_rpc_init
self.log = logging.getLogger(name if name is None else __name__)
2022-01-27 21:11:30 +03:00
self._shut_down: bool = False
db_path_replaced: str = config["database_path"].replace("CHALLENGE", config["selected_network"])
self.db_path = path_from_root(root_path, db_path_replaced)
2022-07-16 20:03:18 +03:00
self.db_path.parent.mkdir(parents=True, exist_ok=True)
2022-04-07 17:55:02 +03:00
server_files_replaced: str = config.get(
"server_files_location", "data_layer/db/server_files_location_CHALLENGE"
).replace("CHALLENGE", config["selected_network"])
2022-04-06 20:44:51 +03:00
self.server_files_location = path_from_root(root_path, server_files_replaced)
2022-07-16 20:03:18 +03:00
self.server_files_location.mkdir(parents=True, exist_ok=True)
2022-04-06 20:44:51 +03:00
self.data_layer_server = DataLayerServer(root_path, self.config, self.log)
2022-02-22 19:11:44 +03:00
self.none_bytes = bytes32([0] * 32)
2022-06-30 17:12:06 +03:00
self.lock = asyncio.Lock()
2021-09-17 15:03:48 +03:00
2021-09-29 02:37:50 +03:00
def _set_state_changed_callback(self, callback: Callable[..., object]) -> None:
self.state_changed_callback = callback
def set_server(self, server: ChiaServer) -> None:
self.server = server
2021-09-17 15:03:48 +03:00
async def _start(self) -> bool:
self.connection = await aiosqlite.connect(self.db_path)
2021-09-17 17:30:14 +03:00
self.db_wrapper = DBWrapper(self.connection)
2022-08-03 20:39:05 +03:00
self.data_store = await DataStore.create(self.db_wrapper)
2022-01-20 19:11:17 +03:00
self.wallet_rpc = await self.wallet_rpc_init
self.subscription_lock: asyncio.Lock = asyncio.Lock()
2022-02-08 18:29:29 +03:00
if self.config.get("run_server", False):
await self.data_layer_server.start()
2022-05-05 19:39:18 +03:00
self.periodically_manage_data_task: asyncio.Task[Any] = asyncio.create_task(self.periodically_manage_data())
return True
2021-09-29 02:37:50 +03:00
def _close(self) -> None:
# TODO: review for anything else we need to do here
self._shut_down = True
2021-09-29 02:37:50 +03:00
async def _await_closed(self) -> None:
2021-12-09 06:25:30 +03:00
if self.connection is not None:
await self.connection.close()
2022-02-08 18:29:29 +03:00
if self.config.get("run_server", False):
await self.data_layer_server.stop()
2022-06-09 16:35:02 +03:00
try:
self.periodically_manage_data_task.cancel()
except asyncio.CancelledError:
pass
2021-09-29 02:37:50 +03:00
async def create_store(
self, fee: uint64, root: bytes32 = bytes32([0] * 32)
) -> Tuple[List[TransactionRecord], bytes32]:
2022-01-20 16:07:50 +03:00
txs, tree_id = await self.wallet_rpc.create_new_dl(root, fee)
2022-01-20 22:30:27 +03:00
res = await self.data_store.create_tree(tree_id=tree_id)
if res is None:
self.log.fatal("failed creating store")
2022-01-20 15:44:20 +03:00
self.initialized = True
2022-01-20 16:07:50 +03:00
return txs, tree_id
2021-10-13 21:18:00 +03:00
async def batch_update(
2021-10-13 21:18:00 +03:00
self,
tree_id: bytes32,
changelist: List[Dict[str, Any]],
fee: uint64,
2022-01-22 05:17:15 +03:00
) -> TransactionRecord:
await self.batch_insert(tree_id=tree_id, changelist=changelist)
return await self.publish_update(tree_id=tree_id, fee=fee)
async def batch_insert(
self,
tree_id: bytes32,
changelist: List[Dict[str, Any]],
2022-08-20 01:36:22 +03:00
lock: bool = True,
) -> bytes32:
2022-08-20 01:36:22 +03:00
async with self.data_store.transaction(lock=lock):
# Make sure we update based on the latest confirmed root.
async with self.lock:
await self._update_confirmation_status(tree_id=tree_id, lock=False)
pending_root: Optional[Root] = await self.data_store.get_pending_root(tree_id=tree_id, lock=False)
if pending_root is not None:
raise Exception("Already have a pending root waiting for confirmation.")
# check before any DL changes that this singleton is currently owned by this wallet
singleton_records: List[SingletonRecord] = await self.get_owned_stores()
if not any(tree_id == singleton.launcher_id for singleton in singleton_records):
raise ValueError(f"Singleton with launcher ID {tree_id} is not owned by DL Wallet")
t1 = time.monotonic()
batch_hash = await self.data_store.insert_batch(tree_id, changelist, lock=False)
t2 = time.monotonic()
self.log.info(f"Data store batch update process time: {t2 - t1}.")
# todo return empty node hash from get_tree_root
if batch_hash is not None:
node_hash = batch_hash
else:
node_hash = self.none_bytes # todo change
return node_hash
async def publish_update(
self,
tree_id: bytes32,
fee: uint64,
) -> TransactionRecord:
# Make sure we update based on the latest confirmed root.
async with self.lock:
await self._update_confirmation_status(tree_id=tree_id)
pending_root: Optional[Root] = await self.data_store.get_pending_root(tree_id=tree_id)
if pending_root is None:
raise Exception("Latest root is already confirmed.")
root_hash = self.none_bytes if pending_root.node_hash is None else pending_root.node_hash
transaction_record = await self.wallet_rpc.dl_update_root(
launcher_id=tree_id,
new_root=root_hash,
fee=fee,
)
2022-01-22 05:17:15 +03:00
return transaction_record
2021-10-13 21:18:00 +03:00
2022-08-15 00:49:19 +03:00
async def get_key_value_hash(
self,
store_id: bytes32,
key: bytes,
2022-08-15 18:09:04 +03:00
root_hash: Optional[bytes32] = None,
2022-08-20 01:36:22 +03:00
lock: bool = True,
2022-08-15 00:49:19 +03:00
) -> bytes32:
2022-08-20 01:36:22 +03:00
async with self.data_store.transaction(lock=lock):
async with self.lock:
await self._update_confirmation_status(tree_id=store_id, lock=False)
node = await self.data_store.get_node_by_key(tree_id=store_id, key=key, root_hash=root_hash, lock=False)
return node.hash
async def get_value(self, store_id: bytes32, key: bytes, lock: bool = True) -> Optional[bytes]:
async with self.data_store.transaction(lock=lock):
async with self.lock:
await self._update_confirmation_status(tree_id=store_id, lock=False)
res = await self.data_store.get_node_by_key(tree_id=store_id, key=key, lock=False)
if res is None:
self.log.error("Failed to fetch key")
return None
return res.value
2021-10-18 12:42:55 +03:00
2022-01-24 15:53:56 +03:00
async def get_keys_values(self, store_id: bytes32, root_hash: Optional[bytes32]) -> List[TerminalNode]:
2022-06-30 17:12:06 +03:00
async with self.lock:
await self._update_confirmation_status(tree_id=store_id)
2022-01-24 15:53:56 +03:00
res = await self.data_store.get_keys_values(store_id, root_hash)
2021-10-18 12:42:55 +03:00
if res is None:
self.log.error("Failed to fetch keys values")
2021-10-18 12:42:55 +03:00
return res
2021-10-13 21:18:00 +03:00
2022-08-01 16:55:34 +03:00
async def get_keys(self, store_id: bytes32, root_hash: Optional[bytes32]) -> List[bytes]:
2022-07-26 16:14:45 +03:00
async with self.lock:
await self._update_confirmation_status(tree_id=store_id)
2022-08-01 16:55:34 +03:00
res = await self.data_store.get_keys(store_id, root_hash)
2022-07-26 16:14:45 +03:00
return res
2021-12-09 06:25:30 +03:00
async def get_ancestors(self, node_hash: bytes32, store_id: bytes32) -> List[InternalNode]:
async with self.lock:
await self._update_confirmation_status(tree_id=store_id)
2022-01-25 20:41:43 +03:00
res = await self.data_store.get_ancestors(node_hash=node_hash, tree_id=store_id)
2021-10-27 14:58:36 +03:00
if res is None:
self.log.error("Failed to get ancestors")
2021-10-27 14:58:36 +03:00
return res
2022-02-15 22:22:54 +03:00
async def get_root(self, store_id: bytes32) -> Optional[SingletonRecord]:
latest = await self.wallet_rpc.dl_latest_singleton(store_id, True)
2022-02-08 14:11:12 +03:00
if latest is None:
self.log.error(f"Failed to get root for {store_id.hex()}")
2022-02-15 22:22:54 +03:00
return latest
2022-02-18 16:43:57 +03:00
async def get_local_root(self, store_id: bytes32) -> Optional[bytes32]:
2022-08-20 01:36:22 +03:00
async with self.lock:
await self._update_confirmation_status(tree_id=store_id)
2022-02-18 16:43:57 +03:00
res = await self.data_store.get_tree_root(tree_id=store_id)
if res is None:
self.log.error(f"Failed to get root for {store_id.hex()}")
return None
return res.node_hash
2022-02-09 16:13:35 +03:00
async def get_root_history(self, store_id: bytes32) -> List[SingletonRecord]:
records = await self.wallet_rpc.dl_history(store_id)
if records is None:
self.log.error(f"Failed to get root history for {store_id.hex()}")
root_history = []
prev: Optional[SingletonRecord] = None
for record in records:
2022-02-10 17:18:38 +03:00
if prev is None or record.root != prev.root:
2022-02-09 16:13:35 +03:00
root_history.append(record)
2022-02-21 23:16:17 +03:00
prev = record
2022-02-09 16:13:35 +03:00
return root_history
2022-08-20 01:36:22 +03:00
async def _update_confirmation_status(self, tree_id: bytes32, lock: bool = True) -> None:
async with self.data_store.transaction(lock=lock):
try:
root = await self.data_store.get_tree_root(tree_id=tree_id, lock=False)
except asyncio.CancelledError:
raise
except Exception:
root = None
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None:
return
if root is None:
pending_root = await self.data_store.get_pending_root(tree_id=tree_id, lock=False)
if pending_root is not None:
if pending_root.generation == 0 and pending_root.node_hash is None:
await self.data_store.change_root_status(pending_root, Status.COMMITTED, lock=False)
await self.data_store.clear_pending_roots(tree_id=tree_id, lock=False)
return
else:
root = None
if root is None:
self.log.info(f"Don't have pending root for {tree_id}.")
return
if root.generation == singleton_record.generation:
return
if root.generation > singleton_record.generation:
self.log.info(
f"Local root ahead of chain root: {root.generation} {singleton_record.generation}. "
"Maybe we're doing a batch update."
)
return
wallet_history = await self.wallet_rpc.dl_history(
launcher_id=tree_id,
min_generation=uint32(root.generation + 1),
max_generation=singleton_record.generation,
2022-07-07 15:20:18 +03:00
)
2022-08-20 01:36:22 +03:00
new_hashes = [record.root for record in reversed(wallet_history)]
root_hash = self.none_bytes if root.node_hash is None else root.node_hash
generation_shift = 0
while len(new_hashes) > 0 and new_hashes[0] == root_hash:
generation_shift += 1
new_hashes.pop(0)
if generation_shift > 0:
await self.data_store.shift_root_generations(tree_id=tree_id, shift_size=generation_shift, lock=False)
else:
expected_root_hash = None if new_hashes[0] == self.none_bytes else new_hashes[0]
pending_root = await self.data_store.get_pending_root(tree_id=tree_id, lock=False)
if (
pending_root is not None
and pending_root.generation == root.generation + 1
and pending_root.node_hash == expected_root_hash
):
await self.data_store.change_root_status(pending_root, Status.COMMITTED, lock=False)
await self.data_store.build_ancestor_table_for_latest_root(tree_id=tree_id, lock=False)
await self.data_store.clear_pending_roots(tree_id=tree_id, lock=False)
2022-06-23 19:44:45 +03:00
2022-08-11 16:45:08 +03:00
async def fetch_and_validate(self, tree_id: bytes32) -> None:
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None:
2022-02-02 23:06:37 +03:00
self.log.info(f"Fetch data: No singleton record for {tree_id}.")
return
2022-01-31 16:58:27 +03:00
if singleton_record.generation == uint32(0):
2022-02-02 23:06:37 +03:00
self.log.info(f"Fetch data: No data on chain for {tree_id}.")
return
2022-06-30 18:08:19 +03:00
async with self.lock:
await self._update_confirmation_status(tree_id=tree_id)
2022-06-23 19:44:45 +03:00
2022-07-08 16:34:57 +03:00
if not await self.data_store.tree_id_exists(tree_id=tree_id):
2022-08-12 16:34:08 +03:00
await self.data_store.create_tree(tree_id=tree_id, status=Status.COMMITTED)
2022-07-08 15:33:50 +03:00
2022-08-03 20:39:05 +03:00
timestamp = int(time.time())
servers_info = await self.data_store.get_available_servers_for_store(tree_id, timestamp)
# TODO: maybe append a random object to the whole DataLayer class?
random.shuffle(servers_info)
for server_info in servers_info:
2022-07-22 18:24:53 +03:00
url = server_info.url
root = await self.data_store.get_tree_root(tree_id=tree_id)
2022-06-06 20:32:12 +03:00
if root.generation > singleton_record.generation:
2022-07-13 22:59:37 +03:00
self.log.info(
2022-06-06 20:32:12 +03:00
"Fetch data: local DL store is ahead of chain generation. "
2022-06-23 19:44:45 +03:00
f"Local root: {root}. Singleton: {singleton_record}"
)
break
2022-06-06 20:32:12 +03:00
if root.generation == singleton_record.generation:
self.log.info(f"Fetch data: wallet generation matching on-chain generation: {tree_id}.")
break
2022-02-02 23:06:37 +03:00
self.log.info(
2022-08-11 16:45:08 +03:00
f"Downloading files {tree_id}. "
2022-05-18 19:38:34 +03:00
f"Current wallet generation: {root.generation}. "
f"Target wallet generation: {singleton_record.generation}. "
2022-06-08 20:25:09 +03:00
f"Server used: {url}."
2022-02-02 23:06:37 +03:00
)
2022-01-31 16:58:27 +03:00
to_download = await self.wallet_rpc.dl_history(
2022-05-04 20:23:29 +03:00
launcher_id=tree_id,
2022-05-18 19:38:34 +03:00
min_generation=uint32(root.generation + 1),
2022-05-04 20:23:29 +03:00
max_generation=singleton_record.generation,
)
2022-02-22 17:22:38 +03:00
try:
2022-05-03 21:00:19 +03:00
success = await insert_from_delta_file(
self.data_store,
2022-08-11 16:45:08 +03:00
tree_id,
2022-05-18 19:38:34 +03:00
root.generation,
[record.root for record in reversed(to_download)],
2022-07-24 23:55:24 +03:00
server_info,
2022-05-04 20:23:29 +03:00
self.server_files_location,
self.log,
2022-03-29 17:49:02 +03:00
)
2022-05-03 21:00:19 +03:00
if success:
self.log.info(
2022-08-11 16:45:08 +03:00
f"Finished downloading and validating {tree_id}. "
2022-05-03 21:00:19 +03:00
f"Wallet generation saved: {singleton_record.generation}. "
f"Root hash saved: {singleton_record.root}."
)
break
2022-02-23 00:25:55 +03:00
except asyncio.CancelledError:
raise
2022-03-29 17:49:02 +03:00
except aiohttp.client_exceptions.ClientConnectorError:
2022-06-08 20:25:09 +03:00
self.log.warning(f"Server {url} unavailable for {tree_id}.")
2022-02-22 17:22:38 +03:00
except Exception as e:
2022-05-04 20:23:29 +03:00
self.log.warning(f"Exception while downloading files for {tree_id}: {e} {traceback.format_exc()}.")
2022-05-05 19:39:18 +03:00
async def upload_files(self, tree_id: bytes32) -> None:
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None:
2022-06-07 17:54:15 +03:00
self.log.info(f"Upload files: no on-chain record for {tree_id}.")
2022-05-05 19:39:18 +03:00
return
2022-06-30 17:12:06 +03:00
async with self.lock:
await self._update_confirmation_status(tree_id=tree_id)
2022-05-05 19:39:18 +03:00
root = await self.data_store.get_tree_root(tree_id=tree_id)
2022-06-06 20:32:12 +03:00
publish_generation = min(singleton_record.generation, 0 if root is None else root.generation)
2022-05-05 19:39:18 +03:00
# If we make some batch updates, which get confirmed to the chain, we need to create the files.
# We iterate back and write the missing files, until we find the files already written.
2022-05-19 17:24:06 +03:00
root = await self.data_store.get_tree_root(tree_id=tree_id, generation=publish_generation)
2022-05-12 16:58:21 +03:00
while publish_generation > 0 and await write_files_for_root(
2022-05-05 19:39:18 +03:00
self.data_store,
tree_id,
root,
self.server_files_location,
):
publish_generation -= 1
root = await self.data_store.get_tree_root(tree_id=tree_id, generation=publish_generation)
2022-05-24 15:24:56 +03:00
async def add_missing_files(self, store_id: bytes32, override: bool, foldername: Optional[Path]) -> None:
2022-05-11 21:59:11 +03:00
root = await self.data_store.get_tree_root(tree_id=store_id)
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(store_id, True)
if singleton_record is None:
self.log.error(f"No singleton record found for: {store_id}")
2022-05-11 21:59:11 +03:00
return
2022-06-06 20:32:12 +03:00
max_generation = min(singleton_record.generation, 0 if root is None else root.generation)
2022-05-24 15:24:56 +03:00
server_files_location = foldername if foldername is not None else self.server_files_location
2022-05-11 21:59:11 +03:00
for generation in range(1, max_generation + 1):
root = await self.data_store.get_tree_root(tree_id=store_id, generation=generation)
2022-05-24 15:24:56 +03:00
await write_files_for_root(self.data_store, store_id, root, server_files_location, override)
2022-05-11 21:59:11 +03:00
2022-06-08 20:25:09 +03:00
async def subscribe(self, store_id: bytes32, urls: List[str]) -> None:
parsed_urls = [url.rstrip("/") for url in urls]
2022-07-22 18:24:53 +03:00
subscription = Subscription(store_id, [ServerInfo(url, 0, 0) for url in parsed_urls])
await self.wallet_rpc.dl_track_new(subscription.tree_id)
async with self.subscription_lock:
await self.data_store.subscribe(subscription)
2022-07-25 17:28:07 +03:00
self.log.info(f"Done adding subscription: {subscription.tree_id}")
2022-07-28 18:47:15 +03:00
async def remove_subscriptions(self, store_id: bytes32, urls: List[str]) -> None:
parsed_urls = [url.rstrip("/") for url in urls]
async with self.subscription_lock:
await self.data_store.remove_subscriptions(store_id, parsed_urls)
async def unsubscribe(self, tree_id: bytes32) -> None:
subscriptions = await self.get_subscriptions()
2022-02-17 17:39:44 +03:00
if tree_id not in (subscription.tree_id for subscription in subscriptions):
2022-02-16 23:46:01 +03:00
raise RuntimeError("No subscription found for the given tree_id.")
async with self.subscription_lock:
await self.data_store.unsubscribe(tree_id)
2022-01-28 15:36:45 +03:00
await self.wallet_rpc.dl_stop_tracking(tree_id)
2022-01-28 18:57:25 +03:00
self.log.info(f"Unsubscribed to {tree_id}")
async def get_subscriptions(self) -> List[Subscription]:
async with self.subscription_lock:
return await self.data_store.get_subscriptions()
2022-08-09 22:24:54 +03:00
async def add_mirror(self, store_id: bytes32, urls: List[str], amount: uint64, fee: uint64) -> None:
2022-08-10 02:30:46 +03:00
bytes_urls = [bytes(url, "utf8") for url in urls]
await self.wallet_rpc.dl_new_mirror(store_id, amount, bytes_urls, fee)
2022-08-09 22:24:54 +03:00
2022-08-09 22:34:53 +03:00
async def delete_mirror(self, coin_id: bytes32, fee: uint64) -> None:
await self.wallet_rpc.dl_delete_mirror(coin_id, fee)
2022-08-09 22:24:54 +03:00
2022-08-12 01:17:45 +03:00
async def get_mirrors(self, tree_id: bytes32) -> List[Mirror]:
return await self.wallet_rpc.dl_get_mirrors(tree_id)
2022-08-10 20:01:25 +03:00
async def update_subscriptions_from_wallet(self, tree_id: bytes32) -> None:
2022-08-04 19:27:44 +03:00
mirrors: List[Mirror] = await self.wallet_rpc.dl_get_mirrors(tree_id)
urls: List[str] = []
for mirror in mirrors:
2022-08-10 16:39:07 +03:00
urls = urls + [url.decode("utf8") for url in mirror.urls]
2022-08-18 23:25:50 +03:00
urls = [url.rstrip("/") for url in urls]
2022-08-04 19:27:44 +03:00
await self.data_store.update_subscriptions_from_wallet(tree_id, urls)
2022-06-07 04:47:50 +03:00
async def get_owned_stores(self) -> List[SingletonRecord]:
return await self.wallet_rpc.dl_owned_singletons()
2022-02-21 23:16:17 +03:00
async def get_kv_diff(self, tree_id: bytes32, hash_1: bytes32, hash_2: bytes32) -> Set[DiffData]:
2022-02-17 19:31:01 +03:00
return await self.data_store.get_kv_diff(tree_id, hash_1, hash_2)
2022-05-05 19:39:18 +03:00
async def periodically_manage_data(self) -> None:
manage_data_interval = self.config.get("manage_data_interval", 60)
while not self._shut_down:
async with self.subscription_lock:
try:
subscriptions = await self.data_store.get_subscriptions()
for subscription in subscriptions:
await self.wallet_rpc.dl_track_new(subscription.tree_id)
break
2022-04-27 15:30:48 +03:00
except aiohttp.client_exceptions.ClientConnectorError:
pass
2022-05-06 15:36:42 +03:00
except asyncio.CancelledError:
2022-06-09 16:35:02 +03:00
raise
self.log.warning("Cannot connect to the wallet. Retrying in 3s.")
delay_until = time.monotonic() + 3
while time.monotonic() < delay_until:
if self._shut_down:
break
2022-05-06 15:36:42 +03:00
try:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
2022-06-09 16:35:02 +03:00
raise
while not self._shut_down:
async with self.subscription_lock:
2022-01-27 21:11:30 +03:00
subscriptions = await self.data_store.get_subscriptions()
# Subscribe to all local tree_ids that we can find on chain.
local_tree_ids = await self.data_store.get_tree_ids()
subscription_tree_ids = set(subscription.tree_id for subscription in subscriptions)
for local_id in local_tree_ids:
if local_id not in subscription_tree_ids:
try:
2022-06-08 20:25:09 +03:00
await self.subscribe(local_id, [])
except asyncio.CancelledError:
2022-06-09 16:35:02 +03:00
raise
except Exception as e:
self.log.info(
f"Can't subscribe to locally stored {local_id}: {type(e)} {e} {traceback.format_exc()}"
)
async with self.subscription_lock:
for subscription in subscriptions:
2022-02-02 23:06:37 +03:00
try:
2022-08-04 19:27:44 +03:00
await self.update_subscriptions_from_wallet(subscription.tree_id)
2022-08-11 16:45:08 +03:00
await self.fetch_and_validate(subscription.tree_id)
2022-05-05 19:39:18 +03:00
await self.upload_files(subscription.tree_id)
2022-05-06 15:38:57 +03:00
except asyncio.CancelledError:
2022-06-09 16:35:02 +03:00
raise
2022-02-02 23:06:37 +03:00
except Exception as e:
self.log.error(f"Exception while fetching data: {type(e)} {e} {traceback.format_exc()}.")
try:
2022-05-05 19:39:18 +03:00
await asyncio.sleep(manage_data_interval)
except asyncio.CancelledError:
2022-06-09 16:35:02 +03:00
raise
2022-08-20 01:36:22 +03:00
async def build_offer_changelist(
self,
store_id: bytes32,
inclusions: Tuple[KeyValue, ...],
lock: bool = True,
) -> List[Dict[str, Any]]:
async with self.data_store.transaction(lock=lock):
changelist: List[Dict[str, Any]] = []
for entry in inclusions:
try:
existing_value = await self.get_value(store_id=store_id, key=entry.key, lock=False)
except KeyNotFoundError:
existing_value = None
2022-08-20 01:36:22 +03:00
if existing_value == entry.value:
# already present, nothing needed
continue
if existing_value is not None:
# upsert, delete the existing key and value
changelist.append(
{
"action": "delete",
"key": entry.key,
}
)
changelist.append(
{
2022-08-20 01:36:22 +03:00
"action": "insert",
"key": entry.key,
2022-08-20 01:36:22 +03:00
"value": entry.value,
}
)
2022-08-20 01:36:22 +03:00
return changelist
async def process_offered_stores(
self, offer_stores: Tuple[OfferStore, ...], lock: bool = True
) -> Dict[bytes32, StoreProofs]:
async with self.data_store.transaction(lock=lock):
our_store_proofs: Dict[bytes32, StoreProofs] = {}
for offer_store in offer_stores:
async with self.lock:
2022-08-21 18:33:31 +03:00
await self._update_confirmation_status(tree_id=offer_store.store_id, lock=False)
2022-08-20 01:36:22 +03:00
changelist = await self.build_offer_changelist(
store_id=offer_store.store_id,
inclusions=offer_store.inclusions,
lock=False,
)
2022-08-20 01:36:22 +03:00
if len(changelist) > 0:
new_root_hash = await self.batch_insert(
tree_id=offer_store.store_id,
changelist=changelist,
lock=False,
)
else:
existing_root = await self.get_root(store_id=offer_store.store_id)
if existing_root is None:
raise Exception(f"store id not available: {offer_store.store_id.hex()}")
new_root_hash = existing_root.root
if new_root_hash is None:
raise Exception("only inserts are supported so a None root hash should not be possible")
proofs: List[Proof] = []
for entry in offer_store.inclusions:
node_hash = await self.get_key_value_hash(
store_id=offer_store.store_id,
key=entry.key,
root_hash=new_root_hash,
lock=False,
)
proof_of_inclusion = await self.data_store.get_proof_of_inclusion_by_hash(
node_hash=node_hash,
tree_id=offer_store.store_id,
root_hash=new_root_hash,
lock=False,
)
proof = Proof(
key=entry.key,
value=entry.value,
node_hash=proof_of_inclusion.node_hash,
layers=tuple(
Layer(
other_hash_side=layer.other_hash_side,
other_hash=layer.other_hash,
combined_hash=layer.combined_hash,
)
for layer in proof_of_inclusion.layers
),
)
proofs.append(proof)
store_proof = StoreProofs(store_id=offer_store.store_id, proofs=tuple(proofs))
our_store_proofs[offer_store.store_id] = store_proof
return our_store_proofs
async def make_offer(
self,
maker: Tuple[OfferStore, ...],
taker: Tuple[OfferStore, ...],
fee: uint64,
) -> Offer:
2022-08-20 01:36:22 +03:00
async with self.data_store.transaction():
our_store_proofs = await self.process_offered_stores(offer_stores=maker, lock=False)
offer_dict: Dict[Union[uint32, str], int] = {
**{offer_store.store_id.hex(): -1 for offer_store in maker},
**{offer_store.store_id.hex(): 1 for offer_store in taker},
}
2022-08-20 01:36:22 +03:00
solver: Dict[str, Any] = {
"0x"
+ our_offer_store.store_id.hex(): {
"new_root": "0x" + our_store_proofs[our_offer_store.store_id].proofs[0].root().hex(),
"dependencies": [
{
"launcher_id": "0x" + their_offer_store.store_id.hex(),
"values_to_prove": [
"0x" + leaf_hash(key=entry.key, value=entry.value).hex()
for entry in their_offer_store.inclusions
],
}
for their_offer_store in taker
],
}
for our_offer_store in maker
}
wallet_offer, trade_record = await self.wallet_rpc.create_offer_for_ids(
offer_dict=offer_dict,
solver=solver,
driver_dict={},
fee=fee,
validate_only=False,
)
if wallet_offer is None:
raise Exception("offer is None despite validate_only=False")
offer = Offer(
trade_id=trade_record.trade_id,
offer=bytes(wallet_offer),
taker=taker,
maker=tuple(our_store_proofs.values()),
)
# being extra careful and verifying the offer before returning it
trading_offer = TradingOffer.from_bytes(offer.offer)
summary = await DataLayerWallet.get_offer_summary(offer=trading_offer)
verify_offer(maker=offer.maker, taker=offer.taker, summary=summary)
2022-08-20 01:36:22 +03:00
return offer
async def take_offer(
self,
offer_bytes: bytes,
taker: Tuple[OfferStore, ...],
maker: Tuple[StoreProofs, ...],
fee: uint64,
) -> TradeRecord:
2022-08-20 01:36:22 +03:00
async with self.data_store.transaction():
our_store_proofs = await self.process_offered_stores(offer_stores=taker, lock=False)
offer = TradingOffer.from_bytes(offer_bytes)
summary = await DataLayerWallet.get_offer_summary(offer=offer)
verify_offer(maker=maker, taker=taker, summary=summary)
all_store_proofs: Dict[bytes32, StoreProofs] = {
store_proofs.proofs[0].root(): store_proofs for store_proofs in [*maker, *our_store_proofs.values()]
}
proofs_of_inclusion: List[Tuple[str, str, List[str]]] = []
for root, store_proofs in all_store_proofs.items():
for proof in store_proofs.proofs:
layers = [
ProofOfInclusionLayer(
combined_hash=layer.combined_hash,
other_hash_side=layer.other_hash_side,
other_hash=layer.other_hash,
)
for layer in proof.layers
]
proof_of_inclusion = ProofOfInclusion(node_hash=proof.node_hash, layers=layers)
sibling_sides_integer = proof_of_inclusion.sibling_sides_integer()
proofs_of_inclusion.append(
(
root.hex(),
str(sibling_sides_integer),
["0x" + sibling_hash.hex() for sibling_hash in proof_of_inclusion.sibling_hashes()],
)
)
2022-08-20 01:36:22 +03:00
solver: Dict[str, Any] = {
"proofs_of_inclusion": proofs_of_inclusion,
**{
"0x"
+ our_offer_store.store_id.hex(): {
"new_root": "0x" + root.hex(),
"dependencies": [
{
"launcher_id": "0x" + their_offer_store.store_id.hex(),
"values_to_prove": ["0x" + entry.node_hash.hex() for entry in their_offer_store.proofs],
}
for their_offer_store in maker
],
}
for our_offer_store in taker
},
}
2022-08-22 17:11:24 +03:00
# Excluding wallet from transaction since failures in the wallet may occur
# after the transaction is submitted to the chain. If we roll back data we
# may lose published data.
trade_record = await self.wallet_rpc.take_offer(
offer=offer,
solver=solver,
fee=fee,
)
2022-08-22 17:11:24 +03:00
return trade_record
async def cancel_offer(self, trade_id: bytes32, secure: bool, fee: uint64) -> None:
store_ids: List[bytes32] = []
if not secure:
2022-08-24 00:31:52 +03:00
trade_record = await self.wallet_rpc.get_offer(trade_id=trade_id, file_contents=True)
trading_offer = TradingOffer.from_bytes(trade_record.offer)
summary = await DataLayerWallet.get_offer_summary(offer=trading_offer)
store_ids = [bytes32.from_hexstr(offered["launcher_id"]) for offered in summary["offered"]]
await self.wallet_rpc.cancel_offer(
trade_id=trade_id,
secure=secure,
fee=fee,
)
if not secure:
for store_id in store_ids:
await self.data_store.clear_pending_roots(tree_id=store_id)