Add Signing message APIs (#13491)

* Add sign APIs

* Add sign APIs

* Resolve comment

* Resolve comment & Add CLI

* Fix pre-commit

* Refine

* Fix unit tests

* Resolve comments
This commit is contained in:
Kronus91 2022-09-28 02:26:54 -07:00 committed by GitHub
parent 5001c9030d
commit 137e61d68a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 315 additions and 34 deletions

View File

@ -5,6 +5,7 @@ import click
from chia.cmds.plotnft import validate_fee
from chia.wallet.transaction_sorting import SortKey
from chia.wallet.util.address_type import AddressType
from chia.wallet.util.wallet_types import WalletType
from chia.cmds.cmds_util import execute_with_wallet
@ -273,6 +274,25 @@ def get_derivation_index_cmd(wallet_rpc_port: Optional[int], fingerprint: int) -
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, get_derivation_index))
@wallet_cmd.command("sign_message", short_help="Sign a message by a derivation address")
@click.option(
"-wp",
"--wallet-rpc-port",
help="Set the port where the Wallet is hosting the RPC interface. See the rpc_port under wallet in config.yaml",
type=int,
default=None,
)
@click.option("-f", "--fingerprint", help="Set the fingerprint to specify which wallet to use", type=int)
@click.option("-a", "--address", help="The address you want to use for signing", type=str, required=True)
@click.option("-m", "--hex_message", help="The hex message you want sign", type=str, required=True)
def address_sign_message(wallet_rpc_port: Optional[int], fingerprint: int, address: str, hex_message: str) -> None:
extra_params: Dict[str, Any] = {"address": address, "message": hex_message, "type": AddressType.XCH}
import asyncio
from .wallet_funcs import sign_message
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, sign_message))
@wallet_cmd.command(
"update_derivation_index", short_help="Generate additional derived puzzle hashes starting at the provided index"
)
@ -494,6 +514,25 @@ def did_create_wallet_cmd(
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, create_did_wallet))
@did_cmd.command("sign_message", short_help="Sign a message by a DID")
@click.option(
"-wp",
"--wallet-rpc-port",
help="Set the port where the Wallet is hosting the RPC interface. See the rpc_port under wallet in config.yaml",
type=int,
default=None,
)
@click.option("-f", "--fingerprint", help="Set the fingerprint to specify which wallet to use", type=int)
@click.option("-i", "--did_id", help="DID ID you want to use for signing", type=str, required=True)
@click.option("-m", "--hex_message", help="The hex message you want to sign", type=str, required=True)
def did_sign_message(wallet_rpc_port: Optional[int], fingerprint: int, did_id: str, hex_message: str) -> None:
extra_params: Dict[str, Any] = {"did_id": did_id, "message": hex_message, "type": AddressType.DID}
import asyncio
from .wallet_funcs import sign_message
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, sign_message))
@did_cmd.command("set_name", short_help="Set DID wallet name")
@click.option(
"-wp",
@ -557,6 +596,25 @@ def nft_wallet_create_cmd(
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, create_nft_wallet))
@nft_cmd.command("sign_message", short_help="Sign a message by a NFT")
@click.option(
"-wp",
"--wallet-rpc-port",
help="Set the port where the Wallet is hosting the RPC interface. See the rpc_port under wallet in config.yaml",
type=int,
default=None,
)
@click.option("-f", "--fingerprint", help="Set the fingerprint to specify which wallet to use", type=int)
@click.option("-i", "--nft_id", help="NFT ID you want to use for signing", type=str, required=True)
@click.option("-m", "--hex_message", help="The hex message you want to sign", type=str, required=True)
def nft_sign_message(wallet_rpc_port: Optional[int], fingerprint: int, nft_id: str, hex_message: str) -> None:
extra_params: Dict[str, Any] = {"nft_id": nft_id, "message": hex_message, "type": AddressType.NFT}
import asyncio
from .wallet_funcs import sign_message
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, sign_message))
@nft_cmd.command("mint", short_help="Mint an NFT")
@click.option(
"-wp",

View File

@ -1078,3 +1078,19 @@ async def delete_notifications(args: Dict, wallet_client: WalletRpcClient, finge
print(f"Success: {await wallet_client.delete_notifications()}")
else:
print(f"Success: {await wallet_client.delete_notifications(ids=ids)}")
async def sign_message(args: Dict, wallet_client: WalletRpcClient, fingerprint: int) -> None:
if args["type"] == AddressType.XCH:
pubkey, signature = await wallet_client.sign_message_by_address(args["address"], args["message"])
elif args["type"] == AddressType.DID:
pubkey, signature = await wallet_client.sign_message_by_id(args["did_id"], args["message"])
elif args["type"] == AddressType.NFT:
pubkey, signature = await wallet_client.sign_message_by_id(args["nft_id"], args["message"])
else:
print("Invalid wallet type.")
return
print("")
print(f'Message: {args["message"]}')
print(f"Public Key: {pubkey}")
print(f"Signature: {signature}")

View File

@ -52,7 +52,7 @@ from chia.wallet.puzzle_drivers import PuzzleInfo, Solver
from chia.wallet.trade_record import TradeRecord
from chia.wallet.trading.offer import Offer
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.util.address_type import AddressType
from chia.wallet.util.address_type import AddressType, is_valid_address
from chia.wallet.util.transaction_type import TransactionType
from chia.wallet.util.wallet_types import AmountWithPuzzlehash, WalletType
from chia.wallet.wallet_info import WalletInfo
@ -115,6 +115,8 @@ class WalletRpcApi:
"/get_notifications": self.get_notifications,
"/delete_notifications": self.delete_notifications,
"/send_notification": self.send_notification,
"/sign_message_by_address": self.sign_message_by_address,
"/sign_message_by_id": self.sign_message_by_id,
# CATs and trading
"/cat_set_name": self.cat_set_name,
"/cat_asset_id_to_name": self.cat_asset_id_to_name,
@ -147,7 +149,6 @@ class WalletRpcApi:
"/did_get_current_coin_info": self.did_get_current_coin_info,
"/did_create_backup_file": self.did_create_backup_file,
"/did_transfer_did": self.did_transfer_did,
"/did_sign_message": self.did_sign_message,
# NFT Wallet
"/nft_mint_nft": self.nft_mint_nft,
"/nft_get_nfts": self.nft_get_nfts,
@ -1002,6 +1003,59 @@ class WalletRpcApi:
await self.service.wallet_state_manager.add_pending_transaction(tx)
return {"tx": tx.to_json_dict_convenience(self.service.config)}
async def sign_message_by_address(self, request) -> EndpointResult:
"""
Given a derived P2 address, sign the message by its private key.
:param request:
:return:
"""
puzzle_hash: bytes32 = decode_puzzle_hash(request["address"])
pubkey, signature = await self.service.wallet_state_manager.main_wallet.sign_message(
request["message"], puzzle_hash
)
return {"success": True, "pubkey": str(pubkey), "signature": str(signature)}
async def sign_message_by_id(self, request) -> EndpointResult:
"""
Given a NFT/DID ID, sign the message by the P2 private key.
:param request:
:return:
"""
entity_id: bytes32 = decode_puzzle_hash(request["id"])
selected_wallet: Optional[WalletProtocol] = None
if is_valid_address(request["id"], {AddressType.DID}, self.service.config):
for wallet in self.service.wallet_state_manager.wallets.values():
if wallet.type() == WalletType.DECENTRALIZED_ID.value:
assert isinstance(wallet, DIDWallet)
assert wallet.did_info.origin_coin is not None
if wallet.did_info.origin_coin.name() == entity_id:
selected_wallet = wallet
break
if selected_wallet is None:
return {"success": False, "error": f"DID for {entity_id.hex()} doesn't exist."}
assert isinstance(selected_wallet, DIDWallet)
pubkey, signature = await selected_wallet.sign_message(request["message"])
elif is_valid_address(request["id"], {AddressType.NFT}, self.service.config):
target_nft: Optional[NFTCoinInfo] = None
for wallet in self.service.wallet_state_manager.wallets.values():
if wallet.type() == WalletType.NFT.value:
assert isinstance(wallet, NFTWallet)
nft: Optional[NFTCoinInfo] = await wallet.get_nft(entity_id)
if nft is not None:
selected_wallet = wallet
target_nft = nft
break
if selected_wallet is None or target_nft is None:
return {"success": False, "error": f"NFT for {entity_id.hex()} doesn't exist."}
assert isinstance(selected_wallet, NFTWallet)
pubkey, signature = await selected_wallet.sign_message(request["message"], target_nft)
else:
return {"success": False, "error": f'Unknown ID type, {request["id"]}'}
return {"success": True, "pubkey": str(pubkey), "signature": str(signature)}
##########################################################################################
# CATs and Trading
##########################################################################################
@ -1541,17 +1595,6 @@ class WalletRpcApi:
"transaction_id": txs.name,
}
async def did_sign_message(self, request) -> EndpointResult:
wallet_id = uint32(request["wallet_id"])
did_wallet: WalletProtocol = self.service.wallet_state_manager.wallets[wallet_id]
assert isinstance(did_wallet, DIDWallet)
hex_message = request["message"]
if hex_message.startswith("0x") or hex_message.startswith("0X"):
hex_message = hex_message[2:]
message = bytes.fromhex(hex_message)
pubkey, signature = await did_wallet.sign_message(message)
return {"success": True, "pubkey": str(pubkey), "signature": str(signature)}
##########################################################################################
# NFT Wallet
##########################################################################################

View File

@ -866,3 +866,11 @@ class WalletRpcClient(RpcClient):
},
)
return TransactionRecord.from_json_dict_convenience(response["tx"])
async def sign_message_by_address(self, address: str, message: str) -> Tuple[str, str]:
response = await self.fetch("sign_message_by_address", {"address": address, "message": message})
return response["pubkey"], response["signature"]
async def sign_message_by_id(self, id: str, message: str) -> Tuple[str, str]:
response = await self.fetch("sign_message_by_id", {"id": id, "message": message})
return response["pubkey"], response["signature"]

