mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-12-23 00:21:38 +03:00
ethsub result type
This commit is contained in:
parent
9d9ea85d2f
commit
4ef0d654ea
@ -9,7 +9,7 @@ alloy-primitives = "0.6.2"
|
||||
alloy-sol-types = "0.6.2"
|
||||
anyhow = "1.0"
|
||||
bincode = "1.3.3"
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9231881" }
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "ad17eaa" }
|
||||
rand = "0.8"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
@ -1,5 +1,6 @@
|
||||
use kinode_process_lib::eth::{
|
||||
get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, Filter, SubscriptionResult,
|
||||
get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, EthSubResult, Filter,
|
||||
SubscriptionResult,
|
||||
};
|
||||
use kinode_process_lib::http::{bind_http_path, serve_ui, HttpServerRequest};
|
||||
use kinode_process_lib::kernel_types as kt;
|
||||
@ -58,7 +59,7 @@ pub enum Req {
|
||||
RemoteRequest(RemoteRequest),
|
||||
FTWorkerCommand(FTWorkerCommand),
|
||||
FTWorkerResult(FTWorkerResult),
|
||||
Eth(EthSub),
|
||||
Eth(EthSubResult),
|
||||
Http(HttpServerRequest),
|
||||
}
|
||||
|
||||
@ -184,11 +185,13 @@ fn handle_message(
|
||||
Req::FTWorkerResult(r) => {
|
||||
println!("app store: got weird ft_worker result: {r:?}");
|
||||
}
|
||||
Req::Eth(sub) => {
|
||||
Req::Eth(eth_result) => {
|
||||
if source.node() != our.node() || source.process != "eth:distro:sys" {
|
||||
return Err(anyhow::anyhow!("eth sub event from weird addr: {source}"));
|
||||
}
|
||||
handle_eth_sub_event(our, &mut state, sub.result)?;
|
||||
if let Ok(EthSub { result, .. }) = eth_result {
|
||||
handle_eth_sub_event(our, &mut state, result)?;
|
||||
}
|
||||
}
|
||||
Req::Http(incoming) => {
|
||||
if source.node() != our.node()
|
||||
|
@ -10,7 +10,7 @@ alloy-primitives = "0.6.2"
|
||||
alloy-sol-types = "0.6.2"
|
||||
bincode = "1.3.3"
|
||||
hex = "0.4.3"
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "9231881" }
|
||||
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "ad17eaa" }
|
||||
rmp-serde = "1.1.2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
@ -4,7 +4,7 @@ use kinode_process_lib::{
|
||||
await_message,
|
||||
eth::{
|
||||
get_block_number, get_logs, subscribe, Address as EthAddress, BlockNumberOrTag, EthSub,
|
||||
Filter, Log, SubscriptionResult,
|
||||
EthSubResult, Filter, Log, SubscriptionResult,
|
||||
},
|
||||
get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response,
|
||||
};
|
||||
@ -253,12 +253,14 @@ fn handle_eth_message(
|
||||
pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>,
|
||||
body: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
let Ok(res) = serde_json::from_slice::<EthSub>(body) else {
|
||||
let Ok(eth_result) = serde_json::from_slice::<EthSubResult>(body) else {
|
||||
return Err(anyhow::anyhow!("kns_indexer: got invalid message"));
|
||||
};
|
||||
|
||||
if let SubscriptionResult::Log(log) = res.result {
|
||||
handle_log(our, state, &log)?;
|
||||
if let Ok(EthSub { result, .. }) = eth_result {
|
||||
if let SubscriptionResult::Log(log) = result {
|
||||
handle_log(our, state, &log)?;
|
||||
}
|
||||
}
|
||||
|
||||
// check the pending_requests btreemap to see if there are any requests that
|
||||
|
@ -243,28 +243,29 @@ async fn handle_local_request(
|
||||
EthResponse::Response { value: response }
|
||||
}
|
||||
};
|
||||
|
||||
let response = KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: km.source.clone(),
|
||||
rsvp: None,
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
body: serde_json::to_vec(&return_body).unwrap(),
|
||||
metadata: req.metadata.clone(),
|
||||
capabilities: vec![],
|
||||
},
|
||||
None,
|
||||
)),
|
||||
lazy_load_blob: None,
|
||||
};
|
||||
|
||||
let _ = send_to_loop.send(response).await;
|
||||
if let Some(_) = req.expects_response {
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: km.id,
|
||||
source: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: km.source.clone(),
|
||||
rsvp: km.rsvp.clone(),
|
||||
message: Message::Response((
|
||||
Response {
|
||||
inherit: false,
|
||||
body: serde_json::to_vec(&return_body).unwrap(),
|
||||
metadata: req.metadata.clone(),
|
||||
capabilities: vec![],
|
||||
},
|
||||
None,
|
||||
)),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -407,10 +408,33 @@ async fn handle_subscription_stream(
|
||||
) -> Result<(), EthError> {
|
||||
match rx.recv().await {
|
||||
Err(e) => {
|
||||
return Err(EthError::SubscriptionClosed(sub_id))?;
|
||||
let error = Err(EthError::SubscriptionClosed(sub_id))?;
|
||||
let _ = send_to_loop
|
||||
.send(KernelMessage {
|
||||
id: rand::random(),
|
||||
source: Address {
|
||||
node: our,
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
target: target.clone(),
|
||||
rsvp: rsvp.clone(),
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSubResult::Err(EthSubError {
|
||||
id: sub_id,
|
||||
error: e.to_string(),
|
||||
}))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Ok(value) => {
|
||||
// this should not return in case of one failed event?
|
||||
let event: SubscriptionResult = serde_json::from_str(value.get()).map_err(|_| {
|
||||
EthError::RpcError("eth: failed to deserialize subscription result".to_string())
|
||||
})?;
|
||||
@ -426,10 +450,10 @@ async fn handle_subscription_stream(
|
||||
message: Message::Request(Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&EthSub {
|
||||
body: serde_json::to_vec(&EthSubResult::Ok(EthSub {
|
||||
id: sub_id,
|
||||
result: event,
|
||||
})
|
||||
}))
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
@ -443,7 +467,6 @@ async fn handle_subscription_stream(
|
||||
Err(EthError::SubscriptionClosed(sub_id))
|
||||
}
|
||||
|
||||
// todo, always send errors or no? general runtime question for other modules too.
|
||||
fn make_error_message(our_node: String, km: KernelMessage, error: EthError) -> KernelMessage {
|
||||
let source = km.rsvp.unwrap_or_else(|| Address {
|
||||
node: our_node.clone(),
|
||||
|
@ -22,13 +22,24 @@ pub enum EthAction {
|
||||
},
|
||||
}
|
||||
|
||||
/// Incoming Request for subscription updates that processes will receive.
|
||||
/// Incoming Result type for subscription updates or errors that processes will receive.
|
||||
pub type EthSubResult = Result<EthSub, EthSubError>;
|
||||
|
||||
/// Incoming Request type for subscription updates.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EthSub {
|
||||
pub id: u64,
|
||||
pub result: SubscriptionResult,
|
||||
}
|
||||
|
||||
/// Incoming Request for subscription errors that processes will receive.
|
||||
/// If your subscription is closed unexpectedly, you will receive this.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct EthSubError {
|
||||
pub id: u64,
|
||||
pub error: String,
|
||||
}
|
||||
|
||||
/// The Response type which a process will get from requesting with an [`EthAction`] will be
|
||||
/// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec`
|
||||
/// and `serde_json::from_slice`.
|
||||
|
Loading…
Reference in New Issue
Block a user