mirror of
https://github.com/Chia-Network/chia-blockchain.git
synced 2024-09-19 14:48:38 +03:00
util: Cleanup Tuple[Optional[Err], ..]
returns in condition_tools.py
(#15009)
This commit is contained in:
parent
20f10b4655
commit
4f7e2ff8c3
@ -15,7 +15,7 @@ from chia.types.coin_spend import CoinSpend
|
||||
from chia.types.condition_opcodes import ConditionOpcode
|
||||
from chia.types.condition_with_args import ConditionWithArgs
|
||||
from chia.types.spend_bundle import SpendBundle
|
||||
from chia.util.condition_tools import conditions_by_opcode, conditions_for_solution
|
||||
from chia.util.condition_tools import conditions_dict_for_solution
|
||||
from chia.util.hash import std_hash
|
||||
from chia.util.ints import uint32, uint64
|
||||
from chia.wallet.derive_keys import master_sk_to_wallet_sk
|
||||
@ -180,12 +180,9 @@ class WalletTool:
|
||||
for coin_spend in coin_spends: # noqa
|
||||
secret_key = self.get_private_key_for_puzzle_hash(coin_spend.coin.puzzle_hash)
|
||||
synthetic_secret_key = calculate_synthetic_secret_key(secret_key, DEFAULT_HIDDEN_PUZZLE_HASH)
|
||||
err, con, cost = conditions_for_solution(
|
||||
conditions_dict = conditions_dict_for_solution(
|
||||
coin_spend.puzzle_reveal, coin_spend.solution, self.constants.MAX_BLOCK_COST_CLVM
|
||||
)
|
||||
if not con:
|
||||
raise ValueError(err)
|
||||
conditions_dict = conditions_by_opcode(con)
|
||||
|
||||
for cwa in conditions_dict.get(ConditionOpcode.AGG_SIG_UNSAFE, []):
|
||||
msg = cwa.vars[1]
|
||||
|
@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from clvm.casts import int_from_bytes
|
||||
|
||||
@ -18,19 +18,17 @@ from chia.util.ints import uint64
|
||||
# since asserts can be stripped with python `-OO` flag
|
||||
|
||||
|
||||
def parse_sexp_to_condition(
|
||||
sexp: Program,
|
||||
) -> Tuple[Optional[Err], Optional[ConditionWithArgs]]:
|
||||
def parse_sexp_to_condition(sexp: Program) -> ConditionWithArgs:
|
||||
"""
|
||||
Takes a ChiaLisp sexp and returns a ConditionWithArgs.
|
||||
If it fails, returns an Error
|
||||
Raises an ConsensusError if it fails.
|
||||
"""
|
||||
first = sexp.pair
|
||||
if first is None:
|
||||
return Err.INVALID_CONDITION, None
|
||||
raise ConsensusError(Err.INVALID_CONDITION, ["first is None"])
|
||||
op = first[0].atom
|
||||
if op is None or len(op) != 1:
|
||||
return Err.INVALID_CONDITION, None
|
||||
raise ConsensusError(Err.INVALID_CONDITION, ["invalid op"])
|
||||
|
||||
# since the ConditionWithArgs only has atoms as the args, we can't parse
|
||||
# hints and memos with this function. We just exit the loop if we encounter
|
||||
@ -46,41 +44,15 @@ def parse_sexp_to_condition(
|
||||
if len(vars) > 3:
|
||||
break
|
||||
|
||||
return None, ConditionWithArgs(ConditionOpcode(op), vars)
|
||||
return ConditionWithArgs(ConditionOpcode(op), vars)
|
||||
|
||||
|
||||
def parse_sexp_to_conditions(
|
||||
sexp: Program,
|
||||
) -> Tuple[Optional[Err], Optional[List[ConditionWithArgs]]]:
|
||||
def parse_sexp_to_conditions(sexp: Program) -> List[ConditionWithArgs]:
|
||||
"""
|
||||
Takes a ChiaLisp sexp (list) and returns the list of ConditionWithArgss
|
||||
If it fails, returns as Error
|
||||
Raises an ConsensusError if it fails.
|
||||
"""
|
||||
results: List[ConditionWithArgs] = []
|
||||
try:
|
||||
for _ in sexp.as_iter():
|
||||
error, cvp = parse_sexp_to_condition(_)
|
||||
if error:
|
||||
return error, None
|
||||
results.append(cvp) # type: ignore # noqa
|
||||
except ConsensusError:
|
||||
return Err.INVALID_CONDITION, None
|
||||
return None, results
|
||||
|
||||
|
||||
def conditions_by_opcode(
|
||||
conditions: List[ConditionWithArgs],
|
||||
) -> Dict[ConditionOpcode, List[ConditionWithArgs]]:
|
||||
"""
|
||||
Takes a list of ConditionWithArgss(CVP) and return dictionary of CVPs keyed of their opcode
|
||||
"""
|
||||
d: Dict[ConditionOpcode, List[ConditionWithArgs]] = {}
|
||||
cvp: ConditionWithArgs
|
||||
for cvp in conditions:
|
||||
if cvp.opcode not in d:
|
||||
d[cvp.opcode] = list()
|
||||
d[cvp.opcode].append(cvp)
|
||||
return d
|
||||
return [parse_sexp_to_condition(s) for s in sexp.as_iter()]
|
||||
|
||||
|
||||
def pkm_pairs(
|
||||
@ -140,22 +112,21 @@ def conditions_dict_for_solution(
|
||||
puzzle_reveal: SerializedProgram,
|
||||
solution: SerializedProgram,
|
||||
max_cost: int,
|
||||
) -> Tuple[Optional[Err], Optional[Dict[ConditionOpcode, List[ConditionWithArgs]]], uint64]:
|
||||
error, result, cost = conditions_for_solution(puzzle_reveal, solution, max_cost)
|
||||
if error or result is None:
|
||||
return error, None, uint64(0)
|
||||
return None, conditions_by_opcode(result), cost
|
||||
) -> Dict[ConditionOpcode, List[ConditionWithArgs]]:
|
||||
conditions_dict: Dict[ConditionOpcode, List[ConditionWithArgs]] = {}
|
||||
for cvp in conditions_for_solution(puzzle_reveal, solution, max_cost):
|
||||
conditions_dict.setdefault(cvp.opcode, list()).append(cvp)
|
||||
return conditions_dict
|
||||
|
||||
|
||||
def conditions_for_solution(
|
||||
puzzle_reveal: SerializedProgram,
|
||||
solution: SerializedProgram,
|
||||
max_cost: int,
|
||||
) -> Tuple[Optional[Err], Optional[List[ConditionWithArgs]], uint64]:
|
||||
) -> List[ConditionWithArgs]:
|
||||
# get the standard script for a puzzle hash and feed in the solution
|
||||
try:
|
||||
cost, r = puzzle_reveal.run_with_cost(max_cost, solution)
|
||||
error, result = parse_sexp_to_conditions(r)
|
||||
return error, result, uint64(cost)
|
||||
except Program.EvalError:
|
||||
return Err.SEXP_ERROR, None, uint64(0)
|
||||
return parse_sexp_to_conditions(r)
|
||||
except Program.EvalError as e:
|
||||
raise ConsensusError(Err.SEXP_ERROR, [str(e)]) from e
|
||||
|
@ -111,14 +111,11 @@ def unsigned_spend_bundle_for_spendable_cats(mod_code: Program, spendable_cat_li
|
||||
# figure out what the deltas are by running the inner puzzles & solutions
|
||||
deltas: List[int] = []
|
||||
for spend_info in spendable_cat_list:
|
||||
error, conditions, cost = conditions_dict_for_solution(
|
||||
spend_info.inner_puzzle, spend_info.inner_solution, INFINITE_COST
|
||||
)
|
||||
conditions = conditions_dict_for_solution(spend_info.inner_puzzle, spend_info.inner_solution, INFINITE_COST)
|
||||
total = spend_info.extra_delta * -1
|
||||
if conditions:
|
||||
for _ in conditions.get(ConditionOpcode.CREATE_COIN, []):
|
||||
if _.vars[1] != b"\x8f": # -113 in bytes
|
||||
total += Program.to(_.vars[1]).as_int()
|
||||
for _ in conditions.get(ConditionOpcode.CREATE_COIN, []):
|
||||
if _.vars[1] != b"\x8f": # -113 in bytes
|
||||
total += Program.to(_.vars[1]).as_int()
|
||||
deltas.append(spend_info.coin.amount - total)
|
||||
|
||||
if sum(deltas) != 0:
|
||||
|
@ -506,21 +506,20 @@ class CATWallet:
|
||||
raise RuntimeError(f"Failed to get keys for puzzle_hash {puzzle_hash}")
|
||||
pubkey, private = ret
|
||||
synthetic_secret_key = calculate_synthetic_secret_key(private, DEFAULT_HIDDEN_PUZZLE_HASH)
|
||||
error, conditions, cost = conditions_dict_for_solution(
|
||||
conditions = conditions_dict_for_solution(
|
||||
spend.puzzle_reveal.to_program(),
|
||||
spend.solution.to_program(),
|
||||
self.wallet_state_manager.constants.MAX_BLOCK_COST_CLVM,
|
||||
)
|
||||
if conditions is not None:
|
||||
synthetic_pk = synthetic_secret_key.get_g1()
|
||||
for pk, msg in pkm_pairs_for_conditions_dict(
|
||||
conditions, spend.coin.name(), self.wallet_state_manager.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
||||
):
|
||||
try:
|
||||
assert bytes(synthetic_pk) == pk
|
||||
sigs.append(AugSchemeMPL.sign(synthetic_secret_key, msg))
|
||||
except AssertionError:
|
||||
raise ValueError("This spend bundle cannot be signed by the CAT wallet")
|
||||
synthetic_pk = synthetic_secret_key.get_g1()
|
||||
for pk, msg in pkm_pairs_for_conditions_dict(
|
||||
conditions, spend.coin.name(), self.wallet_state_manager.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
||||
):
|
||||
try:
|
||||
assert bytes(synthetic_pk) == pk
|
||||
sigs.append(AugSchemeMPL.sign(synthetic_secret_key, msg))
|
||||
except AssertionError:
|
||||
raise ValueError("This spend bundle cannot be signed by the CAT wallet")
|
||||
|
||||
agg_sig = AugSchemeMPL.aggregate(sigs)
|
||||
return SpendBundle.aggregate([spend_bundle, SpendBundle([], agg_sig)])
|
||||
|
@ -1210,22 +1210,20 @@ class DIDWallet:
|
||||
puzzle_hash = p2_puzzle.get_tree_hash()
|
||||
pubkey, private = await self.wallet_state_manager.get_keys(puzzle_hash)
|
||||
synthetic_secret_key = calculate_synthetic_secret_key(private, DEFAULT_HIDDEN_PUZZLE_HASH)
|
||||
error, conditions, cost = conditions_dict_for_solution(
|
||||
conditions = conditions_dict_for_solution(
|
||||
spend.puzzle_reveal.to_program(),
|
||||
spend.solution.to_program(),
|
||||
self.wallet_state_manager.constants.MAX_BLOCK_COST_CLVM,
|
||||
)
|
||||
|
||||
if conditions is not None:
|
||||
synthetic_pk = synthetic_secret_key.get_g1()
|
||||
for pk, msg in pkm_pairs_for_conditions_dict(
|
||||
conditions, spend.coin.name(), self.wallet_state_manager.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
||||
):
|
||||
try:
|
||||
assert bytes(synthetic_pk) == pk
|
||||
sigs.append(AugSchemeMPL.sign(synthetic_secret_key, msg))
|
||||
except AssertionError:
|
||||
raise ValueError("This spend bundle cannot be signed by the DID wallet")
|
||||
synthetic_pk = synthetic_secret_key.get_g1()
|
||||
for pk, msg in pkm_pairs_for_conditions_dict(
|
||||
conditions, spend.coin.name(), self.wallet_state_manager.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
||||
):
|
||||
try:
|
||||
assert bytes(synthetic_pk) == pk
|
||||
sigs.append(AugSchemeMPL.sign(synthetic_secret_key, msg))
|
||||
except AssertionError:
|
||||
raise ValueError("This spend bundle cannot be signed by the DID wallet")
|
||||
|
||||
agg_sig = AugSchemeMPL.aggregate(sigs)
|
||||
return SpendBundle.aggregate([spend_bundle, SpendBundle([], agg_sig)])
|
||||
|
@ -464,24 +464,23 @@ class NFTWallet:
|
||||
synthetic_secret_key = calculate_synthetic_secret_key(private, DEFAULT_HIDDEN_PUZZLE_HASH)
|
||||
synthetic_pk = synthetic_secret_key.get_g1()
|
||||
pks[bytes(synthetic_pk)] = synthetic_secret_key
|
||||
error, conditions, cost = conditions_dict_for_solution(
|
||||
conditions = conditions_dict_for_solution(
|
||||
spend.puzzle_reveal.to_program(),
|
||||
spend.solution.to_program(),
|
||||
self.wallet_state_manager.constants.MAX_BLOCK_COST_CLVM,
|
||||
)
|
||||
if conditions is not None:
|
||||
for pk, msg in pkm_pairs_for_conditions_dict(
|
||||
conditions, spend.coin.name(), self.wallet_state_manager.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
||||
):
|
||||
try:
|
||||
sk = pks.get(pk)
|
||||
if sk:
|
||||
self.log.debug("Found key, signing for pk: %s", pk)
|
||||
sigs.append(AugSchemeMPL.sign(sk, msg))
|
||||
else:
|
||||
self.log.warning("Couldn't find key for: %s", pk)
|
||||
except AssertionError:
|
||||
raise ValueError("This spend bundle cannot be signed by the NFT wallet")
|
||||
for pk, msg in pkm_pairs_for_conditions_dict(
|
||||
conditions, spend.coin.name(), self.wallet_state_manager.constants.AGG_SIG_ME_ADDITIONAL_DATA
|
||||
):
|
||||
try:
|
||||
sk = pks.get(pk)
|
||||
if sk:
|
||||
self.log.debug("Found key, signing for pk: %s", pk)
|
||||
sigs.append(AugSchemeMPL.sign(sk, msg))
|
||||
else:
|
||||
self.log.warning("Couldn't find key for: %s", pk)
|
||||
except AssertionError:
|
||||
raise ValueError("This spend bundle cannot be signed by the NFT wallet")
|
||||
|
||||
agg_sig = AugSchemeMPL.aggregate(sigs)
|
||||
return SpendBundle.aggregate([spend_bundle, SpendBundle([], agg_sig)])
|
||||
|
@ -40,20 +40,14 @@ def make_puzzle(amount: int) -> int:
|
||||
print(f"Address: {encode_puzzle_hash(puzzle_hash, prefix)}")
|
||||
|
||||
result = puzzle_prog.run(solution)
|
||||
error, result_human = parse_sexp_to_conditions(result)
|
||||
|
||||
total_chia = 0
|
||||
if error:
|
||||
print(f"Error: {error}")
|
||||
else:
|
||||
assert result_human is not None
|
||||
for cvp in result_human:
|
||||
assert len(cvp.vars) == 2
|
||||
total_chia += int_from_bytes(cvp.vars[1])
|
||||
print(
|
||||
f"{ConditionOpcode(cvp.opcode).name}: {encode_puzzle_hash(bytes32(cvp.vars[0]), prefix)},"
|
||||
f" amount: {int_from_bytes(cvp.vars[1])}"
|
||||
)
|
||||
for cvp in parse_sexp_to_conditions(result):
|
||||
assert len(cvp.vars) == 2
|
||||
total_chia += int_from_bytes(cvp.vars[1])
|
||||
print(
|
||||
f"{ConditionOpcode(cvp.opcode).name}: {encode_puzzle_hash(bytes32(cvp.vars[0]), prefix)},"
|
||||
f" amount: {int_from_bytes(cvp.vars[1])}"
|
||||
)
|
||||
return total_chia
|
||||
|
||||
|
||||
|
@ -22,10 +22,7 @@ def print_conditions(spend_bundle: SpendBundle):
|
||||
print("\nConditions:")
|
||||
for coin_spend in spend_bundle.coin_spends:
|
||||
result = Program.from_bytes(bytes(coin_spend.puzzle_reveal)).run(Program.from_bytes(bytes(coin_spend.solution)))
|
||||
error, result_human = parse_sexp_to_conditions(result)
|
||||
assert error is None
|
||||
assert result_human is not None
|
||||
for cvp in result_human:
|
||||
for cvp in parse_sexp_to_conditions(result):
|
||||
print(f"{ConditionOpcode(cvp.opcode).name}: {[var.hex() for var in cvp.vars]}")
|
||||
print("")
|
||||
|
||||
|
@ -39,13 +39,7 @@ async def sign_coin_spends(
|
||||
msg_list: List[bytes] = []
|
||||
for coin_spend in coin_spends:
|
||||
# Get AGG_SIG conditions
|
||||
err, conditions_dict, cost = conditions_dict_for_solution(
|
||||
coin_spend.puzzle_reveal, coin_spend.solution, max_cost
|
||||
)
|
||||
if err or conditions_dict is None:
|
||||
error_msg = f"Sign transaction failed, con:{conditions_dict}, error: {err}"
|
||||
raise ValueError(error_msg)
|
||||
|
||||
conditions_dict = conditions_dict_for_solution(coin_spend.puzzle_reveal, coin_spend.solution, max_cost)
|
||||
# Create signature
|
||||
for pk_bytes, msg in pkm_pairs_for_conditions_dict(conditions_dict, coin_spend.coin.name(), additional_data):
|
||||
pk = blspy.G1Element.from_bytes(pk_bytes)
|
||||
|
@ -73,43 +73,40 @@ def debug_spend_bundle(spend_bundle, agg_sig_additional_data=DEFAULT_CONSTANTS.A
|
||||
print(f" with id {coin_name.hex()}")
|
||||
print()
|
||||
print(f"\nbrun -y main.sym '{bu_disassemble(puzzle_reveal)}' '{bu_disassemble(solution)}'")
|
||||
error, conditions, cost = conditions_dict_for_solution(puzzle_reveal, solution, INFINITE_COST)
|
||||
if error:
|
||||
print(f"*** error {error}")
|
||||
elif conditions is not None:
|
||||
for pk_bytes, m in pkm_pairs_for_conditions_dict(conditions, coin_name, agg_sig_additional_data):
|
||||
pks.append(G1Element.from_bytes(pk_bytes))
|
||||
msgs.append(m)
|
||||
print()
|
||||
cost, r = puzzle_reveal.run_with_cost(INFINITE_COST, solution) # type: ignore
|
||||
print(disassemble(r))
|
||||
print()
|
||||
if conditions and len(conditions) > 0:
|
||||
print("grouped conditions:")
|
||||
for condition_programs in conditions.values():
|
||||
print()
|
||||
for c in condition_programs:
|
||||
if len(c.vars) == 1:
|
||||
as_prog = Program.to([c.opcode, c.vars[0]])
|
||||
if len(c.vars) == 2:
|
||||
as_prog = Program.to([c.opcode, c.vars[0], c.vars[1]])
|
||||
print(f" {disassemble(as_prog)}")
|
||||
created_coin_announcements.extend(
|
||||
[coin_name] + _.vars for _ in conditions.get(ConditionOpcode.CREATE_COIN_ANNOUNCEMENT, [])
|
||||
)
|
||||
asserted_coin_announcements.extend(
|
||||
[_.vars[0].hex() for _ in conditions.get(ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT, [])]
|
||||
)
|
||||
created_puzzle_announcements.extend(
|
||||
[puzzle_reveal.get_tree_hash()] + _.vars
|
||||
for _ in conditions.get(ConditionOpcode.CREATE_PUZZLE_ANNOUNCEMENT, [])
|
||||
)
|
||||
asserted_puzzle_announcements.extend(
|
||||
[_.vars[0].hex() for _ in conditions.get(ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT, [])]
|
||||
)
|
||||
conditions = conditions_dict_for_solution(puzzle_reveal, solution, INFINITE_COST)
|
||||
for pk_bytes, m in pkm_pairs_for_conditions_dict(conditions, coin_name, agg_sig_additional_data):
|
||||
pks.append(G1Element.from_bytes(pk_bytes))
|
||||
msgs.append(m)
|
||||
print()
|
||||
cost, r = puzzle_reveal.run_with_cost(INFINITE_COST, solution)
|
||||
print(disassemble(r))
|
||||
print()
|
||||
if conditions and len(conditions) > 0:
|
||||
print("grouped conditions:")
|
||||
for condition_programs in conditions.values():
|
||||
print()
|
||||
else:
|
||||
print("(no output conditions generated)")
|
||||
for c in condition_programs:
|
||||
if len(c.vars) == 1:
|
||||
as_prog = Program.to([c.opcode, c.vars[0]])
|
||||
if len(c.vars) == 2:
|
||||
as_prog = Program.to([c.opcode, c.vars[0], c.vars[1]])
|
||||
print(f" {disassemble(as_prog)}")
|
||||
created_coin_announcements.extend(
|
||||
[coin_name] + _.vars for _ in conditions.get(ConditionOpcode.CREATE_COIN_ANNOUNCEMENT, [])
|
||||
)
|
||||
asserted_coin_announcements.extend(
|
||||
[_.vars[0].hex() for _ in conditions.get(ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT, [])]
|
||||
)
|
||||
created_puzzle_announcements.extend(
|
||||
[puzzle_reveal.get_tree_hash()] + _.vars
|
||||
for _ in conditions.get(ConditionOpcode.CREATE_PUZZLE_ANNOUNCEMENT, [])
|
||||
)
|
||||
asserted_puzzle_announcements.extend(
|
||||
[_.vars[0].hex() for _ in conditions.get(ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT, [])]
|
||||
)
|
||||
print()
|
||||
else:
|
||||
print("(no output conditions generated)")
|
||||
print()
|
||||
print("-------")
|
||||
|
||||
|
@ -28,7 +28,7 @@ from chia.simulator.time_out_assert import time_out_assert
|
||||
from chia.simulator.wallet_tools import WalletTool
|
||||
from chia.types.announcement import Announcement
|
||||
from chia.types.blockchain_format.coin import Coin
|
||||
from chia.types.blockchain_format.program import INFINITE_COST, Program
|
||||
from chia.types.blockchain_format.program import Program
|
||||
from chia.types.blockchain_format.serialized_program import SerializedProgram
|
||||
from chia.types.blockchain_format.sized_bytes import bytes32, bytes48
|
||||
from chia.types.clvm_cost import CLVMCost
|
||||
@ -42,12 +42,7 @@ from chia.types.mempool_item import MempoolItem
|
||||
from chia.types.spend_bundle import SpendBundle
|
||||
from chia.types.spend_bundle_conditions import Spend, SpendBundleConditions
|
||||
from chia.util.api_decorators import api_request
|
||||
from chia.util.condition_tools import (
|
||||
conditions_for_solution,
|
||||
parse_sexp_to_conditions,
|
||||
pkm_pairs,
|
||||
pkm_pairs_for_conditions_dict,
|
||||
)
|
||||
from chia.util.condition_tools import parse_sexp_to_conditions, pkm_pairs, pkm_pairs_for_conditions_dict
|
||||
from chia.util.errors import ConsensusError, Err
|
||||
from chia.util.hash import std_hash
|
||||
from chia.util.ints import uint32, uint64
|
||||
@ -1692,14 +1687,11 @@ class TestMempoolManager:
|
||||
unsigned: List[CoinSpend] = spend_bundle_0.coin_spends
|
||||
|
||||
assert len(unsigned) == 1
|
||||
coin_spend: CoinSpend = unsigned[0]
|
||||
|
||||
err, con, cost = conditions_for_solution(coin_spend.puzzle_reveal, coin_spend.solution, INFINITE_COST)
|
||||
assert con is not None
|
||||
# coin_spend: CoinSpend = unsigned[0]
|
||||
|
||||
# TODO(straya): fix this test
|
||||
# puzzle, solution = list(coin_spend.solution.as_iter())
|
||||
# conditions_dict = conditions_by_opcode(con)
|
||||
# conditions_dict = conditions_dict_for_solution(coin_spend.puzzle_reveal, coin_spend.solution, INFINITE_COST)
|
||||
|
||||
# pkm_pairs = pkm_pairs_for_conditions_dict(conditions_dict, coin_spend.coin.name())
|
||||
# assert len(pkm_pairs) == 1
|
||||
@ -2628,38 +2620,31 @@ class TestPkmPairsForConditionDict:
|
||||
|
||||
class TestParseSexpCondition:
|
||||
def test_basic(self) -> None:
|
||||
err, conds = parse_sexp_to_conditions(Program.to([[bytes([49]), b"foo", b"bar"]]))
|
||||
assert err is None
|
||||
conds = parse_sexp_to_conditions(Program.to([[bytes([49]), b"foo", b"bar"]]))
|
||||
assert conds == [ConditionWithArgs(ConditionOpcode.AGG_SIG_UNSAFE, [b"foo", b"bar"])]
|
||||
|
||||
def test_oversized_op(self) -> None:
|
||||
err, conds = parse_sexp_to_conditions(Program.to([[bytes([49, 49]), b"foo", b"bar"]]))
|
||||
assert err is Err.INVALID_CONDITION
|
||||
assert conds is None
|
||||
with pytest.raises(ConsensusError):
|
||||
parse_sexp_to_conditions(Program.to([[bytes([49, 49]), b"foo", b"bar"]]))
|
||||
|
||||
def test_empty_op(self) -> None:
|
||||
err, conds = parse_sexp_to_conditions(Program.to([[b"", b"foo", b"bar"]]))
|
||||
assert err is Err.INVALID_CONDITION
|
||||
assert conds is None
|
||||
with pytest.raises(ConsensusError):
|
||||
parse_sexp_to_conditions(Program.to([[b"", b"foo", b"bar"]]))
|
||||
|
||||
def test_list_op(self) -> None:
|
||||
err, conds = parse_sexp_to_conditions(Program.to([[[bytes([49])], b"foo", b"bar"]]))
|
||||
assert err is Err.INVALID_CONDITION
|
||||
assert conds is None
|
||||
with pytest.raises(ConsensusError):
|
||||
parse_sexp_to_conditions(Program.to([[[bytes([49])], b"foo", b"bar"]]))
|
||||
|
||||
def test_list_arg(self) -> None:
|
||||
err, conds = parse_sexp_to_conditions(Program.to([[bytes([49]), [b"foo", b"bar"]]]))
|
||||
assert err is None
|
||||
conds = parse_sexp_to_conditions(Program.to([[bytes([49]), [b"foo", b"bar"]]]))
|
||||
assert conds == [ConditionWithArgs(ConditionOpcode.AGG_SIG_UNSAFE, [])]
|
||||
|
||||
def test_list_arg_truncate(self) -> None:
|
||||
err, conds = parse_sexp_to_conditions(Program.to([[bytes([49]), b"baz", [b"foo", b"bar"]]]))
|
||||
assert err is None
|
||||
conds = parse_sexp_to_conditions(Program.to([[bytes([49]), b"baz", [b"foo", b"bar"]]]))
|
||||
assert conds == [ConditionWithArgs(ConditionOpcode.AGG_SIG_UNSAFE, [b"baz"])]
|
||||
|
||||
def test_arg_limit(self) -> None:
|
||||
err, conds = parse_sexp_to_conditions(Program.to([[bytes([49]), b"1", b"2", b"3", b"4", b"5", b"6"]]))
|
||||
assert err is None
|
||||
conds = parse_sexp_to_conditions(Program.to([[bytes([49]), b"1", b"2", b"3", b"4", b"5", b"6"]]))
|
||||
assert conds == [ConditionWithArgs(ConditionOpcode.AGG_SIG_UNSAFE, [b"1", b"2", b"3", b"4"])]
|
||||
|
||||
|
||||
|
@ -6,7 +6,7 @@ from blspy import AugSchemeMPL, G2Element, PrivateKey
|
||||
|
||||
from chia.simulator.block_tools import test_constants
|
||||
from chia.types.coin_spend import CoinSpend
|
||||
from chia.util.condition_tools import conditions_by_opcode, conditions_for_solution, pkm_pairs_for_conditions_dict
|
||||
from chia.util.condition_tools import conditions_dict_for_solution, pkm_pairs_for_conditions_dict
|
||||
from tests.core.make_block_generator import GROUP_ORDER, int_to_public_key
|
||||
|
||||
|
||||
@ -28,11 +28,9 @@ class KeyTool(dict):
|
||||
|
||||
def signature_for_solution(self, coin_spend: CoinSpend, additional_data: bytes) -> AugSchemeMPL:
|
||||
signatures = []
|
||||
err, conditions, cost = conditions_for_solution(
|
||||
conditions_dict = conditions_dict_for_solution(
|
||||
coin_spend.puzzle_reveal, coin_spend.solution, test_constants.MAX_BLOCK_COST_CLVM
|
||||
)
|
||||
assert conditions is not None
|
||||
conditions_dict = conditions_by_opcode(conditions)
|
||||
for public_key, message in pkm_pairs_for_conditions_dict(
|
||||
conditions_dict, coin_spend.coin.name(), additional_data
|
||||
):
|
||||
|
@ -1117,7 +1117,7 @@ class TestDIDWallet:
|
||||
)
|
||||
assert "spend_bundle" in response
|
||||
spend = response["spend_bundle"].coin_spends[0]
|
||||
error, conditions, cost = conditions_dict_for_solution(
|
||||
conditions = conditions_dict_for_solution(
|
||||
spend.puzzle_reveal.to_program(),
|
||||
spend.solution.to_program(),
|
||||
wallet.wallet_state_manager.constants.MAX_BLOCK_COST_CLVM,
|
||||
|
@ -118,8 +118,7 @@ def test_p2_singleton():
|
||||
p2_singleton_full = p2_singleton_puzzle(launcher_id, LAUNCHER_PUZZLE_HASH)
|
||||
solution = Program.to([innerpuz.get_tree_hash(), p2_singleton_coin_id])
|
||||
cost, result = p2_singleton_full.run_with_cost(INFINITE_COST, solution)
|
||||
err, conditions = parse_sexp_to_conditions(result)
|
||||
assert err is None
|
||||
conditions = parse_sexp_to_conditions(result)
|
||||
|
||||
p2_singleton_full = p2_singleton_puzzle(launcher_id, LAUNCHER_PUZZLE_HASH)
|
||||
solution = Program.to([innerpuz.get_tree_hash(), p2_singleton_coin_id])
|
||||
|
Loading…
Reference in New Issue
Block a user