WIP stuck on bizarre alloy bug

This commit is contained in:
dr-frmr 2024-02-26 01:42:38 -03:00
parent 989191037f
commit 365ba8ca70
No known key found for this signature in database
12 changed files with 294 additions and 104 deletions

125
Cargo.lock generated
View File

@ -93,7 +93,7 @@ checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
[[package]]
name = "alloy-eips"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@ -112,13 +112,24 @@ dependencies = [
"thiserror",
]
[[package]]
name = "alloy-json-rpc"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-primitives",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "alloy-network"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-eips",
"alloy-json-rpc",
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-primitives",
"alloy-rlp",
"serde",
@ -149,14 +160,14 @@ dependencies = [
[[package]]
name = "alloy-providers"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-network",
"alloy-primitives",
"alloy-rpc-client",
"alloy-rpc-trace-types",
"alloy-rpc-types",
"alloy-transport",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-transport-http",
"async-trait",
"auto_impl",
@ -168,11 +179,11 @@ dependencies = [
[[package]]
name = "alloy-pubsub"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-json-rpc",
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-primitives",
"alloy-transport",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"bimap",
"futures",
"serde",
@ -207,12 +218,12 @@ dependencies = [
[[package]]
name = "alloy-rpc-client"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-json-rpc",
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-primitives",
"alloy-pubsub",
"alloy-transport",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-transport-http",
"alloy-transport-ws",
"futures",
@ -228,10 +239,10 @@ dependencies = [
[[package]]
name = "alloy-rpc-trace-types"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-primitives",
"alloy-rpc-types",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"serde",
"serde_json",
]
@ -249,6 +260,19 @@ dependencies = [
"thiserror",
]
[[package]]
name = "alloy-rpc-types"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-primitives",
"alloy-rlp",
"itertools 0.12.1",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "alloy-sol-macro"
version = "0.6.3"
@ -284,7 +308,7 @@ name = "alloy-transport"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
dependencies = [
"alloy-json-rpc",
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"base64 0.21.7",
"serde",
"serde_json",
@ -295,13 +319,30 @@ dependencies = [
"wasm-bindgen-futures",
]
[[package]]
name = "alloy-transport"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"base64 0.21.7",
"futures-util",
"serde",
"serde_json",
"thiserror",
"tokio",
"tower",
"url",
"wasm-bindgen-futures",
]
[[package]]
name = "alloy-transport-http"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-json-rpc",
"alloy-transport",
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"reqwest",
"serde_json",
"tower",
@ -311,10 +352,10 @@ dependencies = [
[[package]]
name = "alloy-transport-ws"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=098ad56#098ad5657d55bbc5fe9469ede2a9ca79def738f2"
source = "git+https://github.com/alloy-rs/alloy?rev=6f8ebb4#6f8ebb45afca1a201a11d421ec46db0f7a1d8d08"
dependencies = [
"alloy-pubsub",
"alloy-transport",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"futures",
"http 0.2.11",
"serde_json",
@ -407,7 +448,7 @@ dependencies = [
"alloy-sol-types",
"anyhow",
"bincode",
"kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=7b6fd6e)",
"kinode_process_lib 0.6.0",
"rand 0.8.5",
"serde",
"serde_json",
@ -1967,7 +2008,7 @@ dependencies = [
name = "get_block"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=7b6fd6e)",
"kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=567ba78)",
"serde",
"serde_json",
"wit-bindgen",
@ -2563,7 +2604,7 @@ dependencies = [
"alloy-providers",
"alloy-pubsub",
"alloy-rpc-client",
"alloy-rpc-types",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-transport-ws",
"anyhow",
"async-trait",
@ -2631,6 +2672,26 @@ dependencies = [
"lib",
]
[[package]]
name = "kinode_process_lib"
version = "0.6.0"
dependencies = [
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-primitives",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"anyhow",
"bincode",
"http 1.0.0",
"mime_guess",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"url",
"wit-bindgen",
]
[[package]]
name = "kinode_process_lib"
version = "0.6.0"
@ -2653,10 +2714,10 @@ name = "kinode_process_lib"
version = "0.6.0"
source = "git+https://github.com/kinode-dao/process_lib?rev=3232423#323242399efdcdad02e7f31bb6a9cc5eec048610"
dependencies = [
"alloy-json-rpc",
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"alloy-primitives",
"alloy-rpc-types",
"alloy-transport",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"anyhow",
"bincode",
"http 1.0.0",
@ -2672,12 +2733,12 @@ dependencies = [
[[package]]
name = "kinode_process_lib"
version = "0.6.0"
source = "git+https://github.com/kinode-dao/process_lib?rev=7b6fd6e#7b6fd6ee160299514fee30b315a2a53fbfb434d7"
source = "git+https://github.com/kinode-dao/process_lib?rev=567ba78#567ba7830ba387625e668c61f252f98ff849d6eb"
dependencies = [
"alloy-json-rpc",
"alloy-json-rpc 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"alloy-primitives",
"alloy-rpc-types",
"alloy-transport",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"alloy-transport 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=098ad56)",
"anyhow",
"bincode",
"http 1.0.0",
@ -2745,7 +2806,7 @@ dependencies = [
"anyhow",
"bincode",
"hex",
"kinode_process_lib 0.6.0 (git+https://github.com/kinode-dao/process_lib?rev=7b6fd6e)",
"kinode_process_lib 0.6.0",
"rmp-serde",
"serde",
"serde_json",
@ -2775,7 +2836,7 @@ name = "lib"
version = "0.6.0"
dependencies = [
"alloy-pubsub",
"alloy-rpc-types",
"alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=6f8ebb4)",
"lazy_static",
"rand 0.8.5",
"reqwest",

View File

@ -26,11 +26,11 @@ simulation-mode = []
[dependencies]
aes-gcm = "0.10.2"
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56", features = ["ws"]}
alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
alloy-providers = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" }
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" }
alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4", features = ["ws"]}
alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" }
alloy-providers = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" }
anyhow = "1.0.71"
async-trait = "0.1.71"
base64 = "0.13"

View File

@ -13,7 +13,7 @@
"chain_id": 11155111,
"trusted": false,
"provider": {
"RpcUrl": "wss://eth-sepolia.g.alchemy.com/v2/a4bRKYnvC0uT2l1rzVDAvldH3OPKQnKm"
"RpcUrl": "wss://eth-sepolia.g.alchemy.com/v2/iZZorIE5O93pUSAqyB3INvdxJZ8od_ro"
},
"public": false,
"allow": [],

View File

@ -9,7 +9,8 @@ alloy-primitives = "0.6.2"
alloy-sol-types = "0.6.2"
anyhow = "1.0"
bincode = "1.3.3"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "7b6fd6e" }
# kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "567ba78" }
kinode_process_lib = { path = "../../../../../process_lib" }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@ -1,6 +1,5 @@
use kinode_process_lib::eth::{
get_logs, subscribe, unsubscribe, Address as EthAddress, EthSub, EthSubResult, Filter,
SubscriptionResult,
Address as EthAddress, EthSub, EthSubResult, Filter, Log, SubscriptionResult,
};
use kinode_process_lib::http::{bind_http_path, serve_ui, HttpServerRequest};
use kinode_process_lib::kernel_types as kt;
@ -72,6 +71,32 @@ pub enum Resp {
FTWorkerResult(FTWorkerResult),
}
fn fetch_logs(eth_provider: &eth::Provider, filter: &Filter) -> Vec<Log> {
loop {
match eth_provider.get_logs(filter) {
Ok(res) => return res,
Err(_) => {
println!("app store: failed to fetch logs! trying again in 5s...");
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
}
}
fn subscribe_to_logs(eth_provider: &eth::Provider, filter: Filter) {
loop {
match eth_provider.subscribe(1, filter.clone()) {
Ok(()) => break,
Err(_) => {
println!("app store: failed to subscribe to chain! trying again in 5s...");
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
}
}
call_init!(init);
fn init(our: Address) {
println!("{}: started", our.package());
@ -110,25 +135,26 @@ fn init(our: Address) {
state.contract_address
);
// create new provider for sepolia with request-timeout of 60s
// can change, log requests can take quite a long time.
let eth_provider = eth::Provider::new(CHAIN_ID, 30);
let mut requested_packages: HashMap<PackageId, RequestedPackage> = HashMap::new();
// get past logs, subscribe to new ones.
let filter = Filter::new()
.address(EthAddress::from_str(&state.contract_address).unwrap())
.from_block(state.last_saved_block - 1)
.events(EVENTS);
.event(EVENTS[0])
.event(EVENTS[1])
.event(EVENTS[2]);
let logs = get_logs(CHAIN_ID, &filter);
if let Ok(logs) = logs {
for log in logs {
if let Err(e) = state.ingest_listings_contract_event(&our, log) {
println!("app store: error ingesting log: {e:?}");
};
}
for log in fetch_logs(&eth_provider, &filter) {
if let Err(e) = state.ingest_listings_contract_event(&our, log) {
println!("app store: error ingesting log: {e:?}");
};
}
subscribe(1, CHAIN_ID, filter).unwrap();
subscribe_to_logs(&eth_provider, filter);
loop {
match await_message() {
@ -137,8 +163,13 @@ fn init(our: Address) {
println!("app store: got network error: {send_error}");
}
Ok(message) => {
if let Err(e) = handle_message(&our, &mut state, &mut requested_packages, &message)
{
if let Err(e) = handle_message(
&our,
&mut state,
&eth_provider,
&mut requested_packages,
&message,
) {
println!("app store: error handling message: {:?}", e)
}
}
@ -153,6 +184,7 @@ fn init(our: Address) {
fn handle_message(
our: &Address,
mut state: &mut State,
eth_provider: &eth::Provider,
mut requested_packages: &mut HashMap<PackageId, RequestedPackage>,
message: &Message,
) -> anyhow::Result<()> {
@ -167,8 +199,13 @@ fn handle_message(
if our.node != source.node {
return Err(anyhow::anyhow!("local request from non-local node"));
}
let resp =
handle_local_request(&our, &local_request, &mut state, &mut requested_packages);
let resp = handle_local_request(
&our,
&local_request,
&mut state,
eth_provider,
&mut requested_packages,
);
if expects_response.is_some() {
Response::new().body(serde_json::to_vec(&resp)?).send()?;
}
@ -272,6 +309,7 @@ fn handle_local_request(
our: &Address,
request: &LocalRequest,
state: &mut State,
eth_provider: &eth::Provider,
requested_packages: &mut HashMap<PackageId, RequestedPackage>,
) -> LocalResponse {
match request {
@ -341,24 +379,23 @@ fn handle_local_request(
LocalRequest::RebuildIndex => {
*state = State::new(CONTRACT_ADDRESS.to_string()).unwrap();
// kill our old subscription and build a new one.
unsubscribe(1).unwrap();
eth_provider
.unsubscribe(1)
.expect("app_store: failed to unsub from eth events!");
let filter = Filter::new()
.address(EthAddress::from_str(&state.contract_address).unwrap())
.from_block(state.last_saved_block - 1)
.events(EVENTS);
.event(EVENTS[0])
.event(EVENTS[1])
.event(EVENTS[2]);
let logs = get_logs(CHAIN_ID, &filter);
if let Ok(logs) = logs {
for log in logs {
if let Err(e) = state.ingest_listings_contract_event(our, log) {
println!("app store: error ingesting log: {e:?}");
};
}
for log in fetch_logs(&eth_provider, &filter) {
if let Err(e) = state.ingest_listings_contract_event(our, log) {
println!("app store: error ingesting log: {e:?}");
};
}
subscribe(1, CHAIN_ID, filter).unwrap();
subscribe_to_logs(&eth_provider, filter);
LocalResponse::RebuiltIndex
}
}

View File

@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "7b6fd6e" }
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "567ba78" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "21a46c7" }

View File

@ -1,5 +1,5 @@
use kinode_process_lib::{
await_next_request_body, call_init, eth::get_block_number, println, Address,
await_next_request_body, call_init, println, Address, eth,
};
wit_bindgen::generate!({
@ -24,12 +24,15 @@ fn init(_our: Address) {
.parse::<u64>()
.unwrap_or(1);
match get_block_number(chain_id) {
// request timeout of 5s
let provider = eth::Provider::new(chain_id, 5);
match provider.get_block_number() {
Ok(block_number) => {
println!("latest block number: {block_number}");
}
Err(e) => {
println!("get_block: failed to get block number: {}", e);
println!("failed to get block number: {e:?}");
}
}
}

View File

@ -10,7 +10,8 @@ alloy-primitives = "0.6.2"
alloy-sol-types = "0.6.2"
bincode = "1.3.3"
hex = "0.4.3"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "7b6fd6e" }
# kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", rev = "567ba78" }
kinode_process_lib = { path = "../../../../../process_lib" }
rmp-serde = "1.1.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View File

@ -1,10 +1,9 @@
use alloy_sol_types::{sol, SolEvent};
use kinode_process_lib::{
await_message,
eth::{
get_block_number, get_logs, subscribe, Address as EthAddress, BlockNumberOrTag, EthSub,
EthSubResult, Filter, Log, SubscriptionResult,
Address as EthAddress, BlockNumberOrTag, EthSub, EthSubResult, Filter, Log, Provider,
SubscriptionResult,
},
get_typed_state, print_to_terminal, println, set_state, Address, Message, Request, Response,
};
@ -100,6 +99,19 @@ sol! {
event RoutingUpdate(bytes32 indexed node, bytes32[] routers);
}
fn subscribe_to_logs(eth_provider: &Provider, filter: Filter) {
loop {
match eth_provider.subscribe(1, filter.clone()) {
Ok(()) => break,
Err(_) => {
println!("kns_indexer: failed to subscribe to chain! trying again in 5s...");
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
}
}
struct Component;
impl Guest for Component {
fn init(our: String) {
@ -173,19 +185,32 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
.address(state.contract_address.parse::<EthAddress>().unwrap())
.to_block(BlockNumberOrTag::Latest)
.from_block(state.block - 1)
.events(vec![
"NodeRegistered(bytes32,bytes)",
"KeyUpdate(bytes32,bytes32)",
"IpUpdate(bytes32,uint128)",
"WsUpdate(bytes32,uint16)",
"RoutingUpdate(bytes32,bytes32[])",
]);
.event("NodeRegistered(bytes32,bytes)")
.event("KeyUpdate(bytes32,bytes32)")
.event("IpUpdate(bytes32,uint128)")
.event("WsUpdate(bytes32,uint16)")
.event("RoutingUpdate(bytes32,bytes32[])");
// 60s timeout -- these calls can take a long time
// if they do time out, we try them again
let eth_provider = Provider::new(state.chain_id, 20);
// if block in state is < current_block, get logs from that part.
if state.block < get_block_number(state.chain_id)? {
let logs = get_logs(state.chain_id, &filter)?;
for log in logs {
handle_log(&our, &mut state, &log)?;
if state.block < eth_provider.get_block_number().unwrap_or(u64::MAX) {
loop {
match eth_provider.get_logs(&filter) {
Ok(logs) => {
for log in logs {
handle_log(&our, &mut state, &log)?;
}
break;
}
Err(_) => {
println!("kns_indexer: failed to fetch logs! trying again in 5s...");
std::thread::sleep(std::time::Duration::from_secs(5));
continue;
}
}
}
}
// shove all state into net::net
@ -198,7 +223,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
set_state(&bincode::serialize(&state)?);
subscribe(1, state.chain_id, filter.clone())?;
subscribe_to_logs(&eth_provider, filter.clone());
let mut pending_requests: BTreeMap<u64, Vec<IndexerRequests>> = BTreeMap::new();
@ -214,7 +239,14 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
};
if source.process == "eth:distro:sys" {
handle_eth_message(&our, &mut state, &mut pending_requests, &body, &filter)?;
handle_eth_message(
&our,
&mut state,
&eth_provider,
&mut pending_requests,
&body,
&filter,
)?;
} else {
let Ok(request) = serde_json::from_slice::<IndexerRequests>(&body) else {
println!("kns_indexer: got invalid message");
@ -254,6 +286,7 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
fn handle_eth_message(
our: &Address,
state: &mut State,
eth_provider: &Provider,
pending_requests: &mut BTreeMap<u64, Vec<IndexerRequests>>,
body: &[u8],
filter: &Filter,
@ -270,7 +303,7 @@ fn handle_eth_message(
}
Err(e) => {
println!("kns_indexer: got sub error, resubscribing.. {:?}", e.error);
subscribe(1, state.chain_id, filter.clone())?;
subscribe_to_logs(&eth_provider, filter.clone());
}
}

View File

@ -79,8 +79,15 @@ async fn activate_url_provider(provider: &mut UrlProvider) -> Result<()> {
url: provider.url.to_string(),
auth: None,
};
let client = ClientBuilder::default().ws(connector).await?;
println!("here1\r");
let client = tokio::time::timeout(
std::time::Duration::from_secs(10),
ClientBuilder::default().ws(connector),
)
.await??;
println!("here2\r");
provider.pubsub = Some(Provider::new_with_client(client));
println!("here3\r");
Ok(())
}
_ => Err(anyhow::anyhow!(
@ -160,6 +167,7 @@ async fn handle_message(
match &km.message {
Message::Response(_) => handle_passthrough_response(our, send_to_loop, km).await,
Message::Request(req) => {
let timeout = *req.expects_response.as_ref().unwrap_or(&60); // TODO make this a config
if let Ok(eth_action) = serde_json::from_slice(&req.body) {
// these can be from remote or local processes
return handle_eth_action(
@ -167,6 +175,7 @@ async fn handle_message(
access_settings,
send_to_loop,
km,
timeout,
eth_action,
providers,
active_subscriptions,
@ -218,6 +227,7 @@ async fn handle_eth_action(
access_settings: &mut AccessSettings,
send_to_loop: &MessageSender,
km: KernelMessage,
timeout: u64,
eth_action: EthAction,
providers: &mut Providers,
active_subscriptions: &mut ActiveSubscriptions,
@ -242,7 +252,21 @@ async fn handle_eth_action(
// before returning an error.
match eth_action {
EthAction::SubscribeLogs { sub_id, .. } => {
let new_sub = ActiveSub::Local(tokio::spawn(create_new_subscription(
// let new_sub = ActiveSub::Local(tokio::spawn(create_new_subscription(
// our.to_string(),
// km.id,
// km.source.clone(),
// km.rsvp,
// send_to_loop.clone(),
// eth_action,
// providers.clone(),
// active_subscriptions.clone(),
// )));
// let mut subs = active_subscriptions
// .entry(km.source.process)
// .or_insert(HashMap::new());
// subs.insert(sub_id, new_sub);
create_new_subscription(
our.to_string(),
km.id,
km.source.clone(),
@ -251,11 +275,8 @@ async fn handle_eth_action(
eth_action,
providers.clone(),
active_subscriptions.clone(),
)));
let mut subs = active_subscriptions
.entry(km.source.process)
.or_insert(HashMap::new());
subs.insert(sub_id, new_sub);
)
.await
}
EthAction::UnsubscribeLogs(sub_id) => {
active_subscriptions
@ -274,21 +295,22 @@ async fn handle_eth_action(
});
}
EthAction::Request { .. } => {
tokio::spawn(fulfill_request(
fulfill_request(
our.to_string(),
km.id,
km.source.clone(),
km.rsvp,
timeout,
send_to_loop.clone(),
eth_action,
providers.clone(),
));
)
.await;
}
}
Ok(())
}
/// spawned as a task
/// cleans itself up when the subscription is closed or fails.
async fn create_new_subscription(
our: String,
@ -475,6 +497,7 @@ async fn fulfill_request(
km_id: u64,
target: Address,
rsvp: Option<Address>,
timeout: u64,
send_to_loop: MessageSender,
eth_action: EthAction,
providers: Providers,
@ -524,7 +547,34 @@ async fn fulfill_request(
}
}
};
let response = pubsub.inner().prepare(method, params.clone()).await;
println!("here5\r");
let connector = WsConnect {
url: url_provider.url.to_string(),
auth: None,
};
let client = tokio::time::timeout(
std::time::Duration::from_secs(10),
ClientBuilder::default().ws(connector),
)
.await.unwrap().unwrap();
println!("here6\r");
let provider = Provider::new_with_client(client);
println!("method: {method:?}\r");
println!("params: {params:?}\r");
let response = provider.inner().prepare(method, params.clone()).await;
println!("res: {response:?}\r");
// let Ok(response) = tokio::time::timeout(
// std::time::Duration::from_secs(timeout),
// pubsub.inner().prepare(method, params.clone()),
// )
// .await
// else {
// println!("what the FUCK\r");
// // this provider failed and needs to be reset
// url_provider.pubsub = None;
// continue;
// };
println!("here6\r");
if let Ok(value) = response {
send_to_loop
.send(KernelMessage {

View File

@ -14,8 +14,8 @@ license = "Apache-2.0"
reqwest = { version = "0.11.22", features = ["blocking"] }
[dependencies]
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "098ad56" }
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" }
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "6f8ebb4" }
lazy_static = "1.4.0"
rand = "0.8.4"
ring = "0.16.20"

View File

@ -47,7 +47,7 @@ pub struct EthSubError {
}
/// The Response type which a process will get from requesting with an [`EthAction`] will be
/// of the form `Result<(), EthError>`, serialized and deserialized using `serde_json::to_vec`
/// of this type, serialized and deserialized using `serde_json::to_vec`
/// and `serde_json::from_slice`.
#[derive(Debug, Serialize, Deserialize)]
pub enum EthResponse {
@ -68,10 +68,14 @@ pub enum EthError {
SubscriptionNotFound,
/// Invalid method
InvalidMethod(String),
/// Invalid params
InvalidParams,
/// Permission denied
PermissionDenied,
/// Internal RPC error
RpcError(String),
/// RPC timed out
RpcTimeout,
}
/// The action type used for configuring eth:distro:sys. Only processes which have the "root"