Upload files.

This commit is contained in:
Florin Chirica 2022-05-05 19:39:18 +03:00
parent dcbceea2d3
commit 3e3b520074
No known key found for this signature in database
GPG Key ID: 1805593F7B529698
3 changed files with 37 additions and 12 deletions

View File

@ -16,7 +16,7 @@ from chia.util.ints import uint16, uint32, uint64
from chia.util.path import mkdir, path_from_root
from chia.wallet.transaction_record import TransactionRecord
from chia.data_layer.data_layer_wallet import SingletonRecord
from chia.data_layer.download_data import insert_from_delta_file
from chia.data_layer.download_data import insert_from_delta_file, maybe_write_files_for_root
from chia.data_layer.data_layer_server import DataLayerServer
@ -79,7 +79,7 @@ class DataLayer:
subscriptions = await self.get_subscriptions()
for subscription in subscriptions:
await self.wallet_rpc.dl_track_new(subscription.tree_id)
self.periodically_fetch_data_task: asyncio.Task[Any] = asyncio.create_task(self.periodically_fetch_data())
self.periodically_manage_data_task: asyncio.Task[Any] = asyncio.create_task(self.periodically_manage_data())
return True
def _close(self) -> None:
@ -91,7 +91,7 @@ class DataLayer:
await self.connection.close()
if self.config.get("run_server", False):
await self.data_layer_server.stop()
self.periodically_fetch_data_task.cancel()
self.periodically_manage_data_task.cancel()
async def create_store(
self, fee: uint64, root: bytes32 = bytes32([0] * 32)
@ -101,6 +101,7 @@ class DataLayer:
if res is None:
self.log.fatal("failed creating store")
self.initialized = True
await self.subscribe(tree_id, [], [])
return txs, tree_id
async def batch_update(
@ -233,6 +234,23 @@ class DataLayer:
except Exception as e:
self.log.warning(f"Exception while downloading files for {tree_id}: {e} {traceback.format_exc()}.")
async def upload_files(self, tree_id: bytes32) -> None:
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None:
return
root = await self.data_store.get_tree_root(tree_id=tree_id)
publish_generation = min(int(singleton_record.generation), 0 if root is None else root.generation)
# If we make some batch updates, which get confirmed to the chain, we need to create the files.
# We iterate back and write the missing files, until we find the files already written.
while publish_generation > 0 and maybe_write_files_for_root(
self.data_store,
tree_id,
root,
self.server_files_location,
):
publish_generation -= 1
root = await self.data_store.get_tree_root(tree_id=tree_id, generation=publish_generation)
async def subscribe(self, store_id: bytes32, ip: List[str], port: List[uint16]) -> None:
subscription = Subscription(store_id, ip, port)
subscriptions = await self.get_subscriptions()
@ -261,17 +279,18 @@ class DataLayer:
async def get_kv_diff(self, tree_id: bytes32, hash_1: bytes32, hash_2: bytes32) -> Set[DiffData]:
return await self.data_store.get_kv_diff(tree_id, hash_1, hash_2)
async def periodically_fetch_data(self) -> None:
fetch_data_interval = self.config.get("fetch_data_interval", 60)
async def periodically_manage_data(self) -> None:
manage_data_interval = self.config.get("manage_data_interval", 60)
while not self._shut_down:
async with self.subscription_lock:
subscriptions = await self.data_store.get_subscriptions()
for subscription in subscriptions:
try:
await self.fetch_and_validate(subscription)
await self.upload_files(subscription.tree_id)
except Exception as e:
self.log.error(f"Exception while fetching data: {type(e)} {e} {traceback.format_exc()}.")
try:
await asyncio.sleep(fetch_data_interval)
await asyncio.sleep(manage_data_interval)
except asyncio.CancelledError:
pass

View File

@ -41,20 +41,26 @@ async def insert_into_data_store(
await data_store.insert_batch_root(tree_id, root_hash, Status.COMMITTED)
async def write_files_for_root(
async def maybe_write_files_for_root(
data_store: DataStore,
tree_id: bytes32,
root: Root,
foldername: str,
) -> None:
foldername: Path,
) -> bool:
if root.node_hash is not None:
node_hash = root.node_hash
else:
node_hash = bytes32([0] * 32) # todo change
filename_full_tree = os.path.join(foldername, get_full_tree_filename(tree_id, node_hash, root.generation))
filename_diff_tree = os.path.join(foldername, get_delta_filename(tree_id, node_hash, root.generation))
await data_store.write_tree_to_file(root, node_hash, tree_id, False, filename_full_tree)
await data_store.write_tree_to_file(root, node_hash, tree_id, True, filename_diff_tree)
written = False
if not os.path.exists(filename_full_tree):
await data_store.write_tree_to_file(root, node_hash, tree_id, False, filename_full_tree)
written = True
if not os.path.exists(filename_diff_tree):
await data_store.write_tree_to_file(root, node_hash, tree_id, True, filename_diff_tree)
written = True
return written
async def insert_from_delta_file(

View File

@ -552,7 +552,7 @@ data_layer:
# Switch this to True if we want to run the server.
run_server: True
# Data for running a data layer client.
fetch_data_interval: 60
manage_data_interval: 60
selected_network: *selected_network
# If True, starts an RPC server at the following port
start_rpc_server: True