Get rid of DataLayerServers.

This commit is contained in:
Florin Chirica 2022-04-04 13:45:38 +03:00
parent 6d878e5969
commit 89d1761907
No known key found for this signature in database
GPG Key ID: 1805593F7B529698
5 changed files with 22 additions and 34 deletions

View File

@ -5,14 +5,14 @@ import aiosqlite
import traceback
import asyncio
import aiohttp
from chia.data_layer.data_layer_types import InternalNode, TerminalNode, Subscription, Root, DiffData, DataServersInfo
from chia.data_layer.data_layer_types import InternalNode, TerminalNode, Subscription, Root, DiffData
from chia.data_layer.data_store import DataStore
from chia.rpc.wallet_rpc_client import WalletRpcClient
from chia.server.server import ChiaServer
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.config import load_config
from chia.util.db_wrapper import DBWrapper
from chia.util.ints import uint32, uint64
from chia.util.ints import uint16, uint32, uint64
from chia.util.path import mkdir, path_from_root
from chia.wallet.transaction_record import TransactionRecord
from chia.data_layer.data_layer_wallet import SingletonRecord
@ -199,7 +199,7 @@ class DataLayer:
"data_layer/client_downloaded_files",
)
downloaded = False
for ip, port in zip(subscription.data_servers_info.ip, subscription.data_servers_info.port):
for ip, port in zip(subscription.ip, subscription.port):
try:
downloaded = await download_delta_files(
subscription.tree_id,
@ -246,8 +246,8 @@ class DataLayer:
)
await self.data_store.set_validated_wallet_generation(tree_id, int(singleton_record.generation))
async def subscribe(self, store_id: bytes32, data_servers_info: DataServersInfo) -> None:
subscription = Subscription(store_id, data_servers_info)
async def subscribe(self, store_id: bytes32, ip: List[str], port: List[uint16]) -> None:
subscription = Subscription(store_id, ip, port)
subscriptions = await self.get_subscriptions()
if subscription.tree_id in (subscription.tree_id for subscription in subscriptions):
await self.data_store.update_existing_subscription(subscription)

View File

@ -188,16 +188,11 @@ class DeletionData:
root_status: Status
@dataclass(frozen=True)
class DataServersInfo:
ip: List[str]
port: List[uint16]
@dataclass(frozen=True)
class Subscription:
tree_id: bytes32
data_servers_info: DataServersInfo
ip: List[str]
port: List[uint16]
@dataclass(frozen=True)

View File

