eth: passthrough through local

This commit is contained in:
bitful-pannul 2024-02-14 13:39:00 -03:00
parent a1373d2f96
commit 2eea02e34a
2 changed files with 211 additions and 76 deletions

View File

@ -1,3 +1,4 @@
use alloy_providers::provider::Provider;
use alloy_pubsub::{PubSubFrontend, RawSubscription};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types::pubsub::SubscriptionResult;
@ -6,6 +7,7 @@ use anyhow::Result;
use dashmap::DashMap;
use lib::types::core::*;
use lib::types::eth::*;
use std::str::FromStr;
use std::sync::Arc;
use tokio::task::JoinHandle;
use url::Url;
@ -48,7 +50,7 @@ pub async fn provider(
};
let client = ClientBuilder::default().ws(connector).await?;
Some(alloy_providers::provider::Provider::new_with_client(client))
Some(Provider::new_with_client(client))
} else {
None
};
@ -59,10 +61,6 @@ pub async fn provider(
let connections: DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>> = DashMap::new();
let connections = Arc::new(connections);
// passthrough responses
let responses: DashMap<u64, (u64, ProcessId)> = DashMap::new();
let responses = Arc::new(responses);
// add whitelist, logic in provider middleware?
let public = Arc::new(public);
@ -72,17 +70,15 @@ pub async fn provider(
let send_to_loop = send_to_loop.clone();
let provider = provider.clone();
let connections = connections.clone();
let responses = responses.clone();
let public = public.clone();
tokio::spawn(async move {
if let Err(e) = handle_request(
if let Err(e) = handle_message(
&our,
&km,
&send_to_loop,
provider.clone(),
connections.clone(),
responses.clone(),
public.clone(),
)
.await
@ -96,13 +92,171 @@ pub async fn provider(
Err(anyhow::anyhow!("eth: fatal: message receiver closed!"))
}
async fn handle_request(
async fn handle_message(
our: &str,
km: &KernelMessage,
send_to_loop: &MessageSender,
provider: Arc<Option<alloy_providers::provider::Provider<PubSubFrontend>>>,
provider: Arc<Option<Provider<PubSubFrontend>>>,
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
public: Arc<bool>,
) -> Result<(), EthError> {
match &km.message {
Message::Request(req) => {
if km.source.node == our {
if let Some(provider) = provider.as_ref() {
handle_local_request(our, km, send_to_loop, provider, connections, public)
.await?
} else {
// we have no provider, let's send this request to someone who has one.
let request = KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: "jugodenaranja.os".to_string(),
process: ETH_PROCESS_ID.clone(),
},
rsvp: Some(km.source.clone()),
message: Message::Request(req.clone()),
lazy_load_blob: None,
};
let _ = send_to_loop.send(request).await;
}
} else {
// either someone asking us for rpc, or we are passing through a sub event.
handle_remote_request(our, km, send_to_loop, provider, connections, public).await?
}
}
Message::Response(_) => {
// handle passthrough responses, send to rsvp.
if km.source.process == ProcessId::from_str("eth:distro:sys").unwrap() {
if let Some(rsvp) = &km.rsvp {
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: rsvp.clone(),
rsvp: None,
message: km.message.clone(),
lazy_load_blob: None,
})
.await;
}
}
}
}
Ok(())
}
async fn handle_local_request(
our: &str,
km: &KernelMessage,
send_to_loop: &MessageSender,
provider: &Provider<PubSubFrontend>,
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
public: Arc<bool>,
) -> Result<(), EthError> {
let Message::Request(req) = &km.message else {
return Err(EthError::InvalidMethod(
"eth: only accepts requests".to_string(),
));
};
let action = serde_json::from_slice::<EthAction>(&req.body).map_err(|e| {
EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e))
})?;
// we might want some of these in payloads.. sub items?
let return_body: EthResponse = match action {
EthAction::SubscribeLogs {
sub_id,
kind,
params,
} => {
let sub_id = (km.target.process.clone(), sub_id);
let kind = serde_json::to_value(&kind).unwrap();
let params = serde_json::to_value(&params).unwrap();
let id = provider
.inner()
.prepare("eth_subscribe", [kind, params])
.await
.map_err(|e| EthError::TransportError(e.to_string()))?;
let rx = provider.inner().get_raw_subscription(id).await;
let handle = tokio::spawn(handle_subscription_stream(
our.to_string(),
sub_id.1.clone(),
rx,
km.source.clone(),
km.rsvp.clone(),
send_to_loop.clone(),
));
connections.insert(sub_id, handle);
EthResponse::Ok
}
EthAction::UnsubscribeLogs(sub_id) => {
let sub_id = (km.target.process.clone(), sub_id);
let handle = connections
.remove(&sub_id)
.ok_or(EthError::SubscriptionNotFound)?;
handle.1.abort();
EthResponse::Ok
}
EthAction::Request { method, params } => {
let method = to_static_str(&method).ok_or(EthError::InvalidMethod(method))?;
let response: serde_json::Value = provider
.inner()
.prepare(method, params)
.await
.map_err(|e| EthError::TransportError(e.to_string()))?;
println!("got a normal request! ");
EthResponse::Response { value: response }
}
};
let response = KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: km.source.clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&return_body).unwrap(),
metadata: req.metadata.clone(),
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
};
let _ = send_to_loop.send(response).await;
Ok(())
}
// here we are either processing another nodes request.
// or we are passing through an ethSub Request..
async fn handle_remote_request(
our: &str,
km: &KernelMessage,
send_to_loop: &MessageSender,
provider: Arc<Option<Provider<PubSubFrontend>>>,
connections: Arc<DashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>>,
responses: Arc<DashMap<u64, (u64, ProcessId)>>,
public: Arc<bool>,
) -> Result<(), EthError> {
let Message::Request(req) = &km.message else {
@ -112,15 +266,17 @@ async fn handle_request(
};
if let Some(provider) = provider.as_ref() {
// we need some sort of agreement perhaps on rpc providing.
// even with an agreement, fake ethsubevents could be sent to us.
// light clients could verify blocks perhaps...
if !*public {
return Err(EthError::PermissionDenied("not on the list.".to_string()));
}
let action = serde_json::from_slice::<EthAction>(&req.body).map_err(|e| {
EthError::InvalidMethod(format!("eth: failed to deserialize request: {:?}", e))
})?;
if !*public && km.source.node != our {
return Err(EthError::PermissionDenied("not on the list.".to_string()));
}
// we might want some of these in payloads.. sub items?
let return_body: EthResponse = match action {
EthAction::SubscribeLogs {
sub_id,
@ -138,17 +294,13 @@ async fn handle_request(
.await
.map_err(|e| EthError::TransportError(e.to_string()))?;
let target = km.rsvp.clone().unwrap_or_else(|| Address {
node: our.to_string(),
process: km.source.process.clone(),
});
let rx = provider.inner().get_raw_subscription(id).await;
let handle = tokio::spawn(handle_subscription_stream(
our.to_string(),
sub_id.1.clone(),
rx,
target,
km.target.clone(),
km.rsvp.clone(),
send_to_loop.clone(),
));
@ -177,58 +329,48 @@ async fn handle_request(
}
};
// todo: fix km.clone() and metadata.clone()
if let Some(target) = km.clone().rsvp.or_else(|| {
req.expects_response.map(|_| Address {
node: our.to_string(),
process: km.source.process.clone(),
})
}) {
let response = KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&return_body).unwrap(),
metadata: req.metadata.clone(),
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
};
// Send the response, handling potential errors appropriately
let _ = send_to_loop.send(response).await;
};
} else {
// passthrough
// if node == our, forward to provider
// hoping that rsvp can fix the rest.
let request = KernelMessage {
let response = KernelMessage {
id: km.id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: Address {
node: "jugodenaranja.os".to_string(),
process: ETH_PROCESS_ID.clone(),
},
rsvp: Some(km.source.clone()),
message: Message::Request(req.clone()),
target: km.source.clone(),
rsvp: km.rsvp.clone(),
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&return_body).unwrap(),
metadata: req.metadata.clone(),
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
};
let _ = send_to_loop.send(request).await;
let _ = send_to_loop.send(response).await;
} else {
// We do not have a provider, this is a reply for a request made by us.
if let Ok(eth_sub) = serde_json::from_slice::<EthSub>(&req.body) {
// forward...
if let Some(target) = km.rsvp.clone() {
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target,
rsvp: None,
message: Message::Request(req.clone()),
lazy_load_blob: None,
})
.await;
}
}
}
Ok(())
}
@ -240,15 +382,15 @@ async fn handle_subscription_stream(
sub_id: u64,
mut rx: RawSubscription,
target: Address,
rsvp: Option<Address>,
send_to_loop: MessageSender,
) -> Result<(), EthError> {
match rx.recv().await {
Err(e) => {
println!("got an error from the subscription stream: {:?}", e);
// TODO should we stop the subscription here?
// return Err(EthError::TransportError??(format!("{:?}", e)));
return Err(EthError::SubscriptionClosed(sub_id))?;
}
Ok(value) => {
// this should not return in case of one failed event?
let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| {
EthError::RpcError("eth: failed to deserialize subscription result".to_string())
})?;
@ -260,7 +402,7 @@ async fn handle_subscription_stream(
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
rsvp: rsvp.clone(),
message: Message::Request(Request {
inherit: false,
expects_response: None,

View File

@ -394,13 +394,6 @@ async fn main() {
Ok(contents) => serde_json::from_str(&contents).unwrap(),
Err(_) => {
let routers: Vec<KnsUpdate> = serde_json::from_str(DEFAULT_ROUTERS).unwrap();
// should we dump them into a .routers file?
// fs::write(
// format!("{}/.routers", home_directory_path),
// serde_json::to_string(&routers).unwrap(),
// )
// .await
// .unwrap();
routers
}
};