start datalayer rpc and wallet

This commit is contained in:
almog 2021-09-19 21:31:48 +03:00
parent 12505ba546
commit 44211ffea4
No known key found for this signature in database
GPG Key ID: 9F7BBD2FEE9C050F
4 changed files with 143 additions and 22 deletions

View File

@ -0,0 +1,52 @@
from typing import Callable, Dict, List
from chia.data_layer.data_layer import DataLayer
from clvm import CLVMObject
from chia.data_layer.data_layer_wallet import DataLayerWallet
from chia.data_layer.data_store import Action, OperationType
from chia.types.blockchain_format.sized_bytes import bytes32
# todo input assertions for all rpc's
from chia.util.hash import std_hash
class DataLayerRpcApi:
def __init__(self, service: DataLayer, wallet: DataLayerWallet):
self.data_layer_service: DataLayer = service
self.data_layer_wallet = wallet
self.service_name = "chia_data_layer"
def get_routes(self) -> Dict[str, Callable]:
return {"/update": self.update, "/get_row": self.get_row}
async def get_row(self, request: Dict) -> bytes:
row = b""
if "hash" in request:
row = await self.data_layer_service.data_store.get_row_by_hash(request["hash"])
elif "index" in request:
row = await self.data_layer_service.data_store.get_row_by_index(request["index"])
return row
async def update(self, request: Dict):
"""
rows_to_add a list of clvmobjects as bytes to add to talbe
rows_to_remove a list of row hashes to remove
"""
table: bytes32 = request["table"]
changelist = request["changelist"]
action_list: List[Action] = []
for change in changelist:
if change["action"] == "insert":
row = change["row"]
operation = OperationType.INSERT
index = await self.data_layer_service.data_store.insert_row(table, row)
else:
assert change["action"] == "delete"
row, index = await self.data_layer_service.data_store.delete_row_by_hash(table, change["row"])
operation = OperationType.DELETE
action_list.append(Action(op=operation, row=row, row_index=index))
state = await self.data_layer_service.data_store.get_table_state(table)
# todo get changelist hash, order changelist before committing hash
await self.data_layer_wallet.uptate_table_state(table, state, std_hash(action_list))
# todo need to mark data as pending and change once tx is confirmed

View File

@ -0,0 +1,58 @@
import logging
import time
from typing import Any, Optional, Set, Tuple, List, Dict
from blspy import PrivateKey, G2Element, G1Element
from chia.consensus.block_record import BlockRecord
from chia.pools.pool_config import PoolWalletConfig, load_pool_config, update_pool_config
from chia.pools.pool_wallet_info import (
PoolWalletInfo,
PoolSingletonState,
PoolState,
FARMING_TO_POOL,
SELF_POOLING,
LEAVING_POOL,
create_pool_state,
)
from chia.protocols.pool_protocol import POOL_PROTOCOL_VERSION
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint8
from chia.wallet.util.wallet_types import WalletType
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_info import WalletInfo
class DataLayerWallet:
wallet_state_manager: Any
log: logging.Logger
wallet_info: WalletInfo
target_state: Optional[PoolState]
standard_wallet: Wallet
wallet_id: int
singleton_list: List[Coin]
"""
interface to be used by datalayer for interacting with the chain
"""
@classmethod
def type(cls) -> uint8:
return uint8(WalletType.POOLING_WALLET)
def id(self):
return self.wallet_info.id
async def create_table(self, id: bytes32) -> bool:
return True
async def delete_table(self, id: bytes32) -> bool:
return True
async def get_table_state(self, id: bytes32) -> bytes:
return b""
async def uptate_table_state(self, id: bytes32, new_state: bytes32, action: bytes32) -> bool:
return True

View File

