Checkpoint changes.

This commit is contained in:
Florin Chirica 2022-08-11 15:45:08 +02:00
parent 9b73f7e97c
commit 5351de02ff
No known key found for this signature in database
GPG Key ID: 1805593F7B529698
2 changed files with 16 additions and 18 deletions

View File

@ -245,8 +245,7 @@ class DataLayer:
await self.data_store.build_ancestor_table_for_latest_root(tree_id=tree_id) await self.data_store.build_ancestor_table_for_latest_root(tree_id=tree_id)
await self.data_store.clear_pending_roots(tree_id=tree_id) await self.data_store.clear_pending_roots(tree_id=tree_id)
async def fetch_and_validate(self, subscription: Subscription) -> None: async def fetch_and_validate(self, tree_id: bytes32) -> None:
tree_id = subscription.tree_id
singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True) singleton_record: Optional[SingletonRecord] = await self.wallet_rpc.dl_latest_singleton(tree_id, True)
if singleton_record is None: if singleton_record is None:
self.log.info(f"Fetch data: No singleton record for {tree_id}.") self.log.info(f"Fetch data: No singleton record for {tree_id}.")
@ -279,7 +278,7 @@ class DataLayer:
break break
self.log.info( self.log.info(
f"Downloading files {subscription.tree_id}. " f"Downloading files {tree_id}. "
f"Current wallet generation: {root.generation}. " f"Current wallet generation: {root.generation}. "
f"Target wallet generation: {singleton_record.generation}. " f"Target wallet generation: {singleton_record.generation}. "
f"Server used: {url}." f"Server used: {url}."
@ -294,7 +293,7 @@ class DataLayer:
try: try:
success = await insert_from_delta_file( success = await insert_from_delta_file(
self.data_store, self.data_store,
subscription.tree_id, tree_id,
root.generation, root.generation,
[record.root for record in reversed(to_download)], [record.root for record in reversed(to_download)],
server_info, server_info,
@ -303,7 +302,7 @@ class DataLayer:
) )
if success: if success:
self.log.info( self.log.info(
f"Finished downloading and validating {subscription.tree_id}. " f"Finished downloading and validating {tree_id}. "
f"Wallet generation saved: {singleton_record.generation}. " f"Wallet generation saved: {singleton_record.generation}. "
f"Root hash saved: {singleton_record.root}." f"Root hash saved: {singleton_record.root}."
) )
@ -427,7 +426,7 @@ class DataLayer:
async with self.subscription_lock: async with self.subscription_lock:
for subscription in subscriptions: for subscription in subscriptions:
try: try:
await self.fetch_and_validate(subscription) await self.fetch_and_validate(subscription.tree_id)
await self.upload_files(subscription.tree_id) await self.upload_files(subscription.tree_id)
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise

View File

@ -1304,18 +1304,17 @@ class DataStore:
self, tree_id: bytes32, server_info: ServerInfo, timestamp: int, *, lock: bool = True self, tree_id: bytes32, server_info: ServerInfo, timestamp: int, *, lock: bool = True
) -> None: ) -> None:
SEVEN_DAYS_BAN = 7 * 24 * 60 * 60 SEVEN_DAYS_BAN = 7 * 24 * 60 * 60
new_server_info = ServerInfo( new_server_info = replace(
server_info.url, server_info,
server_info.num_consecutive_failures + 1, num_consecutive_failures=server_info.num_consecutive_failures + 1,
max(server_info.ignore_till, timestamp + SEVEN_DAYS_BAN), ignore_till=max(server_info.ignore_till, timestamp + SEVEN_DAYS_BAN),
) )
await self.update_server_info(tree_id, new_server_info, lock=lock) await self.update_server_info(tree_id, new_server_info, lock=lock)
async def received_correct_file(self, tree_id: bytes32, server_info: ServerInfo, *, lock: bool = True) -> None: async def received_correct_file(self, tree_id: bytes32, server_info: ServerInfo, *, lock: bool = True) -> None:
new_server_info = ServerInfo( new_server_info = replace(
server_info.url, server_info,
0, num_consecutive_failures=0,
server_info.ignore_till,
) )
await self.update_server_info(tree_id, new_server_info, lock=lock) await self.update_server_info(tree_id, new_server_info, lock=lock)
@ -1324,10 +1323,10 @@ class DataStore:
) -> None: ) -> None:
BAN_TIME_BY_MISSING_COUNT = [5 * 60] * 3 + [15 * 60] * 3 + [60 * 60] * 2 + [240 * 60] BAN_TIME_BY_MISSING_COUNT = [5 * 60] * 3 + [15 * 60] * 3 + [60 * 60] * 2 + [240 * 60]
index = min(server_info.num_consecutive_failures, len(BAN_TIME_BY_MISSING_COUNT) - 1) index = min(server_info.num_consecutive_failures, len(BAN_TIME_BY_MISSING_COUNT) - 1)
new_server_info = ServerInfo( new_server_info = replace(
server_info.url, server_info,
server_info.num_consecutive_failures + 1, num_consecutive_failures=server_info.num_consecutive_failures + 1,
max(server_info.ignore_till, timestamp + BAN_TIME_BY_MISSING_COUNT[index]), ignore_till=max(server_info.ignore_till, timestamp + BAN_TIME_BY_MISSING_COUNT[index]),
) )
await self.update_server_info(tree_id, new_server_info, lock=lock) await self.update_server_info(tree_id, new_server_info, lock=lock)