View File

@ -17,7 +17,6 @@ from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.types.spend_bundle import SpendBundle
from chia.util.hash import std_hash
from chia.util.ints import uint64, uint32, uint8, uint128
from chia.wallet.util.transaction_type import TransactionType
from chia.util.condition_tools import conditions_dict_for_solution, pkm_pairs_for_conditions_dict
@ -1100,7 +1099,7 @@ class DIDWallet:
return parent_info
async def sign_message(self, message: bytes) -> Tuple[G1Element, G2Element]:
async def sign_message(self, message: str) -> Tuple[G1Element, G2Element]:
if self.did_info.current_inner is None:
raise ValueError("Missing DID inner puzzle.")
puzzle_args = did_wallet_puzzles.uncurry_innerpuz(self.did_info.current_inner)
@ -1111,8 +1110,8 @@ class DIDWallet:
pubkey, private = await self.wallet_state_manager.get_keys(puzzle_hash)
synthetic_secret_key = calculate_synthetic_secret_key(private, DEFAULT_HIDDEN_PUZZLE_HASH)
synthetic_pk = synthetic_secret_key.get_g1()
prefix = f"\x18Chia Signed Message:\n{len(message)}"
return synthetic_pk, AugSchemeMPL.sign(synthetic_secret_key, std_hash(prefix.encode("utf-8") + message))
puzzle: Program = Program.to(("Chia Signed Message", message))
return synthetic_pk, AugSchemeMPL.sign(synthetic_secret_key, puzzle.get_tree_hash())
else:
raise ValueError("Invalid inner DID puzzle.")

