Ms.double handshake (#203)

* Fix create plot issue, do not recreate plots that exist on another drive
* Fix double harvester handshake with farmer
* Full node sends ping to wallet
* Async constructor for harvester, and instance threadpool
This commit is contained in:
Mariano Sorgente 2020-04-30 14:02:35 +09:00 committed by GitHub
parent 0af14b75c8
commit 1c8815ab1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 70 additions and 38 deletions

View File

@ -89,6 +89,20 @@ def main():
)
filename: str = f"plot-{i}-{args.size}-{plot_seed}.dat"
full_path: Path = args.final_dir / filename
plot_config = load_config(root_path, plot_config_filename)
plot_config_plots_new = deepcopy(plot_config.get("plots", []))
filenames = [Path(k).name for k in plot_config_plots_new.keys()]
relative_path = make_path_relative(full_path, root_path)
already_in_config = (
relative_path in plot_config_plots_new
or full_path in plot_config_plots_new
or full_path.name in filenames
)
if already_in_config:
print(f"Plot {filename} already exists (in config)")
continue
if not full_path.exists():
# Creates the plot. This will take a long time for larger plots.
plotter: DiskPlotter = DiskPlotter()
@ -104,17 +118,10 @@ def main():
print(f"Plot {filename} already exists")
# Updates the config if necessary.
plot_config = load_config(root_path, plot_config_filename)
plot_config_plots_new = deepcopy(plot_config.get("plots", []))
relative_path = make_path_relative(full_path, root_path)
if (
relative_path not in plot_config_plots_new
and full_path not in plot_config_plots_new
):
plot_config_plots_new[str(full_path)] = {
"sk": bytes(sk).hex(),
"pool_pk": bytes(pool_pk).hex(),
}
plot_config_plots_new[str(full_path)] = {
"sk": bytes(sk).hex(),
"pool_pk": bytes(pool_pk).hex(),
}
plot_config["plots"].update(plot_config_plots_new)
# Dumps the new config to disk.

View File

@ -17,10 +17,7 @@ def make_parser(parser):
"group", choices=all_groups(), type=str, nargs="+",
)
parser.add_argument(
"-r",
"--restart",
action="store_true",
help="Restart of running processes",
"-r", "--restart", action="store_true", help="Restart of running processes",
)
parser.add_argument(
"-f",
@ -39,7 +36,10 @@ def start(args, parser):
if args.restart or args.force:
print("restarting")
stop_service(args.root_path, service)
while pid_path_for_service(args.root_path, service).is_file() and not args.force:
while (
pid_path_for_service(args.root_path, service).is_file()
and not args.force
):
# try to avoid race condition
# this is pretty hacky
time.sleep(1)

View File

@ -57,7 +57,7 @@ class Farmer:
[sk.get_public_key() for sk in pool_sks]
)
yield OutboundMessage(
NodeType.HARVESTER, Message("harvester_handshake", msg), Delivery.BROADCAST
NodeType.HARVESTER, Message("harvester_handshake", msg), Delivery.RESPOND
)
def set_server(self, server):

View File

