mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-23 00:21:38 +03:00
Merge branch 'develop' into hf/bump-wasm-deps
Conflicts: Cargo.lock kinode/packages/app_store/app_store/Cargo.toml kinode/packages/kns_indexer/kns_indexer/Cargo.toml
This commit is contained in:
commit
b6d56e1a04
1717
Cargo.lock
generated
1717
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -26,6 +26,11 @@ simulation-mode = []
|
||||
|
||||
[dependencies]
|
||||
aes-gcm = "0.10.2"
|
||||
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
|
||||
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
|
||||
alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56", features = ["ws"]}
|
||||
alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
|
||||
alloy-providers = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
|
||||
anyhow = "1.0.71"
|
||||
async-trait = "0.1.71"
|
||||
base64 = "0.13"
|
||||
@ -41,8 +46,6 @@ curve25519-dalek = "^4.1.2"
|
||||
dashmap = "5.5.3"
|
||||
digest = "0.10"
|
||||
elliptic-curve = { version = "0.13.8", features = ["ecdh"] }
|
||||
ethers = "2.0.13"
|
||||
ethers-providers = "2.0.13"
|
||||
flate2 = "1.0"
|
||||
futures = "0.3"
|
||||
generic-array = "0.14"
|
||||
|
29
kinode/default_providers.json
Normal file
29
kinode/default_providers.json
Normal file
@ -0,0 +1,29 @@
|
||||
[
|
||||
{
|
||||
"name": "default-router-1.os",
|
||||
"owner": "",
|
||||
"node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d",
|
||||
"public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9002,
|
||||
"routers": []
|
||||
},
|
||||
{
|
||||
"name": "default-router-2.os",
|
||||
"owner": "",
|
||||
"node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458",
|
||||
"public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9003,
|
||||
"routers": []
|
||||
},
|
||||
{
|
||||
"name": "default-router-s3.os",
|
||||
"owner": "",
|
||||
"node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a",
|
||||
"public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9004,
|
||||
"routers": []
|
||||
}
|
||||
]
|
@ -5,12 +5,11 @@ edition = "2021"
|
||||
|
||||
|
||||
[dependencies]
|
||||
alloy-primitives = "0.5.1"
|
||||
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy.git", rev = "3b1c310" }
|
||||
alloy-sol-types = "0.5.1"
|
||||
alloy-primitives = "0.6.2"
|
||||
alloy-sol-types = "0.6.2"
|
||||
anyhow = "1.0"
|
||||
bincode = "1.3.3"
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9d185e1", features = ["eth"] }
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" }
|
||||
rand = "0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
@ -1,4 +1,7 @@
|
||||
use kinode_process_lib::eth::{EthAction, EthAddress, EthSubEvent, SubscribeLogsRequest};
|
||||
use kinode_process_lib::eth::{
|
||||
get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, EthSubResult, Filter,
|
||||
SubscriptionResult,
|
||||
};
|
||||
use kinode_process_lib::http::{bind_http_path, serve_ui, HttpServerRequest};
|
||||
use kinode_process_lib::kernel_types as kt;
|
||||
use kinode_process_lib::*;
|
||||
@ -56,7 +59,7 @@ pub enum Req {
|
||||
RemoteRequest(RemoteRequest),
|
||||
FTWorkerCommand(FTWorkerCommand),
|
||||
FTWorkerResult(FTWorkerResult),
|
||||
Eth(EthSubEvent),
|
||||
Eth(EthSubResult),
|
||||
Http(HttpServerRequest),
|
||||
}
|
||||
|
||||
@ -108,13 +111,21 @@ fn init(our: Address) {
|
||||
|
||||
let mut requested_packages: HashMap<PackageId, RequestedPackage> = HashMap::new();
|
||||
|
||||
// subscribe to events on the app store contract
|
||||
SubscribeLogsRequest::new(1) // subscription id 1
|
||||
// get past logs, subscribe to new ones.
|
||||
let filter = Filter::new()
|
||||
.address(EthAddress::from_str(&state.contract_address).unwrap())
|
||||
.from_block(state.last_saved_block - 1)
|
||||
.events(EVENTS)
|
||||
.send()
|
||||
.unwrap();
|
||||
.events(EVENTS);
|
||||
|
||||
let logs = get_logs(&filter);
|
||||
|
||||
if let Ok(logs) = logs {
|
||||
for log in logs {
|
||||
state.ingest_listings_contract_event(&our, log);
|
||||
}
|
||||
}
|
||||
|
||||
subscribe(1, filter).unwrap();
|
||||
|
||||
loop {
|
||||
match await_message() {
|
||||
@ -174,11 +185,15 @@ fn handle_message(
|
||||
Req::FTWorkerResult(r) => {
|
||||
println!("app store: got weird ft_worker result: {r:?}");
|
||||
}
|
||||
Req::Eth(e) => {
|
||||
Req::Eth(eth_result) => {
|
||||
if source.node() != our.node() || source.process != "eth:distro:sys" {
|
||||
return Err(anyhow::anyhow!("eth sub event from weird addr: {source}"));
|
||||
}
|
||||
handle_eth_sub_event(our, &mut state, e)?;
|
||||
if let Ok(EthSub { result, .. }) = eth_result {
|
||||
handle_eth_sub_event(our, &mut state, result)?;
|
||||
} else {
|
||||
println!("app store: got eth sub error: {eth_result:?}");
|
||||
}
|
||||
}
|
||||
Req::Http(incoming) => {
|
||||
if source.node() != our.node()
|
||||
@ -323,16 +338,22 @@ fn handle_local_request(
|
||||
LocalRequest::RebuildIndex => {
|
||||
*state = State::new(CONTRACT_ADDRESS.to_string()).unwrap();
|
||||
// kill our old subscription and build a new one.
|
||||
Request::to(("our", "eth", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&EthAction::UnsubscribeLogs(1)).unwrap())
|
||||
.send()
|
||||
.unwrap();
|
||||
SubscribeLogsRequest::new(1) // subscription id 1
|
||||
unsubscribe(1).unwrap();
|
||||
|
||||
let filter = Filter::new()
|
||||
.address(EthAddress::from_str(&state.contract_address).unwrap())
|
||||
.from_block(state.last_saved_block - 1)
|
||||
.events(EVENTS)
|
||||
.send()
|
||||
.unwrap();
|
||||
.events(EVENTS);
|
||||
|
||||
let logs = get_logs(&filter);
|
||||
|
||||
if let Ok(logs) = logs {
|
||||
for log in logs {
|
||||
state.ingest_listings_contract_event(our, log);
|
||||
}
|
||||
}
|
||||
subscribe(1, filter).unwrap();
|
||||
|
||||
LocalResponse::RebuiltIndex
|
||||
}
|
||||
}
|
||||
@ -521,12 +542,12 @@ fn handle_ft_worker_result(body: &[u8], context: &[u8]) -> anyhow::Result<()> {
|
||||
fn handle_eth_sub_event(
|
||||
our: &Address,
|
||||
state: &mut State,
|
||||
event: EthSubEvent,
|
||||
event: SubscriptionResult,
|
||||
) -> anyhow::Result<()> {
|
||||
let EthSubEvent::Log(log) = event else {
|
||||
let SubscriptionResult::Log(log) = event else {
|
||||
return Err(anyhow::anyhow!("app store: got non-log event"));
|
||||
};
|
||||
state.ingest_listings_contract_event(our, log)
|
||||
state.ingest_listings_contract_event(our, *log)
|
||||
}
|
||||
|
||||
fn fetch_package_manifest(package: &PackageId) -> anyhow::Result<Vec<kt::PackageManifestEntry>> {
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::LocalRequest;
|
||||
use alloy_rpc_types::Log;
|
||||
use alloy_sol_types::{sol, SolEvent};
|
||||
use kinode_process_lib::eth::Log;
|
||||
use kinode_process_lib::kernel_types as kt;
|
||||
use kinode_process_lib::{println, *};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -6,12 +6,11 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
alloy-primitives = "0.5.1"
|
||||
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy.git", rev = "3b1c310" }
|
||||
alloy-sol-types = "0.5.1"
|
||||
alloy-primitives = "0.6.2"
|
||||
alloy-sol-types = "0.6.2"
|
||||
bincode = "1.3.3"
|
||||
hex = "0.4.3"
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "12bf9ee", features = ["eth"] }
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" }
|
||||
rmp-serde = "1.1.2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
@ -1,16 +1,18 @@
|
||||
use alloy_rpc_types::Log;
|
||||
use alloy_sol_types::{sol, SolEvent};
|
||||
use kinode_process_lib::eth::{EthAddress, EthSubEvent, SubscribeLogsRequest};
|
||||
|
||||
use kinode_process_lib::{
|
||||
await_message, get_typed_state, print_to_terminal, println, set_state, Address, Message,
|
||||
Request, Response,
|
||||
await_message,
|
||||
eth::{
|
||||
get_block_number, get_logs, subscribe, Address as EthAddress, BlockNumberOrTag, EthSub,
|
||||
EthSubResult, Filter, Log, SubscriptionResult,
|
||||
},
|
||||
get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{
|
||||
hash_map::{Entry, HashMap},
|
||||
BTreeMap,
|
||||
};
|
||||
use std::str::FromStr;
|
||||
use std::string::FromUtf8Error;
|
||||
|
||||
wit_bindgen::generate!({
|
||||
@ -146,6 +148,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
);
|
||||
// if contract address changed from a previous run, reset state
|
||||
if state.contract_address != contract_address {
|
||||
println!("resetting state for some reason.");
|
||||
state = State {
|
||||
contract_address: contract_address.clone(),
|
||||
names: HashMap::new(),
|
||||
@ -153,6 +156,34 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
block: 1,
|
||||
};
|
||||
}
|
||||
|
||||
// shove all state into net::net
|
||||
Request::new()
|
||||
.target((&our.node, "net", "distro", "sys"))
|
||||
.try_body(NetActions::KnsBatchUpdate(
|
||||
state.nodes.values().cloned().collect::<Vec<_>>(),
|
||||
))?
|
||||
.send()?;
|
||||
|
||||
let filter = Filter::new()
|
||||
.address(contract_address.unwrap().parse::<EthAddress>().unwrap())
|
||||
.to_block(BlockNumberOrTag::Latest)
|
||||
.from_block(state.block - 1)
|
||||
.events(vec![
|
||||
"NodeRegistered(bytes32,bytes)",
|
||||
"KeyUpdate(bytes32,bytes32)",
|
||||
"IpUpdate(bytes32,uint128)",
|
||||
"WsUpdate(bytes32,uint16)",
|
||||
"RoutingUpdate(bytes32,bytes32[])",
|
||||
]);
|
||||
|
||||
// if block in state is < current_block, get logs from that part.
|
||||
if state.block < get_block_number()? {
|
||||
let logs = get_logs(&filter)?;
|
||||
for log in logs {
|
||||
handle_log(&our, &mut state, &log)?;
|
||||
}
|
||||
}
|
||||
// shove all state into net::net
|
||||
Request::new()
|
||||
.target((&our.node, "net", "distro", "sys"))
|
||||
@ -161,17 +192,9 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
))?
|
||||
.send()?;
|
||||
|
||||
SubscribeLogsRequest::new(1) // subscription id 1
|
||||
.address(EthAddress::from_str(contract_address.unwrap().as_str())?)
|
||||
.from_block(state.block - 1)
|
||||
.events(vec![
|
||||
"NodeRegistered(bytes32,bytes)",
|
||||
"KeyUpdate(bytes32,bytes32)",
|
||||
"IpUpdate(bytes32,uint128)",
|
||||
"WsUpdate(bytes32,uint16)",
|
||||
"RoutingUpdate(bytes32,bytes32[])",
|
||||
])
|
||||
.send()?;
|
||||
set_state(&bincode::serialize(&state)?);
|
||||
|
||||
subscribe(1, filter.clone())?;
|
||||
|
||||
let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new();
|
||||
|
||||
@ -187,7 +210,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
if source.process == "eth:distro:sys" {
|
||||
handle_eth_message(&our, &mut state, &mut pending_requests, &body)?;
|
||||
handle_eth_message(&our, &mut state, &mut pending_requests, &body, &filter)?;
|
||||
} else {
|
||||
let Ok(request) = serde_json::from_slice::<IndexerRequests>(&body) else {
|
||||
println!("kns_indexer: got invalid message");
|
||||
@ -229,90 +252,24 @@ fn handle_eth_message(
|
||||
state: &mut State,
|
||||
pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>,
|
||||
body: &[u8],
|
||||
filter: &Filter,
|
||||
) -> anyhow::Result<()> {
|
||||
let Ok(msg) = serde_json::from_slice::<EthSubEvent>(body) else {
|
||||
let Ok(eth_result) = serde_json::from_slice::<EthSubResult>(body) else {
|
||||
return Err(anyhow::anyhow!("kns_indexer: got invalid message"));
|
||||
};
|
||||
|
||||
match msg {
|
||||
EthSubEvent::Log(log) => {
|
||||
state.block = log.block_number.expect("expect").to::<u64>();
|
||||
|
||||
let node_id: alloy_primitives::FixedBytes<32> = log.topics[1];
|
||||
|
||||
let name = match state.names.entry(node_id.to_string()) {
|
||||
Entry::Occupied(o) => o.into_mut(),
|
||||
Entry::Vacant(v) => v.insert(get_name(&log)),
|
||||
};
|
||||
|
||||
let node = state
|
||||
.nodes
|
||||
.entry(name.to_string())
|
||||
.or_insert_with(|| KnsUpdate::new(name, &node_id.to_string()));
|
||||
|
||||
let mut send = true;
|
||||
|
||||
match log.topics[0] {
|
||||
KeyUpdate::SIGNATURE_HASH => {
|
||||
node.public_key = KeyUpdate::abi_decode_data(&log.data, true)
|
||||
.unwrap()
|
||||
.0
|
||||
.to_string();
|
||||
}
|
||||
IpUpdate::SIGNATURE_HASH => {
|
||||
let ip = IpUpdate::abi_decode_data(&log.data, true).unwrap().0;
|
||||
node.ip = format!(
|
||||
"{}.{}.{}.{}",
|
||||
(ip >> 24) & 0xFF,
|
||||
(ip >> 16) & 0xFF,
|
||||
(ip >> 8) & 0xFF,
|
||||
ip & 0xFF
|
||||
);
|
||||
// when we get ip data, we should delete any router data,
|
||||
// since the assignment of ip indicates an direct node
|
||||
node.routers = vec![];
|
||||
}
|
||||
WsUpdate::SIGNATURE_HASH => {
|
||||
node.port = WsUpdate::abi_decode_data(&log.data, true).unwrap().0;
|
||||
// when we get port data, we should delete any router data,
|
||||
// since the assignment of port indicates an direct node
|
||||
node.routers = vec![];
|
||||
}
|
||||
RoutingUpdate::SIGNATURE_HASH => {
|
||||
node.routers = RoutingUpdate::abi_decode_data(&log.data, true)
|
||||
.unwrap()
|
||||
.0
|
||||
.iter()
|
||||
.map(|r| r.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
// when we get routing data, we should delete any ws/ip data,
|
||||
// since the assignment of routers indicates an indirect node
|
||||
node.ip = "".to_string();
|
||||
node.port = 0;
|
||||
}
|
||||
_ => {
|
||||
send = false;
|
||||
}
|
||||
}
|
||||
|
||||
if node.public_key != ""
|
||||
&& ((node.ip != "" && node.port != 0) || node.routers.len() > 0)
|
||||
&& send
|
||||
{
|
||||
print_to_terminal(
|
||||
1,
|
||||
&format!(
|
||||
"kns_indexer: sending ID to net: {node:?} (blocknum {})",
|
||||
state.block
|
||||
),
|
||||
);
|
||||
Request::new()
|
||||
.target((&our.node, "net", "distro", "sys"))
|
||||
.try_body(NetActions::KnsUpdate(node.clone()))?
|
||||
.send()?;
|
||||
match eth_result {
|
||||
Ok(EthSub { result, .. }) => {
|
||||
if let SubscriptionResult::Log(log) = result {
|
||||
handle_log(our, state, &log)?;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("kns_indexer: got sub error, resubscribing.. {:?}", e.error);
|
||||
subscribe(1, filter.clone())?;
|
||||
}
|
||||
}
|
||||
|
||||
// check the pending_requests btreemap to see if there are any requests that
|
||||
// can be handled now that the state block has been updated
|
||||
let mut blocks_to_remove = vec![];
|
||||
@ -347,6 +304,85 @@ fn handle_eth_message(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_log(our: &Address, state: &mut State, log: &Log) -> anyhow::Result<()> {
|
||||
state.block = log.block_number.expect("expect").to::<u64>();
|
||||
|
||||
let node_id = log.topics[1];
|
||||
|
||||
let name = match state.names.entry(node_id.to_string()) {
|
||||
Entry::Occupied(o) => o.into_mut(),
|
||||
Entry::Vacant(v) => v.insert(get_name(&log)),
|
||||
};
|
||||
|
||||
let node = state
|
||||
.nodes
|
||||
.entry(name.to_string())
|
||||
.or_insert_with(|| KnsUpdate::new(name, &node_id.to_string()));
|
||||
|
||||
let mut send = true;
|
||||
|
||||
match log.topics[0] {
|
||||
KeyUpdate::SIGNATURE_HASH => {
|
||||
node.public_key = KeyUpdate::abi_decode_data(&log.data, true)
|
||||
.unwrap()
|
||||
.0
|
||||
.to_string();
|
||||
}
|
||||
IpUpdate::SIGNATURE_HASH => {
|
||||
let ip = IpUpdate::abi_decode_data(&log.data, true).unwrap().0;
|
||||
node.ip = format!(
|
||||
"{}.{}.{}.{}",
|
||||
(ip >> 24) & 0xFF,
|
||||
(ip >> 16) & 0xFF,
|
||||
(ip >> 8) & 0xFF,
|
||||
ip & 0xFF
|
||||
);
|
||||
// when we get ip data, we should delete any router data,
|
||||
// since the assignment of ip indicates an direct node
|
||||
node.routers = vec![];
|
||||
}
|
||||
WsUpdate::SIGNATURE_HASH => {
|
||||
node.port = WsUpdate::abi_decode_data(&log.data, true).unwrap().0;
|
||||
// when we get port data, we should delete any router data,
|
||||
// since the assignment of port indicates an direct node
|
||||
node.routers = vec![];
|
||||
}
|
||||
RoutingUpdate::SIGNATURE_HASH => {
|
||||
node.routers = RoutingUpdate::abi_decode_data(&log.data, true)
|
||||
.unwrap()
|
||||
.0
|
||||
.iter()
|
||||
.map(|r| r.to_string())
|
||||
.collect::<Vec<String>>();
|
||||
// when we get routing data, we should delete any ws/ip data,
|
||||
// since the assignment of routers indicates an indirect node
|
||||
node.ip = "".to_string();
|
||||
node.port = 0;
|
||||
}
|
||||
_ => {
|
||||
send = false;
|
||||
}
|
||||
}
|
||||
|
||||
if node.public_key != ""
|
||||
&& ((node.ip != "" && node.port != 0) || node.routers.len() > 0)
|
||||
&& send
|
||||
{
|
||||
print_to_terminal(
|
||||
1,
|
||||
&format!(
|
||||
"kns_indexer: sending ID to net: {node:?} (blocknum {})",
|
||||
state.block
|
||||
),
|
||||
);
|
||||
Request::new()
|
||||
.target((&our.node, "net", "distro", "sys"))
|
||||
.try_body(NetActions::KnsUpdate(node.clone()))?
|
||||
.send()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_name(log: &Log) -> String {
|
||||
let decoded = NodeRegistered::abi_decode_data(&log.data, true).unwrap();
|
||||
let name = match dnswire_decode(decoded.0.clone()) {
|
||||
|
@ -1,14 +1,23 @@
|
||||
use alloy_providers::provider::Provider;
|
||||
use alloy_pubsub::{PubSubFrontend, RawSubscription};
|
||||
use alloy_rpc_client::ClientBuilder;
|
||||
use alloy_rpc_types::pubsub::SubscriptionResult;
|
||||
use alloy_transport_ws::WsConnect;
|
||||
use anyhow::Result;
|
||||
use ethers::prelude::Provider;
|
||||
use ethers::types::Filter;
|
||||
use ethers_providers::{Middleware, StreamExt, Ws};
|
||||
use std::collections::HashMap;
|
||||
use dashmap::DashMap;
|
||||
use lib::types::core::*;
|
||||
use lib::types::eth::*;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
use url::Url;
|
||||
|
||||
use lib::types::{core::*, eth::*};
|
||||
|
||||
const WS_RECONNECTS: usize = 10_000; // TODO workshop this
|
||||
/// Provider config. Can currently be a node or a ws provider instance.
|
||||
/// Future: add chainId configs, several nodes and fallbacks.
|
||||
pub enum ProviderConfig {
|
||||
Node(String),
|
||||
Provider(Provider<PubSubFrontend>),
|
||||
}
|
||||
|
||||
/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers
|
||||
/// and using them to service indexing requests from other apps. This could also be done by a wasm
|
||||
@ -16,174 +25,470 @@ const WS_RECONNECTS: usize = 10_000; // TODO workshop this
|
||||
/// indexing and ETH node responsibilities.
|
||||
pub async fn provider(
|
||||
our: String,
|
||||
rpc_url: String,
|
||||
provider_node: ProviderInput,
|
||||
public: bool,
|
||||
send_to_loop: MessageSender,
|
||||
mut recv_in_client: MessageReceiver,
|
||||
print_tx: PrintSender,
|
||||
_print_tx: PrintSender,
|
||||
) -> Result<()> {
|
||||
let our = Arc::new(our);
|
||||
// for now, we can only handle WebSocket RPC URLs. In the future, we should
|
||||
// be able to handle HTTP too, at least.
|
||||
match Url::parse(&rpc_url)?.scheme() {
|
||||
"http" | "https" => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"eth: you provided a `http(s)://` Ethereum RPC, but only `ws(s)://` is supported. Please try again with a `ws(s)://` provider"
|
||||
));
|
||||
}
|
||||
"ws" | "wss" => {}
|
||||
s => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"eth: you provided a `{s:?}` Ethereum RPC, but only `ws(s)://` is supported. Please try again with a `ws(s)://` provider"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let provider = match Provider::<Ws>::connect_with_reconnects(&rpc_url, WS_RECONNECTS).await {
|
||||
Ok(provider) => provider,
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"eth: fatal: given RPC URL could not connect! {e:?}"
|
||||
));
|
||||
// Initialize the provider conditionally based on rpc_url
|
||||
// Todo: make provider<T> support multiple transports, one direct and another passthrough.
|
||||
let provider_config = match provider_node {
|
||||
ProviderInput::Ws(rpc_url) => {
|
||||
// Validate and parse the WebSocket URL
|
||||
match Url::parse(&rpc_url)?.scheme() {
|
||||
"ws" | "wss" => {
|
||||
let connector = WsConnect {
|
||||
url: rpc_url,
|
||||
auth: None,
|
||||
};
|
||||
let client = ClientBuilder::default().ws(connector).await?;
|
||||
ProviderConfig::Provider(Provider::new_with_client(client))
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Only `ws://` or `wss://` URLs are supported."
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
ProviderInput::Node(node_id) => {
|
||||
// Directly use the node ID
|
||||
ProviderConfig::Node(node_id)
|
||||
}
|
||||
};
|
||||
|
||||
let mut connections = RpcConnections {
|
||||
provider,
|
||||
ws_provider_subscriptions: HashMap::new(),
|
||||
};
|
||||
let provider_config = Arc::new(provider_config);
|
||||
|
||||
// handles of longrunning subscriptions.
|
||||
let connections: DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>> = DashMap::new();
|
||||
let connections = Arc::new(connections);
|
||||
|
||||
// add whitelist, logic in provider middleware?
|
||||
let public = Arc::new(public);
|
||||
|
||||
while let Some(km) = recv_in_client.recv().await {
|
||||
// this module only handles requests, ignores all responses
|
||||
let Message::Request(req) = &km.message else {
|
||||
continue;
|
||||
};
|
||||
let Ok(action) = serde_json::from_slice::<EthAction>(&req.body) else {
|
||||
continue;
|
||||
};
|
||||
match handle_request(
|
||||
our.clone(),
|
||||
&km.rsvp.unwrap_or(km.source.clone()),
|
||||
action,
|
||||
&mut connections,
|
||||
&send_to_loop,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
let _ = print_tx
|
||||
.send(Printout {
|
||||
verbosity: 0,
|
||||
content: format!("eth: error handling request: {:?}", e),
|
||||
})
|
||||
// clone Arcs
|
||||
let our = our.clone();
|
||||
let send_to_loop = send_to_loop.clone();
|
||||
let provider_config = provider_config.clone();
|
||||
let connections = connections.clone();
|
||||
let public = public.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_message(
|
||||
&our,
|
||||
&km,
|
||||
&send_to_loop,
|
||||
provider_config.clone(),
|
||||
connections.clone(),
|
||||
public.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
let _ = send_to_loop
|
||||
.send(make_error_message(our.to_string(), km, e))
|
||||
.await;
|
||||
if req.expects_response.is_some() {
|
||||
send_to_loop
|
||||
.send(KernelMessage {
|
||||
};
|
||||
});
|
||||
}
|
||||
Err(anyhow::anyhow!("eth: fatal: message receiver closed!"))
|
||||
}
|
||||
|
||||
async fn handle_message(
|
||||
our: &str,
|
||||
km: &KernelMessage,
|
||||
send_to_loop: &MessageSender,
|
||||
provider_config: Arc<ProviderConfig>,
|
||||
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
|
||||
public: Arc<bool>,
|
||||
) -> Result<(), EthError> {
|
||||
match &km.message {
|
||||
Message::Request(req) => {
|
||||
match &*provider_config {
|
||||
ProviderConfig::Node(node) => {
|
||||
if km.source.node == our {
|
||||
// we have no provider, let's send this request to someone who has one.
|
||||
let request = KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: Address {
|
||||
node: our.to_string(),
|
||||
process: km.source.process.clone(),
|
||||
node: "jugodenaranja.os".to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
rsvp: Some(km.source.clone()),
|
||||
message: Message::Request(req.clone()),
|
||||
lazy_load_blob: None,
|
||||
};
|
||||
|
||||
let _ = send_to_loop.send(request).await;
|
||||
} else {
|
||||
// either someone asking us for rpc, or we are passing through a sub event.
|
||||
handle_remote_request(our, km, send_to_loop, None, connections, public)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
ProviderConfig::Provider(provider) => {
|
||||
if km.source.node == our {
|
||||
handle_local_request(our, km, send_to_loop, &provider, connections, public)
|
||||
.await?
|
||||
} else {
|
||||
handle_remote_request(
|
||||
our,
|
||||
km,
|
||||
send_to_loop,
|
||||
Some(provider),
|
||||
connections,
|
||||
public,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::Response(_) => {
|
||||
// handle passthrough responses, send to rsvp.
|
||||
if km.source.process == ProcessId::from_str("eth:distro:sys").unwrap() {
|
||||
if let Some(rsvp) = &km.rsvp {
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: rsvp.clone(),
|
||||
rsvp: None,
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
body: serde_json::to_vec::<Result<(), EthError>>(&Err(e))?,
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
},
|
||||
None,
|
||||
)),
|
||||
message: km.message.clone(),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await?;
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(anyhow::anyhow!("eth: fatal: message receiver closed!"))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
our: Arc<String>,
|
||||
target: &Address,
|
||||
action: EthAction,
|
||||
connections: &mut RpcConnections,
|
||||
async fn handle_local_request(
|
||||
our: &str,
|
||||
km: &KernelMessage,
|
||||
send_to_loop: &MessageSender,
|
||||
provider: &Provider<PubSubFrontend>,
|
||||
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
|
||||
public: Arc<bool>,
|
||||
) -> Result<(), EthError> {
|
||||
match action {
|
||||
EthAction::SubscribeLogs { sub_id, filter } => {
|
||||
let sub_id = (target.process.clone(), sub_id);
|
||||
let Message::Request(req) = &km.message else {
|
||||
return Err(EthError::InvalidMethod(
|
||||
"eth: only accepts requests".to_string(),
|
||||
));
|
||||
};
|
||||
let action = serde_json::from_slice::<EthAction>(&req.body).map_err(|e| {
|
||||
EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e))
|
||||
})?;
|
||||
|
||||
// if this process has already used this subscription ID,
|
||||
// this subscription will **overwrite** the existing one.
|
||||
// we might want some of these in payloads.. sub items?
|
||||
let return_body: EthResponse = match action {
|
||||
EthAction::SubscribeLogs {
|
||||
sub_id,
|
||||
kind,
|
||||
params,
|
||||
} => {
|
||||
let sub_id = (km.target.process.clone(), sub_id);
|
||||
|
||||
let kind = serde_json::to_value(&kind).unwrap();
|
||||
let params = serde_json::to_value(¶ms).unwrap();
|
||||
|
||||
let id = provider
|
||||
.inner()
|
||||
.prepare("eth_subscribe", [kind, params])
|
||||
.await
|
||||
.map_err(|e| EthError::TransportError(e.to_string()))?;
|
||||
|
||||
let rx = provider.inner().get_raw_subscription(id).await;
|
||||
let handle = tokio::spawn(handle_subscription_stream(
|
||||
our.clone(),
|
||||
connections.provider.clone(),
|
||||
filter,
|
||||
target.clone(),
|
||||
our.to_string(),
|
||||
sub_id.1.clone(),
|
||||
rx,
|
||||
km.source.clone(),
|
||||
km.rsvp.clone(),
|
||||
send_to_loop.clone(),
|
||||
));
|
||||
connections.ws_provider_subscriptions.insert(sub_id, handle);
|
||||
Ok(())
|
||||
|
||||
connections.insert(sub_id, handle);
|
||||
EthResponse::Ok
|
||||
}
|
||||
EthAction::UnsubscribeLogs(sub_id) => {
|
||||
let sub_id = (target.process.clone(), sub_id);
|
||||
let sub_id = (km.target.process.clone(), sub_id);
|
||||
let handle = connections
|
||||
.ws_provider_subscriptions
|
||||
.remove(&sub_id)
|
||||
.ok_or(EthError::SubscriptionNotFound)?;
|
||||
|
||||
handle.abort();
|
||||
Ok(())
|
||||
handle.1.abort();
|
||||
EthResponse::Ok
|
||||
}
|
||||
EthAction::Request { method, params } => {
|
||||
let method = to_static_str(&method).ok_or(EthError::InvalidMethod(method))?;
|
||||
|
||||
let response: serde_json::Value = provider
|
||||
.inner()
|
||||
.prepare(method, params)
|
||||
.await
|
||||
.map_err(|e| EthError::TransportError(e.to_string()))?;
|
||||
EthResponse::Response { value: response }
|
||||
}
|
||||
};
|
||||
if let Some(_) = req.expects_response {
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: km.source.clone(),
|
||||
rsvp: km.rsvp.clone(),
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
body: serde_json::to_vec(&return_body).unwrap(),
|
||||
metadata: req.metadata.clone(),
|
||||
capabilities: vec![],
|
||||
},
|
||||
None,
|
||||
)),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// here we are either processing another nodes request.
|
||||
// or we are passing through an ethSub Request..
|
||||
async fn handle_remote_request(
|
||||
our: &str,
|
||||
km: &KernelMessage,
|
||||
send_to_loop: &MessageSender,
|
||||
provider: Option<&Provider<PubSubFrontend>>,
|
||||
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
|
||||
public: Arc<bool>,
|
||||
) -> Result<(), EthError> {
|
||||
let Message::Request(req) = &km.message else {
|
||||
return Err(EthError::InvalidMethod(
|
||||
"eth: only accepts requests".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
if let Some(provider) = provider {
|
||||
// we need some sort of agreement perhaps on rpc providing.
|
||||
// even with an agreement, fake ethsubevents could be sent to us.
|
||||
// light clients could verify blocks perhaps...
|
||||
if !*public {
|
||||
return Err(EthError::PermissionDenied("not on the list.".to_string()));
|
||||
}
|
||||
|
||||
let action = serde_json::from_slice::<EthAction>(&req.body).map_err(|e| {
|
||||
EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e))
|
||||
})?;
|
||||
|
||||
let return_body: EthResponse = match action {
|
||||
EthAction::SubscribeLogs {
|
||||
sub_id,
|
||||
kind,
|
||||
params,
|
||||
} => {
|
||||
let sub_id = (km.target.process.clone(), sub_id);
|
||||
|
||||
let kind = serde_json::to_value(&kind).unwrap();
|
||||
let params = serde_json::to_value(¶ms).unwrap();
|
||||
|
||||
let id = provider
|
||||
.inner()
|
||||
.prepare("eth_subscribe", [kind, params])
|
||||
.await
|
||||
.map_err(|e| EthError::TransportError(e.to_string()))?;
|
||||
|
||||
let rx = provider.inner().get_raw_subscription(id).await;
|
||||
let handle = tokio::spawn(handle_subscription_stream(
|
||||
our.to_string(),
|
||||
sub_id.1.clone(),
|
||||
rx,
|
||||
km.target.clone(),
|
||||
km.rsvp.clone(),
|
||||
send_to_loop.clone(),
|
||||
));
|
||||
|
||||
connections.insert(sub_id, handle);
|
||||
EthResponse::Ok
|
||||
}
|
||||
EthAction::UnsubscribeLogs(sub_id) => {
|
||||
let sub_id = (km.target.process.clone(), sub_id);
|
||||
let handle = connections
|
||||
.remove(&sub_id)
|
||||
.ok_or(EthError::SubscriptionNotFound)?;
|
||||
|
||||
handle.1.abort();
|
||||
EthResponse::Ok
|
||||
}
|
||||
EthAction::Request { method, params } => {
|
||||
let method = to_static_str(&method).ok_or(EthError::InvalidMethod(method))?;
|
||||
|
||||
let response: serde_json::Value = provider
|
||||
.inner()
|
||||
.prepare(method, params)
|
||||
.await
|
||||
.map_err(|e| EthError::TransportError(e.to_string()))?;
|
||||
|
||||
EthResponse::Response { value: response }
|
||||
}
|
||||
};
|
||||
|
||||
let response = KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: km.source.clone(),
|
||||
rsvp: km.rsvp.clone(),
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
body: serde_json::to_vec(&return_body).unwrap(),
|
||||
metadata: req.metadata.clone(),
|
||||
capabilities: vec![],
|
||||
},
|
||||
None,
|
||||
)),
|
||||
lazy_load_blob: None,
|
||||
};
|
||||
|
||||
let _ = send_to_loop.send(response).await;
|
||||
} else {
|
||||
// We do not have a provider, this is a reply for a request made by us.
|
||||
if let Ok(eth_sub) = serde_json::from_slice::<EthSub>(&req.body) {
|
||||
// forward...
|
||||
if let Some(target) = km.rsvp.clone() {
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target,
|
||||
rsvp: None,
|
||||
message: Message::Request(req.clone()),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Executed as a long-lived task. The JoinHandle is stored in the `connections` map.
|
||||
/// This task is responsible for connecting to the ETH RPC provider and streaming logs
|
||||
/// for a specific subscription made by a process.
|
||||
async fn handle_subscription_stream(
|
||||
our: Arc<String>,
|
||||
provider: Provider<Ws>,
|
||||
filter: Filter,
|
||||
our: String,
|
||||
sub_id: u64,
|
||||
mut rx: RawSubscription,
|
||||
target: Address,
|
||||
rsvp: Option<Address>,
|
||||
send_to_loop: MessageSender,
|
||||
) -> Result<(), EthError> {
|
||||
let mut stream = match provider.subscribe_logs(&filter).await {
|
||||
Ok(s) => s,
|
||||
match rx.recv().await {
|
||||
Err(e) => {
|
||||
return Err(EthError::ProviderError(e.to_string()));
|
||||
let error = Err(EthError::SubscriptionClosed(sub_id))?;
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our,
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target.clone(),
|
||||
rsvp: rsvp.clone(),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSubResult::Err(EthSubError {
|
||||
id: sub_id,
|
||||
error: e.to_string(),
|
||||
}))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Ok(value) => {
|
||||
let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| {
|
||||
EthError::RpcError("eth: failed to deserialize subscription result".to_string())
|
||||
})?;
|
||||
send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our,
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target.clone(),
|
||||
rsvp: rsvp.clone(),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSubResult::Ok(EthSub {
|
||||
id: sub_id,
|
||||
result: event,
|
||||
}))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
while let Some(event) = stream.next().await {
|
||||
send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target.clone(),
|
||||
rsvp: None,
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSubEvent::Log(event)).unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Err(EthError::SubscriptionClosed)
|
||||
Err(EthError::SubscriptionClosed(sub_id))
|
||||
}
|
||||
|
||||
fn make_error_message(our_node: String, km: KernelMessage, error: EthError) -> KernelMessage {
|
||||
let source = km.rsvp.unwrap_or_else(|| Address {
|
||||
node: our_node.clone(),
|
||||
process: km.source.process.clone(),
|
||||
});
|
||||
KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our_node,
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: source,
|
||||
rsvp: None,
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
body: serde_json::to_vec(&EthResponse::Err(error)).unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
},
|
||||
None,
|
||||
)),
|
||||
lazy_load_blob: None,
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
use anyhow::Result;
|
||||
use dashmap::DashMap;
|
||||
use ethers_providers::StreamExt;
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use http::header::{HeaderMap, HeaderName, HeaderValue};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
@ -3,6 +3,7 @@ use anyhow::Result;
|
||||
use ring::signature;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
@ -671,6 +672,7 @@ pub async fn kernel(
|
||||
home_directory_path: String,
|
||||
contract_address: String,
|
||||
runtime_extensions: Vec<(t::ProcessId, t::MessageSender, bool)>,
|
||||
default_pki_entries: Vec<crate::net::KnsUpdate>,
|
||||
) -> Result<()> {
|
||||
let mut config = Config::new();
|
||||
config.cache_config_load_default().unwrap();
|
||||
@ -826,6 +828,34 @@ pub async fn kernel(
|
||||
})
|
||||
.await
|
||||
.expect("fatal: kernel event loop died");
|
||||
// sending hard coded pki entries into networking for bootstrapped rpc
|
||||
send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: t::Address {
|
||||
node: our.name.clone(),
|
||||
process: KERNEL_PROCESS_ID.clone(),
|
||||
},
|
||||
target: t::Address {
|
||||
node: our.name.clone(),
|
||||
process: t::ProcessId::from_str("net:distro:sys").unwrap(),
|
||||
},
|
||||
rsvp: None,
|
||||
message: t::Message::Request(t::Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: rmp_serde::to_vec(&crate::net::NetActions::KnsBatchUpdate(
|
||||
default_pki_entries,
|
||||
))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.expect("fatal: kernel event loop died");
|
||||
|
||||
// finally, in order to trigger the kns_indexer app to find the right
|
||||
// contract, queue up a message that will send the contract address
|
||||
// to it on boot.
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
use anyhow::Result;
|
||||
use clap::{arg, value_parser, Command};
|
||||
use rand::seq::SliceRandom;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
@ -45,11 +46,13 @@ const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
const REVEAL_IP: bool = true;
|
||||
|
||||
/// default routers as a eth-provider fallback
|
||||
const DEFAULT_PROVIDERS: &str = include_str!("../default_providers.json");
|
||||
|
||||
async fn serve_register_fe(
|
||||
home_directory_path: &str,
|
||||
our_ip: String,
|
||||
http_server_port: u16,
|
||||
rpc_url: String,
|
||||
testnet: bool,
|
||||
) -> (Identity, Vec<u8>, Keyfile) {
|
||||
// check if we have keys saved on disk, encrypted
|
||||
@ -71,7 +74,7 @@ async fn serve_register_fe(
|
||||
|
||||
let (tx, mut rx) = mpsc::channel::<(Identity, Keyfile, Vec<u8>)>(1);
|
||||
let (our, decoded_keyfile, encoded_keyfile) = tokio::select! {
|
||||
_ = register::register(tx, kill_rx, our_ip, http_server_port, rpc_url, disk_keyfile, testnet) => {
|
||||
_ = register::register(tx, kill_rx, our_ip, http_server_port, disk_keyfile, testnet) => {
|
||||
panic!("registration failed")
|
||||
}
|
||||
Some((our, decoded_keyfile, encoded_keyfile)) = rx.recv() => {
|
||||
@ -109,7 +112,14 @@ async fn main() {
|
||||
);
|
||||
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
let app = app.arg(arg!(--rpc <WS_URL> "Ethereum RPC endpoint (must be wss://)").required(true));
|
||||
let app = app
|
||||
.arg(arg!(--rpc <WS_URL> "Ethereum RPC endpoint (must be wss://)").required(false))
|
||||
.arg(arg!(--rpcnode <String> "RPC node provider must be a valid address").required(false))
|
||||
.arg(
|
||||
arg!(--public "If set, allow rpc passthrough")
|
||||
.default_value("false")
|
||||
.value_parser(value_parser!(bool)),
|
||||
);
|
||||
|
||||
#[cfg(feature = "simulation-mode")]
|
||||
let app = app
|
||||
@ -134,6 +144,8 @@ async fn main() {
|
||||
None => (8080, false),
|
||||
};
|
||||
let on_testnet = *matches.get_one::<bool>("testnet").unwrap();
|
||||
let public = *matches.get_one::<bool>("public").unwrap();
|
||||
|
||||
let contract_address = if on_testnet {
|
||||
register::KNS_SEPOLIA_ADDRESS
|
||||
} else {
|
||||
@ -166,8 +178,46 @@ async fn main() {
|
||||
}
|
||||
}
|
||||
|
||||
// default eth providers/routers
|
||||
type KnsUpdate = crate::net::KnsUpdate;
|
||||
let default_pki_entries: Vec<KnsUpdate> =
|
||||
match fs::read_to_string(format!("{}/.default_providers", home_directory_path)).await {
|
||||
Ok(contents) => serde_json::from_str(&contents).unwrap(),
|
||||
Err(_) => {
|
||||
let defaults: Vec<KnsUpdate> = serde_json::from_str(DEFAULT_PROVIDERS).unwrap();
|
||||
defaults
|
||||
}
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
let (rpc_url, is_detached) = (matches.get_one::<String>("rpc").unwrap(), false);
|
||||
let (rpc_url, is_detached) = (matches.get_one::<String>("rpc").cloned(), false);
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
let (rpc_node, _is_detached) = (matches.get_one::<String>("rpcnode").cloned(), false);
|
||||
|
||||
type ProviderInput = lib::eth::ProviderInput;
|
||||
let eth_provider: ProviderInput;
|
||||
|
||||
match (rpc_url, rpc_node) {
|
||||
(Some(url), Some(_)) => {
|
||||
println!("passed both node and url for rpc, using url.");
|
||||
eth_provider = ProviderInput::Ws(url);
|
||||
}
|
||||
(Some(url), None) => {
|
||||
eth_provider = ProviderInput::Ws(url);
|
||||
}
|
||||
(None, Some(node)) => {
|
||||
println!("trying to use remote node for rpc: {}", node);
|
||||
eth_provider = ProviderInput::Node(node);
|
||||
}
|
||||
(None, None) => {
|
||||
let random_provider = default_pki_entries.choose(&mut rand::thread_rng()).unwrap();
|
||||
let default_provider = random_provider.name.clone();
|
||||
|
||||
println!("no rpc provided, using a default: {}", default_provider);
|
||||
|
||||
eth_provider = ProviderInput::Node(default_provider);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "simulation-mode")]
|
||||
let (rpc_url, password, network_router_port, fake_node_name, is_detached) = (
|
||||
@ -278,7 +328,6 @@ async fn main() {
|
||||
home_directory_path,
|
||||
our_ip.to_string(),
|
||||
http_server_port,
|
||||
rpc_url.clone(),
|
||||
on_testnet, // true if testnet mode
|
||||
)
|
||||
.await;
|
||||
@ -457,6 +506,7 @@ async fn main() {
|
||||
home_directory_path.clone(),
|
||||
contract_address.to_string(),
|
||||
runtime_extensions,
|
||||
default_pki_entries,
|
||||
));
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
tasks.spawn(net::networking(
|
||||
@ -528,7 +578,8 @@ async fn main() {
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
tasks.spawn(eth::provider::provider(
|
||||
our.name.clone(),
|
||||
rpc_url.clone(),
|
||||
eth_provider,
|
||||
public,
|
||||
kernel_message_sender.clone(),
|
||||
eth_provider_receiver,
|
||||
print_sender.clone(),
|
||||
|
@ -19,7 +19,7 @@ mod types;
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
mod utils;
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
use crate::net::{types::*, utils::*};
|
||||
pub use crate::net::{types::*, utils::*};
|
||||
#[cfg(not(feature = "simulation-mode"))]
|
||||
use lib::types::core::*;
|
||||
|
||||
|
@ -273,10 +273,12 @@ pub fn validate_routing_request(
|
||||
&signature::ED25519,
|
||||
hex::decode(strip_0x(&their_id.networking_key))?,
|
||||
);
|
||||
their_networking_key.verify(
|
||||
[&routing_request.target, our_name].concat().as_bytes(),
|
||||
&routing_request.signature,
|
||||
)?;
|
||||
their_networking_key
|
||||
.verify(
|
||||
[&routing_request.target, our_name].concat().as_bytes(),
|
||||
&routing_request.signature,
|
||||
)
|
||||
.map_err(|e| anyhow!("their_networking_key.verify failed: {:?}", e))?;
|
||||
if routing_request.target == routing_request.source {
|
||||
return Err(anyhow!("can't route to self"));
|
||||
}
|
||||
@ -296,7 +298,9 @@ pub fn validate_handshake(
|
||||
&signature::ED25519,
|
||||
hex::decode(strip_0x(&their_id.networking_key))?,
|
||||
);
|
||||
their_networking_key.verify(their_static_key, &handshake.signature)?;
|
||||
their_networking_key
|
||||
.verify(their_static_key, &handshake.signature)
|
||||
.map_err(|e| anyhow!("their_networking_key.verify handshake failed: {:?}", e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,4 @@
|
||||
use aes_gcm::aead::KeyInit;
|
||||
use ethers::prelude::{abigen, namehash, Address as EthAddress, Provider, U256};
|
||||
use ethers_providers::Ws;
|
||||
use hmac::Hmac;
|
||||
use jwt::SignWithKey;
|
||||
use ring::rand::SystemRandom;
|
||||
@ -21,14 +19,6 @@ use warp::{
|
||||
use crate::keygen;
|
||||
use lib::types::core::*;
|
||||
|
||||
// Human readable ABI
|
||||
abigen!(
|
||||
KNSRegistry,
|
||||
r"[
|
||||
function ws(uint256 node) external view returns (bytes32,uint32,uint16,bytes32[])
|
||||
]"
|
||||
);
|
||||
|
||||
type RegistrationSender = mpsc::Sender<(Identity, Keyfile, Vec<u8>)>;
|
||||
|
||||
pub const KNS_SEPOLIA_ADDRESS: &str = "0x3807fBD692Aa5c96F1D8D7c59a1346a885F40B1C";
|
||||
@ -100,7 +90,6 @@ pub async fn register(
|
||||
kill_rx: oneshot::Receiver<bool>,
|
||||
ip: String,
|
||||
port: u16,
|
||||
rpc_url: String,
|
||||
keyfile: Option<Vec<u8>>,
|
||||
testnet: bool,
|
||||
) {
|
||||
@ -134,7 +123,6 @@ pub async fn register(
|
||||
let net_keypair = warp::any().map(move || net_keypair.clone());
|
||||
let tx = warp::any().map(move || tx.clone());
|
||||
let ip = warp::any().map(move || ip.clone());
|
||||
let rpc_url = warp::any().map(move || rpc_url.clone());
|
||||
|
||||
let static_files = warp::path("static").and(static_dir!("src/register-ui/build/static/"));
|
||||
|
||||
@ -199,7 +187,6 @@ pub async fn register(
|
||||
.and(warp::body::content_length_limit(1024 * 16))
|
||||
.and(warp::body::json())
|
||||
.and(ip.clone())
|
||||
.and(rpc_url.clone())
|
||||
.and(tx.clone())
|
||||
.and_then(handle_import_keyfile),
|
||||
))
|
||||
@ -208,7 +195,6 @@ pub async fn register(
|
||||
.and(warp::body::content_length_limit(1024 * 16))
|
||||
.and(warp::body::json())
|
||||
.and(ip)
|
||||
.and(rpc_url)
|
||||
.and(tx.clone())
|
||||
.and(keyfile.clone())
|
||||
.and_then(handle_login),
|
||||
@ -344,7 +330,6 @@ async fn handle_boot(
|
||||
async fn handle_import_keyfile(
|
||||
info: ImportKeyfileInfo,
|
||||
ip: String,
|
||||
_rpc_url: String,
|
||||
sender: Arc<RegistrationSender>,
|
||||
) -> Result<impl Reply, Rejection> {
|
||||
// if keyfile was not present in node and is present from user upload
|
||||
@ -408,7 +393,6 @@ async fn handle_import_keyfile(
|
||||
async fn handle_login(
|
||||
info: LoginInfo,
|
||||
ip: String,
|
||||
_rpc_url: String,
|
||||
sender: Arc<RegistrationSender>,
|
||||
encoded_keyfile: Option<Vec<u8>>,
|
||||
) -> Result<impl Reply, Rejection> {
|
||||
@ -563,61 +547,3 @@ async fn success_response(
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn _networking_info_valid(rpc_url: String, ip: String, ws_port: u16, our: &Identity) -> bool {
|
||||
// check if Identity for this username has correct networking keys,
|
||||
// if not, prompt user to reset them.
|
||||
let Ok(ws_rpc) = Provider::<Ws>::connect(rpc_url.clone()).await else {
|
||||
return false;
|
||||
};
|
||||
let Ok(kns_address): Result<EthAddress, _> = KNS_SEPOLIA_ADDRESS.parse() else {
|
||||
return false;
|
||||
};
|
||||
let contract = KNSRegistry::new(kns_address, ws_rpc.into());
|
||||
let node_id: U256 = namehash(&our.name).as_bytes().into();
|
||||
let Ok((chain_pubkey, chain_ip, chain_port, chain_routers)) = contract.ws(node_id).call().await
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
|
||||
// double check that routers match on-chain information
|
||||
let namehashed_routers: Vec<[u8; 32]> = our
|
||||
.allowed_routers
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|name| {
|
||||
let hash = namehash(&name);
|
||||
let mut result = [0u8; 32];
|
||||
result.copy_from_slice(hash.as_bytes());
|
||||
result
|
||||
})
|
||||
.collect();
|
||||
|
||||
let current_ip = match _ip_to_number(&ip) {
|
||||
Ok(ip_num) => ip_num,
|
||||
Err(_) => {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(networking_key_bytes) = _hex_string_to_u8_array(&our.networking_key) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let address_match = chain_ip == current_ip && chain_port == ws_port;
|
||||
let routers_match = chain_routers == namehashed_routers;
|
||||
|
||||
let routing_match = if chain_ip == 0 {
|
||||
routers_match
|
||||
} else {
|
||||
address_match
|
||||
};
|
||||
let pubkey_match = chain_pubkey == networking_key_bytes;
|
||||
|
||||
// double check that keys match on-chain information
|
||||
if !routing_match || !pubkey_match {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
@ -14,8 +14,8 @@ license = "Apache-2.0"
|
||||
reqwest = { version = "0.11.22", features = ["blocking"] }
|
||||
|
||||
[dependencies]
|
||||
ethers = "2.0.13"
|
||||
ethers-providers = "2.0.13"
|
||||
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
|
||||
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
|
||||
lazy_static = "1.4.0"
|
||||
rand = "0.8.4"
|
||||
ring = "0.16.20"
|
||||
|
111
lib/src/eth.rs
111
lib/src/eth.rs
@ -1,52 +1,109 @@
|
||||
use ethers::prelude::Provider;
|
||||
use ethers::types::{Filter, Log};
|
||||
use ethers_providers::Ws;
|
||||
use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::core::ProcessId;
|
||||
|
||||
/// The Request type that can be made to eth:distro:sys. Currently primitive, this
|
||||
/// enum will expand to support more actions in the future.
|
||||
/// The Action and Request type that can be made to eth:distro:sys.
|
||||
///
|
||||
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum EthAction {
|
||||
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
|
||||
SubscribeLogs { sub_id: u64, filter: Filter },
|
||||
/// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults
|
||||
SubscribeLogs {
|
||||
sub_id: u64,
|
||||
kind: SubscriptionKind,
|
||||
params: Params,
|
||||
},
|
||||
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
|
||||
UnsubscribeLogs(u64),
|
||||
/// Raw request. Used by kinode_process_lib.
|
||||
Request {
|
||||
method: String,
|
||||
params: serde_json::Value,
|
||||
},
|
||||
}
|
||||
|
||||
/// Incoming Result type for subscription updates or errors that processes will receive.
|
||||
pub type EthSubResult = Result<EthSub, EthSubError>;
|
||||
|
||||
/// Incoming Request type for subscription updates.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EthSub {
|
||||
pub id: u64,
|
||||
pub result: SubscriptionResult,
|
||||
}
|
||||
|
||||
/// Incoming Request for subscription errors that processes will receive.
|
||||
/// If your subscription is closed unexpectedly, you will receive this.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EthSubError {
|
||||
pub id: u64,
|
||||
pub error: String,
|
||||
}
|
||||
|
||||
/// The Response type which a process will get from requesting with an [`EthAction`] will be
|
||||
/// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec`
|
||||
/// and `serde_json::from_slice`.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum EthError {
|
||||
/// The ethers provider threw an error when trying to subscribe
|
||||
/// (contains ProviderError serialized to debug string)
|
||||
ProviderError(String),
|
||||
SubscriptionClosed,
|
||||
/// The subscription ID was not found, so we couldn't unsubscribe.
|
||||
SubscriptionNotFound,
|
||||
pub enum EthResponse {
|
||||
Ok,
|
||||
Response { value: serde_json::Value },
|
||||
Err(EthError),
|
||||
}
|
||||
|
||||
/// The Request type which a process will get from using SubscribeLogs to subscribe
|
||||
/// to a log.
|
||||
///
|
||||
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum EthSubEvent {
|
||||
Log(Log),
|
||||
pub enum EthError {
|
||||
/// Underlying transport error
|
||||
TransportError(String),
|
||||
/// Subscription closed
|
||||
SubscriptionClosed(u64),
|
||||
/// The subscription ID was not found, so we couldn't unsubscribe.
|
||||
SubscriptionNotFound,
|
||||
/// Invalid method
|
||||
InvalidMethod(String),
|
||||
/// Permission denied
|
||||
PermissionDenied(String),
|
||||
/// Internal RPC error
|
||||
RpcError(String),
|
||||
}
|
||||
|
||||
//
|
||||
// Internal types
|
||||
//
|
||||
|
||||
/// Primary state object of the `eth` module
|
||||
pub struct RpcConnections {
|
||||
pub provider: Provider<Ws>,
|
||||
pub ws_provider_subscriptions: HashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>,
|
||||
/// For static lifetimes of method strings.
|
||||
/// Replaced soon by alloy-rs network abstraction.
|
||||
pub fn to_static_str(method: &str) -> Option<&'static str> {
|
||||
match method {
|
||||
"eth_getBalance" => Some("eth_getBalance"),
|
||||
"eth_sendRawTransaction" => Some("eth_sendRawTransaction"),
|
||||
"eth_call" => Some("eth_call"),
|
||||
"eth_chainId" => Some("eth_chainId"),
|
||||
"eth_getTransactionReceipt" => Some("eth_getTransactionReceipt"),
|
||||
"eth_getTransactionCount" => Some("eth_getTransactionCount"),
|
||||
"eth_estimateGas" => Some("eth_estimateGas"),
|
||||
"eth_blockNumber" => Some("eth_blockNumber"),
|
||||
"eth_getBlockByHash" => Some("eth_getBlockByHash"),
|
||||
"eth_getBlockByNumber" => Some("eth_getBlockByNumber"),
|
||||
"eth_getTransactionByHash" => Some("eth_getTransactionByHash"),
|
||||
"eth_getCode" => Some("eth_getCode"),
|
||||
"eth_getStorageAt" => Some("eth_getStorageAt"),
|
||||
"eth_gasPrice" => Some("eth_gasPrice"),
|
||||
"eth_accounts" => Some("eth_accounts"),
|
||||
"eth_hashrate" => Some("eth_hashrate"),
|
||||
"eth_getLogs" => Some("eth_getLogs"),
|
||||
"eth_subscribe" => Some("eth_subscribe"),
|
||||
"eth_unsubscribe" => Some("eth_unsubscribe"),
|
||||
// "eth_mining" => Some("eth_mining"),
|
||||
// "net_version" => Some("net_version"),
|
||||
// "net_peerCount" => Some("net_peerCount"),
|
||||
// "net_listening" => Some("net_listening"),
|
||||
// "web3_clientVersion" => Some("web3_clientVersion"),
|
||||
// "web3_sha3" => Some("web3_sha3"),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ProviderInput {
|
||||
Ws(String),
|
||||
Node(String),
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user