This commit is contained in:
dr-frmr 2024-02-22 12:12:33 -03:00
parent 6206ad50bf
commit 31309454e9
No known key found for this signature in database
7 changed files with 136 additions and 59 deletions

24
Cargo.lock generated
View File

@ -407,7 +407,7 @@ dependencies = [
"alloy-sol-types",
"anyhow",
"bincode",
"kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=3232423)",
"kinode_process_lib 0.6.0",
"rand 0.8.5",
"serde",
"serde_json",
@ -2631,6 +2631,26 @@ dependencies = [
"lib",
]
[[package]]
name = "kinode_process_lib"
version = "0.6.0"
dependencies = [
"alloy-json-rpc",
"alloy-primitives",
"alloy-rpc-types",
"alloy-transport",
"anyhow",
"bincode",
"http 1.0.0",
"mime_guess",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"url",
"wit-bindgen",
]
[[package]]
name = "kinode_process_lib"
version = "0.6.0"
@ -2724,7 +2744,7 @@ dependencies = [
"anyhow",
"bincode",
"hex",
"kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=3232423)",
"kinode_process_lib 0.6.0",
"rmp-serde",
"serde",
"serde_json",

View File

@ -9,7 +9,8 @@ alloy-primitives = "0.6.2"
alloy-sol-types = "0.6.2"
anyhow = "1.0"
bincode = "1.3.3"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" }
# kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" }
kinode_process_lib = { path = "../../../../../process_lib" }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@ -10,7 +10,8 @@ alloy-primitives = "0.6.2"
alloy-sol-types = "0.6.2"
bincode = "1.3.3"
hex = "0.4.3"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" }
# kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "3232423" }
kinode_process_lib = { path = "../../../../../process_lib" }
rmp-serde = "1.1.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@ -130,7 +130,7 @@ impl Guest for Component {
fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// first, await a message from the kernel which will contain the
// contract address for the KNS version we want to track.
// chain ID and contract address for the KNS version we want to track.
let mut contract_address: Option<String> = None;
loop {
let Ok(Message::Request { source, body, .. }) = await_message() else {

View File

@ -5,6 +5,7 @@ use alloy_rpc_types::pubsub::SubscriptionResult;
use alloy_transport_ws::WsConnect;
use anyhow::Result;
use dashmap::DashMap;
use futures::Future;
use lib::types::core::*;
use lib::types::eth::*;
use serde::{Deserialize, Serialize};
@ -17,17 +18,20 @@ use url::Url;
/// mapping of chain id to ordered lists of providers
type Providers = Arc<DashMap<u64, ActiveProviders>>;
#[derive(Debug)]
struct ActiveProviders {
pub urls: Vec<UrlProvider>,
pub nodes: Vec<NodeProvider>,
}
#[derive(Debug)]
struct UrlProvider {
pub trusted: bool,
pub url: String,
pub pubsub: Option<Provider<PubSubFrontend>>,
}
#[derive(Debug)]
struct NodeProvider {
pub trusted: bool,
pub name: String,
@ -36,6 +40,7 @@ struct NodeProvider {
/// existing subscriptions held by local processes
type ActiveSubscriptions = Arc<DashMap<ProcessId, HashMap<u64, ActiveSub>>>;
#[derive(Debug)]
enum ActiveSub {
Local(JoinHandle<()>),
Remote(String), // name of node providing this subscription for us
@ -112,6 +117,8 @@ pub async fn provider(
ap.add_provider_config(entry);
}
println!("providers: {providers:?}\r");
// handles of longrunning subscriptions.
let mut active_subscriptions: ActiveSubscriptions = Arc::new(DashMap::new());
@ -149,6 +156,7 @@ async fn handle_message(
providers: &mut Providers,
active_subscriptions: &mut ActiveSubscriptions,
) -> Result<(), EthError> {
println!("provider: handle_message\r");
match &km.message {
Message::Response(_) => handle_passthrough_response(our, send_to_loop, km).await,
Message::Request(req) => {
@ -187,6 +195,7 @@ async fn handle_passthrough_response(
send_to_loop: &MessageSender,
km: KernelMessage,
) -> Result<(), EthError> {
println!("provider: handle_passthrough_response\r");
send_to_loop
.send(KernelMessage {
id: rand::random(),
@ -213,6 +222,7 @@ async fn handle_eth_action(
providers: &mut Providers,
active_subscriptions: &mut ActiveSubscriptions,
) -> Result<(), EthError> {
println!("provider: handle_eth_action\r");
// check our access settings if the request is from a remote node
if km.source.node != our {
if !access_settings.deny.contains(&km.source.node) {
@ -245,6 +255,7 @@ async fn handle_eth_action(
send_to_loop.clone(),
eth_action,
providers.clone(),
active_subscriptions.clone(),
)));
let mut subs = active_subscriptions
.entry(km.source.process)
@ -279,6 +290,7 @@ async fn handle_eth_action(
}
/// spawned as a task
/// cleans itself up when the subscription is closed or fails.
async fn create_new_subscription(
our: String,
km_id: u64,
@ -287,7 +299,76 @@ async fn create_new_subscription(
send_to_loop: MessageSender,
eth_action: EthAction,
providers: Providers,
active_subscriptions: ActiveSubscriptions,
) {
println!("provider: create_new_subscription\r");
match build_subscription(
our.clone(),
km_id,
target.clone(),
rsvp.clone(),
send_to_loop.clone(),
&eth_action,
providers,
)
.await
{
Ok(future) => {
// send a response to the target that the subscription was successful
send_to_loop
.send(KernelMessage {
id: km_id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: rsvp.clone(),
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&EthResponse::Ok).unwrap(),
metadata: None,
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
})
.await
.expect("eth: sender died!");
// await the subscription error and kill it if so
if let Err(e) = future.await {
send_to_loop
.send(make_error_message(&our, km_id, target.clone(), e))
.await
.expect("eth: kernel sender died!");
}
}
Err(e) => {
send_to_loop
.send(make_error_message(&our, km_id, target.clone(), e))
.await
.expect("eth: kernel sender died!");
}
}
active_subscriptions
.entry(target.process)
.and_modify(|sub_map| {
sub_map.remove(&km_id);
});
}
async fn build_subscription(
our: String,
km_id: u64,
target: Address,
rsvp: Option<Address>,
send_to_loop: MessageSender,
eth_action: &EthAction,
providers: Providers,
) -> Result<impl Future<Output = Result<(), EthError>>, EthError> {
println!("provider: build_subscription\r");
let EthAction::SubscribeLogs {
sub_id,
chain_id,
@ -295,19 +376,12 @@ async fn create_new_subscription(
params,
} = eth_action
else {
return;
return Err(EthError::InvalidMethod(
"eth: only accepts subscribe logs requests".to_string(),
));
};
let Some(aps) = providers.get_mut(&chain_id) else {
send_to_loop
.send(make_error_message(
&our,
sub_id,
target,
EthError::NoRpcForChain,
))
.await
.expect("eth: kernel sender died!");
return;
return Err(EthError::NoRpcForChain);
};
// first, try any url providers we have for this chain,
// then if we have none or they all fail, go to node providers.
@ -322,31 +396,21 @@ async fn create_new_subscription(
.await
{
let rx = pubsub.inner().get_raw_subscription(id).await;
if let Err(e) =
handle_subscription_stream(&our, sub_id, rx, &target, &rsvp, &send_to_loop)
.await
{
send_to_loop
.send(make_error_message(&our, sub_id, target, e))
.await
.expect("eth: kernel sender died!");
}
return;
return Ok(maintain_subscription(
our,
*sub_id,
rx,
target,
rsvp,
send_to_loop,
));
}
}
}
for node_provider in &aps.nodes {
// todo
}
send_to_loop
.send(make_error_message(
&our,
sub_id,
target,
EthError::NoRpcForChain,
))
.await
.expect("eth: kernel sender died!");
return Err(EthError::NoRpcForChain);
}
async fn handle_eth_config_action(
@ -357,6 +421,7 @@ async fn handle_eth_config_action(
eth_config_action: EthConfigAction,
providers: &mut Providers,
) -> Result<(), EthError> {
println!("provider: handle_eth_config_action\r");
if km.source.node != our {
return Err(EthError::PermissionDenied);
}
@ -611,14 +676,15 @@ async fn handle_eth_config_action(
/// Executed as a long-lived task. The JoinHandle is stored in the `connections` map.
/// This task is responsible for connecting to the ETH RPC provider and streaming logs
/// for a specific subscription made by a process.
async fn handle_subscription_stream(
our: &str,
async fn maintain_subscription(
our: String,
sub_id: u64,
mut rx: RawSubscription,
target: &Address,
rsvp: &Option<Address>,
send_to_loop: &MessageSender,
target: Address,
rsvp: Option<Address>,
send_to_loop: MessageSender,
) -> Result<(), EthError> {
println!("provider: maintain_subscription\r");
loop {
match rx.recv().await {
Err(e) => {
@ -660,9 +726,10 @@ async fn handle_subscription_stream(
}
}
fn make_error_message(our: &str, id: u64, target: Address, error: EthError) -> KernelMessage {
fn make_error_message(our: &str, km_id: u64, target: Address, error: EthError) -> KernelMessage {
println!("provider: make_error_message\r");
KernelMessage {
id,
id: km_id,
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),

View File

@ -548,7 +548,6 @@ async fn main() {
timer_service_receiver,
print_sender.clone(),
));
#[cfg(not(feature = "simulation-mode"))]
tasks.spawn(eth::provider::provider(
our.name.clone(),
eth_provider_config,
@ -557,17 +556,6 @@ async fn main() {
caps_oracle_sender.clone(),
print_sender.clone(),
));
#[cfg(feature = "simulation-mode")]
if let Some(ref rpc_url) = rpc_url {
tasks.spawn(eth::provider::provider(
our.name.clone(),
eth_provider,
public,
kernel_message_sender.clone(),
eth_provider_receiver,
print_sender.clone(),
));
}
tasks.spawn(vfs::vfs(
our.name.clone(),
kernel_message_sender.clone(),

View File

@ -391,11 +391,11 @@ async fn bootstrap(
for (package_metadata, mut package) in packages.clone() {
let package_name = package_metadata.properties.package_name.as_str();
// special case tester: only load it in if in simulation mode
if package_name == "tester" {
#[cfg(not(feature = "simulation-mode"))]
continue;
}
// // special case tester: only load it in if in simulation mode
// if package_name == "tester" {
// #[cfg(not(feature = "simulation-mode"))]
// continue;
// }
println!("fs: handling package {package_name}...\r");
let package_publisher = package_metadata.properties.publisher.as_str();