Initial commit fix test simulation.

This commit is contained in:
fchirica 2020-12-15 18:31:41 +02:00 committed by Yostra
parent 12740f9a55
commit 6015b8fff0
5 changed files with 80 additions and 23 deletions

View File

@ -114,7 +114,7 @@ jobs:
- name: Test blockchain code with pytest
run: |
. ./activate
./venv/bin/py.test tests -s -v --durations 0
./venv/bin/py.test tests/test_simulation.py -s -v --durations 0
- name: Upload MacOS artifacts
uses: actions/upload-artifact@v2.2.1

View File

@ -78,4 +78,4 @@ jobs:
- name: Test blockchain code with pytest
run: |
. ./activate
./venv/bin/py.test tests -s -v --durations 0
./venv/bin/py.test tests/test_simulation.py -s -v --durations 0

View File

@ -4,7 +4,7 @@ from setuptools import setup
dependencies = [
"aiter==0.13.20191203", # Used for async generator tools
"blspy==0.2.9", # Signature library
"chiavdf==0.13.0b2", # timelord and vdf verification
"chiavdf==0.13.0b4", # timelord and vdf verification
"chiabip158==0.17", # bip158-style wallet filters
"chiapos==0.12.40", # proof of space
"clvm==0.6",

View File

@ -81,6 +81,10 @@ class Timelord:
self.main_loop = None
self.vdf_server = None
self._shut_down = False
self.vdf_failures: List[Chain] = []
self.vdf_failures_count: int = 0
self.total_unfinished: int = 0
self.total_infused: int = 0
async def _start(self):
self.lock: asyncio.Lock = asyncio.Lock()
@ -118,13 +122,24 @@ class Timelord:
async def _stop_chain(self, chain: Chain):
try:
while chain not in self.allows_iters:
self.lock.release()
await asyncio.sleep(0.05)
log.error(f"Trying to stop {chain} before its initialization.")
await self.lock.acquire()
if chain not in self.chain_type_to_stream:
log.warning(f"Trying to stop a crashed chain: {chain}.")
return
stop_ip, _, stop_writer = self.chain_type_to_stream[chain]
self.potential_free_clients.append((stop_ip, time.time()))
stop_writer.write(b"010")
await stop_writer.drain()
if chain in self.allows_iters:
self.allows_iters.remove(chain)
self.unspawned_chains.append(chain)
if chain not in self.unspawned_chains:
self.unspawned_chains.append(chain)
if chain in self.chain_type_to_stream:
del self.chain_type_to_stream[chain]
except ConnectionResetError as e:
log.error(f"{e}")
@ -170,13 +185,14 @@ class Timelord:
return new_block_iters
return None
async def _reset_chains(self):
async def _reset_chains(self, first_run=False):
# First, stop all chains.
ip_iters = self.last_state.get_last_ip()
sub_slot_iters = self.last_state.get_sub_slot_iters()
for chain in self.chain_type_to_stream.keys():
await self._stop_chain(chain)
if not first_run:
for chain in list(self.chain_type_to_stream.keys()):
await self._stop_chain(chain)
# Adjust all signage points iterations to the peak.
iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT)
@ -270,6 +286,7 @@ class Timelord:
async def _submit_iterations(self):
for chain in Chain:
if chain in self.allows_iters:
# log.info(f"Trying to submit for chain {chain}.")
_, _, writer = self.chain_type_to_stream[chain]
for iteration in self.iters_to_submit[chain]:
if iteration in self.iters_submitted[chain]:
@ -418,6 +435,7 @@ class Timelord:
continue
self.unfinished_blocks.remove(block)
self.total_infused += 1
log.info(f"Generated infusion point for challenge: {challenge} iterations: {iteration}.")
if not self.last_state.can_infuse_sub_block():
log.warning("Too many sub-blocks, cannot infuse, discarding")
@ -527,6 +545,13 @@ class Timelord:
self.last_state.reward_challenge_cache,
last_csb_or_eos,
)
if self.total_unfinished > 0:
infusion_rate = int(self.total_infused / self.total_unfinished * 100)
log.info(
f"Total unfinished blocks: {self.total_unfinished}."
f"Total infused blocks: {self.total_infused}."
f"Infusion rate: {infusion_rate}."
)
await self._handle_new_peak()
# Break so we alternate between checking SP and IP
break
@ -637,6 +662,7 @@ class Timelord:
)
if next_ses is None or next_ses.new_difficulty is None:
self.total_unfinished += len(self.overflow_blocks)
self.unfinished_blocks = self.overflow_blocks
else:
# No overflow blocks in a new epoch
@ -644,24 +670,42 @@ class Timelord:
self.overflow_blocks = []
self.new_subslot_end = eos_bundle
async def _handle_failures(self):
while len(self.vdf_failures) > 0:
log.error(f"Vdf clients failed {self.vdf_failures_count} times.")
failed_chain = self.vdf_failures[0]
if failed_chain in self.allows_iters:
self.allows_iters.remove(failed_chain)
if failed_chain not in self.unspawned_chains:
self.unspawned_chains.append(failed_chain)
if failed_chain in self.chain_type_to_stream:
del self.chain_type_to_stream[failed_chain]
ip_iters = self.last_state.get_last_ip()
sub_slot_iters = self.last_state.get_sub_slot_iters()
left_subslot_iters = sub_slot_iters - ip_iters
self.iters_to_submit[failed_chain] = []
self.iters_to_submit[failed_chain].append(left_subslot_iters)
self.iters_submitted[failed_chain] = []
self.vdf_failures = self.vdf_failures[1:]
async def _manage_chains(self):
async with self.lock:
# TODO: fix this race condition. Send the initial data when clients connect
await asyncio.sleep(5)
await self._reset_chains()
await self._reset_chains(True)
while not self._shut_down:
try:
await asyncio.sleep(0.1)
# Didn't get any useful data, continue.
# Map free vdf_clients to unspawned chains.
await self._map_chains_with_vdf_clients()
async with self.lock:
await self._handle_failures()
# We've got a new peak, process it.
if self.new_peak is not None:
await self._handle_new_peak()
# A subslot ended, process it.
if self.new_subslot_end is not None:
await self._handle_subslot_end()
# Map free vdf_clients to unspawned chains.
await self._map_chains_with_vdf_clients()
async with self.lock:
# Submit pending iterations.
await self._submit_iterations()
# Check for new signage point and broadcast it if present.
@ -688,32 +732,38 @@ class Timelord:
try:
# Depending on the flags 'fast_algorithm' and 'sanitizer_mode',
# the timelord tells the vdf_client what to execute.
if self.config["fast_algorithm"]:
# Run n-wesolowski (fast) algorithm.
writer.write(b"N")
else:
# Run two-wesolowski (slow) algorithm.
writer.write(b"T")
await writer.drain()
async with self.lock:
if self.config["fast_algorithm"]:
# Run n-wesolowski (fast) algorithm.
writer.write(b"N")
else:
# Run two-wesolowski (slow) algorithm.
writer.write(b"T")
await writer.drain()
prefix = str(len(str(disc)))
if len(prefix) == 1:
prefix = "00" + prefix
if len(prefix) == 2:
prefix = "0" + prefix
writer.write((prefix + str(disc)).encode())
await writer.drain()
async with self.lock:
writer.write((prefix + str(disc)).encode())
await writer.drain()
# Send (a, b) from 'initial_form'.
for num in [initial_form.a, initial_form.b]:
prefix_l = len(str(num))
prefix_len = len(str(prefix_l))
writer.write((str(prefix_len) + str(prefix_l) + str(num)).encode())
await writer.drain()
async with self.lock:
writer.write((str(prefix_len) + str(prefix_l) + str(num)).encode())
await writer.drain()
try:
ok = await reader.readexactly(2)
except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e:
log.warning(f"{type(e)} {e}")
async with self.lock:
self.vdf_failures.append(chain)
self.vdf_failures_count += 1
return
if ok.decode() != "OK":
@ -732,6 +782,9 @@ class Timelord:
Exception,
) as e:
log.warning(f"{type(e)} {e}")
async with self.lock:
self.vdf_failures.append(chain)
self.vdf_failures_count += 1
break
msg = ""
@ -757,6 +810,9 @@ class Timelord:
Exception,
) as e:
log.warning(f"{type(e)} {e}")
async with self.lock:
self.vdf_failures.append(chain)
self.vdf_failures_count += 1
break
iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True))

View File

@ -63,3 +63,4 @@ class TimelordAPI:
):
self.timelord.iters_to_submit[Chain.INFUSED_CHALLENGE_CHAIN].append(new_block_iters)
self.timelord.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT
self.timelord.total_unfinished += 1