Merge pull request #456 from kinode-dao/v0.8.6

v0.8.6
This commit is contained in:
doria 2024-07-19 23:03:34 +09:00 committed by GitHub
commit 1953a73342
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 189 additions and 30 deletions

6
Cargo.lock generated
View File

@ -3203,7 +3203,7 @@ dependencies = [
[[package]]
name = "kinode"
version = "0.8.5"
version = "0.8.6"
dependencies = [
"aes-gcm",
"alloy",
@ -3259,7 +3259,7 @@ dependencies = [
[[package]]
name = "kinode_lib"
version = "0.8.5"
version = "0.8.6"
dependencies = [
"lib",
]
@ -3376,7 +3376,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "lib"
version = "0.8.5"
version = "0.8.6"
dependencies = [
"alloy",
"kit",

View File

@ -1,7 +1,7 @@
[package]
name = "kinode_lib"
authors = ["KinodeDAO"]
version = "0.8.5"
version = "0.8.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"

View File

@ -1,7 +1,7 @@
[package]
name = "kinode"
authors = ["KinodeDAO"]
version = "0.8.5"
version = "0.8.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"

View File

@ -37,14 +37,14 @@ struct ActiveProviders {
pub nodes: Vec<NodeProvider>,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct UrlProvider {
pub trusted: bool,
pub url: String,
pub pubsub: Option<RootProvider<PubSubFrontend>>,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct NodeProvider {
/// NOT CURRENTLY USED
pub trusted: bool,
@ -581,8 +581,12 @@ async fn fulfill_request(
let Some(method) = to_static_str(&method) else {
return EthResponse::Err(EthError::InvalidMethod(method.to_string()));
};
let Some(mut aps) = providers.get_mut(&chain_id) else {
return EthResponse::Err(EthError::NoRpcForChain);
let mut urls = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return EthResponse::Err(EthError::NoRpcForChain);
};
aps.urls.clone()
};
// first, try any url providers we have for this chain,
@ -590,7 +594,7 @@ async fn fulfill_request(
// finally, if no provider works, return an error.
// bump the successful provider to the front of the list for future requests
for (index, url_provider) in aps.urls.iter_mut().enumerate() {
for url_provider in urls.iter_mut() {
let pubsub = match &url_provider.pubsub {
Some(pubsub) => pubsub,
None => {
@ -613,8 +617,25 @@ async fn fulfill_request(
};
match pubsub.raw_request(method.into(), params.clone()).await {
Ok(value) => {
let successful_provider = aps.urls.remove(index);
aps.urls.insert(0, successful_provider);
let mut is_replacement_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_replacement_successful = false;
return ();
};
aps.urls.remove(index);
aps.urls.insert(0, url_provider.clone());
});
if !is_replacement_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
return EthResponse::Response { value };
}
Err(rpc_error) => {
@ -631,11 +652,38 @@ async fn fulfill_request(
return EthResponse::Err(EthError::RpcError(err));
}
// this provider failed and needs to be reset
url_provider.pubsub = None;
let mut is_reset_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_reset_successful = false;
return ();
};
let mut url = aps.urls.remove(index);
url.pubsub = None;
aps.urls.insert(index, url);
});
if !is_reset_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
}
}
}
for node_provider in &mut aps.nodes {
let nodes = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return EthResponse::Err(EthError::NoRpcForChain);
};
aps.nodes.clone()
};
for node_provider in &nodes {
verbose_print(
print_tx,
&format!(
@ -656,7 +704,13 @@ async fn fulfill_request(
.await;
if let EthResponse::Err(e) = response {
if let EthError::RpcMalformedResponse = e {
node_provider.usable = false;
set_node_unusable(
&providers,
&chain_id,
&node_provider.kns_update.name,
print_tx,
)
.await;
}
} else {
return response;
@ -987,3 +1041,47 @@ async fn kernel_message<T: Serialize>(
})
.await;
}
fn find_index(vec: &Vec<&str>, item: &str) -> Option<usize> {
vec.iter().enumerate().find_map(
|(index, value)| {
if *value == item {
Some(index)
} else {
None
}
},
)
}
async fn set_node_unusable(
providers: &Providers,
chain_id: &u64,
node_name: &str,
print_tx: &PrintSender,
) -> bool {
let mut is_replacement_successful = true;
providers.entry(chain_id.clone()).and_modify(|aps| {
let Some(index) = find_index(
&aps.nodes
.iter()
.map(|n| n.kns_update.name.as_str())
.collect(),
&node_name,
) else {
is_replacement_successful = false;
return ();
};
let mut node = aps.nodes.remove(index);
node.usable = false;
aps.nodes.insert(index, node);
});
if !is_replacement_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
is_replacement_successful
}

View File

@ -180,15 +180,21 @@ async fn build_subscription(
else {
return Err(EthError::PermissionDenied); // will never hit
};
let Some(mut aps) = providers.get_mut(&chain_id) else {
return Err(EthError::NoRpcForChain);
let mut urls = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return Err(EthError::NoRpcForChain);
};
aps.urls.clone()
};
let chain_id = chain_id.clone();
// first, try any url providers we have for this chain,
// then if we have none or they all fail, go to node providers.
// finally, if no provider works, return an error.
// bump the successful provider to the front of the list for future requests
for (index, url_provider) in aps.urls.iter_mut().enumerate() {
for url_provider in urls.iter_mut() {
let pubsub = match &url_provider.pubsub {
Some(pubsub) => pubsub,
None => {
@ -217,8 +223,25 @@ async fn build_subscription(
{
Ok(sub) => {
let rx = sub.into_raw();
let successful_provider = aps.urls.remove(index);
aps.urls.insert(0, successful_provider);
let mut is_replacement_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_replacement_successful = false;
return ();
};
aps.urls.remove(index);
aps.urls.insert(0, url_provider.clone());
});
if !is_replacement_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
return Ok(Ok(rx));
}
Err(rpc_error) => {
@ -231,7 +254,26 @@ async fn build_subscription(
)
.await;
// this provider failed and needs to be reset
url_provider.pubsub = None;
let mut is_reset_successful = true;
providers.entry(chain_id).and_modify(|aps| {
let Some(index) = find_index(
&aps.urls.iter().map(|u| u.url.as_str()).collect(),
&url_provider.url,
) else {
is_reset_successful = false;
return ();
};
let mut url = aps.urls.remove(index);
url.pubsub = None;
aps.urls.insert(index, url);
});
if !is_reset_successful {
verbose_print(
print_tx,
&format!("eth: unexpectedly couldn't find provider to be modified"),
)
.await;
}
}
}
}
@ -241,7 +283,14 @@ async fn build_subscription(
// we need to create our own unique sub id because in the remote provider node,
// all subs will be identified under our process address.
let remote_sub_id = rand::random();
for node_provider in &mut aps.nodes {
let nodes = {
// in code block to drop providers lock asap to avoid deadlock
let Some(aps) = providers.get(&chain_id) else {
return Err(EthError::NoRpcForChain);
};
aps.nodes.clone()
};
for node_provider in &nodes {
verbose_print(
&print_tx,
&format!(
@ -283,11 +332,23 @@ async fn build_subscription(
}
EthResponse::Response { .. } => {
// the response to a SubscribeLogs request must be an 'ok'
node_provider.usable = false;
set_node_unusable(
&providers,
&chain_id,
&node_provider.kns_update.name,
print_tx,
)
.await;
}
EthResponse::Err(e) => {
if let EthError::RpcMalformedResponse = e {
node_provider.usable = false;
set_node_unusable(
&providers,
&chain_id,
&node_provider.kns_update.name,
print_tx,
)
.await;
}
}
}

View File

@ -203,14 +203,14 @@ async fn handle_request(
.query_map(rusqlite::params_from_iter(parameters.iter()), |row| {
let mut map = HashMap::new();
for (i, column_name) in column_names.iter().enumerate() {
let value: SqlValue = row.get(i)?;
let value: Option<SqlValue> = row.get(i)?;
let value_json = match value {
SqlValue::Integer(int) => serde_json::Value::Number(int.into()),
SqlValue::Real(real) => serde_json::Value::Number(
Some(SqlValue::Integer(int)) => serde_json::Value::Number(int.into()),
Some(SqlValue::Real(real)) => serde_json::Value::Number(
serde_json::Number::from_f64(real).unwrap(),
),
SqlValue::Text(text) => serde_json::Value::String(text),
SqlValue::Blob(blob) => {
Some(SqlValue::Text(text)) => serde_json::Value::String(text),
Some(SqlValue::Blob(blob)) => {
serde_json::Value::String(base64_standard.encode(blob))
} // or another representation if you prefer
_ => serde_json::Value::Null,

View File

@ -1,7 +1,7 @@
[package]
name = "lib"
authors = ["KinodeDAO"]
version = "0.8.5"
version = "0.8.6"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"