Tests skipping mempool (#3065)

* Avoid importing `test_constants` as it takes a long time.

* Some new tests that circumvents mempool.

* Fix lint problems.
This commit is contained in:
Richard Kiss 2021-04-30 10:23:45 -07:00 committed by GitHub
parent b084813b12
commit a7f996b4dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 227 additions and 1 deletions

View File

@ -135,7 +135,7 @@ class Keychain:
def _get_pk_and_entropy(self, user: str) -> Optional[Tuple[G1Element, bytes]]:
"""
Returns the keychain conntents for a specific 'user' (key index). The contents
Returns the keychain contents for a specific 'user' (key index). The contents
include an G1Element and the entropy required to generate the private key.
Note that generating the actual private key also requires the passphrase.
"""

View File

@ -0,0 +1,18 @@
from typing import Tuple
import aiosqlite
from chia.consensus.blockchain import Blockchain
from chia.consensus.constants import ConsensusConstants
from chia.full_node.block_store import BlockStore
from chia.full_node.coin_store import CoinStore
from chia.util.db_wrapper import DBWrapper
async def create_ram_blockchain(consensus_constants: ConsensusConstants) -> Tuple[aiosqlite.Connection, Blockchain]:
connection = await aiosqlite.connect(":memory:")
db_wrapper = DBWrapper(connection)
block_store = await BlockStore.create(db_wrapper)
coin_store = await CoinStore.create(db_wrapper)
blockchain = await Blockchain.create(coin_store, block_store, consensus_constants)
return connection, blockchain

View File

@ -0,0 +1,208 @@
"""
These are quick-to-run test that check spends can be added to the blockchain when they're valid
or that they're failing for the right reason when they're invalid.
"""
import logging
import time
from typing import List, Optional
import pytest
from blspy import G2Element
from clvm_tools.binutils import assemble
from chia.consensus.blockchain import ReceiveBlockResult
from chia.consensus.constants import ConsensusConstants
from chia.types.announcement import Announcement
from chia.types.blockchain_format.program import Program
from chia.types.coin_solution import CoinSolution
from chia.types.condition_opcodes import ConditionOpcode
from chia.types.full_block import FullBlock
from chia.types.spend_bundle import SpendBundle
from chia.util.block_tools import BlockTools, test_constants
from chia.util.errors import Err
from .ram_db import create_ram_blockchain
bt = BlockTools(constants=test_constants)
log = logging.getLogger(__name__)
# This puzzle simply returns the solution as conditions.
# We call it the `EASY_PUZZLE` because it's pretty easy to solve.
EASY_PUZZLE = Program.to(assemble("1"))
EASY_PUZZLE_HASH = EASY_PUZZLE.get_tree_hash()
def initial_blocks(block_count: int = 4) -> List[FullBlock]:
blocks = bt.get_consecutive_blocks(
block_count,
guarantee_transaction_block=True,
farmer_reward_puzzle_hash=EASY_PUZZLE_HASH,
pool_reward_puzzle_hash=EASY_PUZZLE_HASH,
)
return blocks
async def check_spend_bundle_validity(
constants: ConsensusConstants,
blocks: List[FullBlock],
spend_bundle: SpendBundle,
expected_err: Optional[Err] = None,
):
"""
This test helper create an extra block after the given blocks that contains the given
`SpendBundle`, and then invokes `receive_block` to ensure that it's accepted (if `expected_err=None`)
or fails with the correct error code.
"""
try:
connection, blockchain = await create_ram_blockchain(constants)
for block in blocks:
received_block_result, err, fork_height = await blockchain.receive_block(block)
assert err is None
additional_blocks = bt.get_consecutive_blocks(
1,
block_list_input=blocks,
guarantee_transaction_block=True,
transaction_data=spend_bundle,
)
newest_block = additional_blocks[-1]
received_block_result, err, fork_height = await blockchain.receive_block(newest_block)
if expected_err is None:
assert err is None
assert received_block_result == ReceiveBlockResult.NEW_PEAK
assert fork_height == len(blocks) - 1
else:
assert err == expected_err
assert received_block_result == ReceiveBlockResult.INVALID_BLOCK
assert fork_height is None
finally:
# if we don't close the connection, the test process doesn't exit cleanly
await connection.close()
# we must call `shut_down` or the executor in `Blockchain` doesn't stop
blockchain.shut_down()
async def check_conditions(
condition_solution: Program, expected_err: Optional[Err] = None, spend_reward_index: int = -2
):
blocks = initial_blocks()
coin = list(blocks[spend_reward_index].get_included_reward_coins())[0]
coin_solution = CoinSolution(coin, EASY_PUZZLE, condition_solution)
spend_bundle = SpendBundle([coin_solution], G2Element())
# now let's try to create a block with the spend bundle and ensure that it doesn't validate
await check_spend_bundle_validity(bt.constants, blocks, spend_bundle, expected_err=expected_err)
class TestConditions:
@pytest.mark.asyncio
async def test_invalid_block_age(self):
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_RELATIVE[0]} 2))"))
await check_conditions(conditions, expected_err=Err.ASSERT_HEIGHT_RELATIVE_FAILED)
@pytest.mark.asyncio
async def test_valid_block_age(self):
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_RELATIVE[0]} 1))"))
await check_conditions(conditions)
@pytest.mark.asyncio
async def test_invalid_block_height(self):
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_ABSOLUTE[0]} 4))"))
await check_conditions(conditions, expected_err=Err.ASSERT_HEIGHT_ABSOLUTE_FAILED)
@pytest.mark.asyncio
async def test_valid_block_height(self):
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_HEIGHT_ABSOLUTE[0]} 3))"))
await check_conditions(conditions)
@pytest.mark.asyncio
async def test_invalid_my_id(self):
blocks = initial_blocks()
coin = list(blocks[-2].get_included_reward_coins())[0]
wrong_name = bytearray(coin.name())
wrong_name[-1] ^= 1
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_MY_COIN_ID[0]} 0x{wrong_name.hex()}))"))
await check_conditions(conditions, expected_err=Err.ASSERT_MY_COIN_ID_FAILED)
@pytest.mark.asyncio
async def test_valid_my_id(self):
blocks = initial_blocks()
coin = list(blocks[-2].get_included_reward_coins())[0]
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_MY_COIN_ID[0]} 0x{coin.name().hex()}))"))
await check_conditions(conditions)
@pytest.mark.asyncio
async def test_invalid_seconds_absolute(self):
# TODO: make the test suite not use `time.time` so we can more accurately
# set `time_now` to make it minimal while still failing
time_now = int(time.time()) + 3000
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_SECONDS_ABSOLUTE[0]} {time_now}))"))
await check_conditions(conditions, expected_err=Err.ASSERT_SECONDS_ABSOLUTE_FAILED)
@pytest.mark.asyncio
async def test_valid_seconds_absolute(self):
time_now = int(time.time())
conditions = Program.to(assemble(f"(({ConditionOpcode.ASSERT_SECONDS_ABSOLUTE[0]} {time_now}))"))
await check_conditions(conditions)
@pytest.mark.asyncio
async def test_invalid_coin_announcement(self):
blocks = initial_blocks()
coin = list(blocks[-2].get_included_reward_coins())[0]
announce = Announcement(coin.name(), b"test_bad")
conditions = Program.to(
assemble(
f"(({ConditionOpcode.CREATE_COIN_ANNOUNCEMENT[0]} 'test')"
f"({ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))"
)
)
await check_conditions(conditions, expected_err=Err.ASSERT_ANNOUNCE_CONSUMED_FAILED)
@pytest.mark.asyncio
async def test_valid_coin_announcement(self):
blocks = initial_blocks()
coin = list(blocks[-2].get_included_reward_coins())[0]
announce = Announcement(coin.name(), b"test")
conditions = Program.to(
assemble(
f"(({ConditionOpcode.CREATE_COIN_ANNOUNCEMENT[0]} 'test')"
f"({ConditionOpcode.ASSERT_COIN_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))"
)
)
await check_conditions(conditions)
@pytest.mark.asyncio
async def test_invalid_puzzle_announcement(self):
announce = Announcement(EASY_PUZZLE_HASH, b"test_bad")
conditions = Program.to(
assemble(
f"(({ConditionOpcode.CREATE_PUZZLE_ANNOUNCEMENT[0]} 'test')"
f"({ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))"
)
)
await check_conditions(conditions, expected_err=Err.ASSERT_ANNOUNCE_CONSUMED_FAILED)
@pytest.mark.asyncio
async def test_valid_puzzle_announcement(self):
announce = Announcement(EASY_PUZZLE_HASH, b"test")
conditions = Program.to(
assemble(
f"(({ConditionOpcode.CREATE_PUZZLE_ANNOUNCEMENT[0]} 'test')"
f"({ConditionOpcode.ASSERT_PUZZLE_ANNOUNCEMENT[0]} 0x{announce.name().hex()}))"
)
)
await check_conditions(conditions)