Merge gw_nft_offer_drivers into release/1.4.0

This commit is contained in:
Amine Khaldi 2022-05-31 22:34:29 +01:00
commit 841abe1600
No known key found for this signature in database
GPG Key ID: B1C074FFC904E2D9
12 changed files with 1303 additions and 40 deletions

View File

@ -95,7 +95,7 @@ jobs:
- name: Test wallet-nft_wallet code with pytest
run: |
. ./activate
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 0 -m "not benchmark" tests/wallet/nft_wallet/test_nft_clvm.py tests/wallet/nft_wallet/test_nft_wallet.py
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 0 -m "not benchmark" tests/wallet/nft_wallet/test_nft_clvm.py tests/wallet/nft_wallet/test_nft_offers.py tests/wallet/nft_wallet/test_nft_wallet.py
- name: Process coverage data
run: |

View File

@ -94,7 +94,7 @@ jobs:
- name: Test wallet-nft_wallet code with pytest
run: |
. ./activate
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 0 -m "not benchmark" tests/wallet/nft_wallet/test_nft_clvm.py tests/wallet/nft_wallet/test_nft_wallet.py
venv/bin/coverage run --rcfile=.coveragerc --module pytest --durations=10 -n 0 -m "not benchmark" tests/wallet/nft_wallet/test_nft_clvm.py tests/wallet/nft_wallet/test_nft_offers.py tests/wallet/nft_wallet/test_nft_wallet.py
- name: Process coverage data
run: |

View File

@ -0,0 +1,72 @@
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple, Union
from clvm_tools.binutils import disassemble
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint64
from chia.wallet.puzzle_drivers import PuzzleInfo, Solver
from chia.wallet.puzzles.load_clvm import load_clvm
NFT_STATE_LAYER_MOD = load_clvm("nft_state_layer.clvm")
NFT_STATE_LAYER_MOD_HASH = NFT_STATE_LAYER_MOD.get_tree_hash()
def match_metadata_layer_puzzle(puzzle: Program) -> Tuple[bool, Union[List[Any], Program]]:
mod, meta_args = puzzle.uncurry()
if mod == NFT_STATE_LAYER_MOD:
return True, list(meta_args.as_iter())
return False, Program.to([])
def puzzle_for_metadata_layer(metadata: Program, updater_hash: bytes32, inner_puzzle: Program) -> Program:
return NFT_STATE_LAYER_MOD.curry(NFT_STATE_LAYER_MOD_HASH, metadata, updater_hash, inner_puzzle) # type: ignore
def solution_for_metadata_layer(amount: uint64, inner_solution: Program) -> Program:
return Program.to([inner_solution, amount]) # type: ignore
@dataclass(frozen=True)
class MetadataOuterPuzzle:
_match: Any
_asset_id: Any
_construct: Any
_solve: Any
def match(self, puzzle: Program) -> Optional[PuzzleInfo]:
matched, curried_args = match_metadata_layer_puzzle(puzzle)
if matched:
_, metadata, updater_hash, inner_puzzle = curried_args
constructor_dict = {
"type": "metadata",
"metadata": disassemble(metadata), # type: ignore
"updater_hash": "0x" + updater_hash.as_python().hex(),
}
next_constructor = self._match(inner_puzzle)
if next_constructor is not None:
constructor_dict["also"] = next_constructor.info
return PuzzleInfo(constructor_dict)
else:
return None
return None # Uncomment above when match_metadata_layer_puzzle works
def asset_id(self, constructor: PuzzleInfo) -> Optional[bytes32]:
return bytes32(constructor["updater_hash"])
def construct(self, constructor: PuzzleInfo, inner_puzzle: Program) -> Program:
if constructor.also() is not None:
inner_puzzle = self._construct(constructor.also(), inner_puzzle)
return puzzle_for_metadata_layer(constructor["metadata"], constructor["updater_hash"], inner_puzzle)
def solve(self, constructor: PuzzleInfo, solver: Solver, inner_puzzle: Program, inner_solution: Program) -> Program:
coin_bytes: bytes = solver["coin"]
coin: Coin = Coin(bytes32(coin_bytes[0:32]), bytes32(coin_bytes[32:64]), uint64.from_bytes(coin_bytes[64:72]))
if constructor.also() is not None:
inner_solution = self._solve(constructor.also(), solver, inner_puzzle, inner_solution)
return solution_for_metadata_layer(
coin.amount,
inner_solution,
)

View File

