mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-23 03:44:04 +03:00
WIP scaffolding multi-chain multi-provider
This commit is contained in:
parent
b52768d179
commit
96f47a77cb
@ -46,6 +46,11 @@ fn build_and_zip_package(
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
if std::env::var("SKIP_BUILD_SCRIPT").is_ok() {
|
||||
println!("Skipping build script");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let pwd = std::env::current_dir()?;
|
||||
let parent_dir = pwd.parent().unwrap();
|
||||
let packages_dir = pwd.join("packages");
|
||||
|
@ -1,29 +1,66 @@
|
||||
[
|
||||
{
|
||||
"name": "default-router-1.os",
|
||||
"owner": "",
|
||||
"node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d",
|
||||
"public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9005,
|
||||
"routers": []
|
||||
"chain_id": 1,
|
||||
"trusted": false,
|
||||
"provider": {
|
||||
"RpcUrl": "wss://ethereum.publicnode.com"
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
},
|
||||
{
|
||||
"name": "default-router-2.os",
|
||||
"owner": "",
|
||||
"node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458",
|
||||
"public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9006,
|
||||
"routers": []
|
||||
"chain_id": 10,
|
||||
"trusted": true,
|
||||
"provider": {
|
||||
"Node": {
|
||||
"name": "default-router-1.os",
|
||||
"owner": "",
|
||||
"node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d",
|
||||
"public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9005,
|
||||
"routers": []
|
||||
}
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
},
|
||||
{
|
||||
"name": "default-router-3.os",
|
||||
"owner": "",
|
||||
"node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a",
|
||||
"public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9007,
|
||||
"routers": []
|
||||
"chain_id": 10,
|
||||
"trusted": true,
|
||||
"provider": {
|
||||
"Node": {
|
||||
"name": "default-router-2.os",
|
||||
"owner": "",
|
||||
"node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458",
|
||||
"public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9006,
|
||||
"routers": []
|
||||
}
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
},
|
||||
{
|
||||
"chain_id": 10,
|
||||
"trusted": true,
|
||||
"provider": {
|
||||
"Node": {
|
||||
"name": "default-router-3.os",
|
||||
"owner": "",
|
||||
"node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a",
|
||||
"public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9007,
|
||||
"routers": []
|
||||
}
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
}
|
||||
]
|
@ -1,29 +1,66 @@
|
||||
[
|
||||
{
|
||||
"name": "default-router-1.os",
|
||||
"owner": "",
|
||||
"node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d",
|
||||
"public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9002,
|
||||
"routers": []
|
||||
"chain_id": 1,
|
||||
"trusted": false,
|
||||
"provider": {
|
||||
"RpcUrl": "wss://ethereum.publicnode.com"
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
},
|
||||
{
|
||||
"name": "default-router-2.os",
|
||||
"owner": "",
|
||||
"node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458",
|
||||
"public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9003,
|
||||
"routers": []
|
||||
"chain_id": 11155111,
|
||||
"trusted": true,
|
||||
"provider": {
|
||||
"Node": {
|
||||
"name": "default-router-1.os",
|
||||
"owner": "",
|
||||
"node": "0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d",
|
||||
"public_key": "0xb1b1cf23c89f651aac3e5fd4decb04aa177ab0ec8ce5f1d3877b90bb6f5779db",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9002,
|
||||
"routers": []
|
||||
}
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
},
|
||||
{
|
||||
"name": "default-router-3.os",
|
||||
"owner": "",
|
||||
"node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a",
|
||||
"public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9004,
|
||||
"routers": []
|
||||
"chain_id": 11155111,
|
||||
"trusted": true,
|
||||
"provider": {
|
||||
"Node": {
|
||||
"name": "default-router-2.os",
|
||||
"owner": "",
|
||||
"node": "0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458",
|
||||
"public_key": "0xab9f1a996db3a4e1dbcd31d765daedeb3af9af4f570c0968463b5be3a7d1e992",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9003,
|
||||
"routers": []
|
||||
}
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
},
|
||||
{
|
||||
"chain_id": 11155111,
|
||||
"trusted": true,
|
||||
"provider": {
|
||||
"Node": {
|
||||
"name": "default-router-3.os",
|
||||
"owner": "",
|
||||
"node": "0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a",
|
||||
"public_key": "0x536e30785e64dd0349a697285af365b5ed7c4d300010139261cfc4dbdd5b254b",
|
||||
"ip": "147.135.114.167",
|
||||
"port": 9004,
|
||||
"routers": []
|
||||
}
|
||||
},
|
||||
"public": false,
|
||||
"allow": [],
|
||||
"deny": []
|
||||
}
|
||||
]
|
@ -8,161 +8,289 @@ use dashmap::DashMap;
|
||||
use lib::types::core::*;
|
||||
use lib::types::eth::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
use url::Url;
|
||||
|
||||
/// mapping of chain id to ordered(TODO) list of providers
|
||||
type Providers = Arc<DashMap<u64, ActiveProviders>>;
|
||||
|
||||
struct ActiveProviders {
|
||||
pub urls: Vec<UrlProvider>,
|
||||
pub nodes: Vec<NodeProvider>,
|
||||
}
|
||||
|
||||
struct UrlProvider {
|
||||
pub trusted: bool,
|
||||
pub url: String,
|
||||
pub pubsub: Option<Provider<PubSubFrontend>>,
|
||||
}
|
||||
|
||||
struct NodeProvider {
|
||||
pub trusted: bool,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
/// existing subscriptions held by local processes
|
||||
type ActiveSubscriptions = Arc<DashMap<ProcessId, HashMap<u64, ActiveSub>>>;
|
||||
|
||||
enum ActiveSub {
|
||||
Local(JoinHandle<Result<(), EthError>>),
|
||||
Remote(String), // name of node providing this subscription for us
|
||||
}
|
||||
|
||||
impl ActiveProviders {
|
||||
fn add_provider_config(&mut self, new: ProviderConfig) {
|
||||
match new.provider {
|
||||
NodeOrRpcUrl::Node(update) => {
|
||||
self.nodes.push(NodeProvider {
|
||||
trusted: new.trusted,
|
||||
name: update.name,
|
||||
});
|
||||
}
|
||||
NodeOrRpcUrl::RpcUrl(url) => {
|
||||
self.urls.push(UrlProvider {
|
||||
trusted: new.trusted,
|
||||
url,
|
||||
pubsub: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_provider(&mut self, remove: &str) {
|
||||
self.urls.retain(|x| x.url != remove);
|
||||
self.nodes.retain(|x| x.name != remove);
|
||||
}
|
||||
}
|
||||
|
||||
async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> {
|
||||
match Url::parse(&provider.url)?.scheme() {
|
||||
"ws" | "wss" => {
|
||||
let connector = WsConnect {
|
||||
url: provider.url.to_string(),
|
||||
auth: None,
|
||||
};
|
||||
let client = ClientBuilder::default().ws(connector).await?;
|
||||
provider.pubsub = Some(Provider::new_with_client(client));
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"Only `ws://` or `wss://` providers are supported."
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// The ETH provider runtime process is responsible for connecting to one or more ETH RPC providers
|
||||
/// and using them to service indexing requests from other apps. This could also be done by a Wasm
|
||||
/// app, but in the future, this process will hopefully expand in scope to perform more complex
|
||||
/// indexing and ETH node responsibilities.
|
||||
/// and using them to service indexing requests from other apps.
|
||||
pub async fn provider(
|
||||
our: String,
|
||||
configs: SavedConfigs,
|
||||
send_to_loop: MessageSender,
|
||||
mut recv_in_client: MessageReceiver,
|
||||
caps_oracle: CapMessageSender,
|
||||
print_tx: PrintSender,
|
||||
) -> Result<()> {
|
||||
let our = Arc::new(our);
|
||||
|
||||
// // Initialize the provider conditionally based on rpc_url
|
||||
// // Todo: make provider<T> support multiple transports, one direct and another passthrough.
|
||||
// let provider_config = match provider_node {
|
||||
// ProviderInput::Ws(rpc_url) => {
|
||||
// // Validate and parse the WebSocket URL
|
||||
// match Url::parse(&rpc_url)?.scheme() {
|
||||
// "ws" | "wss" => {
|
||||
// let connector = WsConnect {
|
||||
// url: rpc_url,
|
||||
// auth: None,
|
||||
// };
|
||||
// let client = ClientBuilder::default().ws(connector).await?;
|
||||
// ProviderConfig::Provider(Provider::new_with_client(client))
|
||||
// }
|
||||
// _ => {
|
||||
// return Err(anyhow::anyhow!(
|
||||
// "Only `ws://` or `wss://` URLs are supported."
|
||||
// ))
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// ProviderInput::Node(node_id) => {
|
||||
// // Directly use the node ID
|
||||
// ProviderConfig::Node(node_id)
|
||||
// }
|
||||
// };
|
||||
let mut access_settings = AccessSettings {
|
||||
public: false,
|
||||
allow: HashSet::new(),
|
||||
deny: HashSet::new(),
|
||||
};
|
||||
|
||||
// let provider_config = Arc::new(provider_config);
|
||||
// convert saved configs into data structure that we will use to route queries
|
||||
let mut providers: Providers = Arc::new(DashMap::new());
|
||||
for entry in configs {
|
||||
let mut ap = providers.entry(entry.chain_id).or_insert(ActiveProviders {
|
||||
urls: vec![],
|
||||
nodes: vec![],
|
||||
});
|
||||
ap.add_provider_config(entry);
|
||||
}
|
||||
|
||||
// // handles of longrunning subscriptions.
|
||||
// let connections: DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>> = DashMap::new();
|
||||
// let connections = Arc::new(connections);
|
||||
// handles of longrunning subscriptions.
|
||||
let mut active_subscriptions: ActiveSubscriptions = Arc::new(DashMap::new());
|
||||
|
||||
// while let Some(km) = recv_in_client.recv().await {
|
||||
// // clone Arcs
|
||||
// let our = our.clone();
|
||||
// let send_to_loop = send_to_loop.clone();
|
||||
// let provider_config = provider_config.clone();
|
||||
// let connections = connections.clone();
|
||||
// tokio::spawn(async move {
|
||||
// if let Err(e) = handle_message(
|
||||
// &our,
|
||||
// &km,
|
||||
// &send_to_loop,
|
||||
// provider_config,
|
||||
// connections,
|
||||
// public,
|
||||
// )
|
||||
// .await
|
||||
// {
|
||||
// let _ = send_to_loop
|
||||
// .send(make_error_message(our.to_string(), km, e))
|
||||
// .await;
|
||||
// };
|
||||
// });
|
||||
// }
|
||||
while let Some(km) = recv_in_client.recv().await {
|
||||
let km_id = km.id;
|
||||
let response_target = km.rsvp.as_ref().unwrap_or(&km.source).clone();
|
||||
if let Err(e) = handle_message(
|
||||
&our,
|
||||
&mut access_settings,
|
||||
km,
|
||||
&send_to_loop,
|
||||
&caps_oracle,
|
||||
&mut providers,
|
||||
&mut active_subscriptions,
|
||||
)
|
||||
.await
|
||||
{
|
||||
send_to_loop
|
||||
.send(make_error_message(&our, km_id, response_target, e))
|
||||
.await
|
||||
.expect("eth: kernel sender died!");
|
||||
};
|
||||
}
|
||||
Err(anyhow::anyhow!("eth: fatal: message receiver closed!"))
|
||||
}
|
||||
|
||||
/// Handle an incoming message.
|
||||
// async fn handle_message(
|
||||
// our: &str,
|
||||
// km: &KernelMessage,
|
||||
// send_to_loop: &MessageSender,
|
||||
// provider_config: Arc<ProviderConfig>,
|
||||
// connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
|
||||
// public: bool,
|
||||
// ) -> Result<(), EthError> {
|
||||
// match &km.message {
|
||||
// Message::Request(req) => {
|
||||
// match &*provider_config {
|
||||
// ProviderConfig::Node(node) => {
|
||||
// if km.source.node == our {
|
||||
// // we have no provider, let's send this request to someone who has one.
|
||||
// let request = KernelMessage {
|
||||
// id: km.id,
|
||||
// source: Address {
|
||||
// node: our.to_string(),
|
||||
// process: ETH_PROCESS_ID.clone(),
|
||||
// },
|
||||
// target: Address {
|
||||
// node: "jugodenaranja.os".to_string(),
|
||||
// process: ETH_PROCESS_ID.clone(),
|
||||
// },
|
||||
// rsvp: Some(km.source.clone()),
|
||||
// message: Message::Request(req.clone()),
|
||||
// lazy_load_blob: None,
|
||||
// };
|
||||
/// handle incoming requests, namely [`EthAction`] and [`EthConfigAction`].
|
||||
/// also handle responses that are passthroughs from remote provider nodes.
|
||||
async fn handle_message(
|
||||
our: &str,
|
||||
access_settings: &mut AccessSettings,
|
||||
km: KernelMessage,
|
||||
send_to_loop: &MessageSender,
|
||||
caps_oracle: &CapMessageSender,
|
||||
providers: &mut Providers,
|
||||
active_subscriptions: &mut ActiveSubscriptions,
|
||||
) -> Result<(), EthError> {
|
||||
match &km.message {
|
||||
Message::Response(_) => handle_passthrough_response(our, send_to_loop, km).await,
|
||||
Message::Request(req) => {
|
||||
if let Ok(eth_action) = serde_json::from_slice(&req.body) {
|
||||
// these can be from remote or local processes
|
||||
return handle_eth_action(
|
||||
our,
|
||||
access_settings,
|
||||
km,
|
||||
eth_action,
|
||||
providers,
|
||||
active_subscriptions,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
if let Ok(eth_config_action) = serde_json::from_slice(&req.body) {
|
||||
// only local node
|
||||
return handle_eth_config_action(
|
||||
our,
|
||||
access_settings,
|
||||
caps_oracle,
|
||||
km,
|
||||
eth_config_action,
|
||||
providers,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(EthError::PermissionDenied)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// let _ = send_to_loop.send(request).await;
|
||||
// } else {
|
||||
// // either someone asking us for rpc, or we are passing through a sub event.
|
||||
// handle_remote_request(our, km, send_to_loop, None, connections, public)
|
||||
// .await?
|
||||
// }
|
||||
// }
|
||||
// ProviderConfig::Provider(provider) => {
|
||||
// if km.source.node == our {
|
||||
// handle_local_request(our, km, send_to_loop, &provider, connections, public)
|
||||
// .await?
|
||||
// } else {
|
||||
// handle_remote_request(
|
||||
// our,
|
||||
// km,
|
||||
// send_to_loop,
|
||||
// Some(provider),
|
||||
// connections,
|
||||
// public,
|
||||
// )
|
||||
// .await?
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Message::Response(_) => {
|
||||
// // handle passthrough responses, send to rsvp.
|
||||
// if km.source.process == "eth:distro:sys" {
|
||||
// if let Some(rsvp) = &km.rsvp {
|
||||
// let _ = send_to_loop
|
||||
// .send(KernelMessage {
|
||||
// id: rand::random(),
|
||||
// source: Address {
|
||||
// node: our.to_string(),
|
||||
// process: ETH_PROCESS_ID.clone(),
|
||||
// },
|
||||
// target: rsvp.clone(),
|
||||
// rsvp: None,
|
||||
// message: km.message.clone(),
|
||||
// lazy_load_blob: None,
|
||||
// })
|
||||
// .await;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
async fn handle_passthrough_response(
|
||||
our: &str,
|
||||
send_to_loop: &MessageSender,
|
||||
km: KernelMessage,
|
||||
) -> Result<(), EthError> {
|
||||
send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: km.rsvp.unwrap_or(km.source),
|
||||
rsvp: None,
|
||||
message: km.message,
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.expect("eth: kernel sender died!");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_eth_action(
|
||||
our: &str,
|
||||
access_settings: &mut AccessSettings,
|
||||
km: KernelMessage,
|
||||
eth_action: EthAction,
|
||||
providers: &mut Providers,
|
||||
active_subscriptions: &mut ActiveSubscriptions,
|
||||
) -> Result<(), EthError> {
|
||||
// check our access settings if the request is from a remote node
|
||||
if km.source.node != our {
|
||||
if !access_settings.deny.contains(&km.source.node) {
|
||||
if !access_settings.public {
|
||||
if !access_settings.allow.contains(&km.source.node) {
|
||||
return Err(EthError::PermissionDenied);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(EthError::PermissionDenied);
|
||||
}
|
||||
}
|
||||
|
||||
// for each incoming action, we need to assign a provider from our map
|
||||
// based on the chain id. once we assign a provider, we can use it for
|
||||
// this request. if the provider is not usable, cycle through options
|
||||
// before returning an error.
|
||||
match eth_action {
|
||||
EthAction::SubscribeLogs {
|
||||
sub_id,
|
||||
chain_id,
|
||||
kind,
|
||||
params,
|
||||
} => {
|
||||
todo!()
|
||||
}
|
||||
EthAction::UnsubscribeLogs(sub_id) => {
|
||||
active_subscriptions
|
||||
.entry(km.source.process)
|
||||
.and_modify(|sub_map| {
|
||||
sub_map.remove(&sub_id);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
EthAction::Request {
|
||||
chain_id,
|
||||
method,
|
||||
params,
|
||||
} => {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_eth_config_action(
|
||||
our: &str,
|
||||
access_settings: &mut AccessSettings,
|
||||
caps_oracle: &CapMessageSender,
|
||||
km: KernelMessage,
|
||||
eth_config_action: EthConfigAction,
|
||||
providers: &mut Providers,
|
||||
) -> Result<(), EthError> {
|
||||
if km.source.node != our {
|
||||
return Err(EthError::PermissionDenied);
|
||||
}
|
||||
// check capabilities to ensure the sender is allowed to make this request
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: km.source.process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
"root": true,
|
||||
}))
|
||||
.unwrap(),
|
||||
},
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await
|
||||
.expect("eth: capability oracle died!");
|
||||
if !recv_cap_bool.await.unwrap_or(false) {
|
||||
return Err(EthError::PermissionDenied);
|
||||
}
|
||||
|
||||
// modify our providers and access settings based on config action
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Handle a local request.
|
||||
// async fn handle_local_request(
|
||||
@ -396,79 +524,55 @@ async fn handle_subscription_stream(
|
||||
rsvp: Option<Address>,
|
||||
send_to_loop: MessageSender,
|
||||
) -> Result<(), EthError> {
|
||||
match rx.recv().await {
|
||||
Err(e) => {
|
||||
let error = Err(EthError::SubscriptionClosed(sub_id))?;
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our,
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target.clone(),
|
||||
rsvp: rsvp.clone(),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSubResult::Err(EthSubError {
|
||||
id: sub_id,
|
||||
error: e.to_string(),
|
||||
}))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Ok(value) => {
|
||||
let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| {
|
||||
EthError::RpcError("eth: failed to deserialize subscription result".to_string())
|
||||
})?;
|
||||
send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our,
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target.clone(),
|
||||
rsvp: rsvp.clone(),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSubResult::Ok(EthSub {
|
||||
id: sub_id,
|
||||
result: event,
|
||||
}))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Err(e) => {
|
||||
return Err(EthError::SubscriptionClosed(sub_id));
|
||||
}
|
||||
Ok(value) => {
|
||||
let result: SubscriptionResult =
|
||||
serde_json::from_str(value.get()).map_err(|_| {
|
||||
EthError::RpcError(
|
||||
"eth: failed to deserialize subscription result".to_string(),
|
||||
)
|
||||
})?;
|
||||
send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our.clone(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target.clone(),
|
||||
rsvp: rsvp.clone(),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSubResult::Ok(EthSub {
|
||||
id: sub_id,
|
||||
result,
|
||||
}))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(EthError::SubscriptionClosed(sub_id))
|
||||
}
|
||||
|
||||
fn make_error_message(our_node: String, km: KernelMessage, error: EthError) -> KernelMessage {
|
||||
let source = km.rsvp.unwrap_or_else(|| Address {
|
||||
node: our_node.clone(),
|
||||
process: km.source.process.clone(),
|
||||
});
|
||||
fn make_error_message(our: &str, id: u64, target: Address, error: EthError) -> KernelMessage {
|
||||
KernelMessage {
|
||||
id: km.id,
|
||||
id,
|
||||
source: Address {
|
||||
node: our_node,
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: source,
|
||||
target,
|
||||
rsvp: None,
|
||||
message: Message::Response((
|
||||
Response {
|
||||
|
@ -554,6 +554,7 @@ async fn main() {
|
||||
eth_provider_config,
|
||||
kernel_message_sender.clone(),
|
||||
eth_provider_receiver,
|
||||
caps_oracle_sender.clone(),
|
||||
print_sender.clone(),
|
||||
));
|
||||
#[cfg(feature = "simulation-mode")]
|
||||
|
@ -1,7 +1,9 @@
|
||||
use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
|
||||
/// The Action and Request type that can be made to eth:distro:sys.
|
||||
/// The Action and Request type that can be made to eth:distro:sys. Any process with messaging
|
||||
/// capabilities can send this action to the eth provider.
|
||||
///
|
||||
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@ -10,6 +12,7 @@ pub enum EthAction {
|
||||
/// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults
|
||||
SubscribeLogs {
|
||||
sub_id: u64,
|
||||
chain_id: u64,
|
||||
kind: SubscriptionKind,
|
||||
params: Params,
|
||||
},
|
||||
@ -17,22 +20,25 @@ pub enum EthAction {
|
||||
UnsubscribeLogs(u64),
|
||||
/// Raw request. Used by kinode_process_lib.
|
||||
Request {
|
||||
chain_id: u64,
|
||||
method: String,
|
||||
params: serde_json::Value,
|
||||
},
|
||||
}
|
||||
|
||||
/// Incoming Result type for subscription updates or errors that processes will receive.
|
||||
/// Can deserialize all incoming requests from eth:distro:sys to this type.
|
||||
///
|
||||
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
|
||||
pub type EthSubResult = Result<EthSub, EthSubError>;
|
||||
|
||||
/// Incoming Request type for subscription updates.
|
||||
/// Incoming Request type for successful subscription updates.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EthSub {
|
||||
pub id: u64,
|
||||
pub result: SubscriptionResult,
|
||||
}
|
||||
|
||||
/// Incoming Request for subscription errors that processes will receive.
|
||||
/// If your subscription is closed unexpectedly, you will receive this.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EthSubError {
|
||||
@ -61,11 +67,52 @@ pub enum EthError {
|
||||
/// Invalid method
|
||||
InvalidMethod(String),
|
||||
/// Permission denied
|
||||
PermissionDenied(String),
|
||||
PermissionDenied,
|
||||
/// Internal RPC error
|
||||
RpcError(String),
|
||||
}
|
||||
|
||||
/// The action type used for configuring eth:distro:sys. Only processes which have the "root"
|
||||
/// capability from eth:distro:sys can successfully send this action.
|
||||
///
|
||||
/// NOTE: changes to config will not be persisted between boots, they must be saved in .env
|
||||
/// to be reflected between boots. TODO: can change this
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum EthConfigAction {
|
||||
/// Add a new provider to the list of providers.
|
||||
AddProvider(ProviderConfig),
|
||||
/// Remove a provider from the list of providers.
|
||||
/// The tuple is (chain_id, node_id/rpc_url).
|
||||
RemoveProvider((u64, String)),
|
||||
/// make our provider public
|
||||
SetPublic,
|
||||
/// make our provider not-public
|
||||
SetPrivate,
|
||||
/// add node to whitelist on a provider
|
||||
AllowNode(String),
|
||||
/// remove node from whitelist on a provider
|
||||
UnallowNode(String),
|
||||
/// add node to blacklist on a provider
|
||||
DenyNode(String),
|
||||
/// remove node from blacklist on a provider
|
||||
UndenyNode(String),
|
||||
/// Set the list of providers to a new list.
|
||||
/// Replaces all existing saved provider configs.
|
||||
SetProviders(SavedConfigs),
|
||||
/// Get the list of as a [`SavedConfigs`] object.
|
||||
GetProviders,
|
||||
}
|
||||
|
||||
/// Response type from an [`EthConfigAction`] request.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum EthConfigResponse {
|
||||
Ok,
|
||||
/// Response from a GetProviders request.
|
||||
Providers(SavedConfigs),
|
||||
/// Permission denied due to missing capability
|
||||
PermissionDenied,
|
||||
}
|
||||
|
||||
//
|
||||
// Internal types
|
||||
//
|
||||
@ -103,13 +150,19 @@ pub fn to_static_str(method: &str) -> Option<&'static str> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Settings for our ETH provider
|
||||
pub struct AccessSettings {
|
||||
pub public: bool, // whether or not other nodes can access through us
|
||||
pub allow: HashSet<String>, // whitelist for access (only used if public == false)
|
||||
pub deny: HashSet<String>, // blacklist for access (always used)
|
||||
}
|
||||
|
||||
pub type SavedConfigs = Vec<ProviderConfig>;
|
||||
|
||||
/// Provider config. Can currently be a node or a ws provider instance.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct ProviderConfig {
|
||||
pub chain_id: u64,
|
||||
pub usable: bool,
|
||||
pub trusted: bool,
|
||||
pub provider: NodeOrRpcUrl,
|
||||
}
|
||||
@ -119,3 +172,12 @@ pub enum NodeOrRpcUrl {
|
||||
Node(crate::core::KnsUpdate),
|
||||
RpcUrl(String),
|
||||
}
|
||||
|
||||
impl std::cmp::PartialEq<str> for NodeOrRpcUrl {
|
||||
fn eq(&self, other: &str) -> bool {
|
||||
match self {
|
||||
NodeOrRpcUrl::Node(kns) => kns.name == other,
|
||||
NodeOrRpcUrl::RpcUrl(url) => url == other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user