Merge branch 'next-release' into jf/eth-rpc

This commit is contained in:
dr-frmr 2024-01-04 11:52:14 -03:00
commit f0fec1bf1c
No known key found for this signature in database
23 changed files with 962 additions and 223 deletions

View File

@ -384,7 +384,7 @@ fn handle_new_package(
.inherit(true)
.ipc(serde_json::to_vec(&kt::VfsRequest {
path: zip_path,
action: kt::VfsAction::ReWrite,
action: kt::VfsAction::Write,
})?)
.payload(payload)
.send_and_await_response(5)??;

View File

@ -3,13 +3,13 @@ use alloy_rpc_types::Log;
use alloy_sol_types::{sol, SolEvent};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::{Entry, HashMap};
use std::string::FromUtf8Error;
use std::str::FromStr;
use std::string::FromUtf8Error;
use uqbar_process_lib::eth::{EthAddress, SubscribeLogsRequest};
use uqbar_process_lib::{
await_message, get_typed_state, http, println, set_state,
Address, Message, Payload, Request, Response,
await_message, get_typed_state, http, println, set_state, Address, Message, Payload, Request,
Response,
};
use uqbar_process_lib::eth::{ EthAddress, SubscribeLogsRequest, };
wit_bindgen::generate!({
path: "../../../wit",
@ -66,7 +66,7 @@ impl QnsUpdate {
node: node.clone(),
..Default::default()
}
}
}
}
sol! {
@ -110,7 +110,6 @@ impl Guest for Component {
}
fn main(our: Address, mut state: State) -> anyhow::Result<()> {
// shove all state into net::net
Request::new()
.target((&our.node, "net", "sys", "uqbar"))
@ -120,7 +119,9 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
.send()?;
SubscribeLogsRequest::new()
.address(EthAddress::from_str("0x4C8D8d4A71cE21B4A16dAbf4593cDF30d79728F1")?)
.address(EthAddress::from_str(
"0x4C8D8d4A71cE21B4A16dAbf4593cDF30d79728F1",
)?)
.from_block(state.block - 1)
.events(vec![
"NodeRegistered(bytes32,bytes)",
@ -128,7 +129,8 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
"IpUpdate(bytes32,uint128)",
"WsUpdate(bytes32,uint16)",
"RoutingUpdate(bytes32,bytes32[])",
]).send()?;
])
.send()?;
http::bind_http_path("/node/:name", false, false)?;
@ -185,17 +187,17 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
match msg {
IndexerActions::EventSubscription(e) => {
state.block = e.clone().block_number.expect("expect").to::<u64>();
let node_id: alloy_primitives::FixedBytes<32> = e.topics[1];
let name = match state.names.entry(node_id.clone().to_string()) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => v.insert(get_name(&e))
Entry::Vacant(v) => v.insert(get_name(&e)),
};
let mut node = state.nodes
let mut node = state
.nodes
.entry(name.to_string())
.or_insert_with(|| QnsUpdate::new(name, &node_id.to_string()));
@ -204,11 +206,14 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
match e.topics[0].clone() {
KeyUpdate::SIGNATURE_HASH => {
node.public_key = KeyUpdate::abi_decode_data(&e.data, true)
.unwrap().0.to_string();
.unwrap()
.0
.to_string();
}
IpUpdate::SIGNATURE_HASH => {
let ip = IpUpdate::abi_decode_data(&e.data, true).unwrap().0;
node.ip = format!("{}.{}.{}.{}",
node.ip = format!(
"{}.{}.{}.{}",
(ip >> 24) & 0xFF,
(ip >> 16) & 0xFF,
(ip >> 8) & 0xFF,
@ -219,15 +224,20 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
node.port = WsUpdate::abi_decode_data(&e.data, true).unwrap().0;
}
RoutingUpdate::SIGNATURE_HASH => {
node.routers = RoutingUpdate::abi_decode_data(&e.data, true).unwrap().0
node.routers = RoutingUpdate::abi_decode_data(&e.data, true)
.unwrap()
.0
.iter()
.map(|r| r.to_string())
.collect::<Vec<String>>();
}
_ => { send = false; }
_ => {
send = false;
}
}
if send {
print_to_terminal(1, &format!("qns_indexer: sending ID to net: {:?}", node));
Request::new()
.target((&our.node, "net", "sys", "uqbar"))
.try_ipc(NetActions::QnsUpdate(node.clone()))?
@ -240,17 +250,15 @@ fn main(our: Address, mut state: State) -> anyhow::Result<()> {
}
fn get_name(log: &Log) -> String {
let decoded = NodeRegistered::abi_decode_data(&log.data, true).unwrap();
let name = match dnswire_decode(decoded.0.clone()) {
Ok(n) => { n }
Ok(n) => n,
Err(_) => {
println!("qns_indexer: failed to decode name: {:?}", decoded.0);
panic!("")
}
};
name
}
fn dnswire_decode(wire_format_bytes: Vec<u8>) -> Result<String, FromUtf8Error> {
@ -280,4 +288,3 @@ fn dnswire_decode(wire_format_bytes: Vec<u8>) -> Result<String, FromUtf8Error> {
Ok(name)
}
}

View File

@ -6,7 +6,8 @@
"request_networking": true,
"request_messaging": [
"net:sys:uqbar",
"http_client:sys:uqbar"
"http_client:sys:uqbar",
"kernel:sys:uqbar"
],
"public": true
}

View File

@ -1,24 +1,41 @@
use crate::http::types::*;
use crate::types::*;
use anyhow::Result;
use dashmap::DashMap;
use ethers_providers::StreamExt;
use futures::stream::{SplitSink, SplitStream};
use futures::SinkExt;
use http::header::{HeaderMap, HeaderName, HeaderValue};
use std::collections::HashMap;
use std::sync::Arc;
use tokio_tungstenite::tungstenite::{client::IntoClientRequest, Message as TungsteniteMessage};
use tokio_tungstenite::{connect_async, tungstenite};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
// Test http_client with these commands in the terminal
// !message our http_client {"method": "GET", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {}}
// !message our http_client {"method": "POST", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}}
// !message our http_client {"method": "PUT", "url": "https://jsonplaceholder.typicode.com/posts", "headers": {"Content-Type": "application/json"}}
// Outgoing WebSocket connections are stored by the source process ID and the channel_id
type WebSocketId = (ProcessId, u32);
type WebSocketMap = DashMap<
WebSocketId,
SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, tungstenite::Message>,
>;
type WebSocketStreams = Arc<WebSocketMap>;
pub async fn http_client(
our_name: String,
send_to_loop: MessageSender,
mut recv_in_client: MessageReceiver,
_print_tx: PrintSender,
print_tx: PrintSender,
) -> Result<()> {
let client = reqwest::Client::new();
let our_name = Arc::new(our_name);
let ws_streams: WebSocketStreams = Arc::new(DashMap::new());
while let Some(KernelMessage {
id,
source,
@ -33,21 +50,298 @@ pub async fn http_client(
..
}) = recv_in_client.recv().await
{
tokio::spawn(handle_message(
our_name.clone(),
id,
rsvp.unwrap_or(source),
expects_response,
ipc,
payload,
client.clone(),
send_to_loop.clone(),
));
// First check if a WebSocketClientAction, otherwise assume it's an OutgoingHttpRequest
if let Ok(ws_action) = serde_json::from_slice::<WebSocketClientAction>(&ipc) {
let ws_streams_clone = Arc::clone(&ws_streams);
let _ = handle_websocket_action(
our_name.clone(),
id,
rsvp.unwrap_or(source),
expects_response,
ws_action,
payload,
ws_streams_clone,
send_to_loop.clone(),
print_tx.clone(),
)
.await;
} else {
tokio::spawn(handle_http_request(
our_name.clone(),
id,
rsvp.unwrap_or(source),
expects_response,
ipc,
payload,
client.clone(),
send_to_loop.clone(),
print_tx.clone(),
));
}
}
Err(anyhow::anyhow!("http_client: loop died"))
}
async fn handle_message(
async fn handle_websocket_action(
our: Arc<String>,
id: u64,
target: Address,
expects_response: Option<u64>,
ws_action: WebSocketClientAction,
payload: Option<Payload>,
ws_streams: WebSocketStreams,
send_to_loop: MessageSender,
print_tx: PrintSender,
) {
let (result, channel_id) = match ws_action {
WebSocketClientAction::Open {
url,
headers,
channel_id,
} => (
connect_websocket(
our.clone(),
id,
target.clone(),
&url,
headers,
channel_id,
ws_streams,
send_to_loop.clone(),
print_tx,
)
.await,
channel_id,
),
WebSocketClientAction::Push {
channel_id,
message_type,
} => (
send_ws_push(
target.clone(),
channel_id,
message_type,
payload,
ws_streams,
)
.await,
channel_id,
),
WebSocketClientAction::Close { channel_id } => (
close_ws_connection(target.clone(), channel_id, ws_streams, print_tx).await,
channel_id,
),
WebSocketClientAction::Response { .. } => (Ok(()), 0), // No-op
};
match result {
Ok(_) => {}
Err(e) => {
websocket_error_message(
our.clone(),
id,
target,
expects_response,
channel_id,
e,
send_to_loop,
)
.await;
}
}
}
async fn connect_websocket(
our: Arc<String>,
id: u64,
target: Address,
url: &str,
headers: HashMap<String, String>,
channel_id: u32,
ws_streams: WebSocketStreams,
send_to_loop: MessageSender,
print_tx: PrintSender,
) -> Result<(), WebSocketClientError> {
let print_tx_clone = print_tx.clone();
let Ok(url) = url::Url::parse(url) else {
return Err(WebSocketClientError::BadUrl {
url: url.to_string(),
});
};
let Ok(mut req) = url.clone().into_client_request() else {
return Err(WebSocketClientError::BadRequest {
req: "failed to parse url into client request".into(),
});
};
let req_headers = req.headers_mut();
for (key, value) in headers.clone() {
req_headers.insert(
HeaderName::from_bytes(key.as_bytes()).unwrap(),
HeaderValue::from_str(&value).unwrap(),
);
}
let ws_stream = match connect_async(req).await {
Ok((ws_stream, _)) => ws_stream,
Err(_) => {
return Err(WebSocketClientError::OpenFailed {
url: url.to_string(),
});
}
};
let (sink, stream) = ws_stream.split();
if let Some(mut sink) = ws_streams.get_mut(&(target.process.clone(), channel_id)) {
let _ = sink.close().await;
}
ws_streams.insert((target.process.clone(), channel_id), sink);
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target: target.clone(),
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<WebSocketClientAction>(
&WebSocketClientAction::Response {
channel_id,
result: Ok(()),
},
)
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
tokio::spawn(listen_to_stream(
our.clone(),
id,
target.clone(),
channel_id,
stream,
ws_streams,
send_to_loop.clone(),
print_tx_clone,
));
Ok(())
}
async fn listen_to_stream(
our: Arc<String>,
id: u64,
target: Address,
channel_id: u32,
mut stream: SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>,
ws_streams: WebSocketStreams,
send_to_loop: MessageSender,
_print_tx: PrintSender,
) {
while let Some(message) = stream.next().await {
match message {
Ok(msg) => {
// Handle different types of messages here
match msg {
TungsteniteMessage::Text(text) => {
// send a Request to the target with the text as payload
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Push {
channel_id,
message_type: WsMessageType::Text,
},
Some(Payload {
mime: Some("text/plain".into()),
bytes: text.into_bytes(),
}),
send_to_loop.clone(),
)
.await;
}
TungsteniteMessage::Binary(bytes) => {
// send a Request to the target with the binary as payload
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Push {
channel_id,
message_type: WsMessageType::Binary,
},
Some(Payload {
mime: Some("application/octet-stream".into()),
bytes,
}),
send_to_loop.clone(),
)
.await;
}
TungsteniteMessage::Close(_) => {
// send a websocket close Request to the target
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Close { channel_id },
None,
send_to_loop.clone(),
)
.await;
// remove the websocket from the map
ws_streams.remove(&(target.process.clone(), channel_id));
break;
}
_ => (), // Handle other message types as needed
}
}
Err(e) => {
println!("WebSocket Client Error ({}): {:?}", channel_id, e);
// The connection was closed/reset by the remote server, so we'll remove and close it
if let Some(mut ws_sink) = ws_streams.get_mut(&(target.process.clone(), channel_id))
{
// Close the stream. The stream is closed even on error.
let _ = ws_sink.close().await;
}
ws_streams.remove(&(target.process.clone(), channel_id));
handle_ws_message(
our.clone(),
id,
target.clone(),
WebSocketClientAction::Close { channel_id },
None,
send_to_loop.clone(),
)
.await;
break;
}
}
}
}
async fn handle_http_request(
our: Arc<String>,
id: u64,
target: Address,
@ -56,11 +350,12 @@ async fn handle_message(
body: Option<Payload>,
client: reqwest::Client,
send_to_loop: MessageSender,
print_tx: PrintSender,
) {
let req: OutgoingHttpRequest = match serde_json::from_slice(&json) {
Ok(req) => req,
Err(_e) => {
make_error_message(
http_error_message(
our,
id,
target,
@ -76,7 +371,7 @@ async fn handle_message(
};
let Ok(req_method) = http::Method::from_bytes(req.method.as_bytes()) else {
make_error_message(
http_error_message(
our,
id,
target,
@ -88,6 +383,13 @@ async fn handle_message(
return;
};
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_client: building {req_method} request to {}", req.url),
})
.await;
let mut request_builder = client.request(req_method, req.url);
if let Some(version) = req.version {
@ -98,7 +400,7 @@ async fn handle_message(
"HTTP/2.0" => request_builder.version(http::Version::HTTP_2),
"HTTP/3.0" => request_builder.version(http::Version::HTTP_3),
_ => {
make_error_message(
http_error_message(
our,
id,
target,
@ -120,7 +422,7 @@ async fn handle_message(
.headers(deserialize_headers(req.headers))
.build()
else {
make_error_message(
http_error_message(
our,
id,
target,
@ -136,6 +438,12 @@ async fn handle_message(
match client.execute(request).await {
Ok(response) => {
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_client: executed request, got response"),
})
.await;
let _ = send_to_loop
.send(KernelMessage {
id,
@ -168,7 +476,13 @@ async fn handle_message(
.await;
}
Err(e) => {
make_error_message(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_client: executed request but got error"),
})
.await;
http_error_message(
our,
id,
target,
@ -221,7 +535,7 @@ fn deserialize_headers(hashmap: HashMap<String, String>) -> HeaderMap {
header_map
}
async fn make_error_message(
async fn http_error_message(
our: Arc<String>,
id: u64,
target: Address,
@ -256,3 +570,144 @@ async fn make_error_message(
.await;
}
}
async fn websocket_error_message(
our: Arc<String>,
id: u64,
target: Address,
expects_response: Option<u64>,
channel_id: u32,
error: WebSocketClientError,
send_to_loop: MessageSender,
) {
if expects_response.is_some() {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target,
rsvp: None,
message: Message::Response((
Response {
inherit: false,
ipc: serde_json::to_vec::<WebSocketClientAction>(
&WebSocketClientAction::Response {
channel_id,
result: Err(error),
},
)
.unwrap(),
metadata: None,
},
None,
)),
payload: None,
signed_capabilities: None,
})
.await;
}
}
async fn send_ws_push(
target: Address,
channel_id: u32,
message_type: WsMessageType,
payload: Option<Payload>,
ws_streams: WebSocketStreams,
) -> Result<(), WebSocketClientError> {
let Some(mut ws_stream) = ws_streams.get_mut(&(target.process.clone(), channel_id)) else {
return Err(WebSocketClientError::BadRequest {
req: format!("channel_id {} not found", channel_id),
});
};
let result = match message_type {
WsMessageType::Text => {
let Some(payload) = payload else {
return Err(WebSocketClientError::BadRequest {
req: "no payload".into(),
});
};
let Ok(text) = String::from_utf8(payload.bytes) else {
return Err(WebSocketClientError::BadRequest {
req: "failed to convert payload to string".into(),
});
};
ws_stream.send(TungsteniteMessage::Text(text)).await
}
WsMessageType::Binary => {
let Some(payload) = payload else {
return Err(WebSocketClientError::BadRequest {
req: "no payload".into(),
});
};
ws_stream
.send(TungsteniteMessage::Binary(payload.bytes))
.await
}
WsMessageType::Ping => {
// send a Request to the target with the ping as payload
ws_stream.send(TungsteniteMessage::Ping(vec![])).await
}
WsMessageType::Pong => {
// send a Request to the target with the pong as payload
ws_stream.send(TungsteniteMessage::Pong(vec![])).await
}
};
match result {
Ok(_) => Ok(()),
Err(_) => Err(WebSocketClientError::PushFailed { channel_id }),
}
}
async fn close_ws_connection(
target: Address,
channel_id: u32,
ws_streams: WebSocketStreams,
_print_tx: PrintSender,
) -> Result<(), WebSocketClientError> {
let Some(mut ws_sink) = ws_streams.get_mut(&(target.process.clone(), channel_id)) else {
return Err(WebSocketClientError::CloseFailed { channel_id });
};
// Close the stream. The stream is closed even on error.
let _ = ws_sink.close().await;
Ok(())
}
async fn handle_ws_message(
our: Arc<String>,
id: u64,
target: Address,
action: WebSocketClientAction,
payload: Option<Payload>,
send_to_loop: MessageSender,
) {
let _ = send_to_loop
.send(KernelMessage {
id,
source: Address {
node: our.to_string(),
process: ProcessId::new(Some("http_client"), "sys", "uqbar"),
},
target,
rsvp: None,
message: Message::Request(Request {
inherit: false,
ipc: serde_json::to_vec::<WebSocketClientAction>(&action).unwrap(),
expects_response: None,
metadata: None,
}),
payload,
signed_capabilities: None,
})
.await;
}

View File

@ -276,10 +276,12 @@ async fn ws_handler(
print_tx: PrintSender,
) -> Result<impl warp::Reply, warp::Rejection> {
let original_path = normalize_path(path.as_str());
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("got ws request for {original_path}"),
});
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_server: got ws request for {original_path}"),
})
.await;
let serialized_headers = serialize_headers(&headers);
let ws_path_bindings = ws_path_bindings.read().await;
@ -294,7 +296,7 @@ async fn ws_handler(
.send(Printout {
verbosity: 1,
content: format!(
"got request for path {original_path} bound by subdomain {subdomain}"
"http_server: ws request for {original_path} bound by subdomain {subdomain}"
),
})
.await;
@ -361,7 +363,7 @@ async fn http_handler(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("got request for path {original_path}"),
content: format!("http_server: got request for path {original_path}"),
})
.await;
let id: u64 = rand::random();
@ -389,7 +391,9 @@ async fn http_handler(
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("redirecting request from {socket_addr:?} to login page"),
content: format!(
"http_server: redirecting request from {socket_addr:?} to login page"
),
})
.await;
return Ok(warp::http::Response::builder()
@ -411,7 +415,7 @@ async fn http_handler(
.send(Printout {
verbosity: 1,
content: format!(
"got request for path {original_path} bound by subdomain {subdomain}"
"http_server: request for {original_path} bound by subdomain {subdomain}"
),
})
.await;
@ -435,26 +439,28 @@ async fn http_handler(
return Ok(warp::reply::with_status(vec![], StatusCode::FORBIDDEN).into_response());
}
// if path has static content, serve it
if let Some(static_content) = &bound_path.static_content {
return Ok(warp::http::Response::builder()
.status(StatusCode::OK)
.header(
"Content-Type",
static_content
.mime
.as_ref()
.unwrap_or(&"text/plain".to_string()),
)
.body(static_content.bytes.clone())
.into_response());
// if path has static content and this is a GET request, serve it
if method == warp::http::Method::GET {
if let Some(static_content) = &bound_path.static_content {
return Ok(warp::http::Response::builder()
.status(StatusCode::OK)
.header(
"Content-Type",
static_content
.mime
.as_ref()
.unwrap_or(&"text/plain".to_string()),
)
.body(static_content.bytes.clone())
.into_response());
}
}
// RPC functionality: if path is /rpc:sys:uqbar/message,
// we extract message from base64 encoded bytes in data
// and send it to the correct app.
let message = if bound_path.app == "rpc:sys:uqbar" {
match handle_rpc_message(our, id, body).await {
match handle_rpc_message(our, id, body, print_tx).await {
Ok(message) => message,
Err(e) => {
return Ok(warp::reply::with_status(vec![], e).into_response());
@ -560,6 +566,7 @@ async fn handle_rpc_message(
our: Arc<String>,
id: u64,
body: warp::hyper::body::Bytes,
print_tx: PrintSender,
) -> Result<KernelMessage, StatusCode> {
let Ok(rpc_message) = serde_json::from_slice::<RpcMessage>(&body) else {
return Err(StatusCode::BAD_REQUEST);
@ -569,6 +576,13 @@ async fn handle_rpc_message(
return Err(StatusCode::BAD_REQUEST);
};
let _ = print_tx
.send(Printout {
verbosity: 2,
content: format!("http_server: passing on RPC message to {target_process}"),
})
.await;
let payload: Option<Payload> = match rpc_message.data {
None => None,
Some(b64_bytes) => match base64::decode(b64_bytes) {
@ -619,17 +633,18 @@ async fn maintain_websocket(
print_tx: PrintSender,
) {
let (mut write_stream, mut read_stream) = ws.split();
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("got new client websocket connection"),
})
.await;
let channel_id: u32 = rand::random();
let (ws_sender, mut ws_receiver) = tokio::sync::mpsc::channel(100);
ws_senders.insert(channel_id, (app.clone(), ws_sender));
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_server: new websocket connection to {app} with id {channel_id}"),
})
.await;
let _ = send_to_loop
.send(KernelMessage {
id: rand::random(),
@ -654,10 +669,6 @@ async fn maintain_websocket(
})
.await;
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("websocket channel {channel_id} opened"),
});
loop {
tokio::select! {
read = read_stream.next() => {
@ -721,6 +732,12 @@ async fn maintain_websocket(
}
}
}
let _ = print_tx
.send(Printout {
verbosity: 1,
content: format!("http_server: websocket connection {channel_id} closed"),
})
.await;
let stream = write_stream.reunite(read_stream).unwrap();
let _ = stream.close().await;
}

View File

@ -58,6 +58,28 @@ pub struct HttpResponse {
// BODY is stored in the payload, as bytes
}
/// WebSocket Client Request type that can be shared over WASM boundary to apps.
/// This is the one you send to the `http_client:sys:uqbar` service.
#[derive(Debug, Serialize, Deserialize)]
pub enum WebSocketClientAction {
Open {
url: String,
headers: HashMap<String, String>,
channel_id: u32,
},
Push {
channel_id: u32,
message_type: WsMessageType,
},
Close {
channel_id: u32,
},
Response {
channel_id: u32,
result: Result<(), WebSocketClientError>,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RpcResponseBody {
pub ipc: Vec<u8>,
@ -78,6 +100,20 @@ pub enum HttpClientError {
RequestFailed { error: String },
}
#[derive(Error, Debug, Serialize, Deserialize)]
pub enum WebSocketClientError {
#[error("websocket_client: request format incorrect: {}.", req)]
BadRequest { req: String },
#[error("websocket_client: url could not be parsed: {}", url)]
BadUrl { url: String },
#[error("websocket_client: failed to open connection {}", url)]
OpenFailed { url: String },
#[error("websocket_client: failed to send message {}", channel_id)]
PushFailed { channel_id: u32 },
#[error("websocket_client: failed to close connection {}", channel_id)]
CloseFailed { channel_id: u32 },
}
/// Request type sent to `http_server:sys:uqbar` in order to configure it.
/// You can also send [`type@HttpServerAction::WebSocketPush`], which
/// allows you to push messages across an existing open WebSocket connection.

View File

@ -17,6 +17,8 @@ mod standard_host;
const PROCESS_CHANNEL_CAPACITY: usize = 100;
const DEFAULT_WIT_VERSION: u32 = 0;
type ProcessMessageSender =
tokio::sync::mpsc::Sender<Result<t::KernelMessage, t::WrappedSendError>>;
type ProcessMessageReceiver =
@ -143,6 +145,7 @@ async fn handle_kernel_request(
t::KernelCommand::InitializeProcess {
id,
wasm_bytes_handle,
wit_version,
on_exit,
initial_capabilities,
public,
@ -249,6 +252,7 @@ async fn handle_kernel_request(
process_id: id,
persisted: t::PersistedProcess {
wasm_bytes_handle,
wit_version,
on_exit,
capabilities: valid_capabilities,
public,
@ -424,6 +428,48 @@ async fn handle_kernel_request(
.await
.expect("event loop: fatal: sender died");
}
t::KernelCommand::Debug(kind) => match kind {
t::KernelPrint::ProcessMap => {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!("kernel process map:\r\n{:#?}", process_map),
})
.await;
}
t::KernelPrint::Process(process_id) => {
let Some(proc) = process_map.get(&process_id) else {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!("kernel: no such running process {}", process_id),
})
.await;
return;
};
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!("kernel process info:\r\n{proc:#?}",),
})
.await;
}
t::KernelPrint::HasCap { on, cap } => {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!(
"process {} has cap:\r\n{}",
on,
process_map
.get(&on)
.map(|p| p.capabilities.contains(&cap))
.unwrap_or(false)
),
})
.await;
}
},
}
}
@ -573,6 +619,10 @@ async fn start_process(
process: id.clone(),
},
wasm_bytes_handle: process_metadata.persisted.wasm_bytes_handle.clone(),
wit_version: process_metadata
.persisted
.wit_version
.unwrap_or(DEFAULT_WIT_VERSION),
on_exit: process_metadata.persisted.on_exit.clone(),
public: process_metadata.persisted.public,
};

View File

@ -193,8 +193,20 @@ impl ProcessState {
}))
.await;
});
self.save_context(kernel_message.id, new_context, timeout_handle)
.await;
self.contexts.insert(
request_id,
(
t::ProcessContext {
prompting_message: if self.prompting_message.is_some() {
self.prompting_message.clone()
} else {
None
},
context: new_context,
},
timeout_handle,
),
);
}
self.send_to_loop
@ -244,29 +256,6 @@ impl ProcessState {
.expect("fatal: kernel couldn't send response");
}
/// save a context for a given request.
async fn save_context(
&mut self,
request_id: u64,
context: Option<t::Context>,
jh: tokio::task::JoinHandle<()>,
) {
self.contexts.insert(
request_id,
(
t::ProcessContext {
prompting_message: if self.prompting_message.is_some() {
self.prompting_message.clone()
} else {
None
},
context,
},
jh,
),
);
}
/// instead of ingesting latest, wait for a specific ID and queue all others
async fn get_specific_message_for_process(
&mut self,
@ -305,21 +294,24 @@ impl ProcessState {
res: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (context, km) = match res {
Ok(km) => match self.contexts.remove(&km.id) {
None => {
// TODO if this a response, ignore it if we don't have outstanding context
Ok(km) => match &km.message {
t::Message::Request(_) => {
self.last_payload = km.payload.clone();
self.prompting_message = Some(km.clone());
(None, km)
}
Some((context, timeout_handle)) => {
timeout_handle.abort();
self.last_payload = km.payload.clone();
self.prompting_message = match context.prompting_message {
None => Some(km.clone()),
Some(prompting_message) => Some(prompting_message),
};
(context.context, km)
t::Message::Response(_) => {
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
timeout_handle.abort();
self.last_payload = km.payload.clone();
self.prompting_message = match context.prompting_message {
None => Some(km.clone()),
Some(prompting_message) => Some(prompting_message),
};
(context.context, km)
} else {
(None, km)
}
}
},
Err(e) => match self.contexts.remove(&e.id) {
@ -332,13 +324,11 @@ impl ProcessState {
},
};
// note: the context in the KernelMessage is not actually the one we want:
// (in fact it should be None, possibly always)
// we need to get *our* context for this message id
Ok((
km.source.en_wit().to_owned(),
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
@ -470,7 +460,7 @@ pub async fn make_process_loop(
Ok(()) => {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 2,
verbosity: 1,
content: format!("process {} returned without error", metadata.our.process,),
})
.await;
@ -501,7 +491,29 @@ pub async fn make_process_loop(
process: KERNEL_PROCESS_ID.clone(),
};
if is_error {
if !is_error {
// just remove handler
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: our_kernel.clone(),
target: our_kernel.clone(),
rsvp: None,
message: t::Message::Request(t::Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&t::KernelCommand::KillProcess(
metadata.our.process.clone(),
))
.unwrap(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
})
.await
.expect("event loop: fatal: sender died");
} else {
// get caps before killing
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = caps_oracle
@ -512,7 +524,7 @@ pub async fn make_process_loop(
.await;
let initial_capabilities = rx.await.unwrap().into_iter().collect();
// always send message to tell main kernel loop to remove handler
// send message to tell main kernel loop to remove handler
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
@ -551,6 +563,7 @@ pub async fn make_process_loop(
ipc: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
id: metadata.our.process.clone(),
wasm_bytes_handle: metadata.wasm_bytes_handle,
wit_version: Some(metadata.wit_version),
on_exit: metadata.on_exit,
initial_capabilities,
public: metadata.public,

View File

@ -1,5 +1,6 @@
use crate::kernel::process;
use crate::kernel::process::uqbar::process::standard as wit;
use crate::kernel::process::StandardHost;
use crate::types as t;
use crate::types::STATE_PROCESS_ID;
use crate::KERNEL_PROCESS_ID;
@ -8,7 +9,15 @@ use anyhow::Result;
use ring::signature::{self, KeyPair};
use std::collections::HashSet;
use crate::kernel::process::StandardHost;
async fn print_debug(proc: &process::ProcessState, content: &str) {
let _ = proc
.send_to_terminal
.send(t::Printout {
verbosity: 2,
content: format!("{}: {}", proc.metadata.our.process, content),
})
.await;
}
///
/// create the process API. this is where the functions that a process can use live.
@ -19,15 +28,11 @@ impl StandardHost for process::ProcessWasi {
// system utils:
//
async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> {
match self
.process
self.process
.send_to_terminal
.send(t::Printout { verbosity, content })
.await
{
Ok(()) => Ok(()),
Err(e) => Err(anyhow::anyhow!("fatal: couldn't send to terminal: {:?}", e)),
}
.map_err(|e| anyhow::anyhow!("fatal: couldn't send to terminal: {e:?}"))
}
async fn get_eth_block(&mut self) -> Result<u64> {
@ -39,9 +44,10 @@ impl StandardHost for process::ProcessWasi {
// process management:
//
/// TODO critical: move to kernel logic to enable persistence of choice made here
/// TODO critical: move to kernel logic to enable persistence of choice made here
async fn set_on_exit(&mut self, on_exit: wit::OnExit) -> Result<()> {
self.process.metadata.on_exit = t::OnExit::de_wit(on_exit);
print_debug(&self.process, "set new on-exit behavior").await;
Ok(())
}
@ -126,6 +132,7 @@ impl StandardHost for process::ProcessWasi {
)),
};
self.process.last_payload = old_last_payload;
print_debug(&self.process, "persisted state").await;
return res;
}
@ -165,6 +172,7 @@ impl StandardHost for process::ProcessWasi {
)),
};
self.process.last_payload = old_last_payload;
print_debug(&self.process, "cleared persisted state").await;
return res;
}
@ -249,6 +257,7 @@ impl StandardHost for process::ProcessWasi {
ipc: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
id: new_process_id.clone(),
wasm_bytes_handle: wasm_path,
wit_version: Some(self.process.metadata.wit_version),
on_exit: t::OnExit::de_wit(on_exit),
initial_capabilities: match capabilities {
wit::Capabilities::None => HashSet::new(),
@ -357,6 +366,7 @@ impl StandardHost for process::ProcessWasi {
.await
.unwrap();
let _ = rx.await.unwrap();
print_debug(&self.process, "spawned a new process").await;
Ok(Ok(new_process_id.en_wit().to_owned()))
}

View File

@ -137,7 +137,11 @@ async fn handle_request(
.await?;
let (ipc, bytes) = match &request.action {
KvAction::New => {
KvAction::Open => {
// handled in check_caps.
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::RemoveDb => {
// handled in check_caps.
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
@ -389,7 +393,7 @@ async fn check_caps(
}
Ok(())
}
KvAction::New { .. } => {
KvAction::Open { .. } => {
if src_package_id != request.package_id {
return Err(KvError::NoCap {
error: request.action.to_string(),
@ -414,7 +418,7 @@ async fn check_caps(
.await?;
if open_kvs.contains_key(&(request.package_id.clone(), request.db.clone())) {
return Err(KvError::DbAlreadyExists);
return Ok(());
}
let db_path = format!(
@ -430,10 +434,25 @@ async fn check_caps(
open_kvs.insert((request.package_id.clone(), request.db.clone()), db);
Ok(())
}
KvAction::Backup { .. } => {
// caps
KvAction::RemoveDb { .. } => {
if src_package_id != request.package_id {
return Err(KvError::NoCap {
error: request.action.to_string(),
});
}
let db_path = format!(
"{}/{}/{}",
kv_path,
request.package_id.to_string(),
request.db.to_string()
);
open_kvs.remove(&(request.package_id.clone(), request.db.clone()));
fs::remove_dir_all(&db_path).await?;
Ok(())
}
KvAction::Backup { .. } => Ok(()),
}
}

View File

@ -124,7 +124,11 @@ pub async fn maintain_connection(
let mut conn = conn.write_stream.reunite(conn.read_stream).unwrap();
let _ = conn.close(None).await;
print_debug(&print_tx, &format!("net: connection with {peer_name} died")).await;
print_debug(
&print_tx,
&format!("net: connection with {peer_name} closed"),
)
.await;
peers.remove(&peer_name);
}

View File

@ -1,11 +1,12 @@
{
"files": {
"main.css": "/static/css/main.115771e3.css",
"main.js": "/static/js/main.9d27844d.js",
"main.css": "/static/css/main.12ff92b6.css",
"main.js": "/static/js/main.caec54fd.js",
"static/media/unknown.png": "/static/media/unknown.880d04d4611a45ab1001.png",
"index.html": "/index.html"
},
"entrypoints": [
"static/css/main.115771e3.css",
"static/js/main.9d27844d.js"
"static/css/main.12ff92b6.css",
"static/js/main.caec54fd.js"
]
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

View File

@ -157,9 +157,12 @@ async fn handle_request(
.await?;
let (ipc, bytes) = match request.action {
SqliteAction::New => {
SqliteAction::Open => {
// handled in check_caps
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::RemoveDb => {
// handled in check_caps
//
(serde_json::to_vec(&SqliteResponse::Ok).unwrap(), None)
}
SqliteAction::Read { query } => {
@ -409,7 +412,7 @@ async fn check_caps(
}
Ok(())
}
SqliteAction::New => {
SqliteAction::Open => {
if src_package_id != request.package_id {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
@ -434,7 +437,7 @@ async fn check_caps(
.await?;
if open_dbs.contains_key(&(request.package_id.clone(), request.db.clone())) {
return Err(SqliteError::DbAlreadyExists);
return Ok(());
}
let db_path = format!(
@ -456,6 +459,24 @@ async fn check_caps(
);
Ok(())
}
SqliteAction::RemoveDb => {
if src_package_id != request.package_id {
return Err(SqliteError::NoCap {
error: request.action.to_string(),
});
}
let db_path = format!(
"{}/{}/{}",
sqlite_path,
request.package_id.to_string(),
request.db.to_string()
);
open_dbs.remove(&(request.package_id.clone(), request.db.clone()));
fs::remove_dir_all(&db_path).await?;
Ok(())
}
SqliteAction::Backup => {
// flushing WALs for backup
Ok(())

View File

@ -331,6 +331,7 @@ async fn bootstrap(
.entry(ProcessId::from_str("kernel:sys:uqbar").unwrap())
.or_insert(PersistedProcess {
wasm_bytes_handle: "".into(),
wit_version: None,
on_exit: OnExit::Restart,
capabilities: runtime_caps.clone(),
public: false,
@ -339,6 +340,7 @@ async fn bootstrap(
.entry(ProcessId::from_str("net:sys:uqbar").unwrap())
.or_insert(PersistedProcess {
wasm_bytes_handle: "".into(),
wit_version: None,
on_exit: OnExit::Restart,
capabilities: runtime_caps.clone(),
public: false,
@ -348,6 +350,7 @@ async fn bootstrap(
.entry(runtime_module.0)
.or_insert(PersistedProcess {
wasm_bytes_handle: "".into(),
wit_version: None,
on_exit: OnExit::Restart,
capabilities: runtime_caps.clone(),
public: runtime_module.2,
@ -538,6 +541,7 @@ async fn bootstrap(
ProcessId::new(Some(&entry.process_name), package_name, package_publisher),
PersistedProcess {
wasm_bytes_handle,
wit_version: None,
on_exit: entry.on_exit,
capabilities: requested_caps,
public: public_process,

View File

@ -66,6 +66,10 @@ pub async fn timer_service(
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
continue
}
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("set timer to pop in {}ms", timer_millis),
}).await;
if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;

View File

@ -736,6 +736,7 @@ pub struct IdentityTransaction {
pub struct ProcessMetadata {
pub our: Address,
pub wasm_bytes_handle: String,
pub wit_version: u32,
pub on_exit: OnExit,
pub public: bool,
}
@ -812,6 +813,7 @@ pub enum KernelCommand {
InitializeProcess {
id: ProcessId,
wasm_bytes_handle: String,
wit_version: Option<u32>,
on_exit: OnExit,
initial_capabilities: HashSet<SignedCapability>,
public: bool,
@ -825,6 +827,15 @@ pub enum KernelCommand {
/// RUNTIME ONLY: notify the kernel that the runtime is shutting down and it
/// should gracefully stop and persist the running processes.
Shutdown,
/// Ask kernel to produce debugging information
Debug(KernelPrint),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum KernelPrint {
ProcessMap,
Process(ProcessId),
HasCap { on: ProcessId, cap: Capability },
}
/// IPC format for all KernelCommand responses
@ -867,6 +878,7 @@ pub type ProcessMap = HashMap<ProcessId, PersistedProcess>;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersistedProcess {
pub wasm_bytes_handle: String,
pub wit_version: Option<u32>,
pub on_exit: OnExit,
pub capabilities: HashSet<Capability>,
pub public: bool, // marks if a process allows messages from any process
@ -967,12 +979,10 @@ pub enum VfsAction {
CreateDir,
CreateDirAll,
CreateFile,
OpenFile,
OpenFile { create: bool },
CloseFile,
WriteAll,
Write,
ReWrite,
WriteAt(u64),
WriteAt,
Append,
SyncAll,
Read,
@ -980,13 +990,14 @@ pub enum VfsAction {
ReadToEnd,
ReadExact(u64),
ReadToString,
Seek(SeekFrom),
Seek { seek_from: SeekFrom },
RemoveFile,
RemoveDir,
RemoveDirAll,
Rename(String),
Rename { new_path: String },
Metadata,
AddZip,
CopyFile { new_path: String },
Len,
SetLen(u64),
Hash,
@ -1024,6 +1035,7 @@ pub enum VfsResponse {
Ok,
Err(VfsError),
Read,
SeekFrom(u64),
ReadDir(Vec<DirEntry>),
ReadToString(String),
Metadata(FileMetadata),
@ -1079,7 +1091,8 @@ pub struct KvRequest {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum KvAction {
New,
Open,
RemoveDb,
Set { key: Vec<u8>, tx_id: Option<u64> },
Delete { key: Vec<u8>, tx_id: Option<u64> },
Get { key: Vec<u8> },
@ -1100,8 +1113,6 @@ pub enum KvResponse {
pub enum KvError {
#[error("kv: DbDoesNotExist")]
NoDb,
#[error("kv: DbAlreadyExists")]
DbAlreadyExists,
#[error("kv: KeyNotFound")]
KeyNotFound,
#[error("kv: no Tx found")]
@ -1125,7 +1136,8 @@ pub struct SqliteRequest {
#[derive(Debug, Serialize, Deserialize)]
pub enum SqliteAction {
New,
Open,
RemoveDb,
Write {
statement: String,
tx_id: Option<u64>,
@ -1162,8 +1174,6 @@ pub enum SqlValue {
pub enum SqliteError {
#[error("sqlite: DbDoesNotExist")]
NoDb,
#[error("sqlite: DbAlreadyExists")]
DbAlreadyExists,
#[error("sqlite: NoTx")]
NoTx,
#[error("sqlite: No capability: {error}")]

View File

@ -158,27 +158,31 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::CreateFile => {
let _file = open_file(open_files.clone(), path, true).await?;
// create truncates any file that might've existed before
open_files.remove(&path);
let _file = open_file(open_files.clone(), path, true, true).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::OpenFile => {
let _file = open_file(open_files.clone(), path, false).await?;
VfsAction::OpenFile { create } => {
// open file opens an existing file, or creates a new one if create is true
let _file = open_file(open_files.clone(), path, create, false).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::CloseFile => {
// removes file from scope, resets file_handle and cursor.
open_files.remove(&path);
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::WriteAll => {
// should expect open file.
VfsAction::WriteAt => {
// doesn't create a file, writes at exact cursor.
let Some(payload) = payload else {
return Err(VfsError::BadRequest {
error: "payload needs to exist for WriteAll".into(),
});
};
let file = open_file(open_files.clone(), path, false).await?;
let file = open_file(open_files.clone(), path, false, false).await?;
let mut file = file.lock().await;
file.write_all(&payload.bytes).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
@ -189,45 +193,19 @@ async fn handle_request(
error: "payload needs to exist for Write".into(),
});
};
let file = open_file(open_files.clone(), path, true).await?;
open_files.remove(&path);
let file = open_file(open_files.clone(), path, true, true).await?;
let mut file = file.lock().await;
file.write_all(&payload.bytes).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::ReWrite => {
let Some(payload) = payload else {
return Err(VfsError::BadRequest {
error: "payload needs to exist for Write".into(),
});
};
let file = open_file(open_files.clone(), path, true).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&payload.bytes).await?;
file.set_len(payload.bytes.len() as u64).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::WriteAt(offset) => {
let Some(payload) = payload else {
return Err(VfsError::BadRequest {
error: "payload needs to exist for WriteAt".into(),
});
};
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::Start(offset)).await?;
file.write_all(&payload.bytes).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Append => {
let Some(payload) = payload else {
return Err(VfsError::BadRequest {
error: "payload needs to exist for Append".into(),
});
};
let file = open_file(open_files.clone(), path, false).await?;
let file = open_file(open_files.clone(), path, false, false).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::End(0)).await?;
file.write_all(&payload.bytes).await?;
@ -235,13 +213,13 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::SyncAll => {
let file = open_file(open_files.clone(), path, false).await?;
let file = open_file(open_files.clone(), path, false, false).await?;
let file = file.lock().await;
file.sync_all().await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Read => {
let file = open_file(open_files.clone(), path.clone(), false).await?;
let file = open_file(open_files.clone(), path.clone(), false, false).await?;
let mut file = file.lock().await;
let mut contents = Vec::new();
file.seek(SeekFrom::Start(0)).await?;
@ -253,7 +231,7 @@ async fn handle_request(
)
}
VfsAction::ReadToEnd => {
let file = open_file(open_files.clone(), path.clone(), false).await?;
let file = open_file(open_files.clone(), path.clone(), false, false).await?;
let mut file = file.lock().await;
let mut contents = Vec::new();
@ -264,6 +242,16 @@ async fn handle_request(
Some(contents),
)
}
VfsAction::ReadExact(length) => {
let file = open_file(open_files.clone(), path, false, false).await?;
let mut file = file.lock().await;
let mut contents = vec![0; length as usize];
file.read_exact(&mut contents).await?;
(
serde_json::to_vec(&VfsResponse::Read).unwrap(),
Some(contents),
)
}
VfsAction::ReadDir => {
let mut dir = fs::read_dir(path).await?;
let mut entries = Vec::new();
@ -284,18 +272,8 @@ async fn handle_request(
None,
)
}
VfsAction::ReadExact(length) => {
let file = open_file(open_files.clone(), path, false).await?;
let mut file = file.lock().await;
let mut contents = vec![0; length as usize];
file.read_exact(&mut contents).await?;
(
serde_json::to_vec(&VfsResponse::Read).unwrap(),
Some(contents),
)
}
VfsAction::ReadToString => {
let file = open_file(open_files.clone(), path, false).await?;
let file = open_file(open_files.clone(), path, false, false).await?;
let mut file = file.lock().await;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
@ -304,8 +282,8 @@ async fn handle_request(
None,
)
}
VfsAction::Seek(seek_from) => {
let file = open_file(open_files.clone(), path, false).await?;
VfsAction::Seek { seek_from } => {
let file = open_file(open_files.clone(), path, false, false).await?;
let mut file = file.lock().await;
// same type, rust tingz
let seek_from = match seek_from {
@ -313,8 +291,11 @@ async fn handle_request(
crate::types::SeekFrom::End(offset) => std::io::SeekFrom::End(offset),
crate::types::SeekFrom::Current(offset) => std::io::SeekFrom::Current(offset),
};
file.seek(seek_from).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
let response = file.seek(seek_from).await?;
(
serde_json::to_vec(&VfsResponse::SeekFrom(response)).unwrap(),
None,
)
}
VfsAction::RemoveFile => {
fs::remove_file(path).await?;
@ -328,15 +309,16 @@ async fn handle_request(
fs::remove_dir_all(path).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Rename(new_path) => {
// doublecheck permission weirdness, sanitize new path
VfsAction::Rename { new_path } => {
fs::rename(path, new_path).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::CopyFile { new_path } => {
fs::copy(path, new_path).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Metadata => {
let file = open_file(open_files.clone(), path, false).await?;
let file = file.lock().await;
let metadata = file.metadata().await?;
let metadata = fs::metadata(&path).await?;
let file_type = get_file_type(&metadata);
let meta = FileMetadata {
@ -350,19 +332,19 @@ async fn handle_request(
)
}
VfsAction::Len => {
let file = open_file(open_files.clone(), path, false).await?;
let file = open_file(open_files.clone(), path, false, false).await?;
let file = file.lock().await;
let len = file.metadata().await?.len();
(serde_json::to_vec(&VfsResponse::Len(len)).unwrap(), None)
}
VfsAction::SetLen(len) => {
let file = open_file(open_files.clone(), path, false).await?;
let file = open_file(open_files.clone(), path, false, false).await?;
let file = file.lock().await;
file.set_len(len).await?;
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Hash => {
let file = open_file(open_files.clone(), path, false).await?;
let file = open_file(open_files.clone(), path, false, false).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::Start(0)).await?;
let mut hasher = blake3::Hasher::new();
@ -410,18 +392,19 @@ async fn handle_request(
// Before any `.await`s are called since ZipFile is not
// Send and so does not play nicely with await
let (is_file, is_dir, local_path, file_contents) = {
let mut file = zip.by_index(i).unwrap();
let mut file = zip.by_index(i)?;
let is_file = file.is_file();
let is_dir = file.is_dir();
let mut file_contents = Vec::new();
if is_file {
file.read_to_end(&mut file_contents).unwrap();
file.read_to_end(&mut file_contents)?;
};
let local_path = path.join(file.name());
(is_file, is_dir, local_path, file_contents)
};
if is_file {
let file = open_file(open_files.clone(), local_path, true).await?;
open_files.remove(&local_path);
let file = open_file(open_files.clone(), local_path, true, true).await?;
let mut file = file.lock().await;
file.seek(SeekFrom::Start(0)).await?;
@ -522,6 +505,7 @@ async fn open_file<P: AsRef<Path>>(
open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>>,
path: P,
create: bool,
truncate: bool,
) -> Result<Arc<Mutex<fs::File>>, VfsError> {
let path = path.as_ref().to_path_buf();
Ok(match open_files.get(&path) {
@ -532,6 +516,7 @@ async fn open_file<P: AsRef<Path>>(
.read(true)
.write(true)
.create(create)
.truncate(truncate)
.open(&path)
.await
.map_err(|e| VfsError::IOError {
@ -581,24 +566,44 @@ async fn check_caps(
VfsAction::CreateDir
| VfsAction::CreateDirAll
| VfsAction::CreateFile
| VfsAction::OpenFile
| VfsAction::OpenFile { .. }
| VfsAction::CloseFile
| VfsAction::WriteAll
| VfsAction::Write
| VfsAction::ReWrite
| VfsAction::WriteAt(_)
| VfsAction::WriteAt
| VfsAction::Append
| VfsAction::SyncAll
| VfsAction::RemoveFile
| VfsAction::RemoveDir
| VfsAction::RemoveDirAll
| VfsAction::Rename(_)
| VfsAction::AddZip
| VfsAction::SetLen(_) => {
if src_package_id == package_id {
return Ok(());
}
if !has_root_cap {
if has_root_cap {
return Ok(());
}
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
return Err(VfsError::NoCap {
action: request.action.to_string(),
path: path.display().to_string(),
@ -611,7 +616,7 @@ async fn check_caps(
| VfsAction::ReadExact(_)
| VfsAction::ReadToEnd
| VfsAction::ReadToString
| VfsAction::Seek(_)
| VfsAction::Seek { .. }
| VfsAction::Hash
| VfsAction::Metadata
| VfsAction::Len => {
@ -648,6 +653,79 @@ async fn check_caps(
}
Ok(())
}
VfsAction::CopyFile { new_path } | VfsAction::Rename { new_path } => {
// these have 2 paths to validate
if has_root_cap {
return Ok(());
}
let (new_package_id, new_drive, _rest) = parse_package_and_drive(&new_path).await?;
let new_drive = format!("/{}/{}", new_package_id, new_drive);
// if both new and old path are within the package_id path, ok
if (src_package_id == package_id) && (src_package_id == new_package_id) {
return Ok(());
}
// otherwise check write caps.
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
return Err(VfsError::NoCap {
action: request.action.to_string(),
path: path.display().to_string(),
});
}
// if they're within the same drive, no need for 2 caps checks
if new_drive == drive {
return Ok(());
}
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.clone(),
process: VFS_PROCESS_ID.clone(),
},
params: serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": new_drive,
}))
.unwrap(),
},
responder: send_cap_bool,
})
.await?;
let has_cap = recv_cap_bool.await?;
if !has_cap {
return Err(VfsError::NoCap {
action: request.action.to_string(),
path: path.display().to_string(),
});
}
Ok(())
}
VfsAction::CreateDrive => {
if src_package_id != package_id {
if !has_root_cap {
@ -757,6 +835,15 @@ impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for VfsError {
}
}
impl From<zip::result::ZipError> for VfsError {
fn from(err: zip::result::ZipError) -> Self {
VfsError::IOError {
error: err.to_string(),
path: "".into(),
}
}
}
impl From<std::io::Error> for VfsError {
fn from(err: std::io::Error) -> Self {
VfsError::IOError {