eth: add --rpcnode flag and handlers

This commit is contained in:
bitful-pannul 2024-02-14 16:04:05 -03:00
parent 2eea02e34a
commit 091ca7fc83
4 changed files with 175 additions and 75 deletions

View File

@ -0,0 +1,42 @@
[
{
"name": "default-router-1.os",
"owner": "dr. who",
"node": "0x",
"public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db",
"ip": "147.135.114.167",
"port": 9002,
"routers": []
},
{
"name": "default-router-2.os",
"owner": "foo",
"node": "0x",
"public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992",
"ip": "147.135.114.167",
"port": 9003,
"routers": []
},
{
"name": "default-router-s3.os",
"owner": "bar",
"node": "0x",
"public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b",
"ip": "147.135.114.167",
"port": 9004,
"routers": []
},
{
"name": "jugodenaranja.os",
"owner": "ass",
"node": "0x",
"public_key": "0xf8a3e9667756306a0a25894a8cfe053dc9e7f34e6a61b8d65e92b79f77e05dff",
"ip": "0.0.0.0",
"port": 0,
"routers": [
"0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d",
"0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458",
"0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a"
]
}
]

View File

@ -12,14 +12,21 @@ use std::sync::Arc;
use tokio::task::JoinHandle;
use url::Url;
/// Provider config. Can currently be a node or a ws provider instance.
/// Future: add chainId configs, several nodes and fallbacks.
pub enum ProviderConfig {
Node(String),
Provider(Provider<PubSubFrontend>),
}
/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers
/// and using them to service indexing requests from other apps. This could also be done by a wasm
/// app, but in the future, this process will hopefully expand in scope to perform more complex
/// indexing and ETH node responsibilities.
pub async fn provider(
our: String,
rpc_url: Option<String>, // if None, bootstrap from router, can set settings later?
public: bool, // todo, whitelists etc.
provider_node: ProviderInput,
public: bool,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
_print_tx: PrintSender,
@ -28,34 +35,32 @@ pub async fn provider(
// Initialize the provider conditionally based on rpc_url
// Todo: make provider<T> support multiple transports, one direct and another passthrough.
let provider = if let Some(rpc_url) = rpc_url {
// If rpc_url is Some, proceed with URL parsing and client setup
match Url::parse(&rpc_url)?.scheme() {
"http" | "https" => {
return Err(anyhow::anyhow!(
"eth: you provided a `http(s)://` Ethereum RPC, but only `ws(s)://` is supported. Please try again with a `ws(s)://` provider"
));
}
"ws" | "wss" => {}
s => {
return Err(anyhow::anyhow!(
"eth: you provided a `{s:?}` Ethereum RPC, but only `ws(s)://` is supported. Please try again with a `ws(s)://` provider"
));
let provider_config = match provider_node {
ProviderInput::WS(rpc_url) => {
// Validate and parse the WebSocket URL
match Url::parse(&rpc_url)?.scheme() {
"ws" | "wss" => {
let connector = WsConnect {
url: rpc_url,
auth: None,
};
let client = ClientBuilder::default().ws(connector).await?;
ProviderConfig::Provider(Provider::new_with_client(client))
}
_ => {
return Err(anyhow::anyhow!(
"Only `ws://` or `wss://` URLs are supported."
))
}
}
}
let connector = WsConnect {
url: rpc_url,
auth: None,
};
let client = ClientBuilder::default().ws(connector).await?;
Some(Provider::new_with_client(client))
} else {
None
ProviderInput::Node(node_id) => {
// Directly use the node ID
ProviderConfig::Node(node_id)
}
};
let provider = Arc::new(provider);
let provider_config = Arc::new(provider_config);
// handles of longrunning subscriptions.
let connections: DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>> = DashMap::new();
@ -68,7 +73,7 @@ pub async fn provider(
// clone Arcs
let our = our.clone();
let send_to_loop = send_to_loop.clone();
let provider = provider.clone();
let provider_config = provider_config.clone();
let connections = connections.clone();
let public = public.clone();
@ -77,7 +82,7 @@ pub async fn provider(
&our,
&km,
&send_to_loop,
provider.clone(),
provider_config.clone(),
connections.clone(),
public.clone(),
)
@ -96,38 +101,54 @@ async fn handle_message(
our: &str,
km: &KernelMessage,
send_to_loop: &MessageSender,
provider: Arc<Option<Provider<PubSubFrontend>>>,
provider_config: Arc<ProviderConfig>,
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
public: Arc<bool>,
) -> Result<(), EthError> {
match &km.message {
Message::Request(req) => {
if km.source.node == our {
if let Some(provider) = provider.as_ref() {
handle_local_request(our, km, send_to_loop, provider, connections, public)
.await?
} else {
// we have no provider, let's send this request to someone who has one.
let request = KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: "jugodenaranja.os".to_string(),
process: ETH_PROCESS_ID.clone(),
},
rsvp: Some(km.source.clone()),
message: Message::Request(req.clone()),
lazy_load_blob: None,
};
match &*provider_config {
ProviderConfig::Node(node) => {
if km.source.node == our {
// we have no provider, let's send this request to someone who has one.
let request = KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: "jugodenaranja.os".to_string(),
process: ETH_PROCESS_ID.clone(),
},
rsvp: Some(km.source.clone()),
message: Message::Request(req.clone()),
lazy_load_blob: None,
};
let _ = send_to_loop.send(request).await;
let _ = send_to_loop.send(request).await;
} else {
// either someone asking us for rpc, or we are passing through a sub event.
handle_remote_request(our, km, send_to_loop, None, connections, public)
.await?
}
}
ProviderConfig::Provider(provider) => {
if km.source.node == our {
handle_local_request(our, km, send_to_loop, &provider, connections, public)
.await?
} else {
handle_remote_request(
our,
km,
send_to_loop,
Some(provider),
connections,
public,
)
.await?
}
}
} else {
// either someone asking us for rpc, or we are passing through a sub event.
handle_remote_request(our, km, send_to_loop, provider, connections, public).await?
}
}
Message::Response(_) => {
@ -255,7 +276,7 @@ async fn handle_remote_request(
our: &str,
km: &KernelMessage,
send_to_loop: &MessageSender,
provider: Arc<Option<Provider<PubSubFrontend>>>,
provider: Option<&Provider<PubSubFrontend>>,
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
public: Arc<bool>,
) -> Result<(), EthError> {
@ -265,7 +286,7 @@ async fn handle_remote_request(
));
};
if let Some(provider) = provider.as_ref() {
if let Some(provider) = provider {
// we need some sort of agreement perhaps on rpc providing.
// even with an agreement, fake ethsubevents could be sent to us.
// light clients could verify blocks perhaps...

View File

@ -2,6 +2,7 @@
use anyhow::Result;
use clap::{arg, value_parser, Command};
use rand::seq::SliceRandom;
use std::env;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
@ -111,14 +112,14 @@ async fn main() {
);
#[cfg(not(feature = "simulation-mode"))]
let app =
app.arg(arg!(--rpc <WS_URL> "Ethereum RPC endpoint (must be wss://)").required(false));
let app = app.arg(
arg!(--public "If set, allow rpc passthrough")
.default_value("false")
.value_parser(value_parser!(bool)),
);
let app = app
.arg(arg!(--rpc <WS_URL> "Ethereum RPC endpoint (must be wss://)").required(false))
.arg(arg!(--rpcnode <String> "RPC node provider must be a valid address").required(false))
.arg(
arg!(--public "If set, allow rpc passthrough")
.default_value("false")
.value_parser(value_parser!(bool)),
);
#[cfg(feature = "simulation-mode")]
let app = app
@ -177,8 +178,50 @@ async fn main() {
}
}
// default routers
type KnsUpdate = crate::net::KnsUpdate;
let routers: Vec<KnsUpdate> =
match fs::read_to_string(format!("{}/.routers", home_directory_path)).await {
Ok(contents) => serde_json::from_str(&contents).unwrap(),
Err(_) => {
let routers: Vec<KnsUpdate> = serde_json::from_str(DEFAULT_ROUTERS).unwrap();
routers
}
};
println!("wtf here are the matches: {:?}", matches);
#[cfg(not(feature = "simulation-mode"))]
let (rpc_url, is_detached) = (matches.get_one::<String>("rpc").cloned(), false);
#[cfg(not(feature = "simulation-mode"))]
let (rpc_node, _is_detached) = (matches.get_one::<String>("rpcnode").cloned(), false);
type ProviderInput = lib::eth::ProviderInput;
let eth_provider: ProviderInput;
match (rpc_url, rpc_node) {
(Some(url), Some(_)) => {
println!("passed both node and url for rpc, using url.");
eth_provider = ProviderInput::WS(url);
}
(Some(url), None) => {
eth_provider = ProviderInput::WS(url);
}
(None, Some(node)) => {
println!("trying to use node for rpc: {}", node);
eth_provider = ProviderInput::Node(node);
}
(None, None) => {
let random_router = routers.choose(&mut rand::thread_rng()).unwrap();
let default_router = random_router.name.clone();
println!(
"no rpc provided, using a default router: {}",
default_router
);
eth_provider = ProviderInput::Node(default_router);
}
}
#[cfg(feature = "simulation-mode")]
let (rpc_url, password, network_router_port, fake_node_name, is_detached) = (
@ -387,17 +430,6 @@ async fn main() {
}
};
// read in default routers .json file
type KnsUpdate = crate::net::KnsUpdate;
let routers: Vec<KnsUpdate> =
match fs::read_to_string(format!("{}/.routers", home_directory_path)).await {
Ok(contents) => serde_json::from_str(&contents).unwrap(),
Err(_) => {
let routers: Vec<KnsUpdate> = serde_json::from_str(DEFAULT_ROUTERS).unwrap();
routers
}
};
// the boolean flag determines whether the runtime module is *public* or not,
// where public means that any process can always message it.
#[allow(unused_mut)]
@ -550,7 +582,7 @@ async fn main() {
#[cfg(not(feature = "simulation-mode"))]
tasks.spawn(eth::provider::provider(
our.name.clone(),
rpc_url.clone(),
eth_provider,
public,
kernel_message_sender.clone(),
eth_provider_receiver,

View File

@ -90,3 +90,8 @@ pub fn to_static_str(method: &str) -> Option<&'static str> {
_ => None,
}
}
pub enum ProviderInput {
WS(String),
Node(String),
}