little fixes

This commit is contained in:
dr-frmr 2024-02-28 19:24:37 -03:00
parent 3299e3135b
commit 1b012d515f
No known key found for this signature in database
4 changed files with 78 additions and 59 deletions

View File

@ -1,4 +1,14 @@
[
{
"chain_id": 1,
"trusted": false,
"provider": {
"RpcUrl": "wss://ethereum.publicnode.com"
},
"public": false,
"allow": [],
"deny": []
},
{
"chain_id": 11155111,
"trusted": true,
@ -13,9 +23,9 @@
"ip": "",
"port": 0,
"routers": [
"default-router-1.os",
"default-router-2.os",
"default-router-3.os"
"0xb35eb347deb896bc3fb6132a07fca1601f83462385ed11e835c24c33ba4ef73d",
"0xd827ae579fafa604af79fbed977e8abe048497f10885c6473dfd343a3b7b4458",
"0x96e36331c8f0882f2c0c46c13b15d812def04fe8606d503bc0e2be39db26486a"
]
}
}

View File

@ -205,7 +205,7 @@ async fn handle_message(
Ok(())
}
Message::Request(req) => {
let timeout = *req.expects_response.as_ref().unwrap_or(&60); // TODO make this a config
let timeout = *req.expects_response.as_ref().unwrap_or(&600); // TODO make this a config
let Ok(req) = serde_json::from_slice::<IncomingReq>(&req.body) else {
return Err(EthError::MalformedRequest);
};
@ -278,17 +278,17 @@ async fn handle_eth_action(
response_channels: &mut ResponseChannels,
) -> Result<(), EthError> {
println!("provider: handle_eth_action: {eth_action:?}\r");
println!("access settings: {access_settings:?}\r");
// check our access settings if the request is from a remote node
if km.source.node != our {
if !access_settings.deny.contains(&km.source.node) {
if !access_settings.public {
if !access_settings.allow.contains(&km.source.node) {
return Err(EthError::PermissionDenied);
}
}
} else {
if access_settings.deny.contains(&km.source.node) {
return Err(EthError::PermissionDenied);
}
if !access_settings.public {
if !access_settings.allow.contains(&km.source.node) {
return Err(EthError::PermissionDenied);
}
}
}
// for each incoming action, we need to assign a provider from our map
@ -330,7 +330,7 @@ async fn handle_eth_action(
},
None,
true,
Some(60), // TODO
Some(600), // TODO
serde_json::to_vec(&eth_action).unwrap(),
send_to_loop,
)
@ -346,36 +346,36 @@ async fn handle_eth_action(
let send_to_loop = send_to_loop.clone();
let providers = providers.clone();
let response_channels = response_channels.clone();
tokio::spawn(async move {
let res = tokio::time::timeout(
std::time::Duration::from_secs(timeout),
fulfill_request(&our, km.id, &send_to_loop, eth_action, providers, receiver),
)
.await;
match res {
Ok(Ok(response)) => {
kernel_message(
&our,
km.id,
km.source,
km.rsvp,
false,
None,
response,
&send_to_loop,
)
.await;
}
Ok(Err(e)) => {
error_message(&our, km.id, km.source, e, &send_to_loop).await;
}
Err(_) => {
error_message(&our, km.id, km.source, EthError::RpcTimeout, &send_to_loop)
.await;
}
// tokio::spawn(async move {
let res = tokio::time::timeout(
std::time::Duration::from_secs(timeout),
fulfill_request(&our, km.id, &send_to_loop, eth_action, providers, receiver),
)
.await;
match res {
Ok(Ok(response)) => {
kernel_message(
&our,
km.id,
km.source,
km.rsvp,
false,
None,
response,
&send_to_loop,
)
.await;
}
response_channels.remove(&km.id);
});
Ok(Err(e)) => {
error_message(&our, km.id, km.source, e, &send_to_loop).await;
}
Err(_) => {
error_message(&our, km.id, km.source, EthError::RpcTimeout, &send_to_loop)
.await;
}
}
response_channels.remove(&km.id);
// });
}
}
Ok(())
@ -538,7 +538,7 @@ async fn build_subscription(
},
None,
true,
Some(60), // TODO
Some(600), // TODO
serde_json::to_vec(&eth_action).unwrap(),
&send_to_loop,
)
@ -664,7 +664,7 @@ async fn fulfill_request(
return Ok(EthResponse::Response { value });
}
for node_provider in &mut aps.nodes {
if !node_provider.usable {
if !node_provider.usable || node_provider.name == our {
continue;
}
// in order, forward the request to each node provider
@ -678,8 +678,8 @@ async fn fulfill_request(
},
None,
true,
Some(60), // TODO
serde_json::to_vec(&eth_action).unwrap(),
Some(600), // TODO
eth_action.clone(),
&send_to_loop,
)
.await;
@ -743,8 +743,10 @@ async fn handle_eth_config_action(
.await
.expect("eth: capability oracle died!");
if !recv_cap_bool.await.unwrap_or(false) {
println!("eth: capability oracle denied request, no cap\r");
return EthConfigResponse::PermissionDenied;
}
println!("cap valid\r");
// modify our providers and access settings based on config action
match eth_config_action {
@ -763,9 +765,11 @@ async fn handle_eth_config_action(
}
}
EthConfigAction::SetPublic => {
println!("set public\r");
access_settings.public = true;
}
EthConfigAction::SetPrivate => {
println!("set private\r");
access_settings.public = false;
}
EthConfigAction::AllowNode(node) => {

View File

@ -453,13 +453,6 @@ async fn handle_kernel_request(
// brutal and savage killing: aborting the task.
// do not do this to a process if you don't want to risk
// dropped messages / un-replied-to-requests / revoked caps
caps_oracle
.send(t::CapMessage::RevokeAll {
on: process_id.clone(),
responder: tokio::sync::oneshot::channel().0,
})
.await
.expect("event loop: fatal: sender died");
let _ = senders.remove(&process_id);
let process_handle = match process_handles.remove(&process_id) {
Some(ph) => ph,
@ -481,7 +474,13 @@ async fn handle_kernel_request(
.await;
process_handle.abort();
process_map.remove(&process_id);
let _ = persist_state(&our_name, &send_to_loop, process_map).await;
caps_oracle
.send(t::CapMessage::RevokeAll {
on: process_id.clone(),
responder: tokio::sync::oneshot::channel().0,
})
.await
.expect("event loop: fatal: sender died");
if request.expects_response.is_none() {
return;
}
@ -1116,6 +1115,12 @@ pub async fn kernel(
},
// capabilities oracle: handles all requests to add, drop, and check capabilities
Some(cap_message) = caps_oracle_receiver.recv() => {
let _ = send_to_terminal.send(
t::Printout {
verbosity: 3,
content: format!("{cap_message:?}")
}
).await;
match cap_message {
t::CapMessage::Add { on, caps, responder } => {
// insert cap in process map
@ -1173,16 +1178,16 @@ pub async fn kernel(
},
t::CapMessage::RevokeAll { on, responder } => {
let Some(granter) = reverse_cap_index.get(&on) else {
let _ = persist_state(&our.name, &send_to_loop, &process_map).await;
let _ = responder.send(true);
continue;
};
for (grantee, caps) in granter {
let Some(entry) = process_map.get_mut(&grantee) else {
continue;
if let Some(entry) = process_map.get_mut(&grantee) {
for cap in caps {
entry.capabilities.remove(&cap);
}
};
for cap in caps {
entry.capabilities.remove(&cap);
}
}
let _ = persist_state(&our.name, &send_to_loop, &process_map).await;
let _ = responder.send(true);

View File

@ -6,7 +6,7 @@ use std::collections::HashSet;
/// capabilities can send this action to the eth provider.
///
/// Will be serialized and deserialized using `serde_json::to_vec` and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
/// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults