multi threaded chia plots check (#15693)

* improve harvester warning messages

* run plot check in multiple threads
This commit is contained in:
Arvid Norberg 2023-07-06 22:38:26 +02:00 committed by GitHub
parent d30922d6b6
commit 01e6e7776f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 15 deletions

View File

@ -211,13 +211,12 @@ class HarvesterAPI:
time_taken = time.time() - start
if time_taken > 5:
self.harvester.log.warning(
f"Looking up qualities on {filename} took: {time_taken}. This should be below 5 seconds "
f"to minimize risk of losing rewards."
f"Looking up qualities on {filename} took: {time_taken}. This should be below 5 seconds"
f" to minimize risk of losing rewards."
)
else:
pass
# If you want additional logs, uncomment the following line
# self.harvester.log.debug(f"Looking up qualities on {filename} took: {time_taken}")
# self.harvester.log.info(f"Looking up qualities on {filename} took: {time_taken}")
for response in sublist:
total_proofs_found += 1
msg = make_msg(ProtocolMessageTypes.new_proof_of_space, response)

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import concurrent.futures
import logging
from collections import Counter
from pathlib import Path
@ -11,6 +12,7 @@ from chiapos import Verifier
from chia.plotting.manager import PlotManager
from chia.plotting.util import (
PlotInfo,
PlotRefreshEvents,
PlotRefreshResult,
PlotsRefreshParameter,
@ -50,6 +52,9 @@ def check_plots(
refresh_parameter=plot_refresh_parameter,
refresh_callback=plot_refresh_callback,
)
context_count = config["harvester"].get("parallel_decompressers_count", 5)
if num is not None:
if num == 0:
log.warning("Not opening plot files")
@ -110,7 +115,12 @@ def check_plots(
bad_plots_list: List[Path] = []
with plot_manager:
for plot_path, plot_info in plot_manager.plots.items():
def process_plot(plot_path: Path, plot_info: PlotInfo, num_start: int, num_end: int) -> None:
nonlocal total_good_plots
nonlocal total_size
nonlocal bad_plots_list
pr = plot_info.prover
log.info(f"Testing plot {plot_path} k={pr.get_size()}")
if plot_info.pool_public_key is not None:
@ -140,10 +150,10 @@ def check_plots(
if quality_spent_time > 5000:
log.warning(
f"\tLooking up qualities took: {quality_spent_time} ms. This should be below 5 seconds "
f"to minimize risk of losing rewards."
f"to minimize risk of losing rewards. Filepath: {plot_path}"
)
else:
log.info(f"\tLooking up qualities took: {quality_spent_time} ms.")
log.info(f"\tLooking up qualities took: {quality_spent_time} ms. Filepath: {plot_path}")
# Other plot errors cause get_full_proof or validate_proof to throw an AssertionError
try:
@ -153,35 +163,53 @@ def check_plots(
if proof_spent_time > 15000:
log.warning(
f"\tFinding proof took: {proof_spent_time} ms. This should be below 15 seconds "
f"to minimize risk of losing rewards."
f"to minimize risk of losing rewards. Filepath: {plot_path}"
)
else:
log.info(f"\tFinding proof took: {proof_spent_time} ms")
total_proofs += 1
log.info(f"\tFinding proof took: {proof_spent_time} ms. Filepath: {plot_path}")
ver_quality_str = v.validate_proof(pr.get_id(), pr.get_size(), challenge, proof)
assert quality_str == ver_quality_str
except AssertionError as e:
log.error(f"{type(e)}: {e} error in proving/verifying for plot {plot_path}")
log.error(
f"{type(e)}: {e} error in proving/verifying for plot {plot_path}. Filepath: {plot_path}"
)
caught_exception = True
quality_start_time = int(round(time() * 1000))
except KeyboardInterrupt:
log.warning("Interrupted, closing")
return None
return
except SystemExit:
log.warning("System is shutting down.")
return None
return
except Exception as e:
log.error(f"{type(e)}: {e} error in getting challenge qualities for plot {plot_path}")
caught_exception = True
if caught_exception is True:
break
if total_proofs > 0 and caught_exception is False:
log.info(f"\tProofs {total_proofs} / {challenges}, {round(total_proofs/float(challenges), 4)}")
log.info(
f"\tProofs {total_proofs} / {challenges}, {round(total_proofs/float(challenges), 4)}. "
f"Filepath: {plot_path}"
)
total_good_plots[pr.get_size()] += 1
total_size += plot_path.stat().st_size
else:
log.error(f"\tProofs {total_proofs} / {challenges}, {round(total_proofs/float(challenges), 4)}")
log.error(
f"\tProofs {total_proofs} / {challenges}, {round(total_proofs/float(challenges), 4)} "
f"Filepath: {plot_path}"
)
bad_plots_list.append(plot_path)
with concurrent.futures.ThreadPoolExecutor(max_workers=context_count) as executor:
futures = []
for plot_path, plot_info in plot_manager.plots.items():
futures.append(executor.submit(process_plot, plot_path, plot_info, num_start, num_end))
for future in concurrent.futures.as_completed(futures):
_ = future.result()
log.info("")
log.info("")
log.info("Summary")