NFT1: ytx1991's Add RPC API for get any NFT (#11570)

* Add get_nft_info API

* Move API to wallet RPC

* Add untrusted case for tests

* Fix DID bug & Modify get_nft_info

* Remove dummy code

* Fix mismatch

* Resolve NFT update issue

* Remove print

Co-authored-by: ytx1991 <t.yu@chia.net>
This commit is contained in:
Amine Khaldi 2022-05-20 18:15:09 +01:00 committed by GitHub
parent 3ee55045fd
commit 0a06afa5d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 323 additions and 75 deletions

View File

@ -573,7 +573,7 @@ def nft_add_uri_cmd(
@click.option("-f", "--fingerprint", help="Set the fingerprint to specify which wallet to use", type=int)
@click.option("-i", "--id", help="Id of the NFT wallet to use", type=int, required=True)
@click.option("-ni", "--nft-coin-id", help="Id of the NFT coin to transfer", type=str, required=True)
@click.option("-aa", "--artist-address", help="Target artist's wallet address", type=str, required=True)
@click.option("-ta", "--target-address", help="Target recipient wallet address", type=str, required=True)
@click.option(
"-m",
"--fee",
@ -588,7 +588,7 @@ def nft_transfer_cmd(
fingerprint: int,
id: int,
nft_coin_id: str,
artist_address: str,
target_address: str,
fee: str,
) -> None:
import asyncio
@ -597,7 +597,7 @@ def nft_transfer_cmd(
extra_params = {
"wallet_id": id,
"nft_coin_id": nft_coin_id,
"artist_address": artist_address,
"target_address": target_address,
"fee": fee,
}
asyncio.run(execute_with_wallet(wallet_rpc_port, fingerprint, extra_params, transfer_nft))

View File

@ -709,9 +709,9 @@ async def transfer_nft(args: Dict, wallet_client: WalletRpcClient, fingerprint:
try:
wallet_id = args["wallet_id"]
nft_coin_id = args["nft_coin_id"]
artist_address = args["artist_address"]
target_address = args["target_address"]
fee = args["fee"]
response = await wallet_client.transfer_nft(wallet_id, nft_coin_id, artist_address, fee)
response = await wallet_client.transfer_nft(wallet_id, nft_coin_id, target_address, fee)
spend_bundle = response["spend_bundle"]
print(f"NFT transferred successfully with spend bundle: {spend_bundle}")
except Exception as e:

View File

@ -11,12 +11,14 @@ from chia.consensus.block_rewards import calculate_base_farmer_reward
from chia.pools.pool_wallet import PoolWallet
from chia.pools.pool_wallet_info import FARMING_TO_POOL, PoolState, PoolWalletInfo, create_pool_state
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.protocols.wallet_protocol import CoinState
from chia.server.outbound_message import NodeType, make_msg
from chia.simulator.simulator_protocol import FarmNewBlockProtocol
from chia.types.announcement import Announcement
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.types.spend_bundle import SpendBundle
from chia.util.bech32m import decode_puzzle_hash, encode_puzzle_hash
from chia.util.byte_types import hexstr_to_bytes
@ -35,8 +37,10 @@ from chia.wallet.derive_keys import (
match_address_to_sk,
)
from chia.wallet.did_wallet.did_wallet import DIDWallet
from chia.wallet.nft_wallet.nft_puzzles import get_nft_info_from_puzzle
from chia.wallet.nft_wallet import nft_puzzles
from chia.wallet.nft_wallet.nft_info import NFTInfo
from chia.wallet.nft_wallet.nft_wallet import NFTWallet
from chia.wallet.nft_wallet.uncurry_nft import UncurriedNFT
from chia.wallet.outer_puzzles import AssetType
from chia.wallet.puzzle_drivers import PuzzleInfo
from chia.wallet.rl_wallet.rl_wallet import RLWallet
@ -131,6 +135,7 @@ class WalletRpcApi:
# NFT Wallet
"/nft_mint_nft": self.nft_mint_nft,
"/nft_get_nfts": self.nft_get_nfts,
"/nft_get_info": self.nft_get_info,
"/nft_transfer_nft": self.nft_transfer_nft,
"/nft_add_uri": self.nft_add_uri,
# RL wallet
@ -1345,7 +1350,7 @@ class WalletRpcApi:
nfts = nft_wallet.get_current_nfts()
nft_info_list = []
for nft in nfts:
nft_info_list.append(get_nft_info_from_puzzle(nft.full_puzzle, nft.coin))
nft_info_list.append(nft_puzzles.get_nft_info_from_puzzle(nft.full_puzzle, nft.coin))
return {"wallet_id": wallet_id, "success": True, "nft_list": nft_info_list}
async def nft_transfer_nft(self, request):
@ -1366,6 +1371,73 @@ class WalletRpcApi:
log.exception(f"Failed to transfer NFT: {e}")
return {"success": False, "error": str(e)}
async def nft_get_info(self, request: Dict) -> Optional[Dict]:
assert self.service.wallet_state_manager is not None
if "coin_id" not in request:
raise ValueError("Coin ID is required.")
coin_id = bytes32.from_hexstr(request["coin_id"])
peer = self.service.wallet_state_manager.wallet_node.get_full_node_peer()
if peer is None:
raise ValueError("Cannot find a full node peer.")
# Get coin state
coin_state_list: List[CoinState] = await self.service.wallet_state_manager.wallet_node.get_coin_state(
[coin_id], peer=peer
)
if coin_state_list is None or len(coin_state_list) < 1:
raise ValueError(f"Coin record 0x{coin_id.hex()} not found")
coin_state: CoinState = coin_state_list[0]
if request.get("latest", True):
# Find the unspent coin
while coin_state.spent_height is not None:
coin_state_list = await self.service.wallet_state_manager.wallet_node.fetch_children(
peer, coin_state.coin.name()
)
odd_coin = 0
for coin in coin_state_list:
if coin.coin.amount % 2 == 1:
odd_coin += 1
if odd_coin > 1:
raise ValueError("This is not a singleton, multiple children coins found.")
coin_state = coin_state_list[0]
# Get parent coin
parent_coin_state_list: List[CoinState] = await self.service.wallet_state_manager.wallet_node.get_coin_state(
[coin_state.coin.parent_coin_info], peer=peer
)
if parent_coin_state_list is None or len(parent_coin_state_list) < 1:
raise ValueError(f"Parent coin record 0x{coin_state.coin.parent_coin_info.hex()} not found")
parent_coin_state: CoinState = parent_coin_state_list[0]
coin_spend: CoinSpend = await self.service.wallet_state_manager.wallet_node.fetch_puzzle_solution(
peer, parent_coin_state.spent_height, parent_coin_state.coin
)
# convert to NFTInfo
try:
# Check if the metadata is updated
inner_solution: Program = Program.from_bytes(bytes(coin_spend.solution)).rest().rest().first().first()
full_puzzle: Program = Program.from_bytes(bytes(coin_spend.puzzle_reveal))
update_condition = None
for condition in inner_solution.rest().first().rest().as_iter():
if condition.first().as_int() == -24:
update_condition = condition
break
if update_condition is not None:
uncurried_nft: UncurriedNFT = UncurriedNFT.uncurry(full_puzzle)
metadata: Program = uncurried_nft.metadata
metadata = nft_puzzles.update_metadata(metadata, update_condition)
# Note: This is not the actual unspent NFT full puzzle.
# There is no way to rebuild the full puzzle in a different wallet.
# But it shouldn't have impact on generating the NFTInfo, since inner_puzzle is not used there.
full_puzzle = nft_puzzles.create_full_puzzle(
uncurried_nft.singleton_launcher_id,
metadata,
uncurried_nft.metdata_updater_hash,
uncurried_nft.inner_puzzle,
)
nft_info: NFTInfo = nft_puzzles.get_nft_info_from_puzzle(full_puzzle, coin_state.coin)
except Exception as e:
raise ValueError(f"The coin is not a NFT. {e}")
else:
return {"nft_info": nft_info}
async def nft_add_uri(self, request) -> Dict:
assert self.service.wallet_state_manager is not None
wallet_id = uint32(request["wallet_id"])

View File

@ -590,11 +590,16 @@ class WalletRpcClient(RpcClient):
response = await self.fetch("nft_add_uri", request)
return response
async def transfer_nft(self, wallet_id, nft_coin_id, artist_address, fee):
async def get_nft_info(self, coin_id: bytes32, latest: bool = True):
request: Dict[str, Any] = {"coin_id": coin_id.hex(), "latest": latest}
response = await self.fetch("nft_get_info", request)
return response
async def transfer_nft(self, wallet_id, nft_coin_id, target_address, fee):
request: Dict[str, Any] = {
"wallet_id": wallet_id,
"nft_coin_id": nft_coin_id,
"target_address": artist_address,
"target_address": target_address,
"fee": fee,
}
response = await self.fetch("nft_transfer_nft", request)

View File

@ -260,9 +260,8 @@ class DIDWallet:
self.wallet_info = await wallet_state_manager.user_store.create_wallet(
name, WalletType.DISTRIBUTED_ID.value, info_as_string, in_transaction=True
)
await self.wallet_state_manager.add_new_wallet(self, self.wallet_info.id)
await self.wallet_state_manager.update_wallet_puzzle_hashes(self.wallet_info.id)
await self.wallet_state_manager.add_new_wallet(self, self.wallet_info.id, in_transaction=True)
await self.wallet_state_manager.update_wallet_puzzle_hashes(self.wallet_info.id, in_transaction=True)
await self.load_parent(self.did_info)
self.log.info(f"New DID wallet created {info_as_string}.")
if self.wallet_info is None:
@ -461,8 +460,13 @@ class DIDWallet:
"""
# full_puz = did_wallet_puzzles.create_fullpuz(innerpuz, origin.name())
# All additions in this block here:
new_puzhash = await self.get_new_did_inner_hash()
new_pubkey = bytes((await self.wallet_state_manager.get_unused_derivation_record(self.wallet_info.id)).pubkey)
new_pubkey = bytes(
(
await self.wallet_state_manager.get_unused_derivation_record(self.wallet_info.id, in_transaction=True)
).pubkey
)
new_puzhash = puzzle_for_pk(new_pubkey).get_tree_hash()
parent_info = None
assert did_info.origin_coin is not None
assert did_info.current_inner is not None
@ -481,8 +485,7 @@ class DIDWallet:
did_info.current_inner.get_tree_hash(),
coin.amount,
)
await self.add_parent(coin.name(), future_parent, False)
await self.add_parent(coin.name(), future_parent, True)
if children_state.spent_height != children_state.created_height:
did_info = DIDInfo(
did_info.origin_coin,
@ -496,7 +499,8 @@ class DIDWallet:
False,
did_info.metadata,
)
await self.save_info(did_info, False)
await self.save_info(did_info, True)
assert children_state.created_height
puzzle_solution_request = wallet_protocol.RequestPuzzleSolution(
coin.parent_coin_info, children_state.created_height
@ -514,7 +518,7 @@ class DIDWallet:
parent_innerpuz.get_tree_hash(),
parent_state.coin.amount,
)
await self.add_parent(coin.parent_coin_info, parent_info, False)
await self.add_parent(coin.parent_coin_info, parent_info, True)
assert parent_info is not None
def puzzle_for_pk(self, pubkey: G1Element) -> Program:
@ -1179,13 +1183,16 @@ class DIDWallet:
async def generate_eve_spend(self, coin: Coin, full_puzzle: Program, innerpuz: Program):
assert self.did_info.origin_coin is not None
uncurried = did_wallet_puzzles.uncurry_innerpuz(innerpuz)
assert uncurried is not None
p2_puzzle = uncurried[0]
# innerpuz solution is (mode p2_solution)
p2_solution = self.standard_wallet.make_solution(
primaries=[
{
"puzzlehash": innerpuz.get_tree_hash(),
"amount": uint64(coin.amount),
"memos": [innerpuz.get_tree_hash()],
"memos": [p2_puzzle.get_tree_hash()],
}
]
)

View File

@ -1,4 +1,3 @@
from clvm_tools.binutils import assemble
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.blockchain_format.program import Program
from typing import List, Optional, Tuple, Iterator, Dict
@ -205,7 +204,7 @@ def metadata_to_program(metadata: Dict) -> Program:
"""
kv_list = []
for key, value in metadata.items():
kv_list.append((assemble(key), assemble(value)))
kv_list.append((key, value))
return Program.to(kv_list)

View File

@ -1,4 +1,5 @@
import logging
from typing import Any, Dict
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
@ -99,3 +100,41 @@ def get_nft_info_from_puzzle(puzzle: Program, nft_coin: Coin) -> NFTInfo:
uint64(1),
)
return nft_info
def metadata_to_program(metadata: Dict[bytes, Any]) -> Program:
"""
Convert the metadata dict to a Chialisp program
:param metadata: User defined metadata
:return: Chialisp program
"""
kv_list = []
for key, value in metadata.items():
kv_list.append((key, value))
program: Program = Program.to(kv_list)
return program
def program_to_metadata(program: Program) -> Dict[bytes, Any]:
"""
Convert a program to a metadata dict
:param program: Chialisp program contains the metadata
:return: Metadata dict
"""
metadata = {}
for kv_pair in program.as_iter():
metadata[kv_pair.first().as_atom()] = kv_pair.rest().as_python()
return metadata
def update_metadata(metadata: Program, update_condition: Program) -> Program:
"""
Apply conditions of metadata updater to the previous metadata
:param metadata: Previous metadata
:param update_condition: Update metadata conditions
:return: Updated metadata
"""
new_metadata = program_to_metadata(metadata)
# TODO Modify this for supporting other fields
new_metadata[b"u"].insert(0, update_condition.rest().rest().first().atom)
return metadata_to_program(new_metadata)

View File

@ -217,6 +217,7 @@ class NFTWallet:
singleton_id = bytes32(uncurried_nft.singleton_launcher_id.atom)
metadata = uncurried_nft.metadata
new_inner_puzzle = None
update_condition = None
parent_inner_puzhash = uncurried_nft.nft_state_layer.get_tree_hash()
self.log.debug("Before spend metadata: %s %s \n%s", metadata, singleton_id, disassemble(solution))
for condition in solution.rest().first().rest().as_iter():
@ -228,16 +229,7 @@ class NFTWallet:
self.log.debug("Checking condition code: %r", condition_code)
if condition_code == -24:
# metadata update
# (-24 (meta updater puzzle) url)
metadata_list = list(metadata.as_python())
new_metadata = []
for metadata_entry in metadata_list:
key = metadata_entry[0]
if key == b"u":
new_metadata.append((b"u", [condition.rest().rest().first().atom] + list(metadata_entry[1:])))
else:
new_metadata.append((b"h", metadata_entry[1]))
metadata = Program.to(new_metadata)
update_condition = condition
elif condition_code == 51 and int_from_bytes(condition.rest().rest().first().atom) == 1:
puzhash = bytes32(condition.rest().first().atom)
self.log.debug("Got back puzhash from solution: %s", puzhash)
@ -254,6 +246,8 @@ class NFTWallet:
raise ValueError("Invalid condition")
if new_inner_puzzle is None:
raise ValueError("Invalid puzzle")
if update_condition is not None:
metadata = nft_puzzles.update_metadata(metadata, update_condition)
parent_coin = None
coin_record = await self.wallet_state_manager.coin_store.get_coin_record(coin_name)
if coin_record is None:

View File

@ -181,8 +181,8 @@ class Wallet:
public_key = await self.hack_populate_secret_key_for_puzzle_hash(puzzle_hash)
return puzzle_for_pk(bytes(public_key))
async def get_new_puzzle(self) -> Program:
dr = await self.wallet_state_manager.get_unused_derivation_record(self.id())
async def get_new_puzzle(self, in_transaction: bool = False) -> Program:
dr = await self.wallet_state_manager.get_unused_derivation_record(self.id(), in_transaction=in_transaction)
return puzzle_for_pk(bytes(dr.pubkey))
async def get_puzzle_hash(self, new: bool) -> bytes32:

View File

@ -326,7 +326,7 @@ class WalletStateManager:
if unused > 0:
await self.puzzle_store.set_used_up_to(uint32(unused - 1), in_transaction)
async def update_wallet_puzzle_hashes(self, wallet_id):
async def update_wallet_puzzle_hashes(self, wallet_id, in_transaction=False):
derivation_paths: List[DerivationRecord] = []
target_wallet = self.wallets[wallet_id]
last: Optional[uint32] = await self.puzzle_store.get_last_derivation_path_for_wallet(wallet_id)
@ -353,7 +353,7 @@ class WalletStateManager:
False,
)
)
await self.puzzle_store.add_derivation_paths(derivation_paths)
await self.puzzle_store.add_derivation_paths(derivation_paths, in_transaction=in_transaction)
async def get_unused_derivation_record(
self, wallet_id: uint32, in_transaction=False, hardened=False

View File

@ -22,8 +22,12 @@ async def get_wallet_num(wallet_manager):
class TestDIDWallet:
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_creation_from_backup_file(self, self_hostname, three_wallet_nodes):
async def test_creation_from_backup_file(self, self_hostname, three_wallet_nodes, trusted):
num_blocks = 5
full_nodes, wallets = three_wallet_nodes
full_node_api = full_nodes[0]
@ -38,7 +42,20 @@ class TestDIDWallet:
ph = await wallet_0.get_new_puzzlehash()
ph1 = await wallet_1.get_new_puzzlehash()
ph2 = await wallet_2.get_new_puzzlehash()
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
await server_2.start_client(PeerInfo(self_hostname, uint16(full_node_server._port)), None)
@ -171,8 +188,12 @@ class TestDIDWallet:
await time_out_assert(45, did_wallet_2.get_confirmed_balance, 0)
await time_out_assert(45, did_wallet_2.get_unconfirmed_balance, 0)
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_did_recovery_with_multiple_backup_dids(self, self_hostname, two_wallet_nodes):
async def test_did_recovery_with_multiple_backup_dids(self, self_hostname, two_wallet_nodes, trusted):
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api = full_nodes[0]
@ -183,7 +204,16 @@ class TestDIDWallet:
wallet2 = wallet_node_2.wallet_state_manager.main_wallet
ph = await wallet.get_new_puzzlehash()
if trusted:
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_2.start_client(PeerInfo(self_hostname, uint16(server_1._port)), None)
await server_3.start_client(PeerInfo(self_hostname, uint16(server_1._port)), None)
@ -310,8 +340,12 @@ class TestDIDWallet:
await time_out_assert(15, did_wallet_3.get_confirmed_balance, 0)
await time_out_assert(15, did_wallet_3.get_unconfirmed_balance, 0)
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_did_recovery_with_empty_set(self, self_hostname, two_wallet_nodes):
async def test_did_recovery_with_empty_set(self, self_hostname, two_wallet_nodes, trusted):
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api = full_nodes[0]
@ -321,7 +355,16 @@ class TestDIDWallet:
wallet = wallet_node.wallet_state_manager.main_wallet
ph = await wallet.get_new_puzzlehash()
if trusted:
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_2.start_client(PeerInfo(self_hostname, uint16(server_1._port)), None)
await server_3.start_client(PeerInfo(self_hostname, uint16(server_1._port)), None)
@ -367,8 +410,12 @@ class TestDIDWallet:
else:
assert False
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_did_attest_after_recovery(self, self_hostname, two_wallet_nodes):
async def test_did_attest_after_recovery(self, self_hostname, two_wallet_nodes, trusted):
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api = full_nodes[0]
@ -378,7 +425,16 @@ class TestDIDWallet:
wallet = wallet_node.wallet_state_manager.main_wallet
wallet2 = wallet_node_2.wallet_state_manager.main_wallet
ph = await wallet.get_new_puzzlehash()
if trusted:
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_2.start_client(PeerInfo(self_hostname, uint16(server_1._port)), None)
await server_3.start_client(PeerInfo(self_hostname, uint16(server_1._port)), None)
for i in range(1, num_blocks):
@ -532,8 +588,12 @@ class TestDIDWallet:
"with_recovery",
[True, False],
)
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_did_transfer(self, two_wallet_nodes, with_recovery):
async def test_did_transfer(self, two_wallet_nodes, with_recovery, trusted):
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api = full_nodes[0]
@ -544,13 +604,16 @@ class TestDIDWallet:
wallet2 = wallet_node_2.wallet_state_manager.main_wallet
ph = await wallet.get_new_puzzlehash()
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
if trusted:
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_2.start_client(PeerInfo("localhost", uint16(server_1._port)), None)
await server_3.start_client(PeerInfo("localhost", uint16(server_1._port)), None)
@ -615,8 +678,12 @@ class TestDIDWallet:
assert metadata["Twitter"] == "Test"
assert metadata["GitHub"] == "测试"
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_update_recovery_list(self, two_wallet_nodes):
async def test_update_recovery_list(self, two_wallet_nodes, trusted):
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api = full_nodes[0]
@ -626,13 +693,16 @@ class TestDIDWallet:
wallet = wallet_node.wallet_state_manager.main_wallet
ph = await wallet.get_new_puzzlehash()
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
if trusted:
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_2.start_client(PeerInfo("localhost", uint16(server_1._port)), None)
await server_3.start_client(PeerInfo("localhost", uint16(server_1._port)), None)
@ -670,8 +740,12 @@ class TestDIDWallet:
assert did_wallet_1.did_info.backup_ids[0] == bytes(ph)
assert did_wallet_1.did_info.num_of_backup_ids_needed == 1
@pytest.mark.parametrize(
"trusted",
[True, False],
)
@pytest.mark.asyncio
async def test_update_metadata(self, two_wallet_nodes):
async def test_update_metadata(self, two_wallet_nodes, trusted):
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api = full_nodes[0]
@ -680,14 +754,16 @@ class TestDIDWallet:
wallet_node_2, server_3 = wallets[1]
wallet = wallet_node.wallet_state_manager.main_wallet
ph = await wallet.get_new_puzzlehash()
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
if trusted:
wallet_node.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_2.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node.config["trusted_peers"] = {}
wallet_node_2.config["trusted_peers"] = {}
await server_2.start_client(PeerInfo("localhost", uint16(server_1._port)), None)
await server_3.start_client(PeerInfo("localhost", uint16(server_1._port)), None)

View File

@ -29,7 +29,7 @@ async def tx_in_pool(mempool: MempoolManager, tx_id: bytes32) -> bool:
@pytest.mark.parametrize(
"trusted",
[True],
[True, False],
)
@pytest.mark.asyncio
async def test_nft_wallet_creation_automatically(two_wallet_nodes: Any, trusted: Any) -> None:
@ -116,7 +116,7 @@ async def test_nft_wallet_creation_automatically(two_wallet_nodes: Any, trusted:
@pytest.mark.parametrize(
"trusted",
[True],
[True, False],
)
@pytest.mark.asyncio
async def test_nft_wallet_creation_and_transfer(two_wallet_nodes: Any, trusted: Any) -> None:
@ -247,7 +247,7 @@ async def test_nft_wallet_creation_and_transfer(two_wallet_nodes: Any, trusted:
@pytest.mark.parametrize(
"trusted",
[True],
[True, False],
)
@pytest.mark.asyncio
async def test_nft_wallet_rpc_creation_and_list(two_wallet_nodes: Any, trusted: Any) -> None:
@ -279,7 +279,7 @@ async def test_nft_wallet_rpc_creation_and_list(two_wallet_nodes: Any, trusted:
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(ph))
await asyncio.sleep(5)
api_0 = WalletRpcApi(wallet_node_0)
nft_wallet_0 = await api_0.create_new_wallet(dict(wallet_type="nft_wallet", name="NFT WALLET 1"))
assert isinstance(nft_wallet_0, dict)
@ -335,7 +335,7 @@ async def test_nft_wallet_rpc_creation_and_list(two_wallet_nodes: Any, trusted:
@pytest.mark.parametrize(
"trusted",
[True],
[True, False],
)
@pytest.mark.asyncio
async def test_nft_wallet_rpc_update_metadata(two_wallet_nodes: Any, trusted: Any) -> None:
@ -364,10 +364,10 @@ async def test_nft_wallet_rpc_update_metadata(two_wallet_nodes: Any, trusted: An
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await asyncio.sleep(5)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(ph))
await asyncio.sleep(5)
api_0 = WalletRpcApi(wallet_node_0)
nft_wallet_0 = await api_0.create_new_wallet(dict(wallet_type="nft_wallet", name="NFT WALLET 1"))
assert isinstance(nft_wallet_0, dict)

View File

@ -33,6 +33,7 @@ from chia.wallet.cat_wallet.cat_constants import DEFAULT_CATS
from chia.wallet.cat_wallet.cat_wallet import CATWallet
from chia.wallet.derive_keys import master_sk_to_wallet_sk, master_sk_to_wallet_sk_unhardened
from chia.wallet.did_wallet.did_wallet import DIDWallet
from chia.wallet.nft_wallet.nft_wallet import NFTWallet
from chia.wallet.trading.trade_status import TradeStatus
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.transaction_sorting import SortKey
@ -831,6 +832,61 @@ async def test_did_endpoints(wallet_rpc_environment: WalletRpcTestEnvironment):
assert metadata["Twitter"] == "Https://test"
@pytest.mark.asyncio
async def test_nft_endpoints(wallet_rpc_environment: WalletRpcTestEnvironment):
env: WalletRpcTestEnvironment = wallet_rpc_environment
wallet_1_node: WalletNode = env.wallet_1.node
wallet_1_rpc: WalletRpcClient = env.wallet_1.rpc_client
wallet_2: Wallet = env.wallet_2.wallet
wallet_2_node: WalletNode = env.wallet_2.node
wallet_2_rpc: WalletRpcClient = env.wallet_2.rpc_client
full_node_api: FullNodeSimulator = env.full_node.api
await generate_funds(env.full_node.api, env.wallet_1, 5)
res = await wallet_1_rpc.create_new_nft_wallet(None)
nft_wallet_id = res["wallet_id"]
res = await wallet_1_rpc.mint_nft(
nft_wallet_id,
None,
"0xD4584AD463139FA8C0D9F68F4B59F185",
["https://www.chia.net/img/branding/chia-logo.svg"],
0,
)
assert res["success"]
for _ in range(3):
await farm_transaction_block(full_node_api, wallet_1_node)
assert wallet_1_node.wallet_state_manager is not None
nft_wallet: NFTWallet = wallet_1_node.wallet_state_manager.wallets[nft_wallet_id]
nft_id = nft_wallet.get_current_nfts()[0].coin.name()
nft_info = (await wallet_1_rpc.get_nft_info(nft_id))["nft_info"]
assert nft_info["nft_coin_id"] == nft_wallet.get_current_nfts()[0].coin.name().hex().upper()
addr = encode_puzzle_hash(await wallet_2.get_new_puzzlehash(), "txch")
res = await wallet_1_rpc.transfer_nft(nft_wallet_id, nft_id.hex(), addr, 0)
assert res["success"]
for _ in range(3):
await farm_transaction_block(full_node_api, wallet_1_node)
assert wallet_2_node.wallet_state_manager is not None
nft_wallet_id_1 = (
await wallet_2_node.wallet_state_manager.get_all_wallet_info_entries(wallet_type=WalletType.NFT)
)[0].id
nft_wallet_1: NFTWallet = wallet_2_node.wallet_state_manager.wallets[nft_wallet_id_1]
nft_info_1 = (await wallet_1_rpc.get_nft_info(nft_id, False))["nft_info"]
assert nft_info_1 == nft_info
nft_info_1 = (await wallet_1_rpc.get_nft_info(nft_id))["nft_info"]
assert nft_info_1["nft_coin_id"] == nft_wallet_1.get_current_nfts()[0].coin.name().hex().upper()
# Cross-check NFT
nft_info_2 = (await wallet_2_rpc.list_nfts(nft_wallet_id_1))["nft_list"][0]
assert nft_info_1 == nft_info_2
@pytest.mark.asyncio
async def test_key_and_address_endpoints(wallet_rpc_environment: WalletRpcTestEnvironment):
env: WalletRpcTestEnvironment = wallet_rpc_environment