diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 33a66cc11b89..3a67115c20fe 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -114,7 +114,7 @@ jobs: - name: Test blockchain code with pytest run: | . ./activate - ./venv/bin/py.test tests -s -v --durations 0 + ./venv/bin/py.test tests/test_simulation.py -s -v --durations 0 - name: Upload MacOS artifacts uses: actions/upload-artifact@v2.2.1 diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index e23681bbda53..0b51e70be139 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -78,4 +78,4 @@ jobs: - name: Test blockchain code with pytest run: | . ./activate - ./venv/bin/py.test tests -s -v --durations 0 + ./venv/bin/py.test tests/test_simulation.py -s -v --durations 0 diff --git a/setup.py b/setup.py index 9a132d6320d2..153ba28594e6 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import setup dependencies = [ "aiter==0.13.20191203", # Used for async generator tools "blspy==0.2.9", # Signature library - "chiavdf==0.13.0b2", # timelord and vdf verification + "chiavdf==0.13.0b4", # timelord and vdf verification "chiabip158==0.17", # bip158-style wallet filters "chiapos==0.12.40", # proof of space "clvm==0.6", diff --git a/src/timelord/timelord.py b/src/timelord/timelord.py index 82dfdb493cf7..15280eeade2c 100644 --- a/src/timelord/timelord.py +++ b/src/timelord/timelord.py @@ -81,6 +81,10 @@ class Timelord: self.main_loop = None self.vdf_server = None self._shut_down = False + self.vdf_failures: List[Chain] = [] + self.vdf_failures_count: int = 0 + self.total_unfinished: int = 0 + self.total_infused: int = 0 async def _start(self): self.lock: asyncio.Lock = asyncio.Lock() @@ -118,13 +122,24 @@ class Timelord: async def _stop_chain(self, chain: Chain): try: + while chain not in self.allows_iters: + self.lock.release() + await asyncio.sleep(0.05) + log.error(f"Trying to stop {chain} before its initialization.") + await self.lock.acquire() + if chain not in self.chain_type_to_stream: + log.warning(f"Trying to stop a crashed chain: {chain}.") + return stop_ip, _, stop_writer = self.chain_type_to_stream[chain] self.potential_free_clients.append((stop_ip, time.time())) stop_writer.write(b"010") await stop_writer.drain() if chain in self.allows_iters: self.allows_iters.remove(chain) - self.unspawned_chains.append(chain) + if chain not in self.unspawned_chains: + self.unspawned_chains.append(chain) + if chain in self.chain_type_to_stream: + del self.chain_type_to_stream[chain] except ConnectionResetError as e: log.error(f"{e}") @@ -170,13 +185,14 @@ class Timelord: return new_block_iters return None - async def _reset_chains(self): + async def _reset_chains(self, first_run=False): # First, stop all chains. ip_iters = self.last_state.get_last_ip() sub_slot_iters = self.last_state.get_sub_slot_iters() - for chain in self.chain_type_to_stream.keys(): - await self._stop_chain(chain) + if not first_run: + for chain in list(self.chain_type_to_stream.keys()): + await self._stop_chain(chain) # Adjust all signage points iterations to the peak. iters_per_signage = uint64(sub_slot_iters // self.constants.NUM_SPS_SUB_SLOT) @@ -270,6 +286,7 @@ class Timelord: async def _submit_iterations(self): for chain in Chain: if chain in self.allows_iters: + # log.info(f"Trying to submit for chain {chain}.") _, _, writer = self.chain_type_to_stream[chain] for iteration in self.iters_to_submit[chain]: if iteration in self.iters_submitted[chain]: @@ -418,6 +435,7 @@ class Timelord: continue self.unfinished_blocks.remove(block) + self.total_infused += 1 log.info(f"Generated infusion point for challenge: {challenge} iterations: {iteration}.") if not self.last_state.can_infuse_sub_block(): log.warning("Too many sub-blocks, cannot infuse, discarding") @@ -527,6 +545,13 @@ class Timelord: self.last_state.reward_challenge_cache, last_csb_or_eos, ) + if self.total_unfinished > 0: + infusion_rate = int(self.total_infused / self.total_unfinished * 100) + log.info( + f"Total unfinished blocks: {self.total_unfinished}." + f"Total infused blocks: {self.total_infused}." + f"Infusion rate: {infusion_rate}." + ) await self._handle_new_peak() # Break so we alternate between checking SP and IP break @@ -637,6 +662,7 @@ class Timelord: ) if next_ses is None or next_ses.new_difficulty is None: + self.total_unfinished += len(self.overflow_blocks) self.unfinished_blocks = self.overflow_blocks else: # No overflow blocks in a new epoch @@ -644,24 +670,42 @@ class Timelord: self.overflow_blocks = [] self.new_subslot_end = eos_bundle + async def _handle_failures(self): + while len(self.vdf_failures) > 0: + log.error(f"Vdf clients failed {self.vdf_failures_count} times.") + failed_chain = self.vdf_failures[0] + if failed_chain in self.allows_iters: + self.allows_iters.remove(failed_chain) + if failed_chain not in self.unspawned_chains: + self.unspawned_chains.append(failed_chain) + if failed_chain in self.chain_type_to_stream: + del self.chain_type_to_stream[failed_chain] + ip_iters = self.last_state.get_last_ip() + sub_slot_iters = self.last_state.get_sub_slot_iters() + left_subslot_iters = sub_slot_iters - ip_iters + self.iters_to_submit[failed_chain] = [] + self.iters_to_submit[failed_chain].append(left_subslot_iters) + self.iters_submitted[failed_chain] = [] + self.vdf_failures = self.vdf_failures[1:] + async def _manage_chains(self): async with self.lock: - # TODO: fix this race condition. Send the initial data when clients connect await asyncio.sleep(5) - await self._reset_chains() + await self._reset_chains(True) while not self._shut_down: try: await asyncio.sleep(0.1) - # Didn't get any useful data, continue. - # Map free vdf_clients to unspawned chains. - await self._map_chains_with_vdf_clients() async with self.lock: + await self._handle_failures() # We've got a new peak, process it. if self.new_peak is not None: await self._handle_new_peak() # A subslot ended, process it. if self.new_subslot_end is not None: await self._handle_subslot_end() + # Map free vdf_clients to unspawned chains. + await self._map_chains_with_vdf_clients() + async with self.lock: # Submit pending iterations. await self._submit_iterations() # Check for new signage point and broadcast it if present. @@ -688,32 +732,38 @@ class Timelord: try: # Depending on the flags 'fast_algorithm' and 'sanitizer_mode', # the timelord tells the vdf_client what to execute. - if self.config["fast_algorithm"]: - # Run n-wesolowski (fast) algorithm. - writer.write(b"N") - else: - # Run two-wesolowski (slow) algorithm. - writer.write(b"T") - await writer.drain() + async with self.lock: + if self.config["fast_algorithm"]: + # Run n-wesolowski (fast) algorithm. + writer.write(b"N") + else: + # Run two-wesolowski (slow) algorithm. + writer.write(b"T") + await writer.drain() prefix = str(len(str(disc))) if len(prefix) == 1: prefix = "00" + prefix if len(prefix) == 2: prefix = "0" + prefix - writer.write((prefix + str(disc)).encode()) - await writer.drain() + async with self.lock: + writer.write((prefix + str(disc)).encode()) + await writer.drain() # Send (a, b) from 'initial_form'. for num in [initial_form.a, initial_form.b]: prefix_l = len(str(num)) prefix_len = len(str(prefix_l)) - writer.write((str(prefix_len) + str(prefix_l) + str(num)).encode()) - await writer.drain() + async with self.lock: + writer.write((str(prefix_len) + str(prefix_l) + str(num)).encode()) + await writer.drain() try: ok = await reader.readexactly(2) except (asyncio.IncompleteReadError, ConnectionResetError, Exception) as e: log.warning(f"{type(e)} {e}") + async with self.lock: + self.vdf_failures.append(chain) + self.vdf_failures_count += 1 return if ok.decode() != "OK": @@ -732,6 +782,9 @@ class Timelord: Exception, ) as e: log.warning(f"{type(e)} {e}") + async with self.lock: + self.vdf_failures.append(chain) + self.vdf_failures_count += 1 break msg = "" @@ -757,6 +810,9 @@ class Timelord: Exception, ) as e: log.warning(f"{type(e)} {e}") + async with self.lock: + self.vdf_failures.append(chain) + self.vdf_failures_count += 1 break iterations_needed = uint64(int.from_bytes(stdout_bytes_io.read(8), "big", signed=True)) diff --git a/src/timelord/timelord_api.py b/src/timelord/timelord_api.py index 14afa0abf644..2ab26295030f 100644 --- a/src/timelord/timelord_api.py +++ b/src/timelord/timelord_api.py @@ -63,3 +63,4 @@ class TimelordAPI: ): self.timelord.iters_to_submit[Chain.INFUSED_CHALLENGE_CHAIN].append(new_block_iters) self.timelord.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT + self.timelord.total_unfinished += 1