mirror of
https://github.com/uqbar-dao/nectar.git
synced 2025-01-03 14:17:20 +03:00
...spring cleaning
This commit is contained in:
parent
f31e74b2af
commit
cb22abc4dc
@ -129,7 +129,6 @@ In order to call a script with shorthand, a user may apply an *alias* using the
|
||||
alias <shorthand> <full_name>
|
||||
```
|
||||
Subsequent use of the shorthand will then be interpolated as the process ID.
|
||||
Aliases are not currently persisted between boots, although this may change.
|
||||
|
||||
A list of the other terminal scripts included in this distro:
|
||||
|
||||
|
@ -49,10 +49,14 @@ fn load_chess_state() -> ChessState {
|
||||
games,
|
||||
clients: HashSet::new(),
|
||||
},
|
||||
None => ChessState {
|
||||
None => {
|
||||
let state = ChessState {
|
||||
games: HashMap::new(),
|
||||
clients: HashSet::new(),
|
||||
},
|
||||
};
|
||||
save_chess_state(&state);
|
||||
state
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,8 +2,8 @@ use anyhow::anyhow;
|
||||
use kinode_process_lib::kernel_types as kt;
|
||||
use kinode_process_lib::kinode::process::standard as wit;
|
||||
use kinode_process_lib::{
|
||||
call_init, get_blob, get_typed_state, our_capabilities, print_to_terminal, println, set_state,
|
||||
vfs, Address, Capability, ProcessId, Request,
|
||||
call_init, get_blob, get_typed_state, our_capabilities, println, set_state, vfs, Address,
|
||||
Capability, ProcessId, Request,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
@ -42,17 +42,15 @@ fn parse_command(state: &mut TerminalState, line: &str) -> anyhow::Result<()> {
|
||||
},
|
||||
};
|
||||
|
||||
match handle_run(&state.our, &process, args.to_string()) {
|
||||
Ok(_) => Ok(()), // TODO clean up process
|
||||
Err(e) => Err(anyhow!("failed to instantiate script: {}", e)),
|
||||
}
|
||||
handle_run(&state.our, &process, args.to_string())
|
||||
}
|
||||
|
||||
call_init!(init);
|
||||
fn init(our: Address) {
|
||||
let mut state: TerminalState = match get_typed_state(|bytes| Ok(bincode::deserialize(bytes)?)) {
|
||||
Some(s) => s,
|
||||
None => TerminalState {
|
||||
None => {
|
||||
let state = TerminalState {
|
||||
our,
|
||||
aliases: HashMap::from([
|
||||
(
|
||||
@ -104,7 +102,10 @@ fn init(our: Address) {
|
||||
ProcessId::new(Some("top"), "terminal", "sys"),
|
||||
),
|
||||
]),
|
||||
},
|
||||
};
|
||||
set_state(&bincode::serialize(&state).unwrap());
|
||||
state
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
@ -126,7 +127,7 @@ fn init(our: Address) {
|
||||
// checks for a request from a terminal script (different process, same package)
|
||||
} else if state.our.node == source.node && state.our.package() == source.package() {
|
||||
let Ok(action) = serde_json::from_slice::<TerminalAction>(&body) else {
|
||||
println!("failed to parse action from: {}", source);
|
||||
println!("failed to parse action from {source}");
|
||||
continue;
|
||||
};
|
||||
match action {
|
||||
@ -138,7 +139,7 @@ fn init(our: Address) {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("ignoring message from: {}", source);
|
||||
println!("ignoring message from {source}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -154,26 +155,16 @@ fn init(our: Address) {
|
||||
}
|
||||
|
||||
fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Result<()> {
|
||||
let wasm_path = format!("{}.wasm", process.process());
|
||||
let package = format!("{}:{}", process.package(), process.publisher());
|
||||
let drive_path = format!("/{}/pkg", package);
|
||||
let drive_path = format!("/{}:{}/pkg", process.package(), process.publisher());
|
||||
let Ok(entry) = get_entry(process) else {
|
||||
return Err(anyhow::anyhow!("script not in scripts.json file"));
|
||||
};
|
||||
let wasm_path = if wasm_path.starts_with("/") {
|
||||
wasm_path
|
||||
} else {
|
||||
format!("/{}", wasm_path)
|
||||
};
|
||||
let wasm_path = format!("{}{}", drive_path, wasm_path);
|
||||
// build initial caps
|
||||
let process_id = format!("{}:{}", rand::random::<u64>(), package); // all scripts are given random process IDs
|
||||
let Ok(parsed_new_process_id) = process_id.parse::<ProcessId>() else {
|
||||
return Err(anyhow::anyhow!("invalid process id!"));
|
||||
};
|
||||
let wasm_path = format!("{drive_path}/{}.wasm", process.process());
|
||||
|
||||
let _bytes_response = Request::new()
|
||||
.target(("our", "vfs", "distro", "sys"))
|
||||
// all scripts are given random process IDs
|
||||
let process_id = ProcessId::new(None, process.package(), process.publisher());
|
||||
|
||||
Request::to(("our", "vfs", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&vfs::VfsRequest {
|
||||
path: wasm_path.clone(),
|
||||
action: vfs::VfsAction::Read,
|
||||
@ -191,7 +182,7 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul
|
||||
Capability {
|
||||
issuer: Address {
|
||||
node: our.node.clone(),
|
||||
process: parsed_new_process_id.clone(),
|
||||
process: process_id.clone(),
|
||||
},
|
||||
params: "\"messaging\"".into(),
|
||||
},
|
||||
@ -211,7 +202,7 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul
|
||||
Capability {
|
||||
issuer: Address {
|
||||
node: our.node.clone(),
|
||||
process: parsed_new_process_id.clone(),
|
||||
process: process_id.clone(),
|
||||
},
|
||||
params: params.to_string(),
|
||||
},
|
||||
@ -227,8 +218,7 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul
|
||||
}
|
||||
}
|
||||
for (process, cap) in granted_caps.into_iter() {
|
||||
Request::new()
|
||||
.target(("our", "kernel", "distro", "sys"))
|
||||
Request::to(("our", "kernel", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&kt::KernelCommand::GrantCapabilities {
|
||||
target: process,
|
||||
capabilities: vec![kt::de_wit_capability(cap)],
|
||||
@ -237,10 +227,9 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul
|
||||
}
|
||||
// inherits the blob from the previous request, `_bytes_response`,
|
||||
// containing the wasm byte code of the process
|
||||
Request::new()
|
||||
.target(("our", "kernel", "distro", "sys"))
|
||||
Request::to(("our", "kernel", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess {
|
||||
id: parsed_new_process_id.clone(),
|
||||
id: process_id.clone(),
|
||||
wasm_bytes_handle: wasm_path.clone(),
|
||||
wit_version: entry.wit_version,
|
||||
on_exit: kt::OnExit::None,
|
||||
@ -305,42 +294,20 @@ fn handle_run(our: &Address, process: &ProcessId, args: String) -> anyhow::Resul
|
||||
requested_caps.push(kt::de_wit_capability(cap.clone()));
|
||||
}
|
||||
}
|
||||
print_to_terminal(
|
||||
3,
|
||||
&format!(
|
||||
"{}: Process {{\n wasm_bytes_handle: {},\n on_exit: {:?},\n public: {}\n capabilities: {}\n}}",
|
||||
parsed_new_process_id.clone(),
|
||||
wasm_path.clone(),
|
||||
kt::OnExit::None,
|
||||
entry.public,
|
||||
{
|
||||
let mut caps_string = "[".to_string();
|
||||
for cap in requested_caps.iter() {
|
||||
caps_string += &format!("\n {}({})", cap.issuer.to_string(), cap.params);
|
||||
}
|
||||
caps_string + "\n ]"
|
||||
},
|
||||
),
|
||||
);
|
||||
Request::new()
|
||||
.target(("our", "kernel", "distro", "sys"))
|
||||
Request::to(("our", "kernel", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&kt::KernelCommand::GrantCapabilities {
|
||||
target: parsed_new_process_id.clone(),
|
||||
target: process_id.clone(),
|
||||
capabilities: requested_caps,
|
||||
})?)
|
||||
.send()?;
|
||||
let _ = Request::new()
|
||||
.target(("our", "kernel", "distro", "sys"))
|
||||
Request::to(("our", "kernel", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&kt::KernelCommand::RunProcess(
|
||||
parsed_new_process_id.clone(),
|
||||
process_id.clone(),
|
||||
))?)
|
||||
.send_and_await_response(5)??;
|
||||
let req = Request::new()
|
||||
.target(("our", parsed_new_process_id))
|
||||
.body(args.into_bytes());
|
||||
|
||||
req.send().unwrap();
|
||||
|
||||
Request::to(("our", process_id))
|
||||
.body(args.into_bytes())
|
||||
.send()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -351,20 +318,15 @@ fn handle_alias_change(
|
||||
) -> anyhow::Result<()> {
|
||||
match process {
|
||||
Some(process) => {
|
||||
// first check to make sure the script is actually a script
|
||||
let Ok(_) = get_entry(&process) else {
|
||||
return Err(anyhow!("process {} not found", process));
|
||||
};
|
||||
|
||||
state.aliases.insert(alias.clone(), process.clone());
|
||||
println!("alias {} set to {}", alias, process);
|
||||
println!("alias {alias} set for {process}");
|
||||
state.aliases.insert(alias, process);
|
||||
}
|
||||
None => {
|
||||
if state.aliases.contains_key(&alias) {
|
||||
state.aliases.remove(&alias);
|
||||
println!("alias {} removed", alias);
|
||||
println!("alias {alias} removed");
|
||||
} else {
|
||||
println!("alias {} not found", alias);
|
||||
println!("alias {alias} not found");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -374,10 +336,9 @@ fn handle_alias_change(
|
||||
|
||||
fn get_entry(process: &ProcessId) -> anyhow::Result<kt::DotScriptsEntry> {
|
||||
let drive_path = format!("/{}:{}/pkg", process.package(), process.publisher());
|
||||
Request::new()
|
||||
.target(("our", "vfs", "distro", "sys"))
|
||||
Request::to(("our", "vfs", "distro", "sys"))
|
||||
.body(serde_json::to_vec(&vfs::VfsRequest {
|
||||
path: format!("{}/scripts.json", drive_path),
|
||||
path: format!("{drive_path}/scripts.json"),
|
||||
action: vfs::VfsAction::Read,
|
||||
})?)
|
||||
.send_and_await_response(5)??;
|
||||
|
@ -908,16 +908,7 @@ async fn check_for_root_cap(
|
||||
caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our.to_string(),
|
||||
process: ETH_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
"root": true,
|
||||
}))
|
||||
.unwrap(),
|
||||
},
|
||||
cap: Capability::new((our, ETH_PROCESS_ID.clone()), "{\"root\":true}"),
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,15 +1,15 @@
|
||||
use crate::KERNEL_PROCESS_ID;
|
||||
use lib::types::core as t;
|
||||
pub use lib::v0::ProcessV0;
|
||||
pub use lib::Process;
|
||||
use ring::signature;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use tokio::fs;
|
||||
use tokio::task::JoinHandle;
|
||||
use lib::{types::core as t, v0::ProcessV0, Process};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{fs, task::JoinHandle};
|
||||
use wasi_common::sync::Dir;
|
||||
use wasmtime::component::{Component, Linker, ResourceTable as Table};
|
||||
use wasmtime::{Engine, Store};
|
||||
use wasmtime::{
|
||||
component::{Component, Linker, ResourceTable as Table},
|
||||
Engine, Store,
|
||||
};
|
||||
use wasmtime_wasi::{
|
||||
pipe::MemoryOutputPipe, DirPerms, FilePerms, WasiCtx, WasiCtxBuilder, WasiView,
|
||||
};
|
||||
@ -25,7 +25,7 @@ pub struct ProcessContext {
|
||||
|
||||
pub struct ProcessState {
|
||||
/// our node's networking keypair
|
||||
pub keypair: Arc<signature::Ed25519KeyPair>,
|
||||
pub keypair: Arc<ring::signature::Ed25519KeyPair>,
|
||||
/// information about ourself
|
||||
pub metadata: t::ProcessMetadata,
|
||||
/// pipe from which we get messages from the main event loop
|
||||
@ -83,29 +83,18 @@ impl WasiView for ProcessWasiV0 {
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_component(
|
||||
engine: Engine,
|
||||
wasm_bytes: &[u8],
|
||||
async fn make_table_and_wasi(
|
||||
home_directory_path: String,
|
||||
process_state: ProcessState,
|
||||
) -> anyhow::Result<(Process, Store<ProcessWasi>, MemoryOutputPipe)> {
|
||||
let component = Component::new(&engine, wasm_bytes.to_vec())
|
||||
.expect("make_process_loop: couldn't read file");
|
||||
|
||||
let mut linker = Linker::new(&engine);
|
||||
Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap();
|
||||
|
||||
process_state: &ProcessState,
|
||||
) -> (Table, WasiCtx, MemoryOutputPipe) {
|
||||
let table = Table::new();
|
||||
let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE);
|
||||
|
||||
let our_process_id = process_state.metadata.our.process.clone();
|
||||
let send_to_terminal = process_state.send_to_terminal.clone();
|
||||
|
||||
let tmp_path = format!(
|
||||
"{}/vfs/{}:{}/tmp",
|
||||
home_directory_path,
|
||||
our_process_id.package(),
|
||||
our_process_id.publisher()
|
||||
process_state.metadata.our.process.package(),
|
||||
process_state.metadata.our.process.publisher()
|
||||
);
|
||||
|
||||
let mut wasi = WasiCtxBuilder::new();
|
||||
@ -130,10 +119,26 @@ async fn make_component(
|
||||
}
|
||||
}
|
||||
|
||||
let wasi = wasi.stderr(wasi_stderr.clone()).build();
|
||||
(table, wasi.stderr(wasi_stderr.clone()).build(), wasi_stderr)
|
||||
}
|
||||
|
||||
async fn make_component(
|
||||
engine: Engine,
|
||||
wasm_bytes: &[u8],
|
||||
home_directory_path: String,
|
||||
process_state: ProcessState,
|
||||
) -> anyhow::Result<(Process, Store<ProcessWasi>, MemoryOutputPipe)> {
|
||||
let component =
|
||||
Component::new(&engine, wasm_bytes.to_vec()).expect("make_component: couldn't read file");
|
||||
|
||||
let mut linker = Linker::new(&engine);
|
||||
Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap();
|
||||
let (table, wasi, wasi_stderr) = make_table_and_wasi(home_directory_path, &process_state).await;
|
||||
wasmtime_wasi::command::add_to_linker(&mut linker).unwrap();
|
||||
|
||||
let our_process_id = process_state.metadata.our.process.clone();
|
||||
let send_to_terminal = process_state.send_to_terminal.clone();
|
||||
|
||||
let mut store = Store::new(
|
||||
&engine,
|
||||
ProcessWasi {
|
||||
@ -147,14 +152,11 @@ async fn make_component(
|
||||
match Process::instantiate_async(&mut store, &component, &linker).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"mk: process {:?} failed to instantiate: {:?}",
|
||||
our_process_id, e,
|
||||
),
|
||||
})
|
||||
t::Printout::new(
|
||||
0,
|
||||
format!("kernel: process {our_process_id} failed to instantiate: {e:?}"),
|
||||
)
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
return Err(e);
|
||||
}
|
||||
@ -169,51 +171,17 @@ async fn make_component_v0(
|
||||
home_directory_path: String,
|
||||
process_state: ProcessState,
|
||||
) -> anyhow::Result<(ProcessV0, Store<ProcessWasiV0>, MemoryOutputPipe)> {
|
||||
let component = Component::new(&engine, wasm_bytes.to_vec())
|
||||
.expect("make_process_loop: couldn't read file");
|
||||
let component =
|
||||
Component::new(&engine, wasm_bytes.to_vec()).expect("make_component: couldn't read file");
|
||||
|
||||
let mut linker = Linker::new(&engine);
|
||||
ProcessV0::add_to_linker(&mut linker, |state: &mut ProcessWasiV0| state).unwrap();
|
||||
|
||||
let table = Table::new();
|
||||
let wasi_stderr = MemoryOutputPipe::new(STACK_TRACE_SIZE);
|
||||
let (table, wasi, wasi_stderr) = make_table_and_wasi(home_directory_path, &process_state).await;
|
||||
wasmtime_wasi::command::add_to_linker(&mut linker).unwrap();
|
||||
|
||||
let our_process_id = process_state.metadata.our.process.clone();
|
||||
let send_to_terminal = process_state.send_to_terminal.clone();
|
||||
|
||||
let tmp_path = format!(
|
||||
"{}/vfs/{}:{}/tmp",
|
||||
home_directory_path,
|
||||
our_process_id.package(),
|
||||
our_process_id.publisher()
|
||||
);
|
||||
|
||||
let mut wasi = WasiCtxBuilder::new();
|
||||
|
||||
// TODO make guarantees about this
|
||||
if let Ok(Ok(())) = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(5),
|
||||
fs::create_dir_all(&tmp_path),
|
||||
)
|
||||
.await
|
||||
{
|
||||
if let Ok(wasi_tempdir) =
|
||||
Dir::open_ambient_dir(tmp_path.clone(), wasi_common::sync::ambient_authority())
|
||||
{
|
||||
wasi.preopened_dir(
|
||||
wasi_tempdir,
|
||||
DirPerms::all(),
|
||||
FilePerms::all(),
|
||||
tmp_path.clone(),
|
||||
)
|
||||
.env("TEMP_DIR", tmp_path);
|
||||
}
|
||||
}
|
||||
|
||||
let wasi = wasi.stderr(wasi_stderr.clone()).build();
|
||||
|
||||
wasmtime_wasi::command::add_to_linker(&mut linker).unwrap();
|
||||
|
||||
let mut store = Store::new(
|
||||
&engine,
|
||||
ProcessWasiV0 {
|
||||
@ -227,14 +195,11 @@ async fn make_component_v0(
|
||||
match ProcessV0::instantiate_async(&mut store, &component, &linker).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"mk: process {:?} failed to instantiate: {:?}",
|
||||
our_process_id, e,
|
||||
),
|
||||
})
|
||||
t::Printout::new(
|
||||
0,
|
||||
format!("kernel: process {our_process_id} failed to instantiate: {e:?}"),
|
||||
)
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
return Err(e);
|
||||
}
|
||||
@ -245,7 +210,7 @@ async fn make_component_v0(
|
||||
|
||||
/// create a specific process, and generate a task that will run it.
|
||||
pub async fn make_process_loop(
|
||||
keypair: Arc<signature::Ed25519KeyPair>,
|
||||
keypair: Arc<ring::signature::Ed25519KeyPair>,
|
||||
metadata: t::ProcessMetadata,
|
||||
send_to_loop: t::MessageSender,
|
||||
send_to_terminal: t::PrintSender,
|
||||
@ -290,9 +255,12 @@ pub async fn make_process_loop(
|
||||
send_to_process.send(message).await?;
|
||||
}
|
||||
|
||||
let our = metadata.our.clone();
|
||||
let wit_version = metadata.wit_version.clone();
|
||||
|
||||
let process_state = ProcessState {
|
||||
keypair: keypair.clone(),
|
||||
metadata: metadata.clone(),
|
||||
keypair,
|
||||
metadata,
|
||||
recv_in_process,
|
||||
self_sender: send_to_process,
|
||||
send_to_loop: send_to_loop.clone(),
|
||||
@ -304,39 +272,27 @@ pub async fn make_process_loop(
|
||||
caps_oracle: caps_oracle.clone(),
|
||||
};
|
||||
|
||||
let metadata = match metadata.wit_version {
|
||||
let metadata = match wit_version {
|
||||
// assume missing version is oldest wit version
|
||||
None => {
|
||||
let (bindings, mut store, wasi_stderr) =
|
||||
make_component(engine, &wasm_bytes, home_directory_path, process_state).await?;
|
||||
|
||||
// the process will run until it returns from init() or crashes
|
||||
match bindings
|
||||
.call_init(&mut store, &metadata.our.to_string())
|
||||
.await
|
||||
{
|
||||
match bindings.call_init(&mut store, &our.to_string()).await {
|
||||
Ok(()) => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 1,
|
||||
content: format!(
|
||||
"process {} returned without error",
|
||||
metadata.our.process
|
||||
),
|
||||
})
|
||||
t::Printout::new(1, format!("process {our} returned without error"))
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
}
|
||||
Err(_) => {
|
||||
let stderr = wasi_stderr.contents().into();
|
||||
let stderr = String::from_utf8(stderr)?;
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}",
|
||||
metadata.our.process, stderr,
|
||||
),
|
||||
})
|
||||
t::Printout::new(
|
||||
0,
|
||||
format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{stderr}",),
|
||||
)
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
}
|
||||
};
|
||||
@ -351,32 +307,20 @@ pub async fn make_process_loop(
|
||||
make_component_v0(engine, &wasm_bytes, home_directory_path, process_state).await?;
|
||||
|
||||
// the process will run until it returns from init() or crashes
|
||||
match bindings
|
||||
.call_init(&mut store, &metadata.our.to_string())
|
||||
.await
|
||||
{
|
||||
match bindings.call_init(&mut store, &our.to_string()).await {
|
||||
Ok(()) => {
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 1,
|
||||
content: format!(
|
||||
"process {} returned without error",
|
||||
metadata.our.process
|
||||
),
|
||||
})
|
||||
t::Printout::new(1, format!("process {our} returned without error"))
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
}
|
||||
Err(_) => {
|
||||
let stderr = wasi_stderr.contents().into();
|
||||
let stderr = String::from_utf8(stderr)?;
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 0,
|
||||
content: format!(
|
||||
"\x1b[38;5;196mprocess {} ended with error:\x1b[0m\n{}",
|
||||
metadata.our.process, stderr,
|
||||
),
|
||||
})
|
||||
t::Printout::new(
|
||||
0,
|
||||
format!("\x1b[38;5;196mprocess {our} ended with error:\x1b[0m\n{stderr}",),
|
||||
)
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
}
|
||||
};
|
||||
@ -390,19 +334,14 @@ pub async fn make_process_loop(
|
||||
// the process has completed, time to perform cleanup
|
||||
//
|
||||
|
||||
let our_kernel = t::Address {
|
||||
node: metadata.our.node.clone(),
|
||||
process: KERNEL_PROCESS_ID.clone(),
|
||||
};
|
||||
|
||||
// get caps before killing
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let _ = caps_oracle
|
||||
caps_oracle
|
||||
.send(t::CapMessage::GetAll {
|
||||
on: metadata.our.process.clone(),
|
||||
responder: tx,
|
||||
})
|
||||
.await;
|
||||
.await?;
|
||||
let initial_capabilities = rx
|
||||
.await?
|
||||
.iter()
|
||||
@ -412,16 +351,24 @@ pub async fn make_process_loop(
|
||||
})
|
||||
.collect();
|
||||
|
||||
t::Printout::new(
|
||||
1,
|
||||
format!(
|
||||
"process {} has OnExit behavior {}",
|
||||
metadata.our.process, metadata.on_exit
|
||||
),
|
||||
)
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
|
||||
// fulfill the designated OnExit behavior
|
||||
match metadata.on_exit {
|
||||
t::OnExit::None => {
|
||||
send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: our_kernel.clone(),
|
||||
target: our_kernel.clone(),
|
||||
rsvp: None,
|
||||
message: t::Message::Request(t::Request {
|
||||
t::KernelMessage::builder()
|
||||
.id(rand::random())
|
||||
.source((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.target((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.message(t::Message::Request(t::Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&t::KernelCommand::KillProcess(
|
||||
@ -430,26 +377,20 @@ pub async fn make_process_loop(
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await?;
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 1,
|
||||
content: format!("process {} had no OnExit behavior", metadata.our.process),
|
||||
})
|
||||
}))
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
}
|
||||
// if restart, tell ourselves to init the app again, with same capabilities
|
||||
t::OnExit::Restart => {
|
||||
send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: our_kernel.clone(),
|
||||
target: our_kernel.clone(),
|
||||
rsvp: None,
|
||||
message: t::Message::Request(t::Request {
|
||||
// kill
|
||||
t::KernelMessage::builder()
|
||||
.id(rand::random())
|
||||
.source((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.target((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.message(t::Message::Request(t::Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&t::KernelCommand::KillProcess(
|
||||
@ -458,26 +399,17 @@ pub async fn make_process_loop(
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await?;
|
||||
let _ = send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 1,
|
||||
content: format!(
|
||||
"firing OnExit::Restart for process {}",
|
||||
metadata.our.process
|
||||
),
|
||||
})
|
||||
}))
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: our_kernel.clone(),
|
||||
target: our_kernel.clone(),
|
||||
rsvp: None,
|
||||
message: t::Message::Request(t::Request {
|
||||
// then re-initialize
|
||||
t::KernelMessage::builder()
|
||||
.id(rand::random())
|
||||
.source((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.target((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.message(t::Message::Request(t::Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
|
||||
@ -491,20 +423,21 @@ pub async fn make_process_loop(
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: Some(t::LazyLoadBlob {
|
||||
}))
|
||||
.lazy_load_blob(Some(t::LazyLoadBlob {
|
||||
mime: None,
|
||||
bytes: wasm_bytes,
|
||||
}),
|
||||
})
|
||||
.await?;
|
||||
send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: our_kernel.clone(),
|
||||
target: our_kernel.clone(),
|
||||
rsvp: None,
|
||||
message: t::Message::Request(t::Request {
|
||||
}))
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
// then run
|
||||
t::KernelMessage::builder()
|
||||
.id(rand::random())
|
||||
.source((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.target((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.message(t::Message::Request(t::Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&t::KernelCommand::RunProcess(
|
||||
@ -513,43 +446,33 @@ pub async fn make_process_loop(
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await?;
|
||||
}))
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
}
|
||||
// if requests, fire them
|
||||
// even in death, a process can only message processes it has capabilities for
|
||||
t::OnExit::Requests(requests) => {
|
||||
send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 1,
|
||||
content: format!(
|
||||
"firing OnExit::Requests for process {}",
|
||||
metadata.our.process
|
||||
),
|
||||
})
|
||||
.await?;
|
||||
for (address, mut request, blob) in requests {
|
||||
request.expects_response = None;
|
||||
send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: metadata.our.clone(),
|
||||
target: address,
|
||||
rsvp: None,
|
||||
message: t::Message::Request(request),
|
||||
lazy_load_blob: blob,
|
||||
})
|
||||
.await?;
|
||||
t::KernelMessage::builder()
|
||||
.id(rand::random())
|
||||
.source(metadata.our.clone())
|
||||
.target(address)
|
||||
.message(t::Message::Request(request))
|
||||
.lazy_load_blob(blob)
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
}
|
||||
send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id: rand::random(),
|
||||
source: our_kernel.clone(),
|
||||
target: our_kernel.clone(),
|
||||
rsvp: None,
|
||||
message: t::Message::Request(t::Request {
|
||||
t::KernelMessage::builder()
|
||||
.id(rand::random())
|
||||
.source((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.target((&our.node, KERNEL_PROCESS_ID.clone()))
|
||||
.message(t::Message::Request(t::Request {
|
||||
inherit: false,
|
||||
expects_response: None,
|
||||
body: serde_json::to_vec(&t::KernelCommand::KillProcess(
|
||||
@ -558,18 +481,12 @@ pub async fn make_process_loop(
|
||||
.unwrap(),
|
||||
metadata: None,
|
||||
capabilities: vec![],
|
||||
}),
|
||||
lazy_load_blob: None,
|
||||
})
|
||||
.await?;
|
||||
}))
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&send_to_loop)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn print(sender: &t::PrintSender, verbosity: u8, content: String) {
|
||||
let _ = sender
|
||||
.send(t::Printout { verbosity, content })
|
||||
.await
|
||||
.expect("fatal: kernel terminal print pipe died!");
|
||||
}
|
||||
|
@ -1,18 +1,13 @@
|
||||
use crate::kernel::process;
|
||||
use anyhow::Result;
|
||||
use lib::core::{KERNEL_PROCESS_ID, VFS_PROCESS_ID};
|
||||
use lib::types::core::{self as t, STATE_PROCESS_ID};
|
||||
pub use lib::wit;
|
||||
pub use lib::wit::Host as StandardHost;
|
||||
use lib::types::core::{self as t, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID};
|
||||
use lib::wit;
|
||||
use lib::wit::Host as StandardHost;
|
||||
use ring::signature::{self, KeyPair};
|
||||
|
||||
async fn print_debug(proc: &process::ProcessState, content: &str) {
|
||||
let _ = proc
|
||||
.send_to_terminal
|
||||
.send(t::Printout {
|
||||
verbosity: 2,
|
||||
content: format!("{}: {}", proc.metadata.our.process, content),
|
||||
})
|
||||
t::Printout::new(2, format!("{}: {}", proc.metadata.our.process, content))
|
||||
.send(&proc.send_to_terminal)
|
||||
.await;
|
||||
}
|
||||
|
||||
@ -295,11 +290,12 @@ impl process::ProcessState {
|
||||
// 1. whether this request expects a response -- if so, rsvp = our address, always
|
||||
// 2. whether this request inherits -- if so, rsvp = prompting message's rsvp
|
||||
// 3. if neither, rsvp = None
|
||||
let kernel_message = t::KernelMessage {
|
||||
id: request_id,
|
||||
source,
|
||||
target: t::Address::de_wit(target),
|
||||
rsvp: match (
|
||||
t::KernelMessage::builder()
|
||||
.id(request_id)
|
||||
.source(source)
|
||||
.target(t::Address::de_wit(target))
|
||||
.rsvp(
|
||||
match (
|
||||
request.expects_response,
|
||||
request.inherit,
|
||||
&self.prompting_message,
|
||||
@ -315,14 +311,13 @@ impl process::ProcessState {
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
message: t::Message::Request(request),
|
||||
lazy_load_blob: blob,
|
||||
};
|
||||
|
||||
self.send_to_loop
|
||||
.send(kernel_message)
|
||||
.await
|
||||
.expect("fatal: kernel couldn't send request");
|
||||
)
|
||||
.message(t::Message::Request(request))
|
||||
.lazy_load_blob(blob)
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&self.send_to_loop)
|
||||
.await;
|
||||
|
||||
Ok(request_id)
|
||||
}
|
||||
@ -333,11 +328,11 @@ impl process::ProcessState {
|
||||
|
||||
// the process requires a prompting_message in order to issue a response
|
||||
let Some(ref prompting_message) = self.prompting_message else {
|
||||
process::print(
|
||||
&self.send_to_terminal,
|
||||
t::Printout::new(
|
||||
0,
|
||||
format!("kernel: need non-None prompting_message to handle Response {response:?}"),
|
||||
)
|
||||
.send(&self.send_to_terminal)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
@ -377,21 +372,20 @@ impl process::ProcessState {
|
||||
};
|
||||
}
|
||||
|
||||
self.send_to_loop
|
||||
.send(t::KernelMessage {
|
||||
id,
|
||||
source: self.metadata.our.clone(),
|
||||
target,
|
||||
rsvp: None,
|
||||
message: t::Message::Response((
|
||||
t::KernelMessage::builder()
|
||||
.id(id)
|
||||
.source(self.metadata.our.clone())
|
||||
.target(target)
|
||||
.message(t::Message::Response((
|
||||
response,
|
||||
// the context will be set by the process receiving this Response.
|
||||
None,
|
||||
)),
|
||||
lazy_load_blob: blob,
|
||||
})
|
||||
.await
|
||||
.expect("fatal: kernel couldn't send response");
|
||||
)))
|
||||
.lazy_load_blob(blob)
|
||||
.build()
|
||||
.unwrap()
|
||||
.send(&self.send_to_loop)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@ -679,11 +673,8 @@ impl StandardHost for process::ProcessWasi {
|
||||
wit_version: self.process.metadata.wit_version,
|
||||
on_exit: t::OnExit::de_wit(on_exit),
|
||||
initial_capabilities: request_capabilities
|
||||
.iter()
|
||||
.map(|cap| t::Capability {
|
||||
issuer: t::Address::de_wit(cap.clone().issuer),
|
||||
params: cap.clone().params,
|
||||
})
|
||||
.into_iter()
|
||||
.map(|cap| t::de_wit_capability(cap).0)
|
||||
.collect(),
|
||||
public,
|
||||
})
|
||||
@ -716,7 +707,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
},
|
||||
params: "\"messaging\"".into(),
|
||||
}],
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -767,7 +758,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
issuer: self.process.metadata.our.clone(),
|
||||
params: "\"messaging\"".into(),
|
||||
}],
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -786,7 +777,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
},
|
||||
params: "\"messaging\"".into(),
|
||||
}],
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -810,7 +801,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
.iter()
|
||||
.map(|cap| t::de_wit_capability(cap.clone()).0)
|
||||
.collect(),
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await?;
|
||||
let _ = rx.await?;
|
||||
@ -828,7 +819,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
.iter()
|
||||
.map(|cap| t::de_wit_capability(cap.clone()).0)
|
||||
.collect(),
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await?;
|
||||
let _ = rx.await?;
|
||||
@ -848,10 +839,7 @@ impl StandardHost for process::ProcessWasi {
|
||||
let caps = rx.await?;
|
||||
Ok(caps
|
||||
.into_iter()
|
||||
.map(|cap| wit::Capability {
|
||||
issuer: t::Address::en_wit(&cap.0.issuer),
|
||||
params: cap.0.params,
|
||||
})
|
||||
.map(|cap| t::en_wit_capability(cap))
|
||||
.collect())
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,8 @@
|
||||
use crate::kernel::process;
|
||||
use anyhow::Result;
|
||||
use lib::core::{KERNEL_PROCESS_ID, VFS_PROCESS_ID};
|
||||
use lib::types::core::{self as t, STATE_PROCESS_ID};
|
||||
pub use lib::v0::wit;
|
||||
pub use lib::v0::wit::Host as StandardHost;
|
||||
use lib::types::core::{self as t, KERNEL_PROCESS_ID, STATE_PROCESS_ID, VFS_PROCESS_ID};
|
||||
use lib::v0::wit;
|
||||
use lib::v0::wit::Host as StandardHost;
|
||||
use ring::signature::{self, KeyPair};
|
||||
|
||||
async fn print_debug(proc: &process::ProcessState, content: &str) {
|
||||
@ -335,11 +334,11 @@ impl process::ProcessState {
|
||||
|
||||
// the process requires a prompting_message in order to issue a response
|
||||
let Some(ref prompting_message) = self.prompting_message else {
|
||||
process::print(
|
||||
&self.send_to_terminal,
|
||||
t::Printout::new(
|
||||
0,
|
||||
format!("kernel: need non-None prompting_message to handle Response {response:?}"),
|
||||
)
|
||||
.send(&self.send_to_terminal)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
@ -685,11 +684,8 @@ impl StandardHost for process::ProcessWasiV0 {
|
||||
wit_version: self.process.metadata.wit_version,
|
||||
on_exit: t::OnExit::de_wit_v0(on_exit),
|
||||
initial_capabilities: request_capabilities
|
||||
.iter()
|
||||
.map(|cap| t::Capability {
|
||||
issuer: t::Address::de_wit_v0(cap.clone().issuer),
|
||||
params: cap.clone().params,
|
||||
})
|
||||
.into_iter()
|
||||
.map(|cap| t::de_wit_capability_v0(cap).0)
|
||||
.collect(),
|
||||
public,
|
||||
})
|
||||
@ -722,7 +718,7 @@ impl StandardHost for process::ProcessWasiV0 {
|
||||
},
|
||||
params: "\"messaging\"".into(),
|
||||
}],
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -773,7 +769,7 @@ impl StandardHost for process::ProcessWasiV0 {
|
||||
issuer: self.process.metadata.our.clone(),
|
||||
params: "\"messaging\"".into(),
|
||||
}],
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -792,7 +788,7 @@ impl StandardHost for process::ProcessWasiV0 {
|
||||
},
|
||||
params: "\"messaging\"".into(),
|
||||
}],
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
@ -816,7 +812,7 @@ impl StandardHost for process::ProcessWasiV0 {
|
||||
.iter()
|
||||
.map(|cap| t::de_wit_capability_v0(cap.clone()).0)
|
||||
.collect(),
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await?;
|
||||
let _ = rx.await?;
|
||||
@ -834,7 +830,7 @@ impl StandardHost for process::ProcessWasiV0 {
|
||||
.iter()
|
||||
.map(|cap| t::de_wit_capability_v0(cap.clone()).0)
|
||||
.collect(),
|
||||
responder: tx,
|
||||
responder: Some(tx),
|
||||
})
|
||||
.await?;
|
||||
let _ = rx.await?;
|
||||
@ -854,10 +850,7 @@ impl StandardHost for process::ProcessWasiV0 {
|
||||
let caps = rx.await?;
|
||||
Ok(caps
|
||||
.into_iter()
|
||||
.map(|cap| wit::Capability {
|
||||
issuer: t::Address::en_wit_v0(&cap.0.issuer),
|
||||
params: cap.0.params,
|
||||
})
|
||||
.map(|cap| t::en_wit_capability_v0(cap))
|
||||
.collect())
|
||||
}
|
||||
|
||||
|
@ -350,11 +350,11 @@ async fn check_caps(
|
||||
node: our_node.to_string(),
|
||||
process: KV_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
params: serde_json::json!({
|
||||
"kind": "write",
|
||||
"db": request.db.to_string(),
|
||||
}))
|
||||
.unwrap(),
|
||||
})
|
||||
.to_string(),
|
||||
},
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
@ -376,11 +376,11 @@ async fn check_caps(
|
||||
node: our_node.to_string(),
|
||||
process: KV_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
params: serde_json::json!({
|
||||
"kind": "read",
|
||||
"db": request.db.to_string(),
|
||||
}))
|
||||
.unwrap(),
|
||||
})
|
||||
.to_string(),
|
||||
},
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
@ -458,14 +458,14 @@ async fn add_capability(
|
||||
node: our_node.to_string(),
|
||||
process: KV_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({ "kind": kind, "db": db })).unwrap(),
|
||||
params: serde_json::json!({ "kind": kind, "db": db }).to_string(),
|
||||
};
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Add {
|
||||
on: source.process.clone(),
|
||||
caps: vec![cap],
|
||||
responder: send_cap_bool,
|
||||
responder: Some(send_cap_bool),
|
||||
})
|
||||
.await?;
|
||||
let _ = recv_cap_bool.await?;
|
||||
|
@ -355,17 +355,14 @@ async fn check_caps(
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our_node.to_string(),
|
||||
process: SQLITE_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
cap: Capability::new(
|
||||
(our_node, SQLITE_PROCESS_ID.clone()),
|
||||
serde_json::json!({
|
||||
"kind": "write",
|
||||
"db": request.db.to_string(),
|
||||
}))
|
||||
.unwrap(),
|
||||
},
|
||||
})
|
||||
.to_string(),
|
||||
),
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await?;
|
||||
@ -381,17 +378,14 @@ async fn check_caps(
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our_node.to_string(),
|
||||
process: SQLITE_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
cap: Capability::new(
|
||||
(our_node, SQLITE_PROCESS_ID.clone()),
|
||||
serde_json::json!({
|
||||
"kind": "read",
|
||||
"db": request.db.to_string(),
|
||||
}))
|
||||
.unwrap(),
|
||||
},
|
||||
})
|
||||
.to_string(),
|
||||
),
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await?;
|
||||
@ -477,14 +471,14 @@ async fn add_capability(
|
||||
node: our_node.to_string(),
|
||||
process: SQLITE_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({ "kind": kind, "db": db })).unwrap(),
|
||||
params: serde_json::json!({ "kind": kind, "db": db }).to_string(),
|
||||
};
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Add {
|
||||
on: source.process.clone(),
|
||||
caps: vec![cap],
|
||||
responder: send_cap_bool,
|
||||
responder: Some(send_cap_bool),
|
||||
})
|
||||
.await?;
|
||||
let _ = recv_cap_bool.await?;
|
||||
|
@ -36,7 +36,8 @@ pub async fn load_state(
|
||||
let kernel_id_vec = process_to_vec(KERNEL_PROCESS_ID.clone());
|
||||
match db.get(&kernel_id_vec) {
|
||||
Ok(Some(value)) => {
|
||||
process_map = bincode::deserialize::<ProcessMap>(&value).unwrap();
|
||||
process_map = bincode::deserialize::<ProcessMap>(&value)
|
||||
.expect("failed to deserialize kernel process map");
|
||||
// if our networking key changed, we need to re-sign all local caps
|
||||
process_map.iter_mut().for_each(|(_id, process)| {
|
||||
process.capabilities.iter_mut().for_each(|(cap, sig)| {
|
||||
@ -116,7 +117,6 @@ pub async fn state_sender(
|
||||
let our_node = our_node.clone();
|
||||
let db_clone = db.clone();
|
||||
let send_to_loop = send_to_loop.clone();
|
||||
let send_to_terminal = send_to_terminal.clone();
|
||||
let home_directory_path = home_directory_path.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
@ -129,9 +129,6 @@ pub async fn state_sender(
|
||||
handle_request(&our_node, km, db_clone, &send_to_loop, &home_directory_path)
|
||||
.await
|
||||
{
|
||||
Printout::new(1, format!("state: {e}"))
|
||||
.send(&send_to_terminal)
|
||||
.await;
|
||||
KernelMessage::builder()
|
||||
.id(km_id)
|
||||
.source((our_node.as_str(), STATE_PROCESS_ID.clone()))
|
||||
@ -559,11 +556,11 @@ async fn bootstrap(
|
||||
node: our_name.into(),
|
||||
process: VFS_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
params: serde_json::json!({
|
||||
"kind": "read",
|
||||
"drive": drive_path,
|
||||
}))
|
||||
.unwrap(),
|
||||
})
|
||||
.to_string(),
|
||||
};
|
||||
requested_caps.insert(read_cap.clone(), sign_cap(read_cap, keypair.clone()));
|
||||
let write_cap = Capability {
|
||||
@ -571,11 +568,11 @@ async fn bootstrap(
|
||||
node: our_name.into(),
|
||||
process: VFS_PROCESS_ID.clone(),
|
||||
},
|
||||
params: serde_json::to_string(&serde_json::json!({
|
||||
params: serde_json::json!({
|
||||
"kind": "write",
|
||||
"drive": drive_path,
|
||||
}))
|
||||
.unwrap(),
|
||||
})
|
||||
.to_string(),
|
||||
};
|
||||
requested_caps.insert(write_cap.clone(), sign_cap(write_cap, keypair.clone()));
|
||||
|
||||
|
@ -391,16 +391,29 @@ async fn handle_request(
|
||||
}
|
||||
VfsAction::Rename { new_path } => {
|
||||
let new_path = join_paths_safely(vfs_path, &new_path);
|
||||
fs::rename(&path, new_path).await?;
|
||||
fs::rename(&path, new_path)
|
||||
.await
|
||||
.map_err(|e| VfsError::IOError {
|
||||
error: e.to_string(),
|
||||
path: request.path,
|
||||
})?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::CopyFile { new_path } => {
|
||||
let new_path = join_paths_safely(vfs_path, &new_path);
|
||||
fs::copy(&path, new_path).await?;
|
||||
fs::copy(&path, new_path)
|
||||
.await
|
||||
.map_err(|e| VfsError::IOError {
|
||||
error: e.to_string(),
|
||||
path: request.path,
|
||||
})?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::Metadata => {
|
||||
let metadata = fs::metadata(&path).await?;
|
||||
let metadata = fs::metadata(&path).await.map_err(|e| VfsError::IOError {
|
||||
error: e.to_string(),
|
||||
path: request.path,
|
||||
})?;
|
||||
let file_type = get_file_type(&metadata);
|
||||
let meta = FileMetadata {
|
||||
len: metadata.len(),
|
||||
@ -411,13 +424,23 @@ async fn handle_request(
|
||||
VfsAction::Len => {
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
let len = file.metadata().await?.len();
|
||||
let len = file
|
||||
.metadata()
|
||||
.await
|
||||
.map_err(|e| VfsError::IOError {
|
||||
error: e.to_string(),
|
||||
path: request.path,
|
||||
})?
|
||||
.len();
|
||||
(VfsResponse::Len(len), None)
|
||||
}
|
||||
VfsAction::SetLen(len) => {
|
||||
let file = open_file(open_files, path, false, false).await?;
|
||||
let file = open_file(open_files, &path, false, false).await?;
|
||||
let file = file.lock().await;
|
||||
file.set_len(len).await?;
|
||||
file.set_len(len).await.map_err(|e| VfsError::IOError {
|
||||
error: e.to_string(),
|
||||
path: request.path,
|
||||
})?;
|
||||
(VfsResponse::Ok, None)
|
||||
}
|
||||
VfsAction::Hash => {
|
||||
@ -464,7 +487,7 @@ async fn handle_request(
|
||||
let (is_file, is_dir, local_path, file_contents) = {
|
||||
let mut file = zip.by_index(i).map_err(|e| VfsError::IOError {
|
||||
error: e.to_string(),
|
||||
path: "".into(),
|
||||
path: request.path.clone(),
|
||||
})?;
|
||||
let is_file = file.is_file();
|
||||
let is_dir = file.is_dir();
|
||||
@ -754,20 +777,18 @@ async fn read_capability(
|
||||
send_to_caps_oracle: &CapMessageSender,
|
||||
) -> bool {
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
if let Err(_) = send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap: Capability {
|
||||
issuer: Address {
|
||||
node: our_node.to_string(),
|
||||
process: VFS_PROCESS_ID.clone(),
|
||||
},
|
||||
params: if root {
|
||||
"{{\"root\": true}}".to_string()
|
||||
let cap = Capability::new(
|
||||
(our_node, VFS_PROCESS_ID.clone()),
|
||||
if root {
|
||||
"{\"root\":true}".to_string()
|
||||
} else {
|
||||
format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}")
|
||||
},
|
||||
},
|
||||
);
|
||||
if let Err(_) = send_to_caps_oracle
|
||||
.send(CapMessage::Has {
|
||||
on: source.process.clone(),
|
||||
cap,
|
||||
responder: send_cap_bool,
|
||||
})
|
||||
.await
|
||||
@ -784,19 +805,16 @@ async fn add_capability(
|
||||
source: &Address,
|
||||
send_to_caps_oracle: &CapMessageSender,
|
||||
) -> Result<(), VfsError> {
|
||||
let cap = Capability {
|
||||
issuer: Address {
|
||||
node: our_node.to_string(),
|
||||
process: VFS_PROCESS_ID.clone(),
|
||||
},
|
||||
params: format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}"),
|
||||
};
|
||||
let cap = Capability::new(
|
||||
(our_node, VFS_PROCESS_ID.clone()),
|
||||
format!("{{\"kind\": \"{kind}\", \"drive\": \"{drive}\"}}"),
|
||||
);
|
||||
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
|
||||
send_to_caps_oracle
|
||||
.send(CapMessage::Add {
|
||||
on: source.process.clone(),
|
||||
caps: vec![cap],
|
||||
responder: send_cap_bool,
|
||||
responder: Some(send_cap_bool),
|
||||
})
|
||||
.await?;
|
||||
match recv_cap_bool.await? {
|
||||
|
135
lib/src/core.rs
135
lib/src/core.rs
@ -279,12 +279,13 @@ pub struct Address {
|
||||
}
|
||||
|
||||
impl Address {
|
||||
pub fn new<T>(node: &str, process: T) -> Address
|
||||
pub fn new<T, U>(node: T, process: U) -> Address
|
||||
where
|
||||
T: Into<ProcessId>,
|
||||
T: Into<String>,
|
||||
U: Into<ProcessId>,
|
||||
{
|
||||
Address {
|
||||
node: node.to_string(),
|
||||
node: node.into(),
|
||||
process: process.into(),
|
||||
}
|
||||
}
|
||||
@ -399,11 +400,12 @@ impl From<(&str, &str, &str, &str)> for Address {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<(&str, T)> for Address
|
||||
impl<T, U> From<(T, U)> for Address
|
||||
where
|
||||
T: Into<ProcessId>,
|
||||
T: Into<String>,
|
||||
U: Into<ProcessId>,
|
||||
{
|
||||
fn from(input: (&str, T)) -> Self {
|
||||
fn from(input: (T, U)) -> Self {
|
||||
Address::new(input.0, input.1)
|
||||
}
|
||||
}
|
||||
@ -468,21 +470,50 @@ pub enum Message {
|
||||
Response((Response, Option<Context>)),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Hash, Serialize, Deserialize)]
|
||||
pub struct Capability {
|
||||
pub issuer: Address,
|
||||
pub params: String, // JSON-string
|
||||
pub params: String,
|
||||
}
|
||||
|
||||
impl Eq for Capability {}
|
||||
|
||||
impl PartialEq for Capability {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
let self_json_params: serde_json::Value =
|
||||
serde_json::from_str(&self.params).unwrap_or_default();
|
||||
let other_json_params: serde_json::Value =
|
||||
serde_json::from_str(&other.params).unwrap_or_default();
|
||||
self.issuer == other.issuer && self_json_params == other_json_params
|
||||
}
|
||||
}
|
||||
|
||||
impl Capability {
|
||||
pub fn new<T, U>(issuer: T, params: U) -> Self
|
||||
where
|
||||
T: Into<Address>,
|
||||
U: Into<String>,
|
||||
{
|
||||
Capability {
|
||||
issuer: issuer.into(),
|
||||
params: params.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn messaging<T>(issuer: T) -> Self
|
||||
where
|
||||
T: Into<Address>,
|
||||
{
|
||||
Capability {
|
||||
issuer: issuer.into(),
|
||||
params: "\"messaging\"".into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Capability {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}({})",
|
||||
self.issuer,
|
||||
serde_json::from_str::<serde_json::Value>(&self.params)
|
||||
.unwrap_or(serde_json::json!("invalid JSON in capability"))
|
||||
)
|
||||
write!(f, "{}({})", self.issuer, self.params)
|
||||
}
|
||||
}
|
||||
|
||||
@ -597,6 +628,20 @@ impl OnExit {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for OnExit {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}",
|
||||
match self {
|
||||
OnExit::None => "None",
|
||||
OnExit::Restart => "Restart",
|
||||
OnExit::Requests(_) => "Requests",
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Message {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", display_message(self, "\n "))
|
||||
@ -839,7 +884,7 @@ pub fn de_wit_capability(wit: wit::Capability) -> (Capability, Vec<u8>) {
|
||||
publisher_node: wit.issuer.process.publisher_node,
|
||||
},
|
||||
},
|
||||
params: wit.params,
|
||||
params: serde_json::from_str(&wit.params).unwrap_or_default(),
|
||||
},
|
||||
vec![],
|
||||
)
|
||||
@ -856,7 +901,7 @@ pub fn de_wit_capability_v0(wit: crate::v0::wit::Capability) -> (Capability, Vec
|
||||
publisher_node: wit.issuer.process.publisher_node,
|
||||
},
|
||||
},
|
||||
params: wit.params,
|
||||
params: serde_json::from_str(&wit.params).unwrap_or_default(),
|
||||
},
|
||||
vec![],
|
||||
)
|
||||
@ -865,14 +910,14 @@ pub fn de_wit_capability_v0(wit: crate::v0::wit::Capability) -> (Capability, Vec
|
||||
pub fn en_wit_capability(cap: (Capability, Vec<u8>)) -> wit::Capability {
|
||||
wit::Capability {
|
||||
issuer: cap.0.issuer.en_wit(),
|
||||
params: cap.0.params,
|
||||
params: cap.0.params.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn en_wit_capability_v0(cap: (Capability, Vec<u8>)) -> crate::v0::wit::Capability {
|
||||
crate::v0::wit::Capability {
|
||||
issuer: cap.0.issuer.en_wit_v0(),
|
||||
params: cap.0.params,
|
||||
params: cap.0.params.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -1347,13 +1392,13 @@ pub enum CapMessage {
|
||||
Add {
|
||||
on: ProcessId,
|
||||
caps: Vec<Capability>,
|
||||
responder: tokio::sync::oneshot::Sender<bool>,
|
||||
responder: Option<tokio::sync::oneshot::Sender<bool>>,
|
||||
},
|
||||
/// root delete: uncritically remove all `caps` from `on`
|
||||
Drop {
|
||||
on: ProcessId,
|
||||
caps: Vec<Capability>,
|
||||
responder: tokio::sync::oneshot::Sender<bool>,
|
||||
responder: Option<tokio::sync::oneshot::Sender<bool>>,
|
||||
},
|
||||
/// does `on` have `cap` in its store?
|
||||
Has {
|
||||
@ -1370,7 +1415,7 @@ pub enum CapMessage {
|
||||
/// Remove all caps issued by `on` from every process on the entire system
|
||||
RevokeAll {
|
||||
on: ProcessId,
|
||||
responder: tokio::sync::oneshot::Sender<bool>,
|
||||
responder: Option<tokio::sync::oneshot::Sender<bool>>,
|
||||
},
|
||||
/// before `on` sends a message, filter out any bogus caps it may have attached, sign any new
|
||||
/// caps it may have created, and retreive the signature for the caps in its store.
|
||||
@ -1492,19 +1537,19 @@ pub enum StateResponse {
|
||||
|
||||
#[derive(Error, Debug, Serialize, Deserialize)]
|
||||
pub enum StateError {
|
||||
#[error("kernel_state: rocksdb internal error: {error}")]
|
||||
#[error("rocksdb internal error: {error}")]
|
||||
RocksDBError { action: String, error: String },
|
||||
#[error("kernel_state: startup error")]
|
||||
#[error("startup error")]
|
||||
StartupError { action: String },
|
||||
#[error("kernel_state: bytes blob required for {action}")]
|
||||
#[error("bytes blob required for {action}")]
|
||||
BadBytes { action: String },
|
||||
#[error("kernel_state: bad request error: {error}")]
|
||||
#[error("bad request error: {error}")]
|
||||
BadRequest { error: String },
|
||||
#[error("kernel_state: Bad JSON blob: {error}")]
|
||||
#[error("Bad JSON blob: {error}")]
|
||||
BadJson { error: String },
|
||||
#[error("kernel_state: state not found for ProcessId {process_id}")]
|
||||
#[error("state not found for ProcessId {process_id}")]
|
||||
NotFound { process_id: ProcessId },
|
||||
#[error("kernel_state: IO error: {error}")]
|
||||
#[error("IO error: {error}")]
|
||||
IOError { error: String },
|
||||
}
|
||||
|
||||
@ -1601,23 +1646,23 @@ pub enum VfsResponse {
|
||||
|
||||
#[derive(Error, Debug, Serialize, Deserialize)]
|
||||
pub enum VfsError {
|
||||
#[error("vfs: No capability for action {action} at path {path}")]
|
||||
#[error("No capability for action {action} at path {path}")]
|
||||
NoCap { action: String, path: String },
|
||||
#[error("vfs: Bytes blob required for {action} at path {path}")]
|
||||
#[error("Bytes blob required for {action} at path {path}")]
|
||||
BadBytes { action: String, path: String },
|
||||
#[error("vfs: bad request error: {error}")]
|
||||
#[error("bad request error: {error}")]
|
||||
BadRequest { error: String },
|
||||
#[error("vfs: error parsing path: {path}: {error}")]
|
||||
#[error("error parsing path: {path}: {error}")]
|
||||
ParseError { error: String, path: String },
|
||||
#[error("vfs: IO error: {error}, at path {path}")]
|
||||
#[error("IO error: {error}, at path {path}")]
|
||||
IOError { error: String, path: String },
|
||||
#[error("vfs: kernel capability channel error: {error}")]
|
||||
#[error("kernel capability channel error: {error}")]
|
||||
CapChannelFail { error: String },
|
||||
#[error("vfs: Bad JSON blob: {error}")]
|
||||
#[error("Bad JSON blob: {error}")]
|
||||
BadJson { error: String },
|
||||
#[error("vfs: File not found at path {path}")]
|
||||
#[error("File not found at path {path}")]
|
||||
NotFound { path: String },
|
||||
#[error("vfs: Creating directory failed at path: {path}: {error}")]
|
||||
#[error("Creating directory failed at path: {path}: {error}")]
|
||||
CreateDirError { path: String, error: String },
|
||||
}
|
||||
|
||||
@ -1667,19 +1712,19 @@ pub enum KvResponse {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Error)]
|
||||
pub enum KvError {
|
||||
#[error("kv: DbDoesNotExist")]
|
||||
#[error("DbDoesNotExist")]
|
||||
NoDb,
|
||||
#[error("kv: KeyNotFound")]
|
||||
#[error("KeyNotFound")]
|
||||
KeyNotFound,
|
||||
#[error("kv: no Tx found")]
|
||||
#[error("no Tx found")]
|
||||
NoTx,
|
||||
#[error("kv: No capability: {error}")]
|
||||
#[error("No capability: {error}")]
|
||||
NoCap { error: String },
|
||||
#[error("kv: rocksdb internal error: {error}")]
|
||||
#[error("rocksdb internal error: {error}")]
|
||||
RocksDBError { action: String, error: String },
|
||||
#[error("kv: input bytes/json/key error: {error}")]
|
||||
#[error("input bytes/json/key error: {error}")]
|
||||
InputError { error: String },
|
||||
#[error("kv: IO error: {error}")]
|
||||
#[error("IO error: {error}")]
|
||||
IOError { error: String },
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user