@ -2,7 +2,7 @@ import json
import logging
import time
from secrets import token_bytes
from typing import Any, Dict, List, Optional, Set, Type, TypeVar
from typing import Any, Dict, List, Optional, Set, Tuple, Type, TypeVar
from blspy import AugSchemeMPL, G1Element, G2Element
from clvm.casts import int_from_bytes, int_to_bytes
@ -24,6 +24,9 @@ from chia.wallet.nft_wallet import nft_puzzles
from chia.wallet.nft_wallet.nft_info import NFTCoinInfo, NFTWalletInfo
from chia.wallet.nft_wallet.nft_puzzles import NFT_METADATA_UPDATER, NFT_STATE_LAYER_MOD_HASH
from chia.wallet.nft_wallet.uncurry_nft import UncurriedNFT
from chia.wallet.outer_puzzles import AssetType, match_puzzle
from chia.wallet.payment import Payment
from chia.wallet.puzzle_drivers import PuzzleInfo
from chia.wallet.puzzles.load_clvm import load_clvm
from chia.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import (
DEFAULT_HIDDEN_PUZZLE_HASH,
@ -32,6 +35,7 @@ from chia.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import (
solution_for_conditions,
)
from chia.wallet.puzzles.puzzle_utils import make_create_coin_condition
from chia.wallet.puzzles.singleton_top_layer import match_singleton_puzzle
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.util.compute_memos import compute_memos
from chia.wallet.util.debug_spend_bundle import disassemble
@ -182,7 +186,9 @@ class NFTWallet:
self.log.debug("Puzzle solution received to wallet: %s", self.wallet_info)
coin_name = coin_spend.coin.name()
puzzle: Program = Program.from_bytes(bytes(coin_spend.puzzle_reveal))
solution: Program = Program.from_bytes(bytes(coin_spend.solution)).rest().rest().first().first()
full_solution: Program = Program.from_bytes(bytes(coin_spend.solution))
delegated_puz_solution: Program = Program.from_bytes(bytes(coin_spend.solution)).rest().rest().first().first()
# At this point, the puzzle must be a NFT puzzle.
# This method will be called only when the walle state manager uncurried this coin as a NFT puzzle.
@ -195,11 +201,18 @@ class NFTWallet:
new_inner_puzzle = None
update_condition = None
parent_inner_puzhash = uncurried_nft.nft_state_layer.get_tree_hash()
self.log.debug("Before spend metadata: %s %s \n%s", metadata, singleton_id, disassemble(solution))
for condition in solution.rest().first().rest().as_iter():
self.log.debug("Before spend metadata: %s %s \n%s", metadata, singleton_id, disassemble(delegated_puz_solution))
if delegated_puz_solution.rest().as_python() == b"":
conds = puzzle.run(full_solution)
else:
conds = delegated_puz_solution.rest().first().rest()
# for condition in delegated_puz_solution.rest().first().rest().as_itexr():
for condition in conds.as_iter():
self.log.debug("Checking solution condition: %s", disassemble(condition))
if condition.list_len() < 2:
# invalid condition
if condition.list_len() <= 2:
# irrelevant condition
continue
condition_code = int_from_bytes(condition.first().atom)
self.log.debug("Checking condition code: %r", condition_code)
@ -208,16 +221,23 @@ class NFTWallet:
update_condition = condition
elif condition_code == 51 and int_from_bytes(condition.rest().rest().first().atom) == 1:
puzhash = bytes32(condition.rest().first().atom)
memo = bytes32(condition.as_python()[-1][0])
if memo != puzhash:
puzhash_for_derivation_record = memo
else:
puzhash_for_derivation_record = puzhash
self.log.debug("Got back puzhash from solution: %s", puzhash)
derivation_record: Optional[
DerivationRecord
] = await self.wallet_state_manager.puzzle_store.get_derivation_record_for_puzzle_hash(puzhash)
] = await self.wallet_state_manager.puzzle_store.get_derivation_record_for_puzzle_hash(
puzhash_for_derivation_record
)
if derivation_record is None:
# we potentially sent it somewhere
await self.remove_coin(coin_spend.coin, in_transaction=in_transaction)
return
new_inner_puzzle = puzzle_for_pk(derivation_record.pubkey)
else:
raise ValueError("Invalid condition")
if new_inner_puzzle is None:
@ -563,3 +583,226 @@ class NFTWallet:
wallet_info = WalletInfo(current_info.id, current_info.name, current_info.type, data_str)
self.wallet_info = wallet_info
await self.wallet_state_manager.user_store.update_wallet(wallet_info, in_transaction)
async def convert_puzzle_hash(self, puzhash: bytes32) -> bytes32:
return puzhash
def get_nft(self, launcher_id: bytes32) -> Optional[NFTCoinInfo]:
for coin in self.nft_wallet_info.my_nft_coins:
matched, curried_args = match_singleton_puzzle(coin.full_puzzle)
if matched:
singleton_struct, inner_puzzle = curried_args
launcher: bytes32 = singleton_struct.as_python()[1]
if launcher == launcher_id:
return coin
return None
def get_puzzle_info(self, asset_id: bytes32) -> PuzzleInfo:
nft_coin: Optional[NFTCoinInfo] = self.get_nft(asset_id)
if nft_coin is None:
raise ValueError("An asset ID was specified that this wallet doesn't track")
puzzle_info: Optional[PuzzleInfo] = match_puzzle(nft_coin.full_puzzle)
if puzzle_info is None:
raise ValueError("Internal Error: NFT wallet is tracking a non NFT coin")
else:
return puzzle_info
async def get_coins_to_offer(self, asset_id: bytes32, amount: uint64) -> Set[Coin]:
nft_coin: Optional[NFTCoinInfo] = self.get_nft(asset_id)
if nft_coin is None:
raise ValueError("An asset ID was specified that this wallet doesn't track")
return set([nft_coin.coin])
def match_puzzle_info(self, puzzle_driver: PuzzleInfo) -> bool:
return (
AssetType(puzzle_driver.type()) == AssetType.SINGLETON
and self.get_nft(puzzle_driver["launcher_id"]) is not None
and puzzle_driver.also() is not None
and AssetType(puzzle_driver.also().type()) == AssetType.METADATA # type: ignore
and puzzle_driver.also().also() is None # type: ignore
)
@classmethod
async def create_from_puzzle_info(
cls,
wallet_state_manager: Any,
wallet: Wallet,
puzzle_driver: PuzzleInfo,
name=None,
in_transaction=False,
) -> Any:
# Off the bat we don't support multiple profile but when we do this will have to change
for wallet in wallet_state_manager.wallets.values():
if wallet.type() == WalletType.NFT:
return wallet
# TODO: These are not the arguments to this function yet but they will be
return await cls.create_new_nft_wallet(
wallet_state_manager,
wallet,
name,
in_transaction,
)
async def create_tandem_xch_tx(
self, fee: uint64, announcement_to_assert: Optional[Announcement] = None
) -> TransactionRecord:
chia_coins = await self.standard_wallet.select_coins(fee)
chia_tx = await self.standard_wallet.generate_signed_transaction(
uint64(0),
(await self.standard_wallet.get_new_puzzlehash()),
fee=fee,
coins=chia_coins,
coin_announcements_to_consume={announcement_to_assert} if announcement_to_assert is not None else None,
)
assert chia_tx.spend_bundle is not None
return chia_tx
async def generate_signed_transaction(
self,
amounts: List[uint64],
puzzle_hashes: List[bytes32],
fee: uint64 = uint64(0),
coins: Set[Coin] = None,
memos: Optional[List[List[bytes]]] = None,
coin_announcements_to_consume: Optional[Set[Announcement]] = None,
puzzle_announcements_to_consume: Optional[Set[Announcement]] = None,
ignore_max_send_amount: bool = False,
) -> List[TransactionRecord]:
if memos is None:
memos = [[] for _ in range(len(puzzle_hashes))]
if not (len(memos) == len(puzzle_hashes) == len(amounts)):
raise ValueError("Memos, puzzle_hashes, and amounts must have the same length")
payments = []
for amount, puzhash, memo_list in zip(amounts, puzzle_hashes, memos):
memos_with_hint: List[bytes] = [puzhash]
memos_with_hint.extend(memo_list)
payments.append(Payment(puzhash, amount, memos_with_hint))
payment_sum = sum([p.amount for p in payments])
unsigned_spend_bundle, chia_tx = await self.generate_unsigned_spendbundle(
payments,
fee,
coins=coins,
coin_announcements_to_consume=coin_announcements_to_consume,
puzzle_announcements_to_consume=puzzle_announcements_to_consume,
)
spend_bundle = await self.sign(unsigned_spend_bundle)
tx_list = [
TransactionRecord(
confirmed_at_height=uint32(0),
created_at_time=uint64(int(time.time())),
to_puzzle_hash=puzzle_hashes[0],
amount=uint64(payment_sum),
fee_amount=fee,
confirmed=False,
sent=uint32(0),
spend_bundle=spend_bundle,
additions=spend_bundle.additions(),
removals=spend_bundle.removals(),
wallet_id=self.id(),
sent_to=[],
trade_id=None,
type=uint32(TransactionType.OUTGOING_TX.value),
name=spend_bundle.name(),
memos=list(compute_memos(spend_bundle).items()),
)
]
if chia_tx is not None:
tx_list.append(
TransactionRecord(
confirmed_at_height=chia_tx.confirmed_at_height,
created_at_time=chia_tx.created_at_time,
to_puzzle_hash=chia_tx.to_puzzle_hash,
amount=chia_tx.amount,
fee_amount=chia_tx.fee_amount,
confirmed=chia_tx.confirmed,
sent=chia_tx.sent,
spend_bundle=None,
additions=chia_tx.additions,
removals=chia_tx.removals,
wallet_id=chia_tx.wallet_id,
sent_to=chia_tx.sent_to,
trade_id=chia_tx.trade_id,
type=chia_tx.type,
name=chia_tx.name,
memos=[],
)
)
return tx_list
async def generate_unsigned_spendbundle(
self,
payments: List[Payment],
fee: uint64 = uint64(0),
coins: Set[Coin] = None,
coin_announcements_to_consume: Optional[Set[Announcement]] = None,
puzzle_announcements_to_consume: Optional[Set[Announcement]] = None,
) -> Tuple[SpendBundle, Optional[TransactionRecord]]:
if coins is None:
# Make sure the user is specifying which specific NFT coin to use
raise ValueError("NFT spends require a selected coin")
else:
nft_coins = [c for c in self.nft_wallet_info.my_nft_coins if c.coin in coins]
if coin_announcements_to_consume is not None:
coin_announcements_bytes: Optional[Set[bytes32]] = {a.name() for a in coin_announcements_to_consume}
else:
coin_announcements_bytes = None
if puzzle_announcements_to_consume is not None:
puzzle_announcements_bytes: Optional[Set[bytes32]] = {a.name() for a in puzzle_announcements_to_consume}
else:
puzzle_announcements_bytes = None
primaries: List = []
for payment in payments:
primaries.append({"puzzlehash": payment.puzzle_hash, "amount": payment.amount, "memos": payment.memos})
chia_tx = None
coin_spends = []
first = True
for coin_info in nft_coins:
if first:
first = False
if fee > 0:
chia_tx = await self.create_tandem_xch_tx(fee)
innersol = self.standard_wallet.make_solution(
primaries=primaries,
coin_announcements_to_assert=coin_announcements_bytes,
puzzle_announcements_to_assert=puzzle_announcements_bytes,
)
else:
innersol = self.standard_wallet.make_solution(
primaries=primaries,
coin_announcements_to_assert=coin_announcements_bytes,
puzzle_announcements_to_assert=puzzle_announcements_bytes,
)
else:
# What announcements do we need?
innersol = self.standard_wallet.make_solution(
primaries=[],
)
nft_layer_solution = Program.to([innersol, coin_info.coin.amount])
singleton_solution = Program.to(
[coin_info.lineage_proof.to_program(), coin_info.coin.amount, nft_layer_solution]
)
coin_spend = CoinSpend(coin_info.coin, coin_info.full_puzzle, singleton_solution)
coin_spends.append(coin_spend)
nft_spend_bundle = SpendBundle(coin_spends, G2Element())
chia_spend_bundle = SpendBundle([], G2Element())
if chia_tx is not None and chia_tx.spend_bundle is not None:
chia_spend_bundle = chia_tx.spend_bundle
unsigned_spend_bundle = SpendBundle.aggregate([nft_spend_bundle, chia_spend_bundle])
return (unsigned_spend_bundle, chia_tx)