View File

@ -531,6 +531,19 @@ class NFTWallet:
else:
return puzzle_info
async def sign_message(self, message: str, nft: NFTCoinInfo) -> Tuple[G1Element, G2Element]:
uncurried_nft = UncurriedNFT.uncurry(*nft.full_puzzle.uncurry())
if uncurried_nft is not None:
p2_puzzle = uncurried_nft.p2_puzzle
puzzle_hash = p2_puzzle.get_tree_hash()
pubkey, private = await self.wallet_state_manager.get_keys(puzzle_hash)
synthetic_secret_key = calculate_synthetic_secret_key(private, DEFAULT_HIDDEN_PUZZLE_HASH)
synthetic_pk = synthetic_secret_key.get_g1()
puzzle: Program = Program.to(("Chia Signed Message", message))
return synthetic_pk, AugSchemeMPL.sign(synthetic_secret_key, puzzle.get_tree_hash())
else:
raise ValueError("Invalid NFT puzzle.")
async def get_coins_to_offer(
self, nft_id: bytes32, amount: uint64, min_coin_amount: Optional[uint64] = None
) -> Set[Coin]:

View File

@ -1,9 +1,9 @@
from __future__ import annotations
import logging
import time
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Tuple
from blspy import G1Element
from blspy import G1Element, G2Element, AugSchemeMPL
from chia.consensus.cost_calculator import NPCResult
from chia.full_node.bundle_tools import simple_solution_generator
@ -432,6 +432,13 @@ class Wallet:
self.wallet_state_manager.constants.MAX_BLOCK_COST_CLVM,
)
async def sign_message(self, message: str, puzzle_hash: bytes32) -> Tuple[G1Element, G2Element]:
pubkey, private = await self.wallet_state_manager.get_keys(puzzle_hash)
synthetic_secret_key = calculate_synthetic_secret_key(private, DEFAULT_HIDDEN_PUZZLE_HASH)
synthetic_pk = synthetic_secret_key.get_g1()
puzlle: Program = Program.to(("Chia Signed Message", message))
return synthetic_pk, AugSchemeMPL.sign(synthetic_secret_key, puzlle.get_tree_hash())
async def generate_signed_transaction(
self,
amount: uint64,

View File

@ -871,6 +871,11 @@ class WalletStateManager:
assert isinstance(nft_wallet, NFTWallet)
if parent_coin_state.spent_height is not None:
await nft_wallet.remove_coin(coin_spend.coin, uint32(parent_coin_state.spent_height))
num = await nft_wallet.get_current_nfts()
if len(num) == 0 and nft_wallet.did_id is not None and new_did_id != old_did_id:
self.log.info(f"No NFT, deleting wallet {nft_wallet.did_id.hex()} ...")
await self.user_store.delete_wallet(nft_wallet.wallet_info.id)
self.wallets.pop(nft_wallet.wallet_info.id)
if nft_wallet_info.did_id == new_did_id:
self.log.info(
"Adding new NFT, NFT_ID:%s, DID_ID:%s",

View File

@ -2,15 +2,17 @@ import json
from typing import Optional
import pytest
from blspy import AugSchemeMPL
from blspy import AugSchemeMPL, G1Element, G2Element
from chia.consensus.block_rewards import calculate_pool_reward, calculate_base_farmer_reward
from chia.rpc.wallet_rpc_api import WalletRpcApi
from chia.simulator.simulator_protocol import FarmNewBlockProtocol
from chia.types.blockchain_format.program import Program
from chia.types.peer_info import PeerInfo
from chia.types.spend_bundle import SpendBundle
from chia.util.hash import std_hash
from chia.util.bech32m import encode_puzzle_hash
from chia.util.ints import uint16, uint32, uint64
from chia.wallet.util.address_type import AddressType
from chia.wallet.util.wallet_types import WalletType
from chia.wallet.did_wallet.did_wallet import DIDWallet
@ -849,6 +851,7 @@ class TestDIDWallet:
wallet_node_2, server_3 = wallets[1]
wallet = wallet_node.wallet_state_manager.main_wallet
wallet2 = wallet_node_2.wallet_state_manager.main_wallet
api_0 = WalletRpcApi(wallet_node)
ph = await wallet.get_new_puzzlehash()
if trusted:
@ -896,9 +899,16 @@ class TestDIDWallet:
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(ph2))
await time_out_assert(15, did_wallet_1.get_confirmed_balance, 101)
hex_message = "abcd"
pubkey, signature = await did_wallet_1.sign_message(bytes.fromhex(hex_message))
message = std_hash(
f"\x18Chia Signed Message:\n{len(bytes.fromhex(hex_message))}".encode("utf-8") + bytes.fromhex(hex_message)
message = "Hello World"
response = await api_0.sign_message_by_id(
{
"id": encode_puzzle_hash(did_wallet_1.did_info.origin_coin.name(), AddressType.DID.value),
"message": message,
}
)
puzzle: Program = Program.to(("Chia Signed Message", message))
assert AugSchemeMPL.verify(
G1Element.from_bytes(bytes.fromhex(response["pubkey"])),
puzzle.get_tree_hash(),
G2Element.from_bytes(bytes.fromhex(response["signature"])),
)
assert AugSchemeMPL.verify(pubkey, message, signature)

View File

@ -3,6 +3,7 @@ import time
from typing import Any, Awaitable, Callable, Dict, List
import pytest
from blspy import AugSchemeMPL, G1Element, G2Element
from clvm_tools.binutils import disassemble
from chia.consensus.block_rewards import calculate_base_farmer_reward, calculate_pool_reward
@ -987,16 +988,13 @@ async def test_nft_transfer_nft_with_did(two_wallet_nodes: Any, trusted: Any) ->
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(ph1))
await wait_rpc_state_condition(
5, api_0.nft_get_nfts, [dict(wallet_id=nft_wallet_0_id)], lambda x: not x["nft_list"]
)
await time_out_assert(5, len, 2, wallet_0.wallet_state_manager.wallets)
await time_out_assert(30, wallet_0.get_unconfirmed_balance, 5999999999798)
await time_out_assert(30, wallet_0.get_confirmed_balance, 5999999999798)
# wait for all wallets to be created
await time_out_assert(30, len, 3, wallet_1.wallet_state_manager.wallets)
did_wallet_1 = wallet_1.wallet_state_manager.wallets[3]
nft_wallet_0 = wallet_node_0.wallet_state_manager.wallets[nft_wallet_0_id]
assert len(await nft_wallet_0.get_current_nfts()) == 0
assert nft_wallet_0_id not in wallet_node_0.wallet_state_manager.wallets.keys()
# Check if the NFT owner DID is reset
resp = await api_1.nft_get_by_did({})
assert resp.get("success")
@ -1304,7 +1302,7 @@ async def test_nft_set_did(two_wallet_nodes: Any, trusted: Any) -> None:
nft_wallet_2_id = coins_response.get("wallet_id")
assert nft_wallet_2_id
await time_out_assert(30, wallet_node_0.wallet_state_manager.wallets[nft_wallet_1_id].get_nft_count, 0)
await time_out_assert(30, wallet_node_0.wallet_state_manager.wallets.get, 0, nft_wallet_1_id, 0)
# Check NFT DID
resp = await wait_rpc_state_condition(
@ -1329,7 +1327,7 @@ async def test_nft_set_did(two_wallet_nodes: Any, trusted: Any) -> None:
coins = resp["nft_list"]
assert len(coins) == 1
assert coins[0].owner_did is None
assert len(await wallet_node_0.wallet_state_manager.wallets[nft_wallet_2_id].get_current_nfts()) == 0
assert nft_wallet_2_id not in wallet_node_0.wallet_state_manager.wallets.keys()
nft_coin_id = coins[0].nft_coin_id
resp = await api_0.nft_get_info(dict(coin_id=nft_coin_id.hex(), latest=True))
assert resp["success"]
@ -1419,3 +1417,87 @@ async def test_set_nft_status(two_wallet_nodes: Any, trusted: Any) -> None:
assert coins_response.get("success")
coins = coins_response["nft_list"]
assert coins[0].pending_transaction
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_nft_sign_message(two_wallet_nodes: Any, trusted: Any) -> None:
num_blocks = 5
full_nodes, wallets, _ = two_wallet_nodes
full_node_api: FullNodeSimulator = full_nodes[0]
full_node_server = full_node_api.server
wallet_node_0, server_0 = wallets[0]
wallet_node_1, server_1 = wallets[1]
wallet_0 = wallet_node_0.wallet_state_manager.main_wallet
api_0 = WalletRpcApi(wallet_node_0)
ph = await wallet_0.get_new_puzzlehash()
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
for _ in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(ph))
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks - 1)]
)
await time_out_assert(30, wallet_0.get_unconfirmed_balance, funds)
await time_out_assert(30, wallet_0.get_confirmed_balance, funds)
await time_out_assert(30, wallet_is_synced, True, wallet_node_0, full_node_api)
res = await api_0.create_new_wallet(dict(wallet_type="nft_wallet", name="NFT WALLET 1"))
assert isinstance(res, dict)
assert res.get("success")
nft_wallet_0_id = res["wallet_id"]
# Create a NFT without DID
resp = await api_0.nft_mint_nft(
{
"wallet_id": nft_wallet_0_id,
"hash": "0xD4584AD463139FA8C0D9F68F4B59F185",
"uris": ["https://www.chia.net/img/branding/chia-logo.svg"],
"mu": ["https://www.chia.net/img/branding/chia-logo.svg"],
}
)
assert resp.get("success")
sb = resp["spend_bundle"]
# ensure hints are generated
assert compute_memos(sb)
await time_out_assert_not_none(30, full_node_api.full_node.mempool_manager.get_spendbundle, sb.name())
await make_new_block_with(resp, full_node_api, ph)
# Check DID NFT
coins_response = await wait_rpc_state_condition(
30, api_0.nft_get_nfts, [dict(wallet_id=nft_wallet_0_id)], lambda x: len(x["nft_list"]) > 0
)
assert coins_response["nft_list"], isinstance(coins_response, dict)
assert coins_response.get("success")
coins = coins_response["nft_list"]
assert len(coins) == 1
assert coins[0].owner_did is None
assert not coins[0].pending_transaction
message = "Hello World"
response = await api_0.sign_message_by_id(
{"id": encode_puzzle_hash(coins[0].launcher_id, AddressType.NFT.value), "message": message}
)
puzzle: Program = Program.to(("Chia Signed Message", message))
assert AugSchemeMPL.verify(
G1Element.from_bytes(bytes.fromhex(response["pubkey"])),
puzzle.get_tree_hash(),
G2Element.from_bytes(bytes.fromhex(response["signature"])),
)

