Timelord RPC + Misc Metrics Updates/Fixes (#10255)

* Add mempool_max_total_cost to RPC

* Add signage_point event

* Fix incorrect crawler RPC port lookup

* Set up initial timelord RPC server + finished_pot_challenge event

* Add new compact proof event

* Add skipping/new_peak to track when fastest or not

* Check for None on change_data

* Add skipping_peak + new_peak to changes for metrics

* Convert chain to value

* Rename iters

* Timelord RPC to 8557 - 8556 is used in simulation tests

* Make tests work with RPC server on timelord

* Change event name to finished_pot

* Use broadcast_farmer object

* Move state changed for `finished_pot` after proofs_finished.append

* Fix type on ips var + add vdf_info and vdf_proof

* fix event name on the state_changed function
This commit is contained in:
Chris Marslender 2022-02-18 09:43:52 -06:00 committed by GitHub
parent ffe4e7ca37
commit 5e4c1a1f62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 76 additions and 12 deletions

View File

@ -1210,6 +1210,8 @@ class FullNode:
msg = make_msg(ProtocolMessageTypes.new_signage_point, broadcast_farmer)
await self.server.send_to_all([msg], NodeType.FARMER)
self._state_changed("signage_point", {"broadcast_farmer": broadcast_farmer})
async def peak_post_processing(
self,
block: FullBlock,

View File

@ -86,13 +86,11 @@ class FullNodeRpcApi:
"metrics",
)
)
return payloads
if change == "block":
payloads.append(create_payload_dict("block", change_data, self.service_name, "metrics"))
return payloads
if change in ("block", "signage_point"):
payloads.append(create_payload_dict(change, change_data, self.service_name, "metrics"))
return []
return payloads
# this function is just here for backwards-compatibility. It will probably
# be removed in the future
@ -123,6 +121,7 @@ class FullNodeRpcApi:
"mempool_min_fees": {
"cost_5000000": 0,
},
"mempool_max_total_cost": 0,
"block_max_cost": 0,
"node_id": node_id,
},
@ -171,10 +170,12 @@ class FullNodeRpcApi:
mempool_size = len(self.service.mempool_manager.mempool.spends)
mempool_cost = self.service.mempool_manager.mempool.total_mempool_cost
mempool_min_fee_5m = self.service.mempool_manager.mempool.get_min_fee_rate(5000000)
mempool_max_total_cost = self.service.mempool_manager.mempool_max_total_cost
else:
mempool_size = 0
mempool_cost = 0
mempool_min_fee_5m = 0
mempool_max_total_cost = 0
if self.service.server is not None:
is_connected = len(self.service.server.get_full_node_connections()) > 0
else:
@ -202,6 +203,7 @@ class FullNodeRpcApi:
# This Dict sets us up for that in the future
"cost_5000000": mempool_min_fee_5m,
},
"mempool_max_total_cost": mempool_max_total_cost,
"block_max_cost": self.service.constants.MAX_BLOCK_COST_CLVM,
"node_id": node_id,
},

View File

@ -0,0 +1,24 @@
from typing import Any, Callable, Dict, List, Optional
from chia.timelord.timelord import Timelord
from chia.util.ws_message import WsRpcMessage, create_payload_dict
class TimelordRpcApi:
def __init__(self, timelord: Timelord):
self.service = timelord
self.service_name = "chia_timelord"
def get_routes(self) -> Dict[str, Callable]:
return {}
async def _state_changed(self, change: str, change_data: Optional[Dict[str, Any]] = None) -> List[WsRpcMessage]:
payloads = []
if change_data is None:
change_data = {}
if change in ("finished_pot", "new_compact_proof", "skipping_peak", "new_peak"):
payloads.append(create_payload_dict(change, change_data, self.service_name, "metrics"))
return payloads

View File

@ -45,7 +45,7 @@ def service_kwargs_for_full_node_crawler(
)
if config.get("crawler", {}).get("start_rpc_server", True):
kwargs["rpc_info"] = (CrawlerRpcApi, config.get("crawler", {}).get("crawler_rpc_port", 8561))
kwargs["rpc_info"] = (CrawlerRpcApi, config.get("crawler", {}).get("rpc_port", 8561))
return kwargs

View File

@ -4,6 +4,7 @@ from typing import Dict
from chia.consensus.constants import ConsensusConstants
from chia.consensus.default_constants import DEFAULT_CONSTANTS
from chia.rpc.timelord_rpc_api import TimelordRpcApi
from chia.server.outbound_message import NodeType
from chia.server.start_service import run_service
from chia.timelord.timelord import Timelord
@ -46,6 +47,10 @@ def service_kwargs_for_timelord(
auth_connect_peers=False,
network_id=network_id,
)
if config.get("start_rpc_server", True):
kwargs["rpc_info"] = (TimelordRpcApi, config.get("rpc_port", 8557))
return kwargs

View File