View File

@ -0,0 +1,65 @@
from dataclasses import dataclass
from typing import Any, Optional
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend
from chia.util.ints import uint64
from chia.wallet.lineage_proof import LineageProof
from chia.wallet.puzzle_drivers import PuzzleInfo, Solver
from chia.wallet.puzzles.singleton_top_layer import (
SINGLETON_LAUNCHER_HASH,
match_singleton_puzzle,
puzzle_for_singleton,
solution_for_singleton,
)
@dataclass(frozen=True)
class SingletonOuterPuzzle:
_match: Any
_asset_id: Any
_construct: Any
_solve: Any
def match(self, puzzle: Program) -> Optional[PuzzleInfo]:
matched, curried_args = match_singleton_puzzle(puzzle)
if matched:
singleton_struct, inner_puzzle = curried_args
constructor_dict = {
"type": "singleton",
"launcher_id": "0x" + singleton_struct.as_python()[1].hex(),
"launcher_ph": "0x" + singleton_struct.as_python()[2].hex(),
}
next_constructor = self._match(inner_puzzle)
if next_constructor is not None:
constructor_dict["also"] = next_constructor.info
return PuzzleInfo(constructor_dict)
else:
return None
def asset_id(self, constructor: PuzzleInfo) -> Optional[bytes32]:
return bytes32(constructor["launcher_id"])
def construct(self, constructor: PuzzleInfo, inner_puzzle: Program) -> Program:
if constructor.also() is not None:
inner_puzzle = self._construct(constructor.also(), inner_puzzle)
launcher_hash = constructor["launcher_ph"] if "launcher_ph" in constructor else SINGLETON_LAUNCHER_HASH
return puzzle_for_singleton(constructor["launcher_id"], inner_puzzle, launcher_hash)
def solve(self, constructor: PuzzleInfo, solver: Solver, inner_puzzle: Program, inner_solution: Program) -> Program:
coin_bytes: bytes = solver["coin"]
coin: Coin = Coin(bytes32(coin_bytes[0:32]), bytes32(coin_bytes[32:64]), uint64.from_bytes(coin_bytes[64:72]))
parent_spend: CoinSpend = CoinSpend.from_bytes(solver["parent_spend"])
parent_coin: Coin = parent_spend.coin
if constructor.also() is not None:
inner_solution = self._solve(constructor.also(), solver, inner_puzzle, inner_solution)
matched, curried_args = match_singleton_puzzle(parent_spend.puzzle_reveal.to_program())
assert matched
_, parent_inner_puzzle = curried_args
return solution_for_singleton(
LineageProof(parent_coin.parent_coin_info, parent_inner_puzzle.get_tree_hash(), parent_coin.amount),
coin.amount,
inner_solution,
)

View File