@ -27,7 +27,6 @@ from chia.data_layer.data_layer_types import (
Subscription,
DiffData,
OperationType,
DataServersInfo,
)
from chia.data_layer.data_layer_util import row_to_node
from chia.types.blockchain_format.program import Program
@ -1193,8 +1192,8 @@ class DataStore:
async def subscribe(self, subscription: Subscription, *, lock: bool = True) -> None:
async with self.db_wrapper.locked_transaction(lock=lock):
servers_ip = json.dumps(subscription.data_servers_info.ip)
servers_port = json.dumps(subscription.data_servers_info.port)
servers_ip = json.dumps(subscription.ip)
servers_port = json.dumps(subscription.port)
await self.db.execute(
"INSERT INTO subscriptions(tree_id, servers_ip, servers_port, validated_wallet_generation) "
"VALUES (:tree_id, :servers_ip, :servers_port, 0)",
@ -1207,8 +1206,8 @@ class DataStore:
async def update_existing_subscription(self, subscription: Subscription, *, lock: bool = True) -> None:
async with self.db_wrapper.locked_transaction(lock=lock):
servers_ip = json.dumps(subscription.data_servers_info.ip)
servers_port = json.dumps(subscription.data_servers_info.port)
servers_ip = json.dumps(subscription.ip)
servers_port = json.dumps(subscription.port)
await self.db.execute(
"""
UPDATE subscriptions SET servers_ip = :servers_ip, servers_port = :servers_port
@ -1254,7 +1253,7 @@ class DataStore:
tree_id = bytes32.fromhex(row["tree_id"])
servers_ip = json.loads(row["servers_ip"])
servers_port = json.loads(row["servers_port"])
subscriptions.append(Subscription(tree_id, DataServersInfo(servers_ip, servers_port)))
subscriptions.append(Subscription(tree_id, servers_ip, servers_port))
return subscriptions

View File

@ -2,7 +2,7 @@ import dataclasses
from typing import Any, Callable, Dict, List
from chia.data_layer.data_layer import DataLayer
from chia.data_layer.data_layer_types import Side, DataServersInfo
from chia.data_layer.data_layer_types import Side
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.byte_types import hexstr_to_bytes
@ -204,8 +204,7 @@ class DataLayerRpcApi:
store_id_bytes = bytes32.from_hexstr(store_id)
ips = request["ips"]
ports = request["ports"]
data_servers_info = DataServersInfo(ips, ports)
await self.service.subscribe(store_id=store_id_bytes, data_servers_info=data_servers_info)
await self.service.subscribe(store_id=store_id_bytes, ip=ips, port=ports)
return {}
async def unsubscribe(self, request: Dict[str, Any]) -> Dict[str, Any]:

View File

@ -23,7 +23,6 @@ from chia.data_layer.data_layer_types import (
DeletionData,
OperationType,
DiffData,
DataServersInfo,
)
from chia.data_layer.data_layer_util import _debug_dump
from chia.data_layer.data_store import DataStore
@ -1133,19 +1132,15 @@ async def test_rollback_to_generation(data_store: DataStore, tree_id: bytes32) -
@pytest.mark.asyncio
async def test_subscribe_unsubscribe(data_store: DataStore, tree_id: bytes32) -> None:
await data_store.subscribe(Subscription(tree_id, DataServersInfo(["127.0.0.1"], [uint16(8000)])))
assert await data_store.get_subscriptions() == [
Subscription(tree_id, DataServersInfo(["127.0.0.1"], [uint16(8000)]))
]
await data_store.update_existing_subscription(Subscription(tree_id, DataServersInfo(["0.0.0.0"], [uint16(8002)])))
assert await data_store.get_subscriptions() == [Subscription(tree_id, DataServersInfo(["0.0.0.0"], [uint16(8002)]))]
await data_store.update_existing_subscription(Subscription(tree_id, DataServersInfo(["0.0.0.0"], [uint16(8001)])))
assert await data_store.get_subscriptions() == [Subscription(tree_id, DataServersInfo(["0.0.0.0"], [uint16(8001)]))]
await data_store.subscribe(Subscription(tree_id, ["127.0.0.1"], [uint16(8000)]))
assert await data_store.get_subscriptions() == [Subscription(tree_id, ["127.0.0.1"], [uint16(8000)])]
await data_store.update_existing_subscription(Subscription(tree_id, ["0.0.0.0"], [uint16(8002)]))
assert await data_store.get_subscriptions() == [Subscription(tree_id, ["0.0.0.0"], [uint16(8002)])]
await data_store.update_existing_subscription(Subscription(tree_id, ["0.0.0.0"], [uint16(8001)]))
assert await data_store.get_subscriptions() == [Subscription(tree_id, ["0.0.0.0"], [uint16(8001)])]
await data_store.unsubscribe(tree_id)
assert await data_store.get_subscriptions() == []
await data_store.subscribe(
Subscription(tree_id, DataServersInfo(["0.0.0.0", "127.0.0.1"], [uint16(8003), uint16(8004)]))
)
await data_store.subscribe(Subscription(tree_id, ["0.0.0.0", "127.0.0.1"], [uint16(8003), uint16(8004)]))
assert await data_store.get_subscriptions() == [
Subscription(tree_id, DataServersInfo(["0.0.0.0", "127.0.0.1"], [uint16(8003), uint16(8004)])),
Subscription(tree_id, ["0.0.0.0", "127.0.0.1"], [uint16(8003), uint16(8004)]),
]