@ -2,6 +2,7 @@ import logging
import asyncio
from pathlib import Path
from typing import Dict, Optional, Tuple, List
import time
import concurrent
from blspy import PrependSignature, PrivateKey, PublicKey, Util
@ -46,12 +47,12 @@ def load_plots(
if filename.exists():
try:
provers[partial_filename_str] = DiskProver(str(filename))
except ValueError:
log.error(f"Failed to open file {filename}.")
except ValueError as e:
log.error(f"Failed to open file {filename}. {e}")
failed_to_open = True
break
log.info(
f"Farming plot {filename} of size {provers[partial_filename_str].get_size()}"
f"Loaded plot {filename} of size {provers[partial_filename_str].get_size()}"
)
found = True
break
@ -61,18 +62,30 @@ def load_plots(
class Harvester:
def __init__(self, config: Dict, plot_config: Dict):
self.config: Dict = config
self.plot_config: Dict = plot_config
config: Dict
plot_config: Dict
provers: Dict[Path, DiskProver]
challenge_hashes: Dict[bytes32, Tuple[bytes32, Path, uint8]]
_plot_notification_task: asyncio.Task
_is_shutdown: bool
executor: concurrent.futures.ThreadPoolExecutor
@staticmethod
async def create(config: Dict, plot_config: Dict):
self = Harvester()
self.config = config
self.plot_config = plot_config
# From filename to prover
self.provers: Dict[Path, DiskProver] = {}
self.provers = {}
# From quality string to (challenge_hash, filename, index)
self.challenge_hashes: Dict[bytes32, Tuple[bytes32, Path, uint8]] = {}
self.challenge_hashes = {}
self._plot_notification_task = asyncio.create_task(self._plot_notification())
self._is_shutdown: bool = False
self._is_shutdown = False
self.server = None
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
return self
async def _plot_notification(self):
"""
@ -125,6 +138,7 @@ class Harvester:
def _shutdown(self):
self._is_shutdown = True
self.executor.shutdown(wait=True)
async def _await_shutdown(self):
await self._plot_notification_task
@ -153,7 +167,7 @@ class Harvester:
for any proofs of space that are are found in the plots. If proofs are found, a
ChallengeResponse message is sent for each of the proofs found.
"""
start = time.time()
challenge_size = len(new_challenge.challenge_hash)
if challenge_size != 32:
raise ValueError(
@ -161,7 +175,6 @@ class Harvester:
)
loop = asyncio.get_running_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
def blocking_lookup(filename: Path, prover: DiskProver) -> Optional[List]:
# Uses the DiskProver object to lookup qualities. This is a blocking call,
@ -173,8 +186,8 @@ class Harvester:
except RuntimeError:
log.error("Error using prover object. Reinitializing prover object.")
try:
self.provers[filename] = DiskProver(str(filename))
quality_strings = prover.get_qualities_for_challenge(
self.prover = DiskProver(str(filename))
quality_strings = self.prover.get_qualities_for_challenge(
new_challenge.challenge_hash
)
except RuntimeError:
@ -190,7 +203,7 @@ class Harvester:
# Exectures a DiskProverLookup in a threadpool, and returns responses
all_responses: List[harvester_protocol.ChallengeResponse] = []
quality_strings = await loop.run_in_executor(
executor, blocking_lookup, filename, prover
self.executor, blocking_lookup, filename, prover
)
if quality_strings is not None:
for index, quality_str in enumerate(quality_strings):
@ -218,7 +231,9 @@ class Harvester:
Message("challenge_response", response),
Delivery.RESPOND,
)
executor.shutdown(wait=False)
log.info(
f"Time taken to lookup qualities in {len(self.provers)} plots: {time.time() - start}"
)
@api_request
async def request_proof_of_space(

View File

@ -255,6 +255,9 @@ class ChiaServer:
self.push_message(
OutboundMessage(NodeType.HARVESTER, msg, Delivery.BROADCAST)
)
self.push_message(
OutboundMessage(NodeType.WALLET, msg, Delivery.BROADCAST)
)
await asyncio.sleep(self._ping_interval)
return asyncio.create_task(ping())

View File

@ -29,7 +29,7 @@ async def async_main():
log = logging.getLogger(__name__)
setproctitle("chia_harvester")
harvester = Harvester(config, plot_config)
harvester = await Harvester.create(config, plot_config)
ping_interval = net_config.get("ping_interval")
network_id = net_config.get("network_id")
assert ping_interval is not None

View File

@ -1,5 +1,6 @@
try:
import setproctitle as pysetproctitle
no_setproctitle = False
except Exception:
no_setproctitle = True

View File

@ -320,7 +320,9 @@ class TestWalletSimulator:
offer_dict = {1: 10, 2: -30}
success, spend_bundle, error = await trade_manager_1.create_offer_for_ids(offer_dict)
success, spend_bundle, error = await trade_manager_1.create_offer_for_ids(
offer_dict
)
assert success is True
assert spend_bundle is not None
@ -504,7 +506,9 @@ class TestWalletSimulator:
offer_dict = {1: -1000, 2: -30, 3: 50}
success, spend_bundle, error = await trade_manager_1.create_offer_for_ids(offer_dict)
success, spend_bundle, error = await trade_manager_1.create_offer_for_ids(
offer_dict
)
assert success is True
assert spend_bundle is not None
@ -600,7 +604,9 @@ class TestWalletSimulator:
offer_dict = {1: -10, 2: 30}
success, spend_bundle, error = await trade_manager_2.create_offer_for_ids(offer_dict)
success, spend_bundle, error = await trade_manager_2.create_offer_for_ids(
offer_dict
)
assert success is True
assert spend_bundle is not None

View File

@ -191,7 +191,7 @@ async def setup_wallet_node(port, introducer_port=None, key_seed=b"", dic={}):
async def setup_harvester(port, dic={}):
config = load_config(root_path, "config.yaml", "harvester")
harvester = Harvester(config, bt.plot_config)
harvester = await Harvester.create(config, bt.plot_config)
net_config = load_config(root_path, "config.yaml")
ping_interval = net_config.get("ping_interval")

View File

@ -8,7 +8,7 @@ from src.consensus.constants import constants as consensus_constants
bt = BlockTools()
test_constants: Dict[str, Any] = consensus_constants.copy()
test_constants.update({"DIFFICULTY_STARTING": 500, "MIN_ITERS_STARTING": 2 ** 15})
test_constants.update({"DIFFICULTY_STARTING": 500, "MIN_ITERS_STARTING": 500})
test_constants["GENESIS_BLOCK"] = bytes(
bt.create_genesis_block(test_constants, bytes([0] * 32), b"0")