@ -4,6 +4,8 @@ from typing import Any, Dict, Optional
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.wallet.cat_wallet.cat_outer_puzzle import CATOuterPuzzle
from chia.wallet.nft_wallet.metadata_outer_puzzle import MetadataOuterPuzzle
from chia.wallet.nft_wallet.singleton_outer_puzzle import SingletonOuterPuzzle
from chia.wallet.puzzle_drivers import PuzzleInfo, Solver
"""
@ -26,6 +28,8 @@ A driver for a puzzle must include the following functions:
class AssetType(Enum):
CAT = "CAT"
SINGLETON = "singleton"
METADATA = "metadata"
def match_puzzle(puzzle: Program) -> Optional[PuzzleInfo]:
@ -50,8 +54,10 @@ def create_asset_id(constructor: PuzzleInfo) -> bytes32:
return driver_lookup[AssetType(constructor.type())].asset_id(constructor) # type: ignore
function_args = [match_puzzle, construct_puzzle, solve_puzzle, create_asset_id]
function_args = [match_puzzle, create_asset_id, construct_puzzle, solve_puzzle]
driver_lookup: Dict[AssetType, Any] = {
AssetType.CAT: CATOuterPuzzle(*function_args),
AssetType.SINGLETON: SingletonOuterPuzzle(*function_args),
AssetType.METADATA: MetadataOuterPuzzle(*function_args),
}

View File

@ -42,6 +42,12 @@ class PuzzleInfo:
return False
return True
def __contains__(self, item: str) -> bool:
if item in self.info:
return True
else:
return False
def type(self) -> str:
return str(self.info["type"])

View File

@ -1,4 +1,4 @@
from typing import List, Tuple, Optional
from typing import Iterator, List, Tuple, Optional
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
@ -10,7 +10,7 @@ from chia.wallet.lineage_proof import LineageProof
from chia.util.ints import uint64
from chia.util.hash import std_hash
SINGLETON_MOD = load_clvm("singleton_top_layer.clvm")
SINGLETON_MOD = load_clvm("singleton_top_layer_v1_1.clvm")
SINGLETON_MOD_HASH = SINGLETON_MOD.get_tree_hash()
P2_SINGLETON_MOD = load_clvm("p2_singleton.clvm")
P2_SINGLETON_OR_DELAYED_MOD = load_clvm("p2_singleton_or_delayed_puzhash.clvm")
@ -157,6 +157,14 @@ MELT_CONDITION = [ConditionOpcode.CREATE_COIN, 0, ESCAPE_VALUE]
#
def match_singleton_puzzle(puzzle: Program) -> Tuple[bool, Iterator[Program]]:
mod, curried_args = puzzle.uncurry()
if mod == SINGLETON_MOD:
return True, curried_args.as_iter()
else:
return False, iter(())
# Given the parent and amount of the launcher coin, return the launcher coin
def generate_launcher_coin(coin: Coin, amount: uint64) -> Coin:
return Coin(coin.name(), SINGLETON_LAUNCHER_HASH, amount)
@ -249,9 +257,11 @@ def lineage_proof_for_coinsol(coin_spend: CoinSpend) -> LineageProof:
# Return the puzzle reveal of a singleton with specific ID and innerpuz
def puzzle_for_singleton(launcher_id: bytes32, inner_puz: Program) -> Program:
def puzzle_for_singleton(
launcher_id: bytes32, inner_puz: Program, launcher_hash: bytes32 = SINGLETON_LAUNCHER_HASH
) -> Program:
return SINGLETON_MOD.curry(
(SINGLETON_MOD_HASH, (launcher_id, SINGLETON_LAUNCHER_HASH)),
(SINGLETON_MOD_HASH, (launcher_id, launcher_hash)),
inner_puz,
)

View File

@ -58,7 +58,7 @@ class TradeManager:
- get_asset_id() -> bytes32
- Finally, you must make sure that your wallet will respond appropriately when these WSM methods are called:
- get_wallet_for_puzzle_info(puzzle_info: PuzzleInfo) -> <Your wallet>
- create_wallet_for_puzzle_info(puzzle_info: PuzzleInfo) -> <Your wallet>
- create_wallet_for_puzzle_info(..., puzzle_info: PuzzleInfo) -> <Your wallet> (See cat_wallet.py for full API)
- get_wallet_for_asset_id(asset_id: bytes32) -> <Your wallet>
"""
@ -222,15 +222,13 @@ class TradeManager:
if wallet is None:
continue
new_ph = await wallet.get_new_puzzlehash()
# This should probably not switch on whether or not we're spending a CAT but it has to for now
# ATTENTION: new_wallets
if wallet.type() == WalletType.CAT:
txs = await wallet.generate_signed_transaction(
[coin.amount], [new_ph], fee=fee_to_pay, coins={coin}, ignore_max_send_amount=True
)
all_txs.extend(txs)
if wallet.type() == WalletType.NFT:
new_ph = await wallet.wallet_state_manager.main_wallet.get_new_puzzlehash()
else:
new_ph = await wallet.get_new_puzzlehash()
# This should probably not switch on whether or not we're spending a XCH but it has to for now
if wallet.type() == WalletType.STANDARD_WALLET:
if fee_to_pay > coin.amount:
selected_coins: Set[Coin] = await wallet.select_coins(
uint64(fee_to_pay - coin.amount),
@ -247,6 +245,12 @@ class TradeManager:
ignore_max_send_amount=True,
)
all_txs.append(tx)
else:
# ATTENTION: new_wallets
txs = await wallet.generate_signed_transaction(
[coin.amount], [new_ph], fee=fee_to_pay, coins={coin}, ignore_max_send_amount=True
)
all_txs.extend(txs)
fee_to_pay = uint64(0)
cancellation_addition = Coin(coin.name(), new_ph, coin.amount)
@ -397,18 +401,8 @@ class TradeManager:
wallet = self.wallet_state_manager.wallets[id]
else:
wallet = await self.wallet_state_manager.get_wallet_for_asset_id(id.hex())
# This should probably not switch on whether or not we're spending a CAT but it has to for now
# ATTENTION: new_wallets
if wallet.type() == WalletType.CAT:
txs = await wallet.generate_signed_transaction(
[abs(offer_dict[id])],
[Offer.ph()],
fee=fee_left_to_pay,
coins=set(selected_coins),
puzzle_announcements_to_consume=announcements_to_assert,
)
all_transactions.extend(txs)
else:
# This should probably not switch on whether or not we're spending XCH but it has to for now
if wallet.type() == WalletType.STANDARD_WALLET:
tx = await wallet.generate_signed_transaction(
abs(offer_dict[id]),
Offer.ph(),
@ -417,6 +411,29 @@ class TradeManager:
puzzle_announcements_to_consume=announcements_to_assert,
)
all_transactions.append(tx)
elif wallet.type() == WalletType.NFT:
# This is to generate the tx for specific nft assets, i.e. not using
# wallet_id as the selector which would select any coins from nft_wallet
amounts = [coin.amount for coin in selected_coins]
txs = await wallet.generate_signed_transaction(
# [abs(offer_dict[id])],
amounts,
[Offer.ph()],
fee=fee_left_to_pay,
coins=set(selected_coins),
puzzle_announcements_to_consume=announcements_to_assert,
)
all_transactions.extend(txs)
else:
# ATTENTION: new_wallets
txs = await wallet.generate_signed_transaction(
[abs(offer_dict[id])],
[Offer.ph()],
fee=fee_left_to_pay,
coins=set(selected_coins),
puzzle_announcements_to_consume=announcements_to_assert,
)
all_transactions.extend(txs)
fee_left_to_pay = uint64(0)
@ -551,7 +568,7 @@ class TradeManager:
wallet = await self.wallet_state_manager.get_wallet_for_asset_id(asset_id.hex())
if wallet is None and amount < 0:
return False, None, f"Do not have a wallet for asset ID: {asset_id} to fulfill offer"
elif wallet is None:
elif wallet is None or wallet.type() == WalletType.NFT:
key = asset_id
else:
key = int(wallet.id())

View File

