diff --git a/chia/plotting/plot_tools.py b/chia/plotting/plot_tools.py index 94910ca127df..935ae9bb72e6 100644 --- a/chia/plotting/plot_tools.py +++ b/chia/plotting/plot_tools.py @@ -2,6 +2,7 @@ import logging import time import traceback from dataclasses import dataclass +from functools import reduce from pathlib import Path from typing import Dict, List, Optional, Set, Tuple, Union from concurrent.futures.thread import ThreadPoolExecutor @@ -153,31 +154,31 @@ def load_plots( all_filenames: List[Path] = [] for paths in plot_filenames.values(): all_filenames += paths - new_provers: Dict[Path, PlotInfo] = {} plot_ids: Set[bytes32] = set() if match_str is not None: log.info(f'Only loading plots that contain "{match_str}" in the file or directory name') - def process_file(filename: Path) -> int: + def process_file(filename: Path) -> Tuple[int, Dict]: + new_provers: Dict[Path, PlotInfo] = {} nonlocal changed filename_str = str(filename) if match_str is not None and match_str not in filename_str: - return 0 + return 0, new_provers if filename.exists(): if filename in failed_to_open_filenames and (time.time() - failed_to_open_filenames[filename]) < 1200: # Try once every 20 minutes to open the file - return 0 + return 0, new_provers if filename in provers: try: stat_info = filename.stat() except Exception as e: log.error(f"Failed to open file {filename}. {e}") - return 0 + return 0, new_provers if stat_info.st_mtime == provers[filename].time_modified: new_provers[filename] = provers[filename] plot_ids.add(provers[filename].prover.get_id()) - return stat_info.st_size + return stat_info.st_size, new_provers try: prover = DiskProver(str(filename)) @@ -192,11 +193,11 @@ def load_plots( f"Not farming plot {filename}. Size is {stat_info.st_size / (1024**3)} GiB, but expected" f" at least: {expected_size / (1024 ** 3)} GiB. We assume the file is being copied." ) - return 0 + return 0, new_provers if prover.get_id() in plot_ids: log.warning(f"Have multiple copies of the plot {filename}, not adding it.") - return 0 + return 0, new_provers ( pool_public_key_or_puzzle_hash, @@ -209,7 +210,7 @@ def load_plots( log.warning(f"Plot {filename} has a farmer public key that is not in the farmer's pk list.") no_key_filenames.add(filename) if not open_no_key_filenames: - return 0 + return 0, new_provers if isinstance(pool_public_key_or_puzzle_hash, G1Element): pool_public_key = pool_public_key_or_puzzle_hash @@ -227,7 +228,7 @@ def load_plots( log.warning(f"Plot {filename} has a pool public key that is not in the farmer's pool pk list.") no_key_filenames.add(filename) if not open_no_key_filenames: - return 0 + return 0, new_provers stat_info = filename.stat() local_sk = master_sk_to_local_sk(local_master_sk) @@ -246,7 +247,7 @@ def load_plots( tb = traceback.format_exc() log.error(f"Failed to open file {filename}. {e} {tb}") failed_to_open_filenames[filename] = int(time.time()) - return 0 + return 0, new_provers log.info(f"Found plot {filename} of size {new_provers[filename].prover.get_size()}") if show_memo: @@ -258,11 +259,16 @@ def load_plots( plot_memo_str: str = plot_memo.hex() log.info(f"Memo: {plot_memo_str}") - return stat_info.st_size - return 0 + return stat_info.st_size, new_provers + return 0, new_provers + + def reduce_function(x: Tuple[int, Dict], y: Tuple[int, Dict]) -> Tuple[int, Dict]: + (total_size1, new_provers1) = x + (total_size2, new_provers2) = y + return total_size1 + total_size2, {**new_provers1, **new_provers2} with ThreadPoolExecutor() as executor: - total_size = sum(executor.map(process_file, all_filenames)) + total_size, new_provers = reduce(reduce_function, executor.map(process_file, all_filenames)) log.info( f"Loaded a total of {len(new_provers)} plots of size {total_size / (1024 ** 4)} TiB, in"