Add tests for many puzzles, and fix some bugs found.

This commit is contained in:
Richard Kiss 2021-02-26 23:55:57 -08:00
parent fb1993dbc7
commit 3a2413cf3b
12 changed files with 371 additions and 89 deletions

View File

@ -21,7 +21,7 @@ def parse_sexp_to_condition(
If it fails, returns an Error
"""
if not sexp.listp():
return Err.SEXP_ERROR, None
return Err.INVALID_CONDITION, None
items = sexp.as_python()
if not isinstance(items[0], bytes):
return Err.INVALID_CONDITION, None

View File

@ -18,5 +18,5 @@ def puzzle_for_pk(public_key: Program) -> Program:
return MOD.curry(public_key)
def solution_for_conditions(puzzle_reveal: Program, conditions: Program) -> Program:
return conditions
def solution_for_conditions(conditions: Program) -> Program:
return conditions.to([conditions])

View File

@ -26,13 +26,10 @@ def puzzle_for_pk(public_key: bytes) -> Program:
return MOD.curry(public_key)
def puzzle_reveal_for_conditions(conditions: Program) -> Program:
return p2_conditions.puzzle_for_conditions(conditions)
def solution_for_conditions(conditions) -> Program:
return Program.to(0)
delegated_puzzle = p2_conditions.puzzle_for_conditions(conditions)
return solution_for_delegated_puzzle(delegated_puzzle, Program.to(0))
def solution_for_delegated_puzzle(delegated_puzzle: Program, delegated_solution: Program) -> Program:
return Program.to([delegated_puzzle, delegated_solution])
return delegated_puzzle.to([delegated_puzzle, delegated_solution])

View File

@ -85,14 +85,12 @@ def solution_for_delegated_puzzle(delegated_puzzle: Program, solution: Program)
return Program.to([[], delegated_puzzle, solution])
def solution_with_hidden_puzzle(
def solution_for_hidden_puzzle(
hidden_public_key: G1Element,
hidden_puzzle: Program,
solution_to_hidden_puzzle: Program,
) -> Program:
synthetic_public_key = calculate_synthetic_public_key(hidden_public_key, hidden_puzzle)
puzzle = puzzle_for_synthetic_public_key(synthetic_public_key)
return Program.to([puzzle, [hidden_public_key, hidden_puzzle, solution_to_hidden_puzzle]])
return Program.to([hidden_public_key, hidden_puzzle, solution_to_hidden_puzzle])
def solution_for_conditions(conditions) -> Program:

View File

@ -17,6 +17,5 @@ def puzzle_for_m_of_public_key_list(m, public_key_list) -> Program:
return MOD.curry(m, public_key_list)
def solution_for_delegated_puzzle(m, public_key_list, selectors, puzzle, solution) -> Program:
puzzle_reveal = puzzle_for_m_of_public_key_list(m, public_key_list)
return Program.to([puzzle_reveal, [selectors, puzzle, solution]])
def solution_for_delegated_puzzle(m, selectors, puzzle, solution) -> Program:
return Program.to([selectors, puzzle, solution])

View File

@ -6,6 +6,7 @@ hash along with its solution.
"""
from src.types.blockchain_format.program import Program
from src.types.blockchain_format.sized_bytes import bytes32
from .load_clvm import load_clvm
@ -13,12 +14,14 @@ from .load_clvm import load_clvm
MOD = load_clvm("p2_puzzle_hash.clvm")
def puzzle_for_puzzle_hash(inner_puzzle_hash) -> Program:
def puzzle_for_inner_puzzle_hash(inner_puzzle_hash: bytes32) -> Program:
program = MOD.curry(inner_puzzle_hash)
return program
def solution_for_puzzle_and_solution(inner_puzzle, inner_puzzle_solution) -> Program:
inner_puzzle_hash = Program.to(inner_puzzle).tree_hash()
puzzle_reveal = puzzle_for_puzzle_hash(inner_puzzle_hash)
return Program.to([puzzle_reveal, inner_puzzle_solution])
def puzzle_for_inner_puzzle(inner_puzzle: Program) -> Program:
return puzzle_for_inner_puzzle_hash(inner_puzzle.get_tree_hash())
def solution_for_inner_puzzle_and_inner_solution(inner_puzzle: Program, inner_puzzle_solution: Program) -> Program:
return Program.to([inner_puzzle, inner_puzzle_solution])

0
tests/clvm/__init__.py Normal file
View File

91
tests/clvm/coin_store.py Normal file
View File

@ -0,0 +1,91 @@
from collections import defaultdict
from dataclasses import dataclass, replace
from typing import Dict, Iterator, List
from src.consensus.blockchain_check_conditions import blockchain_check_conditions_dict
from src.types.announcement import Announcement
from src.types.blockchain_format.coin import Coin
from src.types.blockchain_format.sized_bytes import bytes32
from src.types.coin_record import CoinRecord
from src.types.spend_bundle import SpendBundle
from src.util.condition_tools import created_announcements_for_conditions_dict, conditions_dict_for_solution
from src.util.ints import uint32, uint64
class BadSpendBundleError(Exception):
pass
@dataclass
class CoinTimestamp:
seconds: int
height: int
class CoinStore:
def __init__(self):
self._db: Dict[bytes32, CoinRecord] = dict()
self._ph_index = defaultdict(list)
def farm_coin(self, puzzle_hash: bytes32, birthday: CoinTimestamp, amount: int = 1024) -> Coin:
parent = birthday.height.to_bytes(32, "big")
coin = Coin(parent, puzzle_hash, uint64(amount))
self._add_coin_entry(coin, birthday)
return coin
def validate_spend_bundle(
self,
spend_bundle: SpendBundle,
now: CoinTimestamp,
) -> int:
# this should use blockchain consensus code
announcements: List[Announcement] = []
conditions_dicts = []
for coin_solution in spend_bundle.coin_solutions:
err, conditions_dict, cost = conditions_dict_for_solution(
coin_solution.puzzle_reveal, coin_solution.solution
)
if conditions_dict is None:
raise BadSpendBundleError(f"clvm validation failure {err}")
conditions_dicts.append(conditions_dict)
announcements.extend(created_announcements_for_conditions_dict(conditions_dict, coin_solution.coin.name()))
for coin_solution, conditions_dict in zip(spend_bundle.coin_solutions, conditions_dicts):
prev_transaction_block_height = now.height
timestamp = now.seconds
coin_record = self._db[coin_solution.coin.name()]
err = blockchain_check_conditions_dict(
coin_record, announcements, conditions_dict, uint32(prev_transaction_block_height), uint64(timestamp)
)
if err is not None:
raise BadSpendBundleError(f"condition validation failure {err}")
return 0
def update_coin_store_for_spend_bundle(self, spend_bundle: SpendBundle, now: CoinTimestamp):
err = self.validate_spend_bundle(spend_bundle, now)
if err != 0:
raise BadSpendBundleError(f"validation failure {err}")
for spent_coin in spend_bundle.removals():
coin_name = spent_coin.name()
coin_record = self._db[coin_name]
self._db[coin_name] = replace(coin_record, spent_block_index=now.height, spent=True)
for new_coin in spend_bundle.additions():
self._add_coin_entry(new_coin, now)
def coins_for_puzzle_hash(self, puzzle_hash: bytes32) -> Iterator[Coin]:
for coin_name in self._ph_index[puzzle_hash]:
coin_entry = self._db[coin_name]
assert coin_entry.coin.puzzle_hash == puzzle_hash
yield coin_entry.coin
def all_coins(self) -> Iterator[Coin]:
for coin_entry in self._db.values():
yield coin_entry.coin
def _add_coin_entry(self, coin: Coin, birthday: CoinTimestamp) -> None:
name = coin.name()
assert name not in self._db
self._db[name] = CoinRecord(coin, uint32(birthday.height), uint32(0), False, False, uint64(birthday.seconds))
self._ph_index[coin.puzzle_hash].append(name)

240
tests/clvm/test_puzzles.py Normal file
View File

@ -0,0 +1,240 @@
from typing import Iterable, List, Tuple
from unittest import TestCase
from blspy import AugSchemeMPL, BasicSchemeMPL, G1Element, G2Element
from src.types.blockchain_format.program import Program
from src.types.blockchain_format.sized_bytes import bytes32
from src.types.coin_solution import CoinSolution
from src.types.spend_bundle import SpendBundle
from src.util.condition_tools import ConditionOpcode
from src.util.hash import std_hash
from src.wallet.puzzles import (
p2_conditions,
p2_delegated_conditions,
p2_delegated_puzzle,
p2_puzzle_hash,
p2_m_of_n_delegate_direct,
p2_delegated_puzzle_or_hidden_puzzle,
)
from tests.util.key_tool import KeyTool
from .coin_store import CoinStore, CoinTimestamp
T1 = CoinTimestamp(1, 10000000)
T2 = CoinTimestamp(5, 10003000)
def secret_exponent_for_index(index: int) -> int:
blob = index.to_bytes(32, "big")
hashed_blob = BasicSchemeMPL.key_gen(std_hash(b"foo" + blob))
r = int.from_bytes(hashed_blob, "big")
return r
def public_key_for_index(index: int, key_lookup: KeyTool) -> bytes:
secret_exponent = secret_exponent_for_index(index)
key_lookup.add_secret_exponents([secret_exponent])
return bytes(G1Element.generator() * secret_exponent)
def throwaway_puzzle_hash(index: int, key_lookup: KeyTool) -> bytes32:
return p2_delegated_puzzle.puzzle_for_pk(public_key_for_index(index, key_lookup)).get_tree_hash()
def do_test_spend(
puzzle_reveal: Program,
solution: Program,
payments: Iterable[Tuple[bytes32, int]],
key_lookup: KeyTool,
farm_time: CoinTimestamp = T1,
spend_time: CoinTimestamp = T2,
) -> SpendBundle:
"""
This method will farm a coin paid to the hash of `puzzle_reveal`, then try to spend it
with `solution`, and verify that the created coins correspond to `payments`.
The `key_lookup` is used to create a signed version of the `SpendBundle`, although at
this time, signatures are not verified.
"""
coin_db = CoinStore()
puzzle_hash = puzzle_reveal.get_tree_hash()
# farm it
coin = coin_db.farm_coin(puzzle_hash, farm_time)
# spend it
coin_solution = CoinSolution(coin, puzzle_reveal, solution)
spend_bundle = SpendBundle([coin_solution], G2Element.infinity())
coin_db.update_coin_store_for_spend_bundle(spend_bundle, spend_time)
# ensure all outputs are there
for puzzle_hash, amount in payments:
for coin in coin_db.coins_for_puzzle_hash(puzzle_hash):
if coin.amount == amount:
break
else:
assert 0
# make sure we can actually sign the solution
signatures = []
for coin_solution in spend_bundle.coin_solutions:
signature = key_lookup.signature_for_solution(coin_solution)
signatures.append(signature)
return SpendBundle(spend_bundle.coin_solutions, AugSchemeMPL.aggregate(signatures))
def default_payments_and_conditions(
initial_index: int, key_lookup: KeyTool
) -> Tuple[List[Tuple[bytes32, int]], Program]:
payments = [
(throwaway_puzzle_hash(initial_index + 1, key_lookup), initial_index * 1000),
(throwaway_puzzle_hash(initial_index + 2, key_lookup), (initial_index + 1) * 1000),
]
conditions = Program.to([make_create_coin_condition(ph, amount) for ph, amount in payments])
return payments, conditions
def make_create_coin_condition(puzzle_hash, amount):
return Program.to([ConditionOpcode.CREATE_COIN, puzzle_hash, amount])
class TestPuzzles(TestCase):
def test_p2_conditions(self):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
puzzle = p2_conditions.puzzle_for_conditions(conditions)
solution = p2_conditions.solution_for_conditions(conditions)
do_test_spend(puzzle, solution, payments, key_lookup)
def test_p2_delegated_conditions(self):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
pk = public_key_for_index(1, key_lookup)
puzzle = p2_delegated_conditions.puzzle_for_pk(pk)
solution = p2_delegated_conditions.solution_for_conditions(conditions)
do_test_spend(puzzle, solution, payments, key_lookup)
def test_p2_delegated_puzzle_simple(self):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
pk = public_key_for_index(1, key_lookup)
puzzle = p2_delegated_puzzle.puzzle_for_pk(pk)
solution = p2_delegated_puzzle.solution_for_conditions(conditions)
do_test_spend(puzzle, solution, payments, key_lookup)
def test_p2_delegated_puzzle_graftroot(self):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
delegated_puzzle = p2_delegated_conditions.puzzle_for_pk(public_key_for_index(8, key_lookup))
delegated_solution = p2_delegated_conditions.solution_for_conditions(conditions)
puzzle_program = p2_delegated_puzzle.puzzle_for_pk(public_key_for_index(1, key_lookup))
solution = p2_delegated_puzzle.solution_for_delegated_puzzle(delegated_puzzle, delegated_solution)
do_test_spend(puzzle_program, solution, payments, key_lookup)
def test_p2_puzzle_hash(self):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
inner_puzzle = p2_delegated_conditions.puzzle_for_pk(public_key_for_index(4, key_lookup))
inner_solution = p2_delegated_conditions.solution_for_conditions(conditions)
inner_puzzle_hash = inner_puzzle.get_tree_hash()
puzzle_program = p2_puzzle_hash.puzzle_for_inner_puzzle_hash(inner_puzzle_hash)
assert puzzle_program == p2_puzzle_hash.puzzle_for_inner_puzzle(inner_puzzle)
solution = p2_puzzle_hash.solution_for_inner_puzzle_and_inner_solution(inner_puzzle, inner_solution)
do_test_spend(puzzle_program, solution, payments, key_lookup)
def test_p2_m_of_n_delegated_puzzle(self):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
pks = [public_key_for_index(_, key_lookup) for _ in range(1, 6)]
M = 3
delegated_puzzle = p2_conditions.puzzle_for_conditions(conditions)
delegated_solution = []
puzzle_program = p2_m_of_n_delegate_direct.puzzle_for_m_of_public_key_list(M, pks)
selectors = [1, [], [], 1, 1]
solution = p2_m_of_n_delegate_direct.solution_for_delegated_puzzle(
M, selectors, delegated_puzzle, delegated_solution
)
do_test_spend(puzzle_program, solution, payments, key_lookup)
def test_p2_delegated_puzzle_or_hidden_puzzle_with_hidden_puzzle(self):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
hidden_puzzle = p2_conditions.puzzle_for_conditions(conditions)
hidden_public_key = public_key_for_index(10, key_lookup)
puzzle = p2_delegated_puzzle_or_hidden_puzzle.puzzle_for_public_key_and_hidden_puzzle(
hidden_public_key, hidden_puzzle
)
solution = p2_delegated_puzzle_or_hidden_puzzle.solution_for_hidden_puzzle(
hidden_public_key, hidden_puzzle, Program.to(0)
)
do_test_spend(puzzle, solution, payments, key_lookup)
def do_test_spend_p2_delegated_puzzle_or_hidden_puzzle_with_delegated_puzzle(self, hidden_pub_key_index):
key_lookup = KeyTool()
payments, conditions = default_payments_and_conditions(1, key_lookup)
hidden_puzzle = p2_conditions.puzzle_for_conditions(conditions)
hidden_public_key = public_key_for_index(hidden_pub_key_index, key_lookup)
puzzle = p2_delegated_puzzle_or_hidden_puzzle.puzzle_for_public_key_and_hidden_puzzle(
hidden_public_key, hidden_puzzle
)
payable_payments, payable_conditions = default_payments_and_conditions(5, key_lookup)
delegated_puzzle = p2_conditions.puzzle_for_conditions(payable_conditions)
delegated_solution = []
synthetic_public_key = p2_delegated_puzzle_or_hidden_puzzle.calculate_synthetic_public_key(
hidden_public_key, hidden_puzzle.get_tree_hash()
)
solution = p2_delegated_puzzle_or_hidden_puzzle.solution_for_delegated_puzzle(
delegated_puzzle, delegated_solution
)
hidden_puzzle_hash = hidden_puzzle.get_tree_hash()
synthetic_offset = p2_delegated_puzzle_or_hidden_puzzle.calculate_synthetic_offset(
hidden_public_key, hidden_puzzle_hash
)
hidden_pub_key_point = G1Element.from_bytes(hidden_public_key)
assert synthetic_public_key == synthetic_offset * G1Element.generator() + hidden_pub_key_point
secret_exponent = key_lookup.get(hidden_public_key)
assert G1Element.generator() * secret_exponent == hidden_pub_key_point
synthetic_secret_exponent = secret_exponent + synthetic_offset
key_lookup.add_secret_exponents([synthetic_secret_exponent])
do_test_spend(puzzle, solution, payable_payments, key_lookup)
def test_p2_delegated_puzzle_or_hidden_puzzle_with_delegated_puzzle(self):
for hidden_pub_key_index in range(1, 10):
self.do_test_spend_p2_delegated_puzzle_or_hidden_puzzle_with_delegated_puzzle(hidden_pub_key_index)

View File

@ -1,41 +0,0 @@
from src.util.hash import std_hash
from src.util.condition_tools import (
conditions_by_opcode,
aggsig_in_conditions_dict,
created_outputs_for_conditions_dict,
conditions_for_solution,
)
from src.wallet.puzzles import p2_delegated_puzzle
from src.wallet.puzzles.puzzle_utils import make_create_coin_condition
from src.types.blockchain_format.program import Program
from src.util.ints import uint32
from tests.keys import puzzle_program_for_index
def test_1():
puzzle_program_1 = puzzle_program_for_index(uint32(1))
puzzle_program_2 = puzzle_program_for_index(uint32(2))
conditions = Program.to(
[
make_create_coin_condition(std_hash(bytes(pp)), amount)
for pp, amount in [(puzzle_program_1, 1000), (puzzle_program_2, 2000)]
]
)
assert conditions is not None
puzzle_reveal = p2_delegated_puzzle.puzzle_reveal_for_conditions(conditions)
solution = p2_delegated_puzzle.solution_for_conditions(conditions)
error, output_conditions, cost = conditions_for_solution(puzzle_reveal, solution)
assert error is None
from pprint import pprint
assert output_conditions is not None
output_conditions_dict = conditions_by_opcode(output_conditions)
pprint(output_conditions_dict)
input_coin_info_hash = bytes([0] * 32)
created_outputs_for_conditions_dict(output_conditions_dict, input_coin_info_hash)
aggsigs = aggsig_in_conditions_dict(output_conditions_dict)
pprint(aggsigs)

View File

@ -1,11 +0,0 @@
from blspy import AugSchemeMPL
from src.wallet.puzzles.p2_delegated_puzzle_or_hidden_puzzle import puzzle_for_pk
from src.util.ints import uint32
from src.wallet.derive_keys import master_sk_to_wallet_sk
MASTER_KEY = AugSchemeMPL.key_gen(bytes([1] * 32))
def puzzle_program_for_index(index: uint32):
return puzzle_for_pk(bytes(master_sk_to_wallet_sk(MASTER_KEY, index).get_g1()))

View File

@ -1,11 +1,17 @@
from blspy import PrivateKey, AugSchemeMPL
from typing import List
from blspy import AugSchemeMPL, G1Element, G2Element, PrivateKey
from src.util.condition_tools import (
conditions_by_opcode,
pkm_pairs_for_conditions_dict,
conditions_for_solution,
)
from src.types.blockchain_format.program import Program
from src.types.blockchain_format.sized_bytes import bytes32
from src.types.coin_solution import CoinSolution
GROUP_ORDER = 0x73EDA753299D7D483339D80809A1D80553BDA402FFFE5BFEFFFFFFFF00000001
class KeyTool(dict):
@ -13,23 +19,23 @@ class KeyTool(dict):
def __new__(cls, *args):
return dict.__new__(*args)
def add_secret_exponents(self, secret_exponents):
def add_secret_exponents(self, secret_exponents: List[int]) -> None:
for _ in secret_exponents:
bls_private_key = PrivateKey.from_bytes(_.to_bytes(32, "big"))
self[bls_private_key.get_g1()] = bls_private_key
self[bytes(G1Element.generator() * _)] = _ % GROUP_ORDER
def sign(self, pk, msg):
private = self.get(pk)
if not private:
raise ValueError("unknown pubkey %s" % pk)
return AugSchemeMPL.sign(private, msg)
def sign(self, public_key: bytes, message_hash: bytes32) -> G2Element:
secret_exponent = self.get(public_key)
if not secret_exponent:
raise ValueError("unknown pubkey %s" % public_key.hex())
bls_private_key = PrivateKey.from_bytes(secret_exponent.to_bytes(32, "big"))
return AugSchemeMPL.sign(bls_private_key, message_hash)
def signature_for_solution(self, puzzle: Program, solution: Program, coin_name) -> AugSchemeMPL:
def signature_for_solution(self, coin_solution: CoinSolution) -> AugSchemeMPL:
signatures = []
conditions = conditions_for_solution(puzzle, solution)
assert conditions[1] is not None
conditions_dict = conditions_by_opcode(conditions[1])
for pk, msg in pkm_pairs_for_conditions_dict(conditions_dict, coin_name):
signature = self.sign(pk, msg)
err, conditions, cost = conditions_for_solution(coin_solution.puzzle_reveal, coin_solution.solution)
assert conditions is not None
conditions_dict = conditions_by_opcode(conditions)
for public_key, message_hash in pkm_pairs_for_conditions_dict(conditions_dict, coin_solution.coin.name()):
signature = self.sign(bytes(public_key), message_hash)
signatures.append(signature)
return AugSchemeMPL.aggregate(signatures)