@ -42,7 +42,7 @@ from chia.wallet.did_wallet.did_wallet_puzzles import DID_INNERPUZ_MOD, create_f
from chia.wallet.key_val_store import KeyValStore
from chia.wallet.nft_wallet.nft_wallet import NFTWallet, NFTWalletInfo
from chia.wallet.nft_wallet.uncurry_nft import UncurriedNFT
from chia.wallet.outer_puzzles import AssetType
from chia.wallet.outer_puzzles import AssetType, match_puzzle
from chia.wallet.puzzle_drivers import PuzzleInfo
from chia.wallet.puzzles.cat_loader import CAT_MOD
from chia.wallet.rl_wallet.rl_wallet import RLWallet
@ -1298,6 +1298,11 @@ class WalletStateManager:
if wallet.type() == WalletType.CAT:
if bytes(wallet.cat_info.limitations_program_hash).hex() == asset_id:
return wallet
elif wallet.type() == WalletType.NFT:
for nft_coin in wallet.nft_wallet_info.my_nft_coins:
nft_info = match_puzzle(nft_coin.full_puzzle)
if nft_info.info["launcher_id"] == "0x" + asset_id: # type: ignore
return wallet
return None
async def get_wallet_for_puzzle_info(self, puzzle_driver: PuzzleInfo):

View File

