Async waiting for processes, fix networking bug, reorganize blockchain class

This commit is contained in:
Mariano Sorgente 2020-01-14 17:35:51 +09:00
parent a52014881f
commit 69c808a33f
No known key found for this signature in database
GPG Key ID: 0F866338C369278C
9 changed files with 174 additions and 137 deletions

2
.gitignore vendored
View File

@ -11,7 +11,7 @@ __pycache__/
# Database
nohup.out
mongod.log*
fndb_test
fndb_test*
*.db
# Logs

View File

@ -40,26 +40,39 @@ class ReceiveBlockResult(Enum):
class Blockchain:
def __init__(self, override_constants: Dict = {}):
# Allow passing in custom overrides for any consesus parameters
self.constants: Dict = consensus_constants
# Allow passing in custom overrides for any consesus parameters
constants: Dict
# Tips of the blockchain
tips: List[HeaderBlock]
# Least common ancestor of tips
lca_block: HeaderBlock
# Defines the path from genesis to the tip
height_to_hash: Dict[uint32, bytes32]
# All headers (but not orphans) from genesis to the tip are guaranteed to be in header_blocks
header_blocks: Dict[bytes32, HeaderBlock]
# Process pool to verify blocks
pool: concurrent.futures.ProcessPoolExecutor
# Genesis block
genesis: FullBlock
@staticmethod
async def create(
header_blocks: Dict[str, HeaderBlock], override_constants: Dict = {}
):
"""
Initializes a blockchain with the given header blocks, assuming they have all been
validated. If no header_blocks are given, only the genesis block is added.
Uses the genesis block given in override_constants, or as a fallback,
in the consensus constants config.
"""
self = Blockchain()
self.constants = consensus_constants
for key, value in override_constants.items():
self.constants[key] = value
self.tips = []
self.height_to_hash = {}
self.header_blocks = {}
self.tips: List[HeaderBlock] = []
self.lca_block: HeaderBlock
# Defines the path from genesis to the tip
self.height_to_hash: Dict[uint32, bytes32] = {}
# All headers (but not orphans) from genesis to the tip are guaranteed to be in header_blocks
self.header_blocks: Dict[bytes32, HeaderBlock] = {}
cpu_count = multiprocessing.cpu_count()
# Pool of workers to validate blocks concurrently
self.pool = concurrent.futures.ProcessPoolExecutor(
max_workers=max(cpu_count - 1, 1)
)
async def initialize(self, header_blocks: Dict[str, HeaderBlock]):
self.genesis = FullBlock.from_bytes(self.constants["GENESIS_BLOCK"])
result = await self.receive_block(self.genesis)
@ -81,6 +94,7 @@ class Blockchain:
self.header_blocks[self.height_to_hash[uint32(1)]].prev_header_hash
== self.genesis.header_hash
)
return self
def get_current_tips(self) -> List[HeaderBlock]:
"""
@ -672,10 +686,15 @@ class Blockchain:
self, blocks: List[FullBlock]
) -> List[Tuple[bool, Optional[bytes32]]]:
futures = []
cpu_count = multiprocessing.cpu_count()
# Pool of workers to validate blocks concurrently
pool = concurrent.futures.ProcessPoolExecutor(max_workers=max(cpu_count - 1, 1))
for block in blocks:
futures.append(
asyncio.get_running_loop().run_in_executor(
self.pool, self.pre_validate_block_multi, bytes(block)
pool, self.pre_validate_block_multi, bytes(block)
)
)
results = await asyncio.gather(*futures)
@ -684,7 +703,7 @@ class Blockchain:
if pos is not None:
pos = bytes32(pos)
results[i] = val, pos
pool.shutdown()
return results
@staticmethod

View File

@ -43,8 +43,8 @@ class ChiaServer:
global_connections: PeerConnections
# Optional listening server. You can also use this class without starting one.
_server: Optional[asyncio.AbstractServer] = None
_host: Optional[str] = None
_server: Optional[asyncio.AbstractServer]
_host: Optional[str]
# (StreamReader, StreamWriter, NodeType) aiter, gets things from server and clients and
# sends them through the pipeline
@ -57,10 +57,13 @@ class ChiaServer:
_outbound_aiter: push_aiter
# Called for inbound connections after successful handshake
_on_inbound_connect: OnConnectFunc = None
_on_inbound_connect: OnConnectFunc
def __init__(self, port: int, api: Any, local_type: NodeType):
self.global_connections = PeerConnections([])
self._server = None
self._host = None
self._on_inbound_connect = None
self._port = port # TCP port to identify our node
self._api = api # API module that will be called from the requests
self._local_type = local_type # NodeType (farmer, full node, timelord, pool, harvester, wallet)
@ -257,12 +260,7 @@ class ChiaServer:
log.info(f"-> {message.function} to peer {connection.get_peername()}")
try:
await connection.send(message)
except (
ConnectionResetError,
BrokenPipeError,
RuntimeError,
TimeoutError,
) as e:
except (RuntimeError, TimeoutError, OSError,) as e:
log.error(
f"Cannot write to {connection}, already closed. Error {e}."
)
@ -371,7 +369,7 @@ class ChiaServer:
InvalidAck,
InvalidHandshake,
asyncio.IncompleteReadError,
ConnectionResetError,
OSError,
Exception,
) as e:
log.warning(f"{e}, handshake not completed. Connection not created.")

View File

@ -66,8 +66,7 @@ async def main():
log.info("Initializing blockchain from disk")
header_blocks: Dict[str, HeaderBlock] = await load_header_blocks_from_store(store)
blockchain = Blockchain()
await blockchain.initialize(header_blocks)
blockchain = await Blockchain.create(header_blocks)
full_node = FullNode(store, blockchain)
# Starts the full node server (which full nodes can connect to)
@ -138,7 +137,6 @@ async def main():
await rpc_cleanup()
await store.close()
blockchain.pool.shutdown()
await asyncio.get_running_loop().shutdown_asyncgens()

View File

@ -18,38 +18,35 @@ class FullNodeStore:
db_name: str
db: aiosqlite.Connection
# Whether or not we are syncing
sync_mode: bool = False
sync_mode: bool
# Potential new tips that we have received from others.
potential_tips: Dict[bytes32, FullBlock] = {}
potential_tips: Dict[bytes32, FullBlock]
# List of all header hashes up to the tip, download up front
potential_hashes: List[bytes32] = []
potential_hashes: List[bytes32]
# Header blocks received from other peers during sync
potential_headers: Dict[uint32, HeaderBlock] = {}
potential_headers: Dict[uint32, HeaderBlock]
# Event to signal when header hashes are received
potential_hashes_received: Optional[asyncio.Event] = None
potential_hashes_received: Optional[asyncio.Event]
# Event to signal when headers are received at each height
potential_headers_received: Dict[uint32, asyncio.Event] = {}
potential_headers_received: Dict[uint32, asyncio.Event]
# Event to signal when blocks are received at each height
potential_blocks_received: Dict[uint32, asyncio.Event] = {}
potential_blocks_received: Dict[uint32, asyncio.Event]
# Blocks that we have finalized during sync, queue them up for adding after sync is done
potential_future_blocks: List[FullBlock] = []
potential_future_blocks: List[FullBlock]
# Current estimate of the speed of the network timelords
proof_of_time_estimate_ips: uint64 = uint64(10000)
proof_of_time_estimate_ips: uint64
# Our best unfinished block
unfinished_blocks_leader: Tuple[uint32, uint64] = (
uint32(0),
uint64((1 << 64) - 1),
)
unfinished_blocks_leader: Tuple[uint32, uint64]
# Blocks which we have created, but don't have proof of space yet, old ones are cleared
candidate_blocks: Dict[bytes32, Tuple[Body, HeaderData, ProofOfSpace, uint32]] = {}
candidate_blocks: Dict[bytes32, Tuple[Body, HeaderData, ProofOfSpace, uint32]]
# Blocks which are not finalized yet (no proof of time), old ones are cleared
unfinished_blocks: Dict[Tuple[bytes32, uint64], FullBlock] = {}
unfinished_blocks: Dict[Tuple[bytes32, uint64], FullBlock]
# Header hashes of unfinished blocks that we have seen recently
seen_unfinished_blocks: set = set()
seen_unfinished_blocks: set
# Header hashes of blocks that we have seen recently
seen_blocks: set = set()
seen_blocks: set
# Blocks which we have received but our blockchain does not reach, old ones are cleared
disconnected_blocks: Dict[bytes32, FullBlock] = {}
disconnected_blocks: Dict[bytes32, FullBlock]
# Lock
lock: asyncio.Lock
@ -74,7 +71,25 @@ class FullNodeStore:
"CREATE INDEX IF NOT EXISTS block_height on blocks(height)"
)
await self.db.commit()
# Lock
self.sync_mode = False
self.potential_tips = {}
self.potential_hashes = []
self.potential_headers = {}
self.potential_hashes_received = None
self.potential_headers_received = {}
self.potential_blocks_received = {}
self.potential_future_blocks = []
self.proof_of_time_estimate_ips = uint64(10000)
self.unfinished_blocks_leader = (
uint32(0),
uint64((1 << 64) - 1),
)
self.candidate_blocks = {}
self.unfinished_blocks = {}
self.seen_unfinished_blocks = set()
self.seen_blocks = set()
self.disconnected_blocks = {}
self.lock = asyncio.Lock() # external
return self

View File

@ -46,9 +46,8 @@ class TestRpc:
store = await FullNodeStore.create("fndb_test")
await store._clear_database()
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 10)
b: Blockchain = Blockchain(test_constants)
b: Blockchain = await Blockchain.create({}, test_constants)
await store.add_block(blocks[0])
await b.initialize({})
for i in range(1, 9):
assert (
await b.receive_block(blocks[i])
@ -107,10 +106,10 @@ class TestRpc:
# Checks that the RPC manages to stop the node
await client.stop_node()
await store.close()
client.close()
await client.await_closed()
server_2.close_all()
await server_1.await_closed()
await server_2.await_closed()
await rpc_cleanup()
await store.close()

View File

@ -42,8 +42,7 @@ def event_loop():
class TestGenesisBlock:
@pytest.mark.asyncio
async def test_basic_blockchain(self):
bc1: Blockchain = Blockchain()
await bc1.initialize({})
bc1 = await Blockchain.create({})
assert len(bc1.get_current_tips()) == 1
genesis_block = bc1.get_current_tips()[0]
assert genesis_block.height == 0
@ -64,8 +63,7 @@ class TestBlockValidation:
Provides a list of 10 valid blocks, as well as a blockchain with 9 blocks added to it.
"""
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 10)
b: Blockchain = Blockchain(test_constants)
await b.initialize({})
b: Blockchain = await Blockchain.create({}, test_constants)
for i in range(1, 9):
assert (
await b.receive_block(blocks[i])
@ -241,8 +239,7 @@ class TestBlockValidation:
# Make it 5x faster than target time
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 2)
b: Blockchain = Blockchain(test_constants)
await b.initialize({})
b: Blockchain = await Blockchain.create({}, test_constants)
for i in range(1, num_blocks):
assert (
await b.receive_block(blocks[i])
@ -275,8 +272,7 @@ class TestReorgs:
@pytest.mark.asyncio
async def test_basic_reorg(self):
blocks = bt.get_consecutive_blocks(test_constants, 100, [], 9)
b: Blockchain = Blockchain(test_constants)
await b.initialize({})
b: Blockchain = await Blockchain.create({}, test_constants)
for block in blocks:
await b.receive_block(block)
@ -298,8 +294,7 @@ class TestReorgs:
@pytest.mark.asyncio
async def test_reorg_from_genesis(self):
blocks = bt.get_consecutive_blocks(test_constants, 20, [], 9, b"0")
b: Blockchain = Blockchain(test_constants)
await b.initialize({})
b: Blockchain = await Blockchain.create({}, test_constants)
for block in blocks:
await b.receive_block(block)
assert b.get_current_tips()[0].height == 20
@ -335,8 +330,7 @@ class TestReorgs:
@pytest.mark.asyncio
async def test_lca(self):
blocks = bt.get_consecutive_blocks(test_constants, 5, [], 9, b"0")
b: Blockchain = Blockchain(test_constants)
await b.initialize({})
b: Blockchain = await Blockchain.create({}, test_constants)
for block in blocks:
await b.receive_block(block)
@ -357,8 +351,7 @@ class TestReorgs:
@pytest.mark.asyncio
async def test_get_header_hashes(self):
blocks = bt.get_consecutive_blocks(test_constants, 5, [], 9, b"0")
b: Blockchain = Blockchain(test_constants)
await b.initialize({})
b: Blockchain = await Blockchain.create({}, test_constants)
for block in blocks:
await b.receive_block(block)
header_hashes = b.get_header_hashes(blocks[-1].header_hash)

View File

@ -45,9 +45,8 @@ class TestNodeLoad:
store = await FullNodeStore.create("fndb_test")
await store._clear_database()
blocks = bt.get_consecutive_blocks(test_constants, 10, [], 10)
b: Blockchain = Blockchain(test_constants)
b: Blockchain = await Blockchain.create({}, test_constants)
await store.add_block(blocks[0])
await b.initialize({})
for i in range(1, 9):
assert (
await b.receive_block(blocks[i])
@ -111,9 +110,8 @@ class TestNodeLoad:
store = await FullNodeStore.create("fndb_test")
await store._clear_database()
blocks = bt.get_consecutive_blocks(test_constants, num_blocks, [], 10)
b: Blockchain = Blockchain(test_constants)
b: Blockchain = await Blockchain.create({}, test_constants)
await store.add_block(blocks[0])
await b.initialize({})
full_node_1 = FullNode(store, b)
server_1 = ChiaServer(21236, full_node_1, NodeType.FULL_NODE)

View File

@ -39,82 +39,99 @@ class TestStore:
blocks = bt.get_consecutive_blocks(test_constants, 9, [], 9, b"0")
db = await FullNodeStore.create("fndb_test")
await db._clear_database()
db_2 = await FullNodeStore.create("fndb_test_2")
try:
await db._clear_database()
genesis = FullBlock.from_bytes(constants["GENESIS_BLOCK"])
genesis = FullBlock.from_bytes(constants["GENESIS_BLOCK"])
# Save/get block
for block in blocks:
await db.add_block(block)
assert block == await db.get_block(block.header_hash)
# Save/get block
for block in blocks:
await db.add_block(block)
assert block == await db.get_block(block.header_hash)
# Save/get sync
for sync_mode in (False, True):
await db.set_sync_mode(sync_mode)
assert sync_mode == await db.get_sync_mode()
# Save/get sync
for sync_mode in (False, True):
await db.set_sync_mode(sync_mode)
assert sync_mode == await db.get_sync_mode()
# clear sync info
await db.clear_sync_info()
# clear sync info
await db.clear_sync_info()
# add/get potential tip, get potential tips num
await db.add_potential_tip(blocks[6])
assert blocks[6] == await db.get_potential_tip(blocks[6].header_hash)
# add/get potential tip, get potential tips num
await db.add_potential_tip(blocks[6])
assert blocks[6] == await db.get_potential_tip(blocks[6].header_hash)
# add/get potential trunk
header = genesis.header_block
db.add_potential_header(header)
assert db.get_potential_header(genesis.height) == header
# add/get potential trunk
header = genesis.header_block
db.add_potential_header(header)
assert db.get_potential_header(genesis.height) == header
# Add potential block
await db.add_potential_block(genesis)
assert genesis == await db.get_potential_block(uint32(0))
# Add potential block
await db.add_potential_block(genesis)
assert genesis == await db.get_potential_block(uint32(0))
# Add/get candidate block
assert await db.get_candidate_block(0) is None
partial = (
blocks[5].body,
blocks[5].header_block.header.data,
blocks[5].header_block.proof_of_space,
)
await db.add_candidate_block(blocks[5].header_hash, *partial)
assert await db.get_candidate_block(blocks[5].header_hash) == partial
await db.clear_candidate_blocks_below(uint32(8))
assert await db.get_candidate_block(blocks[5].header_hash) is None
# Add/get candidate block
assert await db.get_candidate_block(0) is None
partial = (
blocks[5].body,
blocks[5].header_block.header.data,
blocks[5].header_block.proof_of_space,
)
await db.add_candidate_block(blocks[5].header_hash, *partial)
assert await db.get_candidate_block(blocks[5].header_hash) == partial
await db.clear_candidate_blocks_below(uint32(8))
assert await db.get_candidate_block(blocks[5].header_hash) is None
# Add/get unfinished block
i = 1
for block in blocks:
key = (block.header_hash, uint64(1000))
assert await db.get_unfinished_block(key) is None
await db.add_unfinished_block(key, block)
assert await db.get_unfinished_block(key) == block
assert len(await db.get_unfinished_blocks()) == i
i += 1
await db.clear_unfinished_blocks_below(uint32(5))
assert len(await db.get_unfinished_blocks()) == 5
# Add/get unfinished block
i = 1
for block in blocks:
key = (block.header_hash, uint64(1000))
# Set/get unf block leader
assert db.get_unfinished_block_leader() == (0, (1 << 64) - 1)
db.set_unfinished_block_leader(key)
assert db.get_unfinished_block_leader() == key
# Different database should have different data
await db_2.add_unfinished_block(key, block)
assert await db.get_disconnected_block(blocks[0].prev_header_hash) is None
# Disconnected blocks
for block in blocks:
await db.add_disconnected_block(block)
await db.get_disconnected_block(block.prev_header_hash) == block
assert await db.get_unfinished_block(key) is None
await db.add_unfinished_block(key, block)
assert await db.get_unfinished_block(key) == block
assert len(await db.get_unfinished_blocks()) == i
i += 1
await db.clear_unfinished_blocks_below(uint32(5))
assert len(await db.get_unfinished_blocks()) == 5
await db.clear_disconnected_blocks_below(uint32(5))
assert await db.get_disconnected_block(blocks[4].prev_header_hash) is None
# Set/get unf block leader
assert db.get_unfinished_block_leader() == (0, (1 << 64) - 1)
db.set_unfinished_block_leader(key)
assert db.get_unfinished_block_leader() == key
h_hash_1 = bytes32(token_bytes(32))
assert not db.seen_unfinished_block(h_hash_1)
assert db.seen_unfinished_block(h_hash_1)
db.clear_seen_unfinished_blocks()
assert not db.seen_unfinished_block(h_hash_1)
assert await db.get_disconnected_block(blocks[0].prev_header_hash) is None
# Disconnected blocks
for block in blocks:
await db.add_disconnected_block(block)
await db.get_disconnected_block(block.prev_header_hash) == block
await db.clear_disconnected_blocks_below(uint32(5))
assert await db.get_disconnected_block(blocks[4].prev_header_hash) is None
h_hash_1 = bytes32(token_bytes(32))
assert not db.seen_unfinished_block(h_hash_1)
assert db.seen_unfinished_block(h_hash_1)
db.clear_seen_unfinished_blocks()
assert not db.seen_unfinished_block(h_hash_1)
assert not db.seen_block(h_hash_1)
assert db.seen_block(h_hash_1)
db.clear_seen_blocks()
assert not db.seen_block(h_hash_1)
except Exception:
await db.close()
await db_2.close()
raise
# Different database should have different data
db_3 = await FullNodeStore.create("fndb_test_3")
assert db_3.get_unfinished_block_leader() == (0, (1 << 64) - 1)
assert not db.seen_block(h_hash_1)
assert db.seen_block(h_hash_1)
db.clear_seen_blocks()
assert not db.seen_block(h_hash_1)
await db.close()
await db_2.close()
await db_3.close()