compressed plot support (#15702)

* Compression branch.

* disable GPU farming in tests

---------

Co-authored-by: Florin Chirica <fchirica96@gmail.com>
This commit is contained in:
Arvid Norberg 2023-07-12 23:49:15 +02:00 committed by GitHub
parent faf4709834
commit 7b67fe0a4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 140 additions and 8 deletions

View File

@ -4,6 +4,7 @@ import asyncio
import concurrent
import dataclasses
import logging
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
@ -80,6 +81,30 @@ class Harvester:
self.state_changed_callback: Optional[StateChangedProtocol] = None
self.parallel_read: bool = config.get("parallel_read", True)
context_count = config.get("parallel_decompressers_count", 5)
thread_count = config.get("decompresser_thread_count", 0)
if thread_count == 0:
thread_count = multiprocessing.cpu_count() // 2
disable_cpu_affinity = config.get("disable_cpu_affinity", False)
max_compression_level_allowed = config.get("max_compression_level_allowed", 7)
use_gpu_harvesting = config.get("use_gpu_harvesting", False)
gpu_index = config.get("gpu_index", 0)
enforce_gpu_index = config.get("enforce_gpu_index", False)
try:
self.plot_manager.configure_decompresser(
context_count,
thread_count,
disable_cpu_affinity,
max_compression_level_allowed,
use_gpu_harvesting,
gpu_index,
enforce_gpu_index,
)
except Exception as e:
self.log.error(f"{type(e)} {e} while configuring decompresser.")
raise
async def _start(self) -> None:
self._refresh_lock = asyncio.Lock()
self.event_loop = asyncio.get_running_loop()

View File

@ -137,6 +137,22 @@ class HarvesterAPI:
proof_xs = plot_info.prover.get_full_proof(
sp_challenge_hash, index, self.harvester.parallel_read
)
except RuntimeError as e:
if str(e) == "GRResult_NoProof received":
self.harvester.log.info(
f"Proof dropped due to line point compression for {filename}"
)
self.harvester.log.info(
f"File: {filename} Plot ID: {plot_id.hex()}, challenge: {sp_challenge_hash}, "
f"plot_info: {plot_info}"
)
else:
self.harvester.log.error(f"Exception fetching full proof for {filename}. {e}")
self.harvester.log.error(
f"File: {filename} Plot ID: {plot_id.hex()}, challenge: {sp_challenge_hash}, "
f"plot_info: {plot_info}"
)
continue
except Exception as e:
self.harvester.log.error(f"Exception fetching full proof for {filename}. {e}")
self.harvester.log.error(

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import concurrent.futures
import logging
import multiprocessing
from collections import Counter
from pathlib import Path
from time import sleep, time
@ -54,6 +55,24 @@ def check_plots(
)
context_count = config["harvester"].get("parallel_decompressers_count", 5)
thread_count = config["harvester"].get("decompresser_thread_count", 0)
if thread_count == 0:
thread_count = multiprocessing.cpu_count() // 2
disable_cpu_affinity = config["harvester"].get("disable_cpu_affinity", False)
max_compression_level_allowed = config["harvester"].get("max_compression_level_allowed", 7)
use_gpu_harvesting = config["harvester"].get("use_gpu_harvesting", False)
gpu_index = config["harvester"].get("gpu_index", 0)
enforce_gpu_index = config["harvester"].get("enforce_gpu_index", False)
plot_manager.configure_decompresser(
context_count,
thread_count,
disable_cpu_affinity,
max_compression_level_allowed,
use_gpu_harvesting,
gpu_index,
enforce_gpu_index,
)
if num is not None:
if num == 0:
@ -169,8 +188,13 @@ def check_plots(
log.info(f"\tFinding proof took: {proof_spent_time} ms. Filepath: {plot_path}")
ver_quality_str = v.validate_proof(pr.get_id(), pr.get_size(), challenge, proof)
assert quality_str == ver_quality_str
total_proofs += 1
if quality_str == ver_quality_str:
total_proofs += 1
else:
log.warning(
f"\tQuality doesn't match with proof. Filepath: {plot_path} "
"This can occasionally happen with a compressed plot."
)
except AssertionError as e:
log.error(
f"{type(e)}: {e} error in proving/verifying for plot {plot_path}. Filepath: {plot_path}"
@ -183,6 +207,13 @@ def check_plots(
except SystemExit:
log.warning("System is shutting down.")
return
except RuntimeError as e:
if str(e) == "GRResult_NoProof received":
log.info(f"Proof dropped due to line point compression. Filepath: {plot_path}")
continue
else:
log.error(f"{type(e)}: {e} error in getting challenge qualities for plot {plot_path}")
caught_exception = True
except Exception as e:
log.error(f"{type(e)}: {e} error in getting challenge qualities for plot {plot_path}")
caught_exception = True

View File

@ -9,7 +9,7 @@ from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
from blspy import G1Element
from chiapos import DiskProver
from chiapos import DiskProver, decompresser_context_queue
from chia.consensus.pos_quality import UI_ACTUAL_SPACE_CONSTANT_FACTOR, _expected_plot_size
from chia.plotting.cache import Cache, CacheEntry
@ -38,6 +38,7 @@ class PlotManager:
_refreshing_enabled: bool
_refresh_callback: Callable
_initial: bool
max_compression_level_allowed: int
def __init__(
self,
@ -66,6 +67,7 @@ class PlotManager:
self._refreshing_enabled = False
self._refresh_callback = refresh_callback
self._initial = True
self.max_compression_level_allowed = 0
def __enter__(self):
self._lock.acquire()
@ -73,6 +75,39 @@ class PlotManager:
def __exit__(self, exc_type, exc_value, exc_traceback):
self._lock.release()
def configure_decompresser(
self,
context_count: int,
thread_count: int,
disable_cpu_affinity: bool,
max_compression_level_allowed: int,
use_gpu_harvesting: bool,
gpu_index: int,
enforce_gpu_index: bool,
) -> None:
if max_compression_level_allowed > 7:
log.error(
"Currently only compression levels up to 7 are allowed, "
f"while {max_compression_level_allowed} was specified."
"Setting max_compression_level_allowed to 7."
)
max_compression_level_allowed = 7
is_using_gpu = decompresser_context_queue.init(
context_count,
thread_count,
disable_cpu_affinity,
max_compression_level_allowed,
use_gpu_harvesting,
gpu_index,
enforce_gpu_index,
)
if not is_using_gpu and use_gpu_harvesting:
log.error(
"GPU harvesting failed initialization. "
f"Falling back to CPU harvesting: {context_count} decompressers count, {thread_count} threads."
)
self.max_compression_level_allowed = max_compression_level_allowed
def reset(self) -> None:
with self:
self.last_refresh_time = time.time()
@ -280,10 +315,20 @@ class PlotManager:
# TODO: consider checking if the file was just written to (which would mean that the file is still
# being copied). A segfault might happen in this edge case.
if prover.get_size() >= 30 and stat_info.st_size < 0.98 * expected_size:
level = prover.get_compression_level()
if level == 0:
if prover.get_size() >= 30 and stat_info.st_size < 0.98 * expected_size:
log.warning(
f"Not farming plot {file_path}. "
f"Size is {stat_info.st_size / (1024 ** 3)} GiB, "
f"but expected at least: {expected_size / (1024 ** 3)} GiB. "
"We assume the file is being copied."
)
return None
elif level > self.max_compression_level_allowed:
log.warning(
f"Not farming plot {file_path}. Size is {stat_info.st_size / (1024 ** 3)} GiB, but expected"
f" at least: {expected_size / (1024 ** 3)} GiB. We assume the file is being copied."
f"Not farming plot {file_path}. Plot compression level: {level}, "
f"max compression level allowed: {self.max_compression_level_allowed}."
)
return None

View File

@ -242,6 +242,7 @@ class BlockTools:
config_overrides["daemon_port"] = find_available_listen_port("BlockTools daemon")
self._config = override_config(self._config, config_overrides)
with lock_config(self.root_path, "config.yaml"):
save_config(self.root_path, "config.yaml", self._config)
overrides = self._config["network_overrides"]["constants"][self._config["selected_network"]]

View File

@ -290,6 +290,8 @@ async def setup_harvester(
config["harvester"]["port"] = 0
config["harvester"]["rpc_port"] = 0
config["harvester"]["plot_directories"] = [str(b_tools.plot_dir.resolve())]
# CI doesn't like GPU compressed farming
config["harvester"]["parallel_decompressers_count"] = 0
save_config(root_path, "config.yaml", config)
service = create_harvester_service(
root_path,

View File

@ -211,6 +211,18 @@ harvester:
crt: "config/ssl/ca/chia_ca.crt"
key: "config/ssl/ca/chia_ca.key"
# Compressed harvesting.
parallel_decompressers_count: 0
# If set to 0, `decompresser_thread_count` will default to half of nproc available on the machine.
# A non-zero number overrides this default.
decompresser_thread_count: 0
disable_cpu_affinity: False
# Ignore compressed plots with compression level greater than this.
max_compression_level_allowed: 7
use_gpu_harvesting: False
gpu_index: 0
enforce_gpu_index: False
pool:
# Replace this with a real receive address
# xch_target_address: txch102gkhhzs60grx7cfnpng5n6rjecr89r86l5s8xux2za8k820cxsq64ssdg

View File

@ -12,7 +12,7 @@ dependencies = [
"boto3==1.26.161", # AWS S3 for DL s3 plugin
"chiavdf==1.0.9", # timelord and vdf verification
"chiabip158==1.2", # bip158-style wallet filters
"chiapos==1.0.11", # proof of space
"chiapos==2.0.0b3", # proof of space
"clvm==0.9.7",
"clvm_tools==0.4.6", # Currying, Program.to, other conveniences
"chia_rs==0.2.8",

View File

@ -1100,7 +1100,7 @@ async def test_bad_json(daemon_connection_and_temp_keychain: Tuple[aiohttp.Clien
response={
"success": True,
"plotters": {
"chiapos": {"display_name": "Chia Proof of Space", "installed": True, "version": "1.0.11"},
"chiapos": {"display_name": "Chia Proof of Space", "installed": True, "version": "2.0.0b3"},
"madmax": {"can_install": True, "display_name": "madMAx Plotter", "installed": False},
},
},