View File

@ -4,15 +4,17 @@ from pathlib import Path
from typing import Any, Dict, List, Tuple
import pytest
from blspy import AugSchemeMPL, G1Element, G2Element
from chia.consensus.block_rewards import calculate_base_farmer_reward, calculate_pool_reward
from chia.protocols.full_node_protocol import RespondBlock
from chia.rpc.wallet_rpc_api import WalletRpcApi
from chia.server.server import ChiaServer
from chia.simulator.full_node_simulator import FullNodeSimulator
from chia.simulator.simulator_protocol import FarmNewBlockProtocol, ReorgProtocol
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.peer_info import PeerInfo
from chia.util.bech32m import encode_puzzle_hash
from chia.util.ints import uint16, uint32, uint64
from chia.wallet.derive_keys import master_sk_to_wallet_sk
from chia.wallet.transaction_record import TransactionRecord
@ -826,6 +828,44 @@ class TestWalletSimulator:
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(bytes32(32 * b"0")))
await time_out_assert(60, wallet.get_confirmed_balance, 12 * 10 ** 12)
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_sign_message(
self,
two_wallet_nodes: Tuple[List[FullNodeSimulator], List[Tuple[WalletNode, ChiaServer]], BlockTools],
trusted: bool,
self_hostname: str,
) -> None:
full_nodes, wallets, _ = two_wallet_nodes
full_node_api = full_nodes[0]
server_1 = full_node_api.full_node.server
wallet_node, server_2 = wallets[0]
wallet_node_2, server_3 = wallets[1]
api_0 = WalletRpcApi(wallet_node)
wallet = wallet_node.wallet_state_manager.main_wallet
ph = await wallet.get_new_puzzlehash()
if trusted:
wallet_node.config["trusted_peers"] = {server_1.node_id.hex(): server_1.node_id.hex()}
wallet_node_2.config["trusted_peers"] = {server_1.node_id.hex(): server_1.node_id.hex()}
else:
wallet_node.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_2.start_client(PeerInfo(self_hostname, uint16(server_1._port)), None)
message = "Hello World"
response = await api_0.sign_message_by_address({"address": encode_puzzle_hash(ph, "xch"), "message": message})
puzzle: Program = Program.to(("Chia Signed Message", message))
assert AugSchemeMPL.verify(
G1Element.from_bytes(bytes.fromhex(response["pubkey"])),
puzzle.get_tree_hash(),
G2Element.from_bytes(bytes.fromhex(response["signature"])),
)
@pytest.mark.parametrize(
"trusted",
[True, False],