Merge branch 'develop' into hf/app_store-always-respond-with-error

This commit is contained in:
bitful-pannul 2024-09-10 17:15:35 +03:00
commit f704eb9521
12 changed files with 330 additions and 334 deletions

94
Cargo.lock generated
View File

@ -78,7 +78,7 @@ name = "alias"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -983,7 +983,7 @@ dependencies = [
"alloy-sol-types",
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"process_macros",
"rand 0.8.5",
"serde",
@ -1354,7 +1354,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"url",
@ -1517,7 +1517,7 @@ name = "cat"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -1581,7 +1581,7 @@ dependencies = [
"alloy-sol-types",
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"process_macros",
"rand 0.8.5",
"serde",
@ -1600,7 +1600,7 @@ version = "0.2.1"
dependencies = [
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"pleco",
"serde",
"serde_json",
@ -2330,12 +2330,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "docs"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?rev=db4a6c1)",
"process_macros",
"serde",
"serde_json",
"wit-bindgen",
]
[[package]]
name = "download"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"process_macros",
"serde",
"serde_json",
@ -2384,7 +2396,7 @@ dependencies = [
name = "echo"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"wit-bindgen",
]
@ -2767,7 +2779,7 @@ dependencies = [
name = "get_block"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -2830,7 +2842,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
name = "globe"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"url",
@ -2957,7 +2969,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
name = "help"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"wit-bindgen",
]
@ -2986,7 +2998,7 @@ checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46"
name = "hi"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -3007,7 +3019,7 @@ version = "0.1.1"
dependencies = [
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -3335,7 +3347,7 @@ name = "install"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"process_macros",
"serde",
"serde_json",
@ -3512,7 +3524,7 @@ name = "kfetch"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"rmp-serde",
"serde",
"serde_json",
@ -3524,7 +3536,7 @@ name = "kill"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -3532,7 +3544,7 @@ dependencies = [
[[package]]
name = "kinode"
version = "0.9.2"
version = "0.9.3"
dependencies = [
"aes-gcm",
"alloy 0.2.1",
@ -3591,7 +3603,7 @@ dependencies = [
[[package]]
name = "kinode_lib"
version = "0.9.1"
version = "0.9.3"
dependencies = [
"lib",
]
@ -3640,6 +3652,28 @@ dependencies = [
"wit-bindgen",
]
[[package]]
name = "kinode_process_lib"
version = "0.9.0"
source = "git+https://github.com/kinode-dao/process_lib?rev=db4a6c1#db4a6c19043807dbbfaeb517561db8931b4a3dd6"
dependencies = [
"alloy 0.1.4",
"alloy-primitives",
"alloy-sol-macro",
"alloy-sol-types",
"anyhow",
"bincode",
"http 1.1.0",
"mime_guess",
"rand 0.8.5",
"rmp-serde",
"serde",
"serde_json",
"thiserror",
"url",
"wit-bindgen",
]
[[package]]
name = "kinode_process_lib"
version = "0.9.1"
@ -3734,7 +3768,7 @@ dependencies = [
"alloy-sol-types",
"anyhow",
"hex",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"rmp-serde",
"serde",
"serde_json",
@ -3762,7 +3796,7 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "lib"
version = "0.9.2"
version = "0.9.3"
dependencies = [
"alloy 0.2.1",
"kit 0.6.8",
@ -3950,7 +3984,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"regex",
"serde",
"serde_json",
@ -4111,7 +4145,7 @@ dependencies = [
name = "net_diagnostics"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"rmp-serde",
"serde",
"wit-bindgen",
@ -4424,7 +4458,7 @@ dependencies = [
name = "peer"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"rmp-serde",
"serde",
"wit-bindgen",
@ -4434,7 +4468,7 @@ dependencies = [
name = "peers"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"rmp-serde",
"serde",
"wit-bindgen",
@ -5449,7 +5483,7 @@ dependencies = [
"anyhow",
"base64 0.22.1",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"rmp-serde",
"serde",
"serde_json",
@ -5667,7 +5701,7 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
name = "state"
version = "0.1.0"
dependencies = [
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -5861,7 +5895,7 @@ version = "0.1.1"
dependencies = [
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"rand 0.8.5",
"regex",
"serde",
@ -5875,7 +5909,7 @@ version = "0.1.1"
dependencies = [
"anyhow",
"bincode",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"process_macros",
"serde",
"serde_json",
@ -6132,7 +6166,7 @@ version = "0.2.0"
dependencies = [
"anyhow",
"clap",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"serde",
"serde_json",
"wit-bindgen",
@ -6449,7 +6483,7 @@ name = "uninstall"
version = "0.1.0"
dependencies = [
"anyhow",
"kinode_process_lib 0.9.0",
"kinode_process_lib 0.9.0 (git+https://github.com/kinode-dao/process_lib?tag=v0.9.0)",
"process_macros",
"serde",
"serde_json",

View File

@ -1,7 +1,7 @@
[package]
name = "kinode_lib"
authors = ["KinodeDAO"]
version = "0.9.1"
version = "0.9.3"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"
@ -17,6 +17,7 @@ members = [
"kinode/packages/app_store/app_store", "kinode/packages/app_store/ft_worker",
"kinode/packages/app_store/download", "kinode/packages/app_store/install", "kinode/packages/app_store/uninstall", "kinode/packages/app_store/downloads", "kinode/packages/app_store/chain",
"kinode/packages/chess/chess",
"kinode/packages/docs/docs",
"kinode/packages/homepage/homepage",
"kinode/packages/kino_updates/blog", "kinode/packages/kino_updates/globe",
"kinode/packages/kns_indexer/kns_indexer", "kinode/packages/kns_indexer/get_block", "kinode/packages/kns_indexer/state",

View File

@ -1,7 +1,7 @@
[package]
name = "kinode"
authors = ["KinodeDAO"]
version = "0.9.2"
version = "0.9.3"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"

View File

@ -198,7 +198,11 @@ fn main() -> anyhow::Result<()> {
}
}
get_kinode_book(&packages_dir)?;
if std::env::var("SKIP_BOOK").is_ok() {
p!("skipping book build");
} else {
get_kinode_book(&packages_dir)?;
}
output_reruns(&packages_dir);

View File

@ -1,8 +1,5 @@
use kinode_process_lib::{
await_message, call_init,
homepage::add_to_homepage,
http::server::{HttpBindingConfig, HttpServer},
println, Address,
await_message, call_init, homepage, http, println, vfs, Address, LazyLoadBlob,
};
wit_bindgen::generate!({
@ -16,17 +13,82 @@ call_init!(init);
fn init(our: Address) {
println!("begin");
let mut server = HttpServer::new(5);
let mut server = http::server::HttpServer::new(5);
// Serve the docs book dynamically from /docs:docs:sys/
server
.serve_ui(&our, "ui", vec!["/"], HttpBindingConfig::default())
.unwrap();
.bind_http_path("/", http::server::HttpBindingConfig::default())
.expect("failed to bind /");
add_to_homepage("Docs", Some(ICON), Some("index.html"), None);
homepage::add_to_homepage("Docs", Some(ICON), Some("index.html"), None);
loop {
match await_message() {
Err(send_error) => println!("got SendError: {send_error}"),
Ok(ref _message) => println!("got message"),
Ok(ref message) => {
// handle http requests
// no need to validate source since capabilities limit to vfs/http_server
let Ok(request) = server.parse_request(message.body()) else {
continue;
};
server.handle_request(
request,
|incoming| {
// client frontend sent an HTTP request, process it and
// return an HTTP response
// these functions can reuse the logic from handle_local_request
// after converting the request into the appropriate format!
match incoming.method().unwrap_or_default() {
http::Method::GET => {
// serve the page they requested
match vfs::File::new(
format!(
"{}/pkg/ui{}",
our.package_id(),
incoming.path().unwrap_or_default()
),
5,
)
.read()
{
Ok(file) => {
let mime_type = format!(
"text/{}",
incoming
.path()
.unwrap_or_default()
.split('.')
.last()
.unwrap_or("plain")
);
(
http::server::HttpResponse::new(http::StatusCode::OK)
.header("Content-Type", mime_type),
Some(LazyLoadBlob::new(None::<String>, file)),
)
}
Err(e) => (
http::server::HttpResponse::new(
http::StatusCode::NOT_FOUND,
)
.header("Content-Type", "text/html"),
Some(LazyLoadBlob::new(None::<String>, e.to_string())),
),
}
}
_ => (
http::server::HttpResponse::new(
http::StatusCode::METHOD_NOT_ALLOWED,
),
None,
),
}
},
|_channel_id, _message_type, _message| {
// client frontend sent a websocket message, ignore
},
)
}
}
}
}

View File

@ -1209,7 +1209,7 @@ async fn kernel_message<T: Serialize>(
// its an Err: handle
match e {
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
panic!("(eth) kernel message sender: receiver closed");
return;
}
tokio::sync::mpsc::error::TrySendError::Full(_) => {
// TODO: implement backpressure

View File

@ -70,10 +70,9 @@ pub async fn create_new_subscription(
let send_to_loop = send_to_loop.clone();
let print_tx = print_tx.clone();
let active_subscriptions = active_subscriptions.clone();
let providers = providers.clone();
let (close_sender, close_receiver) = tokio::sync::mpsc::channel(1);
match maybe_raw_sub {
Ok((rx, chain_id)) => {
Ok((rx, _chain_id)) => {
subs.insert(
sub_id,
// this is a local sub, as in, we connect to the rpc endpoint
@ -89,10 +88,7 @@ pub async fn create_new_subscription(
&rsvp,
&send_to_loop,
&active_subscriptions,
chain_id,
&providers,
close_receiver,
&print_tx,
)
.await;
let Err(e) = r else {
@ -381,15 +377,11 @@ async fn maintain_local_subscription(
rsvp: &Option<Address>,
send_to_loop: &MessageSender,
active_subscriptions: &ActiveSubscriptions,
chain_id: u64,
providers: &Providers,
mut close_receiver: tokio::sync::mpsc::Receiver<bool>,
print_tx: &PrintSender,
) -> Result<(), EthSubError> {
let e = loop {
tokio::select! {
_ = close_receiver.recv() => {
//unsubscribe(rx, &chain_id, providers, print_tx).await;
return Ok(());
},
value = rx.recv() => {
@ -420,39 +412,13 @@ async fn maintain_local_subscription(
.and_modify(|sub_map| {
sub_map.remove(&sub_id);
});
//unsubscribe(rx, &chain_id, providers, print_tx).await;
Err(EthSubError {
id: sub_id,
error: format!("subscription ({target}) closed unexpectedly {e}"),
})
}
async fn unsubscribe(
rx: RawSubscription,
chain_id: &u64,
providers: &Providers,
print_tx: &PrintSender,
) {
let alloy_sub_id = rx.local_id();
let alloy_sub_id = alloy_sub_id.clone().into();
let Some(chain_providers) = providers.get_mut(chain_id) else {
return; //?
};
for url in chain_providers.urls.iter() {
let Some(pubsub) = url.pubsub.as_ref() else {
continue;
};
if let Err(err) = pubsub.unsubscribe(alloy_sub_id) {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("unsubscribe from ETH RPC failed: {err:?}"),
})
.await;
}
}
}
/// handle the subscription updates from a remote provider,
/// and also perform keepalive checks on that provider.
/// current keepalive is 30s, this can be adjusted as desired

View File

@ -1,21 +1,28 @@
use crate::http::server_types::*;
use crate::http::utils::*;
use crate::http::server_types::{
HttpResponse, HttpServerAction, HttpServerError, HttpServerRequest, IncomingHttpRequest,
MessageType, RpcResponseBody, WsMessageType,
};
use crate::http::utils;
use crate::keygen;
use anyhow::Result;
use base64::{engine::general_purpose::STANDARD as base64_standard, Engine};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use http::uri::Authority;
use lib::types::core::*;
use lib::types::core::{
Address, KernelCommand, KernelMessage, LazyLoadBlob, LoginInfo, Message, MessageReceiver,
MessageSender, PrintSender, Printout, ProcessId, Request, Response, HTTP_SERVER_PROCESS_ID,
};
use route_recognizer::Router;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock;
use warp::http::{header::HeaderValue, StatusCode};
use warp::ws::{WebSocket, Ws};
use warp::{Filter, Reply};
use warp::{
http::{header::HeaderValue, StatusCode},
ws::{WebSocket, Ws},
Filter, Reply,
};
#[cfg(not(feature = "simulation-mode"))]
const HTTP_SELF_IMPOSED_TIMEOUT: u64 = 15;
@ -180,39 +187,37 @@ pub async fn http_server(
mut recv_in_server: MessageReceiver,
send_to_loop: MessageSender,
print_tx: PrintSender,
) -> Result<()> {
let our_name = Arc::new(our_name);
let encoded_keyfile = Arc::new(encoded_keyfile);
let jwt_secret_bytes = Arc::new(jwt_secret_bytes);
) -> anyhow::Result<()> {
let http_response_senders: HttpResponseSenders = Arc::new(DashMap::new());
let ws_senders: WebSocketSenders = Arc::new(DashMap::new());
let path = format!("/rpc:distro:sys/message");
// add RPC path
let mut bindings_map: Router<BoundPath> = Router::new();
let rpc_bound_path = BoundPath {
app: Some(ProcessId::new(Some("rpc"), "distro", "sys")),
path: path.clone(),
secure_subdomain: None,
authenticated: false,
local_only: true,
static_content: None,
};
bindings_map.add(&path, rpc_bound_path);
let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map));
// ws path bindings
// add local-only RPC path
bindings_map.add(
"/rpc:distro:sys/message",
BoundPath {
app: Some(ProcessId::new(Some("rpc"), "distro", "sys")),
path: "/rpc:distro:sys/message".to_string(),
secure_subdomain: None,
authenticated: false,
local_only: true,
static_content: None,
},
);
let path_bindings: PathBindings = Arc::new(RwLock::new(bindings_map));
let ws_path_bindings: WsPathBindings = Arc::new(RwLock::new(Router::new()));
tokio::spawn(serve(
our_name.clone(),
Arc::new(our_name),
our_port,
http_response_senders.clone(),
path_bindings.clone(),
ws_path_bindings.clone(),
ws_senders.clone(),
encoded_keyfile.clone(),
jwt_secret_bytes.clone(),
Arc::new(encoded_keyfile),
Arc::new(jwt_secret_bytes),
send_to_loop.clone(),
print_tx.clone(),
));
@ -246,17 +251,14 @@ async fn serve(
send_to_loop: MessageSender,
print_tx: PrintSender,
) {
let _ = print_tx
.send(Printout {
verbosity: 0,
content: format!("http_server: running on port {our_port}"),
})
Printout::new(0, format!("http_server: running on port {our_port}"))
.send(&print_tx)
.await;
// filter to receive websockets
let cloned_msg_tx = send_to_loop.clone();
let cloned_our = our.clone();
let cloned_jwt_secret_bytes = jwt_secret_bytes.clone();
let cloned_msg_tx = send_to_loop.clone();
let cloned_print_tx = print_tx.clone();
let ws_route = warp::ws()
.and(warp::addr::remote())
@ -394,22 +396,19 @@ async fn ws_handler(
send_to_loop: MessageSender,
print_tx: PrintSender,
) -> Result<impl warp::Reply, warp::Rejection> {
let original_path = normalize_path(path.as_str()).to_string();
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!("http_server: got ws request for {original_path}"),
})
let original_path = utils::normalize_path(path.as_str());
Printout::new(2, format!("http_server: ws request for {original_path}"))
.send(&print_tx)
.await;
let serialized_headers = serialize_headers(&headers);
let ws_path_bindings = ws_path_bindings.read().await;
let serialized_headers = utils::serialize_headers(&headers);
let Ok(route) = ws_path_bindings.recognize(&original_path) else {
let ws_path_bindings = ws_path_bindings.read().await;
let Ok(route) = ws_path_bindings.recognize(original_path) else {
return Err(warp::reject::not_found());
};
let bound_path = route.handler();
let Some(app) = bound_path.app.clone() else {
return Err(warp::reject::not_found());
};
@ -420,14 +419,6 @@ async fn ws_handler(
};
if let Some(ref subdomain) = bound_path.secure_subdomain {
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!(
"http_server: ws request for {original_path} bound by subdomain {subdomain}"
),
})
.await;
// assert that host matches what this app wants it to be
let host = match host {
Some(host) => host,
@ -436,12 +427,12 @@ async fn ws_handler(
// parse out subdomain from host (there can only be one)
let request_subdomain = host.host().split('.').next().unwrap_or("");
if request_subdomain != subdomain
|| !auth_cookie_valid(&our, Some(&app), auth_token, &jwt_secret_bytes)
|| !utils::auth_cookie_valid(&our, Some(&app), auth_token, &jwt_secret_bytes)
{
return Err(warp::reject::not_found());
}
} else {
if !auth_cookie_valid(&our, None, auth_token, &jwt_secret_bytes) {
if !utils::auth_cookie_valid(&our, None, auth_token, &jwt_secret_bytes) {
return Err(warp::reject::not_found());
}
}
@ -501,25 +492,30 @@ async fn http_handler(
print_tx: PrintSender,
login_html: Arc<String>,
) -> Result<impl warp::Reply, warp::Rejection> {
// trim trailing "/"
let original_path = normalize_path(path.as_str());
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!("http_server: got request for path {original_path}"),
})
let original_path = utils::normalize_path(path.as_str());
let base_path = original_path.split('/').skip(1).next().unwrap_or("");
Printout::new(2, format!("http_server: request for {original_path}"))
.send(&print_tx)
.await;
let id: u64 = rand::random();
let serialized_headers = serialize_headers(&headers);
let path_bindings = path_bindings.read().await;
let Ok(route) = path_bindings.recognize(&original_path) else {
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!("http_server: no route found for {original_path}"),
})
.await;
let id: u64 = rand::random();
let serialized_headers = utils::serialize_headers(&headers);
let path_bindings = path_bindings.read().await;
let route = if let Ok(route) = path_bindings.recognize(&original_path) {
route
} else if let Ok(base_route) = path_bindings.recognize(base_path) {
// if the specific path isn't found, try the base path which should
// be just the process ID. use the base path configuration to handle
// paths that have not been specifically bound by that process.
base_route
} else {
Printout::new(
2,
format!("http_server: no route found for {original_path}"),
)
.send(&print_tx)
.await;
return Ok(warp::reply::with_status(vec![], StatusCode::NOT_FOUND).into_response());
};
let bound_path = route.handler();
@ -532,14 +528,6 @@ async fn http_handler(
if bound_path.authenticated {
if let Some(ref subdomain) = bound_path.secure_subdomain {
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!(
"http_server: request for {original_path} bound by subdomain {subdomain}"
),
})
.await;
let request_subdomain = host.host().split('.').next().unwrap_or("");
// assert that host matches what this app wants it to be
if request_subdomain.is_empty() {
@ -579,7 +567,7 @@ async fn http_handler(
.body(vec![])
.into_response());
}
if !auth_cookie_valid(
if !utils::auth_cookie_valid(
&our,
Some(&app),
serialized_headers.get("cookie").unwrap_or(&"".to_string()),
@ -592,7 +580,7 @@ async fn http_handler(
.into_response());
}
} else {
if !auth_cookie_valid(
if !utils::auth_cookie_valid(
&our,
None,
serialized_headers.get("cookie").unwrap_or(&"".to_string()),
@ -694,29 +682,14 @@ async fn http_handler(
drop(path_bindings);
if is_fire_and_forget {
match send_to_loop.send(message).await {
Ok(_) => {}
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR)
.into_response(),
);
}
}
message.send(&send_to_loop).await;
return Ok(warp::reply::with_status(vec![], StatusCode::OK).into_response());
}
let (response_sender, response_receiver) = tokio::sync::oneshot::channel();
http_response_senders.insert(id, (original_path.to_string(), response_sender));
match send_to_loop.send(message).await {
Ok(_) => {}
Err(_) => {
return Ok(
warp::reply::with_status(vec![], StatusCode::INTERNAL_SERVER_ERROR).into_response(),
);
}
}
message.send(&send_to_loop).await;
let timeout_duration = tokio::time::Duration::from_secs(HTTP_SELF_IMPOSED_TIMEOUT);
let result = tokio::time::timeout(timeout_duration, response_receiver).await;
@ -743,7 +716,7 @@ async fn http_handler(
// Merge the deserialized headers into the existing headers
let existing_headers = response.headers_mut();
for (header_name, header_value) in deserialize_headers(http_response.headers).iter() {
for (header_name, header_value) in utils::deserialize_headers(http_response.headers).iter() {
if header_name == "set-cookie" || header_name == "Set-Cookie" {
if let Ok(cookie) = header_value.to_str() {
let cookie_headers: Vec<&str> = cookie
@ -768,7 +741,7 @@ async fn handle_rpc_message(
body: warp::hyper::body::Bytes,
print_tx: PrintSender,
) -> Result<(KernelMessage, bool), StatusCode> {
let Ok(rpc_message) = serde_json::from_slice::<RpcMessage>(&body) else {
let Ok(rpc_message) = serde_json::from_slice::<utils::RpcMessage>(&body) else {
return Err(StatusCode::BAD_REQUEST);
};
@ -776,12 +749,12 @@ async fn handle_rpc_message(
return Err(StatusCode::BAD_REQUEST);
};
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!("http_server: passing on RPC message to {target_process}"),
})
.await;
Printout::new(
2,
format!("http_server: passing on RPC message to {target_process}"),
)
.send(&print_tx)
.await;
let blob: Option<LazyLoadBlob> = match rpc_message.data {
None => None,
@ -801,19 +774,12 @@ async fn handle_rpc_message(
Ok((
KernelMessage {
id,
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: rpc_message.node.unwrap_or(our.to_string()),
process: target_process,
},
source: Address::new(&*our, HTTP_SERVER_PROCESS_ID.clone()),
target: Address::new(rpc_message.node.unwrap_or(our.to_string()), target_process),
rsvp,
message: Message::Request(Request {
inherit: false,
expects_response: rpc_message.expects_response.clone(),
//expects_response: Some(15), // NB: no effect on runtime
expects_response: rpc_message.expects_response,
body: match rpc_message.body {
Some(body_string) => body_string.into_bytes(),
None => Vec::new(),
@ -836,14 +802,8 @@ fn make_websocket_message(
) -> Option<KernelMessage> {
Some(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: app,
},
source: Address::new(&our, HTTP_SERVER_PROCESS_ID.clone()),
target: Address::new(&our, app),
rsvp: None,
message: Message::Request(Request {
inherit: false,
@ -937,14 +897,8 @@ fn make_ext_websocket_message(
Some(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.to_string(),
process: app,
},
source: Address::new(&our, HTTP_SERVER_PROCESS_ID.clone()),
target: Address::new(&our, app),
rsvp: None,
message,
lazy_load_blob: blob,
@ -968,35 +922,28 @@ async fn maintain_websocket(
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
ws_senders.insert(channel_id, (app.clone(), ws_sender));
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!("http_server: new websocket connection to {app} with id {channel_id}"),
})
.await;
Printout::new(
2,
format!("http_server: new websocket connection to {app} with id {channel_id}"),
)
.send(&print_tx)
.await;
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: our.to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: our.clone().to_string(),
process: app.clone(),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&HttpServerRequest::WebSocketOpen { path, channel_id })
.unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
KernelMessage::builder()
.id(rand::random())
.source(Address::new(&*our, HTTP_SERVER_PROCESS_ID.clone()))
.target(Address::new(&*our, app.clone()))
.message(Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&HttpServerRequest::WebSocketOpen { path, channel_id })
.unwrap(),
metadata: None,
capabilities: vec![],
}))
.build()
.unwrap()
.send(&send_to_loop)
.await;
let make_ws_message = if extension {
@ -1049,12 +996,12 @@ async fn maintain_websocket(
}
}
}
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!("http_server: websocket connection {channel_id} closed"),
})
.await;
Printout::new(
2,
format!("http_server: websocket connection {channel_id} closed"),
)
.send(&print_tx)
.await;
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
}
@ -1066,34 +1013,20 @@ async fn websocket_close(
send_to_loop: &MessageSender,
) {
ws_senders.remove(&channel_id);
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
source: Address {
node: "our".to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
},
target: Address {
node: "our".to_string(),
process,
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&HttpServerRequest::WebSocketClose(channel_id)).unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: Some(LazyLoadBlob {
mime: None,
bytes: serde_json::to_vec(&RpcResponseBody {
body: Vec::new(),
lazy_load_blob: None,
})
.unwrap(),
}),
})
KernelMessage::builder()
.id(rand::random())
.source(Address::new("our", HTTP_SERVER_PROCESS_ID.clone()))
.target(Address::new("our", process))
.message(Message::Request(Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&HttpServerRequest::WebSocketClose(channel_id)).unwrap(),
metadata: None,
capabilities: vec![],
}))
.build()
.unwrap()
.send(send_to_loop)
.await;
}
@ -1180,25 +1113,24 @@ async fn handle_app_message(
local_only,
cache,
} => {
let path = format_path_with_process(&km.source.process, &path);
let path = utils::format_path_with_process(&km.source.process, &path);
let mut path_bindings = path_bindings.write().await;
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!(
"http: binding {path}, {}, {}, {}",
if authenticated {
"authenticated"
} else {
"unauthenticated"
},
if local_only { "local only" } else { "open" },
if cache { "cached" } else { "dynamic" },
),
})
.await;
Printout::new(
2,
format!(
"http: binding {path}, {}, {}, {}",
if authenticated {
"authenticated"
} else {
"unauthenticated"
},
if local_only { "local only" } else { "open" },
if cache { "cached" } else { "dynamic" },
),
)
.send(&print_tx)
.await;
if !cache {
// trim trailing "/"
path_bindings.add(
&path,
BoundPath {
@ -1221,7 +1153,6 @@ async fn handle_app_message(
.await;
return;
};
// trim trailing "/"
path_bindings.add(
&path,
BoundPath {
@ -1236,18 +1167,18 @@ async fn handle_app_message(
}
}
HttpServerAction::SecureBind { path, cache } => {
let path = format_path_with_process(&km.source.process, &path);
let subdomain = generate_secure_subdomain(&km.source.process);
let path = utils::format_path_with_process(&km.source.process, &path);
let subdomain = utils::generate_secure_subdomain(&km.source.process);
let mut path_bindings = path_bindings.write().await;
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!(
"http: binding subdomain {subdomain} with path {path}, {}",
if cache { "cached" } else { "dynamic" },
),
})
.await;
Printout::new(
2,
format!(
"http: binding subdomain {subdomain} with path {path}, {}",
if cache { "cached" } else { "dynamic" },
),
)
.send(&print_tx)
.await;
if !cache {
path_bindings.add(
&path,
@ -1271,7 +1202,6 @@ async fn handle_app_message(
.await;
return;
};
// trim trailing "/"
path_bindings.add(
&path,
BoundPath {
@ -1286,7 +1216,7 @@ async fn handle_app_message(
}
}
HttpServerAction::Unbind { path } => {
let path = format_path_with_process(&km.source.process, &path);
let path = utils::format_path_with_process(&km.source.process, &path);
let mut path_bindings = path_bindings.write().await;
path_bindings.add(
&path,
@ -1306,7 +1236,7 @@ async fn handle_app_message(
encrypted,
extension,
} => {
let path = format_path_with_process(&km.source.process, &path);
let path = utils::format_path_with_process(&km.source.process, &path);
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&path,
@ -1324,8 +1254,8 @@ async fn handle_app_message(
encrypted,
extension,
} => {
let path = format_path_with_process(&km.source.process, &path);
let subdomain = generate_secure_subdomain(&km.source.process);
let path = utils::format_path_with_process(&km.source.process, &path);
let subdomain = utils::generate_secure_subdomain(&km.source.process);
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&path,
@ -1339,7 +1269,7 @@ async fn handle_app_message(
);
}
HttpServerAction::WebSocketUnbind { mut path } => {
let path = format_path_with_process(&km.source.process, &path);
let path = utils::format_path_with_process(&km.source.process, &path);
let mut ws_path_bindings = ws_path_bindings.write().await;
ws_path_bindings.add(
&path,
@ -1448,25 +1378,21 @@ pub async fn send_action_response(
send_to_loop: &MessageSender,
result: Result<(), HttpServerError>,
) {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: "our".to_string(),
process: HTTP_SERVER_PROCESS_ID.clone(),
KernelMessage::builder()
.id(id)
.source(Address::new("our", HTTP_SERVER_PROCESS_ID.clone()))
.target(target)
.message(Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&result).unwrap(),
metadata: None,
capabilities: vec![],
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
body: serde_json::to_vec(&result).unwrap(),
metadata: None,
capabilities: vec![],
},
None,
)),
lazy_load_blob: None,
})
None,
)))
.build()
.unwrap()
.send(send_to_loop)
.await;
}

View File

@ -1,13 +1,12 @@
use hmac::{Hmac, Mac};
use jwt::VerifyWithKey;
use lib::{core::ProcessId, types::http_server};
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use std::collections::HashMap;
use tokio::net::TcpListener;
use warp::http::{header::HeaderName, header::HeaderValue, HeaderMap};
use lib::{core::ProcessId, types::http_server::*};
#[derive(Serialize, Deserialize)]
pub struct RpcMessage {
pub node: Option<String>,
@ -27,7 +26,7 @@ pub fn _verify_auth_token(auth_token: &str, jwt_secret: &[u8]) -> Result<String,
return Err(jwt::Error::Format);
};
let claims: Result<JwtClaims, jwt::Error> = auth_token.verify_with_key(&secret);
let claims: Result<http_server::JwtClaims, jwt::Error> = auth_token.verify_with_key(&secret);
match claims {
Ok(data) => Ok(data.username),
@ -66,7 +65,7 @@ pub fn auth_cookie_valid(
return false;
};
let claims: Result<JwtClaims, _> = auth_token.verify_with_key(&secret);
let claims: Result<http_server::JwtClaims, _> = auth_token.verify_with_key(&secret);
match claims {
Ok(data) => data.username == our_node && data.subdomain == subdomain.map(|s| s.to_string()),

View File

@ -52,7 +52,7 @@ pub async fn terminal(
) -> anyhow::Result<()> {
let (stdout, _maybe_raw_mode) = utils::startup(&our, version, is_detached)?;
let (win_cols, win_rows) = crossterm::terminal::size().expect("terminal: couldn't fetch size");
let (win_cols, win_rows) = crossterm::terminal::size().unwrap_or_else(|_| (0, 0));
let current_line = format!("{} > ", our.name);
let prompt_len: usize = our.name.len() + 3;

View File

@ -36,7 +36,7 @@ pub fn startup(
crossterm::terminal::SetTitle(format!("kinode {}", our.name))
)?;
let (win_cols, _) = crossterm::terminal::size().expect("terminal: couldn't fetch size");
let (win_cols, _) = crossterm::terminal::size().unwrap_or_else(|_| (0, 0));
// print initial splash screen, large if there's room, small otherwise
if win_cols >= 90 {
@ -323,5 +323,9 @@ pub fn truncate_in_place(
if end > s.len() {
return s.to_string();
}
prompt.to_string() + &s[(prompt_len + line_col - cursor_col as usize)..end]
let start = prompt_len + line_col - cursor_col as usize;
if start >= end {
return prompt.to_string();
}
prompt.to_string() + &s[start..end]
}

View File

@ -1,7 +1,7 @@
[package]
name = "lib"
authors = ["KinodeDAO"]
version = "0.9.2"
version = "0.9.3"
edition = "2021"
description = "A general-purpose sovereign cloud computing platform"
homepage = "https://kinode.org"