@ -7,7 +7,7 @@ import random
import time
import traceback
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, Dict, List, Optional, Set, Tuple
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
from chiavdf import create_discriminant, prove
@ -152,6 +152,13 @@ class Timelord:
async def _await_closed(self):
pass
def _set_state_changed_callback(self, callback: Callable):
self.state_changed_callback = callback
def state_changed(self, change: str, change_data: Optional[Dict[str, Any]] = None):
if self.state_changed_callback is not None:
self.state_changed_callback(change, change_data)
def set_server(self, server: ChiaServer):
self.server = server
@ -986,6 +993,8 @@ class Timelord:
# Verifies our own proof just in case
form_size = ClassgroupElement.get_size(self.constants)
output = ClassgroupElement.from_bytes(y_bytes[:form_size])
# default value so that it's always set for state_changed later
ips: float = 0
if not self.bluebox_mode:
time_taken = time.time() - self.chain_start_time[chain]
ips = int(iterations_needed / time_taken * 10) / 10
@ -1012,6 +1021,16 @@ class Timelord:
async with self.lock:
assert proof_label is not None
self.proofs_finished.append((chain, vdf_info, vdf_proof, proof_label))
self.state_changed(
"finished_pot",
{
"estimated_ips": ips,
"iterations_needed": iterations_needed,
"chain": chain.value,
"vdf_info": vdf_info,
"vdf_proof": vdf_proof,
},
)
else:
async with self.lock:
writer.write(b"010")
@ -1025,6 +1044,10 @@ class Timelord:
if self.server is not None:
message = make_msg(ProtocolMessageTypes.respond_compact_proof_of_time, response)
await self.server.send_to_all([message], NodeType.FULL_NODE)
self.state_changed(
"new_compact_proof", {"header_hash": header_hash, "height": height, "field_vdf": field_vdf}
)
except ConnectionResetError as e:
log.debug(f"Connection reset with VDF client {e}")

View File

@ -17,7 +17,7 @@ class TimelordAPI:
self.timelord = timelord
def _set_state_changed_callback(self, callback: Callable):
pass
self.timelord.state_changed_callback = callback
@api_request
async def new_peak_timelord(self, new_peak: timelord_protocol.NewPeakTimelord):
@ -33,15 +33,18 @@ class TimelordAPI:
f"{new_peak.reward_chain_block.weight} "
)
self.timelord.new_peak = new_peak
self.timelord.state_changed("new_peak", {"height": new_peak.reward_chain_block.height})
elif (
self.timelord.last_state.peak is not None
and self.timelord.last_state.peak.reward_chain_block == new_peak.reward_chain_block
):
log.info("Skipping peak, already have.")
self.timelord.state_changed("skipping_peak", {"height": new_peak.reward_chain_block.height})
return None
else:
log.warning("block that we don't have, changing to it.")
self.timelord.new_peak = new_peak
self.timelord.state_changed("new_peak", {"height": new_peak.reward_chain_block.height})
self.timelord.new_subslot_end = None
@api_request

View File

@ -305,6 +305,9 @@ timelord:
# If `slow_bluebox` is True, launches `slow_bluebox_process_count` processes.
slow_bluebox_process_count: 1
start_rpc_server: True
rpc_port: 8557
ssl:
private_crt: "config/ssl/timelord/private_timelord.crt"
private_key: "config/ssl/timelord/private_timelord.key"

View File

@ -69,7 +69,7 @@ class TestSSL:
@pytest_asyncio.fixture(scope="function")
async def timelord(self):
async for _ in setup_timelord(21236, 21237, False, test_constants, bt):
async for _ in setup_timelord(21236, 21237, 0, False, test_constants, bt):
yield _
@pytest.mark.asyncio

View File

@ -311,7 +311,7 @@ async def setup_vdf_clients(port):
await kill_processes()
async def setup_timelord(port, full_node_port, sanitizer, consensus_constants: ConsensusConstants, b_tools):
async def setup_timelord(port, full_node_port, rpc_port, sanitizer, consensus_constants: ConsensusConstants, b_tools):
config = b_tools.config["timelord"]
config["port"] = port
config["full_node_peer"]["port"] = full_node_port
@ -319,6 +319,8 @@ async def setup_timelord(port, full_node_port, sanitizer, consensus_constants: C
config["fast_algorithm"] = False
if sanitizer:
config["vdf_server"]["port"] = 7999
config["start_rpc_server"] = True
config["rpc_port"] = rpc_port
kwargs = service_kwargs_for_timelord(b_tools.root_path, config, consensus_constants)
kwargs.update(
@ -509,7 +511,7 @@ async def setup_full_system(
setup_harvester(21234, 21235, consensus_constants, b_tools),
setup_farmer(21235, consensus_constants, b_tools, uint16(21237)),
setup_vdf_clients(8000),
setup_timelord(21236, 21237, False, consensus_constants, b_tools),
setup_timelord(21236, 21237, 21241, False, consensus_constants, b_tools),
setup_full_node(
consensus_constants,
"blockchain_test.db",
@ -535,7 +537,7 @@ async def setup_full_system(
db_version=db_version,
),
setup_vdf_client(7999),
setup_timelord(21239, 1000, True, consensus_constants, b_tools_1),
setup_timelord(21239, 1000, 21242, True, consensus_constants, b_tools_1),
]
introducer, introducer_server = await node_iters[0].__anext__()