Annotate test_offer_lifecycle.py (#17149)

Annotate test_offer_lifecycle.py.
This commit is contained in:
Amine Khaldi 2024-01-04 19:15:58 +01:00 committed by GitHub
parent a9e0d11438
commit 6d9f77ca1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 107 deletions

View File

@ -105,7 +105,6 @@ tests.util.test_misc
tests.util.test_network
tests.util.time_out_assert
tests.wallet.cat_wallet.test_cat_wallet
tests.wallet.cat_wallet.test_offer_lifecycle
tests.wallet.cat_wallet.test_trades
tests.wallet.did_wallet.test_did
tests.wallet.rpc.test_wallet_rpc

View File

@ -1,18 +1,18 @@
from __future__ import annotations
from dataclasses import replace
from typing import Any, Dict, List, Optional
from typing import Dict, List, Optional, cast
import pytest
from chia_rs import G2Element
from chia.clvm.spend_sim import sim_and_client
from chia.clvm.spend_sim import CostLogger, SimClient, SpendSim, sim_and_client
from chia.types.announcement import Announcement
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.program import Program
from chia.types.blockchain_format.serialized_program import SerializedProgram
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend, make_spend
from chia.types.coin_spend import make_spend
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.util.ints import uint64
@ -26,7 +26,7 @@ from chia.wallet.conditions import ConditionValidTimes
from chia.wallet.outer_puzzles import AssetType
from chia.wallet.payment import Payment
from chia.wallet.puzzle_drivers import PuzzleInfo
from chia.wallet.trading.offer import OFFER_MOD, NotarizedPayment, Offer
from chia.wallet.trading.offer import OFFER_MOD, Offer
acs = Program.to(1)
acs_ph = acs.get_tree_hash()
@ -34,7 +34,8 @@ acs_ph = acs.get_tree_hash()
# Some methods mapping strings to CATs
def str_to_tail(tail_str: str) -> Program:
return Program.to([3, [], [1, tail_str], []])
# TODO: Remove cast when we improve typing
return cast(Program, Program.to([3, [], [1, tail_str], []]))
def str_to_tail_hash(tail_str: str) -> bytes32:
@ -47,24 +48,22 @@ def str_to_cat_hash(tail_str: str) -> bytes32:
# This method takes a dictionary of strings mapping to amounts and generates the appropriate CAT/XCH coins
async def generate_coins(
sim,
sim_client,
requested_coins: Dict[Optional[str], List[uint64]],
sim: SpendSim, sim_client: SimClient, requested_coins: Dict[Optional[str], List[int]]
) -> Dict[Optional[str], List[Coin]]:
await sim.farm_block(acs_ph)
parent_coin: Coin = [cr.coin for cr in await sim_client.get_coin_records_by_puzzle_hash(acs_ph)][0]
parent_coin = [cr.coin for cr in await sim_client.get_coin_records_by_puzzle_hash(acs_ph)][0]
# We need to gather a list of initial coins to create as well as spends that do the eve spend for every CAT
payments: List[Payment] = []
cat_bundles: List[SpendBundle] = []
payments = []
cat_bundles = []
for tail_str, amounts in requested_coins.items():
for amount in amounts:
if tail_str:
tail: Program = str_to_tail(tail_str) # Making a fake but unique TAIL
tail = str_to_tail(tail_str) # Making a fake but unique TAIL
tail_hash = tail.get_tree_hash()
cat_puzzle: Program = construct_cat_puzzle(CAT_MOD, tail_hash, acs)
cat_puzzle = construct_cat_puzzle(CAT_MOD, tail_hash, acs)
cat_puzzle_hash = cat_puzzle.get_tree_hash()
payments.append(Payment(cat_puzzle_hash, amount))
payments.append(Payment(cat_puzzle_hash, uint64(amount)))
cat_bundles.append(
unsigned_spend_bundle_for_spendable_cats(
CAT_MOD,
@ -79,18 +78,11 @@ async def generate_coins(
)
)
else:
payments.append(Payment(acs_ph, amount))
payments.append(Payment(acs_ph, uint64(amount)))
# This bundle creates all of the initial coins
parent_bundle = SpendBundle(
[
make_spend(
parent_coin,
acs,
Program.to([[51, p.puzzle_hash, p.amount] for p in payments]),
)
],
G2Element(),
[make_spend(parent_coin, acs, Program.to([[51, p.puzzle_hash, p.amount] for p in payments]))], G2Element()
)
# Then we aggregate it with all of the eve spends
@ -102,7 +94,7 @@ async def generate_coins(
for tail_str, _ in requested_coins.items():
if tail_str:
tail_hash = str_to_tail_hash(tail_str)
cat_ph: bytes32 = construct_cat_puzzle(CAT_MOD, tail_hash, acs).get_tree_hash()
cat_ph = construct_cat_puzzle(CAT_MOD, tail_hash, acs).get_tree_hash()
coin_dict[tail_str] = [
cr.coin for cr in await sim_client.get_coin_records_by_puzzle_hash(cat_ph, include_spent_coins=False)
]
@ -128,10 +120,10 @@ def generate_secure_bundle(
offered_amount: uint64,
tail_str: Optional[str] = None,
) -> SpendBundle:
announcement_assertions: List[List] = [[63, a.name()] for a in announcements]
selected_coin_amount: int = sum([c.amount for c in selected_coins])
non_primaries: List[Coin] = [] if len(selected_coins) < 2 else selected_coins[1:]
inner_solution: List[List] = [
announcement_assertions = [[63, a.name()] for a in announcements]
selected_coin_amount = sum([c.amount for c in selected_coins])
non_primaries = [] if len(selected_coins) < 2 else selected_coins[1:]
inner_solution = [
[51, Offer.ph(), offered_amount], # Offered coin
[51, acs_ph, uint64(selected_coin_amount - offered_amount)], # Change
*announcement_assertions,
@ -150,7 +142,7 @@ def generate_secure_bundle(
G2Element(),
)
else:
spendable_cats: List[SpendableCAT] = [
spendable_cats = [
SpendableCAT(
c,
str_to_tail_hash(tail_str),
@ -170,19 +162,15 @@ def generate_secure_bundle(
@pytest.mark.anyio()
async def test_complex_offer(cost_logger):
async def test_complex_offer(cost_logger: CostLogger) -> None:
async with sim_and_client() as (sim, sim_client):
coins_needed: Dict[Optional[str], List[int]] = {
None: [500, 400, 300],
"red": [250, 100],
"blue": [3000],
}
all_coins: Dict[Optional[str], List[Coin]] = await generate_coins(sim, sim_client, coins_needed)
chia_coins: List[Coin] = all_coins[None]
red_coins: List[Coin] = all_coins["red"]
blue_coins: List[Coin] = all_coins["blue"]
coins_needed = {None: [500, 400, 300], "red": [250, 100], "blue": [3000]}
all_coins = await generate_coins(sim, sim_client, coins_needed)
chia_coins = all_coins[None]
red_coins = all_coins["red"]
blue_coins = all_coins["blue"]
driver_dict: Dict[bytes32, PuzzleInfo] = {
driver_dict = {
str_to_tail_hash("red"): PuzzleInfo(
{"type": AssetType.CAT.value, "tail": "0x" + str_to_tail_hash("red").hex()}
),
@ -190,61 +178,41 @@ async def test_complex_offer(cost_logger):
{"type": AssetType.CAT.value, "tail": "0x" + str_to_tail_hash("blue").hex()}
),
}
driver_dict_as_infos: Dict[str, Any] = {}
for key, value in driver_dict.items():
driver_dict_as_infos[key.hex()] = value.info
driver_dict_as_infos = {key.hex(): value.info for key, value in driver_dict.items()}
# Create an XCH Offer for RED
chia_requested_payments: Dict[Optional[bytes32], List[Payment]] = {
str_to_tail_hash("red"): [
Payment(acs_ph, 100, [b"memo"]),
Payment(acs_ph, 200, [b"memo"]),
]
str_to_tail_hash("red"): [Payment(acs_ph, uint64(100), [b"memo"]), Payment(acs_ph, uint64(200), [b"memo"])]
}
chia_requested_payments: Dict[Optional[bytes32], List[NotarizedPayment]] = Offer.notarize_payments(
chia_requested_payments, chia_coins
)
chia_announcements: List[Announcement] = Offer.calculate_announcements(chia_requested_payments, driver_dict)
chia_secured_bundle: SpendBundle = generate_secure_bundle(chia_coins, chia_announcements, 1000)
chia_offer = Offer(chia_requested_payments, chia_secured_bundle, driver_dict)
chia_notarized_payments = Offer.notarize_payments(chia_requested_payments, chia_coins)
chia_announcements = Offer.calculate_announcements(chia_notarized_payments, driver_dict)
chia_secured_bundle = generate_secure_bundle(chia_coins, chia_announcements, uint64(1000))
chia_offer = Offer(chia_notarized_payments, chia_secured_bundle, driver_dict)
assert not chia_offer.is_valid()
# Create a RED Offer for XCH
red_coins_1 = red_coins[0:1]
red_coins_2 = red_coins[1:]
red_requested_payments: Dict[Optional[bytes32], List[Payment]] = {
None: [
Payment(acs_ph, 300, [b"red memo"]),
Payment(acs_ph, 350, [b"red memo"]),
]
None: [Payment(acs_ph, uint64(300), [b"red memo"]), Payment(acs_ph, uint64(350), [b"red memo"])]
}
red_requested_payments: Dict[Optional[bytes32], List[NotarizedPayment]] = Offer.notarize_payments(
red_requested_payments, red_coins_1
red_notarized_payments = Offer.notarize_payments(red_requested_payments, red_coins_1)
red_announcements = Offer.calculate_announcements(red_notarized_payments, driver_dict)
red_secured_bundle = generate_secure_bundle(
red_coins_1, red_announcements, uint64(sum([c.amount for c in red_coins_1])), tail_str="red"
)
red_announcements: List[Announcement] = Offer.calculate_announcements(red_requested_payments, driver_dict)
red_secured_bundle: SpendBundle = generate_secure_bundle(
red_coins_1, red_announcements, sum([c.amount for c in red_coins_1]), tail_str="red"
)
red_offer = Offer(red_requested_payments, red_secured_bundle, driver_dict)
red_offer = Offer(red_notarized_payments, red_secured_bundle, driver_dict)
assert not red_offer.is_valid()
red_requested_payments_2: Dict[Optional[bytes32], List[Payment]] = {
None: [
Payment(acs_ph, 50, [b"red memo"]),
]
None: [Payment(acs_ph, uint64(50), [b"red memo"])]
}
red_requested_payments_2: Dict[Optional[bytes32], List[NotarizedPayment]] = Offer.notarize_payments(
red_requested_payments_2, red_coins_2
red_notarized_payments_2 = Offer.notarize_payments(red_requested_payments_2, red_coins_2)
red_announcements_2 = Offer.calculate_announcements(red_notarized_payments_2, driver_dict)
red_secured_bundle_2 = generate_secure_bundle(
red_coins_2, red_announcements_2, uint64(sum([c.amount for c in red_coins_2])), tail_str="red"
)
red_announcements_2: List[Announcement] = Offer.calculate_announcements(red_requested_payments_2, driver_dict)
red_secured_bundle_2: SpendBundle = generate_secure_bundle(
red_coins_2, red_announcements_2, sum([c.amount for c in red_coins_2]), tail_str="red"
)
red_offer_2 = Offer(red_requested_payments_2, red_secured_bundle_2, driver_dict)
red_offer_2 = Offer(red_notarized_payments_2, red_secured_bundle_2, driver_dict)
assert not red_offer_2.is_valid()
# Test aggregation of offers
@ -255,24 +223,17 @@ async def test_complex_offer(cost_logger):
# Create yet another offer of BLUE for XCH and RED
blue_requested_payments: Dict[Optional[bytes32], List[Payment]] = {
None: [
Payment(acs_ph, 200, [b"blue memo"]),
],
str_to_tail_hash("red"): [
Payment(acs_ph, 50, [b"blue memo"]),
],
None: [Payment(acs_ph, uint64(200), [b"blue memo"])],
str_to_tail_hash("red"): [Payment(acs_ph, uint64(50), [b"blue memo"])],
}
blue_requested_payments: Dict[Optional[bytes32], List[NotarizedPayment]] = Offer.notarize_payments(
blue_requested_payments, blue_coins
)
blue_announcements: List[Announcement] = Offer.calculate_announcements(blue_requested_payments, driver_dict)
blue_secured_bundle: SpendBundle = generate_secure_bundle(blue_coins, blue_announcements, 2000, tail_str="blue")
blue_offer = Offer(blue_requested_payments, blue_secured_bundle, driver_dict)
blue_notarized_payments = Offer.notarize_payments(blue_requested_payments, blue_coins)
blue_announcements = Offer.calculate_announcements(blue_notarized_payments, driver_dict)
blue_secured_bundle = generate_secure_bundle(blue_coins, blue_announcements, uint64(2000), tail_str="blue")
blue_offer = Offer(blue_notarized_payments, blue_secured_bundle, driver_dict)
assert not blue_offer.is_valid()
# Test a re-aggregation
new_offer: Offer = Offer.aggregate([new_offer, blue_offer])
new_offer = Offer.aggregate([new_offer, blue_offer])
assert new_offer.get_offered_amounts() == {
None: 1000,
str_to_tail_hash("red"): 350,
@ -280,11 +241,7 @@ async def test_complex_offer(cost_logger):
}
assert new_offer.get_requested_amounts() == {None: 900, str_to_tail_hash("red"): 350}
assert new_offer.summary() == (
{
"xch": 1000,
str_to_tail_hash("red").hex(): 350,
str_to_tail_hash("blue").hex(): 2000,
},
{"xch": 1000, str_to_tail_hash("red").hex(): 350, str_to_tail_hash("blue").hex(): 2000},
{"xch": 900, str_to_tail_hash("red").hex(): 350},
driver_dict_as_infos,
ConditionValidTimes(),
@ -297,15 +254,16 @@ async def test_complex_offer(cost_logger):
assert new_offer.is_valid()
# Test preventing TAIL from running during exchange
blue_cat_puz: Program = construct_cat_puzzle(CAT_MOD, str_to_tail_hash("blue"), OFFER_MOD)
blue_spend: CoinSpend = make_spend(
Coin(bytes32(32), blue_cat_puz.get_tree_hash(), uint64(0)),
blue_cat_puz = construct_cat_puzzle(CAT_MOD, str_to_tail_hash("blue"), OFFER_MOD)
random_hash = bytes32([0] * 32)
blue_spend = make_spend(
Coin(random_hash, blue_cat_puz.get_tree_hash(), uint64(0)),
blue_cat_puz,
Program.to([[bytes32(32), [bytes32(32), 200, ["hey there"]]]]),
Program.to([[random_hash, [random_hash, 200, ["hey there"]]]]),
)
new_spends_list: List[CoinSpend] = [blue_spend, *new_offer.to_spend_bundle().coin_spends]
tail_offer: Offer = Offer.from_spend_bundle(SpendBundle(new_spends_list, G2Element()))
valid_spend = tail_offer.to_valid_spend(bytes32(32))
new_spends_list = [blue_spend, *new_offer.to_spend_bundle().coin_spends]
tail_offer = Offer.from_spend_bundle(SpendBundle(new_spends_list, G2Element()))
valid_spend = tail_offer.to_valid_spend(random_hash)
real_blue_spend = [spend for spend in valid_spend.coin_spends if b"hey there" in bytes(spend)][0]
real_blue_spend_replaced = replace(
real_blue_spend,
@ -329,8 +287,8 @@ async def test_complex_offer(cost_logger):
assert Offer.from_compressed(new_offer.compress()) == new_offer
# Make sure we can actually spend the offer once it's valid
arbitrage_ph: bytes32 = Program.to([3, [], [], 1]).get_tree_hash()
offer_bundle: SpendBundle = new_offer.to_valid_spend(arbitrage_ph)
arbitrage_ph = Program.to([3, [], [], 1]).get_tree_hash()
offer_bundle = new_offer.to_valid_spend(arbitrage_ph)
result = await sim_client.push_tx(cost_logger.add_cost("Complex Offer", offer_bundle))
assert result == (MempoolInclusionStatus.SUCCESS, None)