@ -0,0 +1,841 @@
import asyncio
from secrets import token_bytes
from typing import Any, Dict, Optional
import pytest
from chia.consensus.block_rewards import calculate_base_farmer_reward, calculate_pool_reward
from chia.full_node.mempool_manager import MempoolManager
from chia.simulator.full_node_simulator import FullNodeSimulator
from chia.simulator.simulator_protocol import FarmNewBlockProtocol
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.peer_info import PeerInfo
from chia.util.ints import uint16, uint32, uint64
from chia.wallet.cat_wallet.cat_wallet import CATWallet
from chia.wallet.nft_wallet.nft_wallet import NFTWallet
from chia.wallet.outer_puzzles import create_asset_id, match_puzzle
from chia.wallet.puzzle_drivers import PuzzleInfo
from chia.wallet.trading.offer import Offer
from chia.wallet.trading.trade_status import TradeStatus
from tests.time_out_assert import time_out_assert, time_out_assert_not_none
async def tx_in_pool(mempool: MempoolManager, tx_id: bytes32) -> bool:
tx = mempool.get_spendbundle(tx_id)
if tx is None:
return False
return True
async def get_trade_and_status(trade_manager, trade) -> TradeStatus: # type: ignore
trade_rec = await trade_manager.get_trade_by_id(trade.trade_id)
return TradeStatus(trade_rec.status)
@pytest.mark.parametrize(
"trusted",
[True],
)
@pytest.mark.asyncio
async def test_nft_offer_with_fee(two_wallet_nodes: Any, trusted: Any) -> None:
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api: FullNodeSimulator = full_nodes[0]
full_node_server = full_node_api.server
wallet_node_0, server_0 = wallets[0]
wallet_node_1, server_1 = wallets[1]
wallet_maker = wallet_node_0.wallet_state_manager.main_wallet
wallet_taker = wallet_node_1.wallet_state_manager.main_wallet
maker_ph = await wallet_maker.get_new_puzzlehash()
taker_ph = await wallet_taker.get_new_puzzlehash()
token_ph = bytes32(token_bytes())
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(maker_ph))
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(taker_ph))
await asyncio.sleep(5)
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks)]
)
await time_out_assert(15, wallet_maker.get_unconfirmed_balance, funds)
await time_out_assert(15, wallet_maker.get_confirmed_balance, funds)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
nft_wallet_maker = await NFTWallet.create_new_nft_wallet(
wallet_node_0.wallet_state_manager, wallet_maker, name="NFT WALLET 1"
)
nft_wallet_taker = await NFTWallet.create_new_nft_wallet(
wallet_node_1.wallet_state_manager, wallet_taker, name="NFT WALLET 2"
)
trade_manager_maker = wallet_maker.wallet_state_manager.trade_manager
trade_manager_taker = wallet_taker.wallet_state_manager.trade_manager
metadata = Program.to(
[
("u", ["https://www.chia.net/img/branding/chia-logo.svg"]),
("h", "0xD4584AD463139FA8C0D9F68F4B59F185"),
]
)
sb = await nft_wallet_maker.generate_new_nft(metadata)
assert sb
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, sb.name())
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_taker) == 0
# MAKE FIRST TRADE: 1 NFT for 100 xch
maker_balance_pre = await wallet_maker.get_confirmed_balance()
taker_balance_pre = await wallet_taker.get_confirmed_balance()
nft_to_offer = coins_maker[0]
nft_info: Optional[PuzzleInfo] = match_puzzle(nft_to_offer.full_puzzle)
nft_asset_id: bytes32 = create_asset_id(nft_info) # type: ignore
driver_dict: Dict[bytes32, Optional[PuzzleInfo]] = {nft_asset_id: nft_info}
xch_request = 100
maker_fee = uint64(10)
offer_nft_for_xch = {wallet_maker.id(): xch_request, nft_asset_id: -1}
success, trade_make, error = await trade_manager_maker.create_offer_for_ids(
offer_nft_for_xch, driver_dict, fee=maker_fee
)
await asyncio.sleep(1)
assert success is True
assert error is None
assert trade_make is not None
taker_fee = uint64(1)
success, trade_take, error = await trade_manager_taker.respond_to_offer(
Offer.from_bytes(trade_make.offer), fee=taker_fee
)
await asyncio.sleep(1)
assert success
assert error is None
assert trade_take is not None
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_maker, trade_make)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_taker, trade_take)
await time_out_assert(15, wallet_maker.get_confirmed_balance, maker_balance_pre + xch_request - maker_fee)
await time_out_assert(15, wallet_taker.get_confirmed_balance, taker_balance_pre - xch_request - taker_fee)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 0
assert len(coins_taker) == 1
# MAKE SECOND TRADE: 100 xch for 1 NFT
maker_balance_pre = await wallet_maker.get_confirmed_balance()
taker_balance_pre = await wallet_taker.get_confirmed_balance()
nft_to_buy = coins_taker[0]
nft_to_buy_info: Optional[PuzzleInfo] = match_puzzle(nft_to_buy.full_puzzle)
nft_to_buy_asset_id: bytes32 = create_asset_id(nft_to_buy_info) # type: ignore
driver_dict_to_buy: Dict[bytes32, Optional[PuzzleInfo]] = {nft_to_buy_asset_id: nft_to_buy_info}
xch_offered = 100
maker_fee = uint64(10)
offer_xch_for_nft = {wallet_maker.id(): -xch_offered, nft_to_buy_asset_id: 1}
success, trade_make, error = await trade_manager_maker.create_offer_for_ids(
offer_xch_for_nft, driver_dict_to_buy, fee=maker_fee
)
await asyncio.sleep(1)
assert success is True
assert error is None
assert trade_make is not None
taker_fee = uint64(1)
success, trade_take, error = await trade_manager_taker.respond_to_offer(
Offer.from_bytes(trade_make.offer), fee=taker_fee
)
await asyncio.sleep(1)
assert success
assert error is None
assert trade_take is not None
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_maker, trade_make)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_taker, trade_take)
await time_out_assert(15, wallet_maker.get_confirmed_balance, maker_balance_pre - xch_offered - maker_fee)
await time_out_assert(15, wallet_taker.get_confirmed_balance, taker_balance_pre + xch_offered - taker_fee)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
assert len(coins_taker) == 0
@pytest.mark.parametrize(
"trusted",
[True],
)
@pytest.mark.asyncio
async def test_nft_offer_cancellations(two_wallet_nodes: Any, trusted: Any) -> None:
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api: FullNodeSimulator = full_nodes[0]
full_node_server = full_node_api.server
wallet_node_0, server_0 = wallets[0]
wallet_node_1, server_1 = wallets[1]
wallet_maker = wallet_node_0.wallet_state_manager.main_wallet
wallet_taker = wallet_node_1.wallet_state_manager.main_wallet
maker_ph = await wallet_maker.get_new_puzzlehash()
taker_ph = await wallet_taker.get_new_puzzlehash()
token_ph = bytes32(token_bytes())
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(maker_ph))
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(taker_ph))
await asyncio.sleep(5)
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks)]
)
await time_out_assert(15, wallet_maker.get_unconfirmed_balance, funds)
await time_out_assert(15, wallet_maker.get_confirmed_balance, funds)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
nft_wallet_maker = await NFTWallet.create_new_nft_wallet(
wallet_node_0.wallet_state_manager, wallet_maker, name="NFT WALLET 1"
)
nft_wallet_taker = await NFTWallet.create_new_nft_wallet(
wallet_node_1.wallet_state_manager, wallet_taker, name="NFT WALLET 2"
)
trade_manager_maker = wallet_maker.wallet_state_manager.trade_manager
# trade_manager_taker = wallet_taker.wallet_state_manager.trade_manager
metadata = Program.to(
[
("u", ["https://www.chia.net/img/branding/chia-logo.svg"]),
("h", "0xD4584AD463139FA8C0D9F68F4B59F185"),
]
)
sb = await nft_wallet_maker.generate_new_nft(metadata)
assert sb
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, sb.name())
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_taker) == 0
# maker creates offer and cancels
maker_balance_pre = await wallet_maker.get_confirmed_balance()
# taker_balance_pre = await wallet_taker.get_confirmed_balance()
nft_to_offer = coins_maker[0]
nft_info: Optional[PuzzleInfo] = match_puzzle(nft_to_offer.full_puzzle)
nft_asset_id: bytes32 = create_asset_id(nft_info) # type: ignore
driver_dict: Dict[bytes32, Optional[PuzzleInfo]] = {nft_asset_id: nft_info}
xch_request = 100
maker_fee = uint64(10)
offer_nft_for_xch = {wallet_maker.id(): xch_request, nft_asset_id: -1}
success, trade_make, error = await trade_manager_maker.create_offer_for_ids(
offer_nft_for_xch, driver_dict, fee=maker_fee
)
await asyncio.sleep(1)
assert success is True
assert error is None
assert trade_make is not None
# await trade_manager_maker.cancel_pending_offer(trade_make.trade_id)
# await time_out_assert(15, get_trade_and_status, TradeStatus.CANCELLED, trade_manager_maker, trade_make)
cancel_fee = uint64(10)
txs = await trade_manager_maker.cancel_pending_offer_safely(trade_make.trade_id, fee=cancel_fee)
await time_out_assert(15, get_trade_and_status, TradeStatus.PENDING_CANCEL, trade_manager_maker, trade_make)
for tx in txs:
if tx.spend_bundle is not None:
await time_out_assert(15, tx_in_pool, True, full_node_api.full_node.mempool_manager, tx.spend_bundle.name())
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, get_trade_and_status, TradeStatus.CANCELLED, trade_manager_maker, trade_make)
maker_balance = await wallet_maker.get_confirmed_balance()
assert maker_balance == maker_balance_pre - cancel_fee
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
@pytest.mark.parametrize(
"trusted",
[True],
)
@pytest.mark.asyncio
async def test_nft_offer_with_metadata_update(two_wallet_nodes: Any, trusted: Any) -> None:
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api: FullNodeSimulator = full_nodes[0]
full_node_server = full_node_api.server
wallet_node_0, server_0 = wallets[0]
wallet_node_1, server_1 = wallets[1]
wallet_maker = wallet_node_0.wallet_state_manager.main_wallet
wallet_taker = wallet_node_1.wallet_state_manager.main_wallet
maker_ph = await wallet_maker.get_new_puzzlehash()
taker_ph = await wallet_taker.get_new_puzzlehash()
token_ph = bytes32(token_bytes())
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(maker_ph))
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(taker_ph))
await asyncio.sleep(5)
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks)]
)
await time_out_assert(15, wallet_maker.get_unconfirmed_balance, funds)
await time_out_assert(15, wallet_maker.get_confirmed_balance, funds)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
nft_wallet_maker = await NFTWallet.create_new_nft_wallet(
wallet_node_0.wallet_state_manager, wallet_maker, name="NFT WALLET 1"
)
nft_wallet_taker = await NFTWallet.create_new_nft_wallet(
wallet_node_1.wallet_state_manager, wallet_taker, name="NFT WALLET 2"
)
trade_manager_maker = wallet_maker.wallet_state_manager.trade_manager
trade_manager_taker = wallet_taker.wallet_state_manager.trade_manager
metadata = Program.to(
[
("u", ["https://www.chia.net/img/branding/chia-logo.svg"]),
("h", "0xD4584AD463139FA8C0D9F68F4B59F185"),
]
)
sb = await nft_wallet_maker.generate_new_nft(metadata)
assert sb
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, sb.name())
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_taker) == 0
# Maker updates metadata:
nft_to_update = coins_maker[0]
url_to_add = "https://new_url.com"
fee_for_update = uint64(10)
update_sb = await nft_wallet_maker.update_metadata(nft_to_update, url_to_add, fee=fee_for_update)
mempool_mgr = full_node_api.full_node.mempool_manager
await time_out_assert_not_none(5, mempool_mgr.get_spendbundle, update_sb.name()) # type: ignore
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
updated_nft = coins_maker[0]
updated_nft_info = match_puzzle(updated_nft.full_puzzle)
assert url_to_add in updated_nft_info.also().info["metadata"] # type: ignore
# MAKE FIRST TRADE: 1 NFT for 100 xch
maker_balance_pre = await wallet_maker.get_confirmed_balance()
taker_balance_pre = await wallet_taker.get_confirmed_balance()
nft_to_offer = coins_maker[0]
nft_info: Optional[PuzzleInfo] = match_puzzle(nft_to_offer.full_puzzle)
nft_asset_id: bytes32 = create_asset_id(nft_info) # type: ignore
driver_dict: Dict[bytes32, Optional[PuzzleInfo]] = {nft_asset_id: nft_info}
xch_request = 100
maker_fee = uint64(10)
offer_nft_for_xch = {wallet_maker.id(): xch_request, nft_asset_id: -1}
success, trade_make, error = await trade_manager_maker.create_offer_for_ids(
offer_nft_for_xch, driver_dict, fee=maker_fee
)
await asyncio.sleep(1)
assert success is True
assert error is None
assert trade_make is not None
taker_fee = uint64(1)
success, trade_take, error = await trade_manager_taker.respond_to_offer(
Offer.from_bytes(trade_make.offer), fee=taker_fee
)
await asyncio.sleep(1)
assert success
assert error is None
assert trade_take is not None
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_maker, trade_make)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_taker, trade_take)
await time_out_assert(15, wallet_maker.get_confirmed_balance, maker_balance_pre + xch_request - maker_fee)
await time_out_assert(15, wallet_taker.get_confirmed_balance, taker_balance_pre - xch_request - taker_fee)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 0
assert len(coins_taker) == 1
@pytest.mark.parametrize(
"trusted",
[True],
)
@pytest.mark.asyncio
async def test_nft_offer_nft_for_cat(two_wallet_nodes: Any, trusted: Any) -> None:
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api: FullNodeSimulator = full_nodes[0]
full_node_server = full_node_api.server
wallet_node_0, server_0 = wallets[0]
wallet_node_1, server_1 = wallets[1]
wallet_maker = wallet_node_0.wallet_state_manager.main_wallet
wallet_taker = wallet_node_1.wallet_state_manager.main_wallet
maker_ph = await wallet_maker.get_new_puzzlehash()
taker_ph = await wallet_taker.get_new_puzzlehash()
token_ph = bytes32(token_bytes())
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(maker_ph))
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(taker_ph))
await asyncio.sleep(5)
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks)]
)
await time_out_assert(15, wallet_maker.get_unconfirmed_balance, funds)
await time_out_assert(15, wallet_maker.get_confirmed_balance, funds)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
# Create NFT wallets and nfts for maker and taker
nft_wallet_maker = await NFTWallet.create_new_nft_wallet(
wallet_node_0.wallet_state_manager, wallet_maker, name="NFT WALLET 1"
)
nft_wallet_taker = await NFTWallet.create_new_nft_wallet(
wallet_node_1.wallet_state_manager, wallet_taker, name="NFT WALLET 2"
)
trade_manager_maker = wallet_maker.wallet_state_manager.trade_manager
trade_manager_taker = wallet_taker.wallet_state_manager.trade_manager
metadata = Program.to(
[
("u", ["https://www.chia.net/img/branding/chia-logo.svg"]),
("h", "0xD4584AD463139FA8C0D9F68F4B59F185"),
]
)
sb = await nft_wallet_maker.generate_new_nft(metadata)
assert sb
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, sb.name())
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_taker) == 0
# Create two new CATs and wallets for maker and taker
cats_to_mint = 10000
async with wallet_node_0.wallet_state_manager.lock:
cat_wallet_maker: CATWallet = await CATWallet.create_new_cat_wallet(
wallet_node_0.wallet_state_manager, wallet_maker, {"identifier": "genesis_by_id"}, uint64(cats_to_mint)
)
await asyncio.sleep(1)
async with wallet_node_1.wallet_state_manager.lock:
cat_wallet_taker: CATWallet = await CATWallet.create_new_cat_wallet(
wallet_node_1.wallet_state_manager, wallet_taker, {"identifier": "genesis_by_id"}, uint64(cats_to_mint)
)
await asyncio.sleep(1)
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, cat_wallet_maker.get_confirmed_balance, cats_to_mint)
await time_out_assert(15, cat_wallet_maker.get_unconfirmed_balance, cats_to_mint)
await time_out_assert(15, cat_wallet_taker.get_confirmed_balance, cats_to_mint)
await time_out_assert(15, cat_wallet_taker.get_unconfirmed_balance, cats_to_mint)
wallet_maker_for_taker_cat: CATWallet = await CATWallet.create_wallet_for_cat(
wallet_node_0.wallet_state_manager, wallet_maker, cat_wallet_taker.get_asset_id()
)
wallet_taker_for_maker_cat: CATWallet = await CATWallet.create_wallet_for_cat(
wallet_node_1.wallet_state_manager, wallet_taker, cat_wallet_maker.get_asset_id()
)
assert wallet_taker_for_maker_cat
# MAKE FIRST TRADE: 1 NFT for 10 taker cats
maker_balance_pre = await wallet_maker.get_confirmed_balance()
taker_balance_pre = await wallet_taker.get_confirmed_balance()
taker_cat_maker_balance_pre = await wallet_maker_for_taker_cat.get_confirmed_balance()
taker_cat_taker_balance_pre = await cat_wallet_taker.get_confirmed_balance()
nft_to_offer = coins_maker[0]
nft_info: Optional[PuzzleInfo] = match_puzzle(nft_to_offer.full_puzzle)
nft_asset_id: bytes32 = create_asset_id(nft_info) # type: ignore
driver_dict: Dict[bytes32, Optional[PuzzleInfo]] = {nft_asset_id: nft_info}
maker_fee = uint64(10)
taker_cat_offered = 2500
offer_nft_for_cat = {nft_asset_id: -1, wallet_maker_for_taker_cat.id(): taker_cat_offered}
success, trade_make, error = await trade_manager_maker.create_offer_for_ids(
offer_nft_for_cat, driver_dict, fee=maker_fee
)
await asyncio.sleep(1)
assert success is True
assert error is None
assert trade_make is not None
taker_fee = uint64(1)
success, trade_take, error = await trade_manager_taker.respond_to_offer(
Offer.from_bytes(trade_make.offer), fee=taker_fee
)
await asyncio.sleep(1)
assert success
assert error is None
assert trade_take is not None
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_maker, trade_make)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_taker, trade_take)
taker_cat_maker_balance_post = await wallet_maker_for_taker_cat.get_confirmed_balance()
taker_cat_taker_balance_post = await cat_wallet_taker.get_confirmed_balance()
assert taker_cat_maker_balance_post == taker_cat_maker_balance_pre + taker_cat_offered
assert taker_cat_taker_balance_post == taker_cat_taker_balance_pre - taker_cat_offered
maker_balance_post = await wallet_maker.get_confirmed_balance()
taker_balance_post = await wallet_taker.get_confirmed_balance()
assert maker_balance_post == maker_balance_pre - maker_fee
assert taker_balance_post == taker_balance_pre - taker_fee
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 0
assert len(coins_taker) == 1
# Make an offer for taker NFT for multiple cats
maker_cat_amount = 400
taker_cat_amount = 500
nft_to_buy = coins_taker[0]
nft_to_buy_info: Optional[PuzzleInfo] = match_puzzle(nft_to_buy.full_puzzle)
nft_to_buy_asset_id: bytes32 = create_asset_id(nft_to_buy_info) # type: ignore
driver_dict_to_buy: Dict[bytes32, Optional[PuzzleInfo]] = {
nft_to_buy_asset_id: nft_to_buy_info,
}
maker_fee = uint64(10)
offer_multi_cats_for_nft = {
nft_to_buy_asset_id: 1,
wallet_maker_for_taker_cat.id(): -taker_cat_amount,
cat_wallet_maker.id(): -maker_cat_amount,
}
success, trade_make, error = await trade_manager_maker.create_offer_for_ids(
offer_multi_cats_for_nft, driver_dict_to_buy, fee=maker_fee
)
await asyncio.sleep(1)
assert success is True
assert error is None
assert trade_make is not None
taker_fee = uint64(1)
success, trade_take, error = await trade_manager_taker.respond_to_offer(
Offer.from_bytes(trade_make.offer), fee=taker_fee
)
await asyncio.sleep(1)
assert success
assert error is None
assert trade_take is not None
# check balances: taker wallet down an NFT, up cats
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_maker, trade_make)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_taker, trade_take)
taker_cat_maker_balance_post_2 = await wallet_maker_for_taker_cat.get_confirmed_balance()
taker_cat_taker_balance_post_2 = await cat_wallet_taker.get_confirmed_balance()
assert taker_cat_maker_balance_post_2 == taker_cat_maker_balance_post - taker_cat_amount
assert taker_cat_taker_balance_post_2 == taker_cat_taker_balance_post + taker_cat_amount
maker_balance_post_2 = await wallet_maker.get_confirmed_balance()
taker_balance_post_2 = await wallet_taker.get_confirmed_balance()
assert maker_balance_post_2 == maker_balance_post - maker_fee
assert taker_balance_post_2 == taker_balance_post - taker_fee
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
assert len(coins_taker) == 0
@pytest.mark.parametrize(
"trusted",
[True],
)
@pytest.mark.asyncio
async def test_nft_offer_nft_for_nft(two_wallet_nodes: Any, trusted: Any) -> None:
num_blocks = 5
full_nodes, wallets = two_wallet_nodes
full_node_api: FullNodeSimulator = full_nodes[0]
full_node_server = full_node_api.server
wallet_node_0, server_0 = wallets[0]
wallet_node_1, server_1 = wallets[1]
wallet_maker = wallet_node_0.wallet_state_manager.main_wallet
wallet_taker = wallet_node_1.wallet_state_manager.main_wallet
maker_ph = await wallet_maker.get_new_puzzlehash()
taker_ph = await wallet_taker.get_new_puzzlehash()
token_ph = bytes32(token_bytes())
if trusted:
wallet_node_0.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
wallet_node_1.config["trusted_peers"] = {
full_node_api.full_node.server.node_id.hex(): full_node_api.full_node.server.node_id.hex()
}
else:
wallet_node_0.config["trusted_peers"] = {}
wallet_node_1.config["trusted_peers"] = {}
await server_0.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
await server_1.start_client(PeerInfo("localhost", uint16(full_node_server._port)), None)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(maker_ph))
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(taker_ph))
await asyncio.sleep(5)
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks)]
)
await time_out_assert(15, wallet_maker.get_unconfirmed_balance, funds)
await time_out_assert(15, wallet_maker.get_confirmed_balance, funds)
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
# Create NFT wallets and nfts for maker and taker
nft_wallet_maker = await NFTWallet.create_new_nft_wallet(
wallet_node_0.wallet_state_manager, wallet_maker, name="NFT WALLET 1"
)
nft_wallet_taker = await NFTWallet.create_new_nft_wallet(
wallet_node_1.wallet_state_manager, wallet_taker, name="NFT WALLET 2"
)
trade_manager_maker = wallet_maker.wallet_state_manager.trade_manager
trade_manager_taker = wallet_taker.wallet_state_manager.trade_manager
metadata = Program.to(
[
("u", ["https://www.chia.net/img/branding/chia-logo.svg"]),
("h", "0xD4584AD463139FA8C0D9F68F4B59F185"),
]
)
sb = await nft_wallet_maker.generate_new_nft(metadata)
assert sb
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, sb.name())
metadata_2 = Program.to(
[
("u", ["https://www.chia.net/image2.html"]),
("h", "0xD4584AD463139FA8C0D9F68F4B59F183"),
]
)
sb_2 = await nft_wallet_taker.generate_new_nft(metadata_2)
assert sb_2
await time_out_assert_not_none(5, full_node_api.full_node.mempool_manager.get_spendbundle, sb_2.name())
for i in range(1, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_taker) == 1
maker_balance_pre = await wallet_maker.get_confirmed_balance()
taker_balance_pre = await wallet_taker.get_confirmed_balance()
nft_to_offer = coins_maker[0]
nft_to_offer_info: Optional[PuzzleInfo] = match_puzzle(nft_to_offer.full_puzzle)
nft_to_offer_asset_id: bytes32 = create_asset_id(nft_to_offer_info) # type: ignore
nft_to_take = coins_taker[0]
nft_to_take_info: Optional[PuzzleInfo] = match_puzzle(nft_to_take.full_puzzle)
nft_to_take_asset_id: bytes32 = create_asset_id(nft_to_take_info) # type: ignore
driver_dict: Dict[bytes32, Optional[PuzzleInfo]] = {
nft_to_offer_asset_id: nft_to_offer_info,
nft_to_take_asset_id: nft_to_take_info,
}
maker_fee = uint64(10)
offer_nft_for_nft = {nft_to_take_asset_id: 1, nft_to_offer_asset_id: -1}
success, trade_make, error = await trade_manager_maker.create_offer_for_ids(
offer_nft_for_nft, driver_dict, fee=maker_fee
)
await asyncio.sleep(1)
assert success is True
assert error is None
assert trade_make is not None
taker_fee = uint64(1)
success, trade_take, error = await trade_manager_taker.respond_to_offer(
Offer.from_bytes(trade_make.offer), fee=taker_fee
)
await asyncio.sleep(1)
assert success
assert error is None
assert trade_take is not None
for i in range(0, num_blocks):
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(token_ph))
await asyncio.sleep(5)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_maker, trade_make)
await time_out_assert(15, get_trade_and_status, TradeStatus.CONFIRMED, trade_manager_taker, trade_take)
await time_out_assert(15, wallet_maker.get_confirmed_balance, maker_balance_pre - maker_fee)
await time_out_assert(15, wallet_taker.get_confirmed_balance, taker_balance_pre - taker_fee)
coins_maker = nft_wallet_maker.nft_wallet_info.my_nft_coins
coins_taker = nft_wallet_taker.nft_wallet_info.my_nft_coins
assert len(coins_maker) == 1
assert len(coins_taker) == 1

View File

@ -1,6 +1,4 @@
import asyncio
# pytestmark = pytest.mark.skip("TODO: Fix tests")
from typing import Any
import pytest