@ -32,9 +32,9 @@ class OperationType(IntEnum):
@dataclass(frozen=True)
class Action:
type: OperationType
op: OperationType
row_index: int
row: CLVMObject.CLVMObject
row: CLVMObject
@dataclass(frozen=True)
@ -78,7 +78,9 @@ class DataStore:
# TODO: do we need to handle multiple equal rows
# Just a raw collection of all ChiaLisp lists that are used
await self.db.execute("CREATE TABLE IF NOT EXISTS raw_rows(row_hash TEXT PRIMARY KEY, clvm_object BLOB)")
await self.db.execute(
"CREATE TABLE IF NOT EXISTS raw_rows(row_hash TEXT PRIMARY KEY,table_id TEXT, clvm_object BLOB)"
)
# The present properly ordered collection of rows.
await self.db.execute("CREATE TABLE IF NOT EXISTS data_rows(row_hash TEXT PRIMARY KEY)")
# TODO: needs a key
@ -99,31 +101,38 @@ class DataStore:
# TODO: Add some handling for multiple tables. Could be another layer of class
# for each table or another parameter to select the table.
async def get_row_by_index(self, index: int) -> CLVMObject.CLVMObject:
async def get_row_by_index(self, index: int) -> CLVMObject:
pass
# chia.util.merkle_set.TerminalNode requires 32 bytes so I think that's applicable here
async def get_row_by_hash(self, row_hash: bytes32) -> CLVMObject.CLVMObject:
async def get_row_by_hash(self, row_hash: bytes32) -> CLVMObject:
pass
async def insert_row(self, index: int, clvm_object: CLVMObject.CLVMObject) -> None:
async def insert_row(self, table_id: bytes32, clvm_object: CLVMObject) -> None:
row_hash = sha256_treehash(sexp=clvm_object)
cursor = await self.db.execute(
"SELECT * FROM raw_rows WHERE row_hash=:row_hash",
parameters={"row_hash": row_hash},
)
if await cursor.fetchone() is None:
await self.db.execute("INSERT INTO raw_rows (row_hash, clvm_object) VALUES(?, ?)", (row_hash, clvm_object))
await self.db.execute(
"INSERT INTO raw_rows (row_hash,table_id,clvm_object) VALUES(?,?,?)",
(row_hash, table_id, clvm_object),
)
await self.db.execute("INSERT INTO ")
await self.db.commit()
# "INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?)",
async def delete_row_by_index(self, index: int) -> None:
async def delete_row_by_index(self, table: bytes32, index: int) -> CLVMObject:
# todo this
pass
async def delete_row_by_hash(self, row_hash: bytes32) -> None:
async def delete_row_by_hash(self, table: bytes32, row_hash: bytes32) -> Tuple[CLVMObject, int]:
pass
async def get_table_state(self, table: bytes32) -> bytes32:
pass
# TODO: I'm not sure about the name here. I'm thinking that this will

View File

@ -101,14 +101,14 @@ class RpcServer:
request_node_type: Optional[NodeType] = None
if "node_type" in request:
request_node_type = NodeType(request["node_type"])
if self.rpc_api.service.server is None:
if self.rpc_api.data_layer_service.server is None:
raise ValueError("Global connections is not set")
if self.rpc_api.service.server._local_type is NodeType.FULL_NODE:
if self.rpc_api.data_layer_service.server._local_type is NodeType.FULL_NODE:
# TODO add peaks for peers
connections = self.rpc_api.service.server.get_connections(request_node_type)
connections = self.rpc_api.data_layer_service.server.get_connections(request_node_type)
con_info = []
if self.rpc_api.service.sync_store is not None:
peak_store = self.rpc_api.service.sync_store.peer_to_peak
if self.rpc_api.data_layer_service.sync_store is not None:
peak_store = self.rpc_api.data_layer_service.sync_store.peer_to_peak
else:
peak_store = None
for con in connections:
@ -135,7 +135,7 @@ class RpcServer:
}
con_info.append(con_dict)
else:
connections = self.rpc_api.service.server.get_connections(request_node_type)
connections = self.rpc_api.data_layer_service.server.get_connections(request_node_type)
con_info = [
{
"type": con.connection_type,
@ -158,19 +158,21 @@ class RpcServer:
port = request["port"]
target_node: PeerInfo = PeerInfo(host, uint16(int(port)))
on_connect = None
if hasattr(self.rpc_api.service, "on_connect"):
on_connect = self.rpc_api.service.on_connect
if getattr(self.rpc_api.service, "server", None) is None or not (
await self.rpc_api.service.server.start_client(target_node, on_connect)
if hasattr(self.rpc_api.data_layer_service, "on_connect"):
on_connect = self.rpc_api.data_layer_service.on_connect
if getattr(self.rpc_api.data_layer_service, "server", None) is None or not (
await self.rpc_api.data_layer_service.server.start_client(target_node, on_connect)
):
raise ValueError("Start client failed, or server is not set")
return {}
async def close_connection(self, request: Dict):
node_id = hexstr_to_bytes(request["node_id"])
if self.rpc_api.service.server is None:
if self.rpc_api.data_layer_service.server is None:
raise aiohttp.web.HTTPInternalServerError()
connections_to_close = [c for c in self.rpc_api.service.server.get_connections() if c.peer_node_id == node_id]
connections_to_close = [
c for c in self.rpc_api.data_layer_service.server.get_connections() if c.peer_node_id == node_id
]
if len(connections_to_close) == 0:
raise ValueError(f"Connection with node_id {node_id.hex()} does not exist")
for connection in connections_to_close:
@ -304,7 +306,7 @@ async def start_rpc_server(
"""
app = aiohttp.web.Application()
rpc_server = RpcServer(rpc_api, rpc_api.service_name, stop_cb, root_path, net_config)
rpc_server.rpc_api.service._set_state_changed_callback(rpc_server.state_changed)
rpc_server.rpc_api.data_layer_service._set_state_changed_callback(rpc_server.state_changed)
http_routes: Dict[str, Callable] = rpc_api.get_routes()
routes = [aiohttp.web.post(route, rpc_server._wrap_http_handler(func)) for (route, func) in http_routes.items()]