eth: wip other methods & apI

This commit is contained in:
bitful-pannul 2024-02-02 14:40:49 -03:00
parent 55c55ab556
commit e7363393bf
6 changed files with 852 additions and 888 deletions

1440
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,7 @@ simulation-mode = []
[dependencies]
aes-gcm = "0.10.2"
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy" }
alloy-primitives = { git = "https://github.com/alloy-rs/core.git" }
alloy-primitives = "0.6.2"
alloy-providers = { git = "https://github.com/alloy-rs/alloy.git" }
alloy-network = { git = "https://github.com/alloy-rs/alloy.git" }
alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy.git", features = [

View File

@ -44,6 +44,30 @@ pub enum EthAction {
GetBlockNumber,
}
//TEMP
/// Potential EthResponse type.
/// Can encapsulate all methods in their own response type,
/// or return generic result which can be parsed later..
#[derive(Debug, Serialize, Deserialize)]
pub enum EthResponse {
// another possible strat, just return RpcResult<T, E, ErrResp>,
// then try deserializing on the process_lib side.
Ok,
//Err(EthError),
Sub(SubscriptionResult),
GetLogs(Vec<Log>),
// GetBlockNumber(u64),
// GetBalance(U256),
// GetGasPrice(U256),
// Call(Vec<u8>), // alloy_primimtives::Bytes deserialization..
// GetTransactionCount(U256),
// GetBlockByNumber(Option<Block>),
// GetBlockByHash(Option<Block>),
// // raw json vs enum type vs into T?
// RawRequest(serde_json::Value),
// SendRawTransaction(Vec<u8>), // alloy_primitives::TxHash deserialization..
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct State {
// what contract this state pertains to
@ -270,10 +294,60 @@ fn handle_eth_message(
pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>,
body: &[u8],
) -> anyhow::Result<()> {
let Ok(log) = serde_json::from_slice::<Log>(body) else {
let Ok(res) = serde_json::from_slice::<EthResponse>(body) else {
return Err(anyhow::anyhow!("kns_indexer: got invalid message"));
};
match res {
EthResponse::GetLogs(logs) => {
for log in logs {
handle_log(our, state, &log)?;
}
}
EthResponse::Sub(result) => match result {
SubscriptionResult::Log(log) => {
handle_log(our, state, &log)?;
}
_ => {}
},
_ => {}
}
// check the pending_requests btreemap to see if there are any requests that
// can be handled now that the state block has been updated
let mut blocks_to_remove = vec![];
for (block, requests) in pending_requests.iter() {
if *block <= state.block {
for request in requests.iter() {
match request {
IndexerRequests::NamehashToName { hash, .. } => {
Response::new()
.body(serde_json::to_vec(&state.names.get(hash))?)
.send()
.unwrap();
}
IndexerRequests::NodeInfo { name, .. } => {
Response::new()
.body(serde_json::to_vec(&state.nodes.get(name))?)
.send()
.unwrap();
}
}
}
blocks_to_remove.push(*block);
} else {
break;
}
}
for block in blocks_to_remove.iter() {
pending_requests.remove(block);
}
set_state(&bincode::serialize(state)?);
Ok(())
}
fn handle_log(our: &Address, state: &mut State, log: &Log) -> anyhow::Result<()> {
state.block = log.block_number.expect("expect").to::<u64>();
let node_id: alloy_primitives::FixedBytes<32> = log.topics[1];
@ -349,37 +423,6 @@ fn handle_eth_message(
.try_body(NetActions::KnsUpdate(node.clone()))?
.send()?;
}
// check the pending_requests btreemap to see if there are any requests that
// can be handled now that the state block has been updated
let mut blocks_to_remove = vec![];
for (block, requests) in pending_requests.iter() {
if *block <= state.block {
for request in requests.iter() {
match request {
IndexerRequests::NamehashToName { hash, .. } => {
Response::new()
.body(serde_json::to_vec(&state.names.get(hash))?)
.send()
.unwrap();
}
IndexerRequests::NodeInfo { name, .. } => {
Response::new()
.body(serde_json::to_vec(&state.nodes.get(name))?)
.send()
.unwrap();
}
}
}
blocks_to_remove.push(*block);
} else {
break;
}
}
for block in blocks_to_remove.iter() {
pending_requests.remove(block);
}
set_state(&bincode::serialize(state)?);
Ok(())
}

View File

@ -1,13 +1,15 @@
use crate::eth::types::*;
use crate::types::*;
use alloy_primitives::{Bytes, U256};
use alloy_primitives::{Address as EthAddress, Bytes, U256};
use alloy_providers::provider::TempProvider;
use alloy_pubsub::RawSubscription;
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
use alloy_rpc_types::{BlockNumberOrTag, Filter};
use alloy_transport_ws::WsConnect;
use anyhow::Result;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use url::Url;
@ -58,6 +60,8 @@ pub async fn provider(
ws_provider_subscriptions: HashMap::new(),
};
// turn into dashmap so we can share across threads
while let Some(km) = recv_in_client.recv().await {
// this module only handles requests, ignores all responses
let Message::Request(req) = &km.message else {
@ -140,7 +144,7 @@ async fn handle_request(
.await
.unwrap();
let rx = connections.provider.inner().get_watcher(id).await;
let rx = connections.provider.inner().get_raw_subscription(id).await;
let handle = tokio::spawn(handle_subscription_stream(
our.clone(),
rx,
@ -162,7 +166,7 @@ async fn handle_request(
Ok(())
}
EthAction::GetBlockNumber => {
let block_number = connections.provider.get_block_number().await.unwrap();
let vc: Vec<u8> = Vec::new();
Ok(())
}
@ -174,28 +178,95 @@ async fn handle_request(
.map_err(|e| EthError::ProviderError(format!("{:?}", e)))?;
// TEMP, will change.
for log in logs {
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Request(Request {
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&log).unwrap(),
body: serde_json::to_vec(&EthResponse::GetLogs(logs)).unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await
.unwrap();
}
},
None,
)),
lazy_load_blob: None,
})
.await
.unwrap();
Ok(())
}
EthAction::GetGasPrice => {
let gas_price = connections
.provider
.get_gas_price()
.await
.map_err(|e| EthError::ProviderError(format!("{:?}", e)))?;
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&gas_price).unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await
.unwrap();
Ok(())
}
EthAction::GetBalance { address, tag } => {
let address = EthAddress::from_str(&address)
.map_err(|e| EthError::ProviderError(format!("{:?}", e)))?;
let balance = connections
.provider
.get_balance(address, tag)
.await
.map_err(|e| EthError::ProviderError(format!("{:?}", e)))?;
send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: ETH_PROCESS_ID.clone(),
},
target: target.clone(),
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&balance).unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await
.unwrap();
Ok(())
}
_ => {
println!("eth: unhandled action: {:?}", action);
// will be handled soon.
Ok(())
}
@ -207,7 +278,7 @@ async fn handle_request(
/// for a specific subscription made by a process.
async fn handle_subscription_stream(
our: Arc<String>,
mut rx: tokio::sync::broadcast::Receiver<Box<serde_json::value::RawValue>>,
mut rx: RawSubscription,
target: Address,
send_to_loop: MessageSender,
) -> Result<(), EthError> {
@ -231,7 +302,7 @@ async fn handle_subscription_stream(
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&event).unwrap(),
body: serde_json::to_vec(&EthResponse::Sub(event)).unwrap(),
metadata: None,
capabilities: vec![],
}),

View File

@ -1,10 +1,10 @@
use crate::types::ProcessId;
use alloy_primitives::{Address, ChainId, U256};
use alloy_primitives::{Address, BlockHash, Bytes, ChainId, TxHash, B256, U256};
use alloy_providers::provider::Provider;
use alloy_pubsub::{PubSubConnect, PubSubFrontend};
use alloy_rpc_client::ClientBuilder;
use alloy_pubsub::PubSubFrontend;
use alloy_rpc_types::pubsub::{Params, SubscriptionKind, SubscriptionResult};
use alloy_rpc_types::{Filter, Log};
use alloy_rpc_types::{Block, BlockId, BlockNumberOrTag, CallRequest, Filter, Log};
use alloy_transport::RpcResult;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::task::JoinHandle;
@ -16,6 +16,7 @@ use tokio::task::JoinHandle;
#[derive(Debug, Serialize, Deserialize)]
pub enum EthAction {
/// Subscribe to logs with a custom filter. ID is to be used to unsubscribe.
/// Logs come in as alloy_rpc_types::pubsub::SubscriptionResults
SubscribeLogs {
sub_id: u64,
kind: SubscriptionKind,
@ -24,9 +25,64 @@ pub enum EthAction {
/// Kill a SubscribeLogs subscription of a given ID, to stop getting updates.
UnsubscribeLogs(u64),
/// get_logs
GetLogs { filter: Filter },
/// Vec<Log> or loop through?
GetLogs {
filter: Filter,
},
/// get_block_number
GetBlockNumber,
/// eth_getBalance
GetBalance {
address: String, // alloy_primitives::Address deserialization..
tag: Option<BlockId>,
},
GetGasPrice,
Call {
tx: CallRequest,
tag: BlockNumberOrTag,
},
GetTransactionCount {
address: String, // alloy_primitives::Address deserialization..
tag: Option<BlockNumberOrTag>,
},
GetBlockByNumber {
block: BlockId,
full_tx: bool,
},
GetBlockByHash {
hash: Vec<u8>, // alloy_primitives::BlockHash deserialization..
full_tx: bool,
},
RawRequest {
method: String,
params: Params,
},
SendRawTransaction {
tx: Vec<u8>, // alloy_primitives::Bytes deserialization..
},
}
/// Potential EthResponse type.
/// Can encapsulate all methods in their own response type,
/// or return generic result which can be parsed later..
#[derive(Debug, Serialize, Deserialize)]
pub enum EthResponse {
// another possible strat, just return RpcResult<T, E, ErrResp>,
// then try deserializing on the process_lib side.
Ok,
Err(EthError),
Sub(SubscriptionResult),
GetLogs(Vec<Log>),
GetBlockNumber(u64),
GetBalance(U256),
GetGasPrice(U256),
Call(Vec<u8>), // alloy_primimtives::Bytes deserialization..
GetTransactionCount(U256),
GetBlockByNumber(Option<Block>),
GetBlockByHash(Option<Block>),
// raw json vs enum type vs into T?
RawRequest(serde_json::Value),
SendRawTransaction(Vec<u8>), // alloy_primitives::TxHash deserialization..
}
/// The Response type which a process will get from requesting with an [`EthAction`] will be
@ -57,7 +113,7 @@ pub enum EthSubEvent {
/// Primary state object of the `eth` module
pub struct RpcConnections {
// todo generics when they work properly: pub struct RpcConnections<T>
// todo generics when they work properly: pub struct RpcConnections<T>, where T: Transport
pub provider: Provider<PubSubFrontend>,
pub ws_provider_subscriptions: HashMap<(ProcessId, u64), JoinHandle<Result<(), EthError>>>,
}

View File

@ -267,10 +267,10 @@ async fn get_unencrypted_info(keyfile: Option<Vec<u8>>) -> Result<impl Reply, Re
}
};
Ok(warp::reply::with_status(
Ok(warp::reply::json(&UnencryptedIdentity {
warp::reply::json(&UnencryptedIdentity {
name,
allowed_routers,
})),
}),
StatusCode::OK,
)
.into_response())