Merge pull request #234 from kinode-dao/da/drop

Da/drop
This commit is contained in:
tadad 2024-02-08 14:42:53 -06:00 committed by GitHub
commit 1eb4824c94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 387 additions and 182 deletions

19
Cargo.lock generated
View File

@ -3206,6 +3206,23 @@ dependencies = [
"wit-bindgen", "wit-bindgen",
] ]
[[package]]
name = "kinode_process_lib"
version = "0.6.0"
source = "git+https://github.com/kinode-dao/process_lib?tag=v0.6.0-alpha#4c49368e1945c041dfcabbe15734322012239d25"
dependencies = [
"anyhow",
"bincode",
"http 1.0.0",
"mime_guess",
"rand 0.8.5",
"serde",
"serde_json",
"thiserror",
"url",
"wit-bindgen",
]
[[package]] [[package]]
name = "kit" name = "kit"
version = "0.1.0" version = "0.1.0"
@ -5535,7 +5552,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bincode", "bincode",
"kinode_process_lib 0.5.9 (git+https://github.com/kinode-dao/process_lib?tag=v0.5.9-alpha)", "kinode_process_lib 0.6.0",
"rand 0.8.5", "rand 0.8.5",
"regex", "regex",
"serde", "serde",

View File

@ -18,7 +18,7 @@ If you have questions, join the [Kinode discord](https://discord.gg/TCgdca5Bjt)
```bash ```bash
# Clone the repo. # Clone the repo.
git clone https://github.com/kinode-dao/kinode.git git clone git@github.com:kinode-dao/kinode.git
# Get some stuff so we can build Wasm. # Get some stuff so we can build Wasm.

View File

@ -2,7 +2,6 @@ use kinode_process_lib::{
await_next_request_body, call_init, println, Address, ProcessId, Request, await_next_request_body, call_init, println, Address, ProcessId, Request,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json;
wit_bindgen::generate!({ wit_bindgen::generate!({
path: "wit", path: "wit",
@ -12,10 +11,12 @@ wit_bindgen::generate!({
}, },
}); });
#[derive(Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct EditAliases { enum TerminalAction {
EditAlias {
alias: String, alias: String,
process: Option<ProcessId>, process: Option<ProcessId>,
},
} }
call_init!(init); call_init!(init);
@ -27,6 +28,12 @@ fn init(_our: Address) {
}; };
let line = String::from_utf8(args).unwrap_or("alias: error".into()); let line = String::from_utf8(args).unwrap_or("alias: error".into());
if line.is_empty() {
println!("Change alias for a process");
println!("\x1b[1mUsage:\x1b[0m alias <alias_name> <process_id>");
return;
}
let (alias, process) = line.split_once(" ").unwrap_or((&line, "")); let (alias, process) = line.split_once(" ").unwrap_or((&line, ""));
if alias.is_empty() { if alias.is_empty() {
@ -38,13 +45,11 @@ fn init(_our: Address) {
let _ = Request::new() let _ = Request::new()
.target(("our", "terminal", "terminal", "sys")) .target(("our", "terminal", "terminal", "sys"))
.body( .body(
json!(EditAliases { serde_json::to_vec(&TerminalAction::EditAlias {
alias: alias.to_string(), alias: alias.to_string(),
process: None process: None,
}) })
.to_string() .unwrap(),
.as_bytes()
.to_vec(),
) )
.send(); .send();
} else { } else {
@ -53,13 +58,11 @@ fn init(_our: Address) {
let _ = Request::new() let _ = Request::new()
.target(("our", "terminal", "terminal", "sys")) .target(("our", "terminal", "terminal", "sys"))
.body( .body(
json!(EditAliases { serde_json::to_vec(&TerminalAction::EditAlias {
alias: alias.to_string(), alias: alias.to_string(),
process: Some(process) process: Some(process),
}) })
.to_string() .unwrap(),
.as_bytes()
.to_vec(),
) )
.send(); .send();
} }

View File

@ -19,10 +19,16 @@ fn init(_our: Address) {
}; };
let Ok(file_path) = String::from_utf8(args) else { let Ok(file_path) = String::from_utf8(args) else {
println!("bad file path"); println!("cat: bad args, aborting");
return; return;
}; };
if file_path.is_empty() {
println!("Print the contents of a file to the terminal");
println!("\x1b[1mUsage:\x1b[0m cat <file_path>");
return;
}
Request::new() Request::new()
.target(("our", "vfs", "distro", "sys")) .target(("our", "vfs", "distro", "sys"))
.body( .body(

View File

@ -19,6 +19,11 @@ fn init(our: Address) {
}; };
let tail = String::from_utf8(args).unwrap(); let tail = String::from_utf8(args).unwrap();
if tail.is_empty() {
println!("Send a Message to another node's terminal");
println!("\x1b[1mUsage:\x1b[0m hi <node_id> <message>");
return;
}
let (node_id, message) = match tail.split_once(" ") { let (node_id, message) = match tail.split_once(" ") {
Some((s, t)) => (s, t), Some((s, t)) => (s, t),

View File

@ -18,6 +18,11 @@ fn init(_our: Address) {
return; return;
}; };
let body_string = String::from_utf8(body).unwrap(); let body_string = String::from_utf8(body).unwrap();
if body_string.is_empty() {
println!("Send a Request to a Process");
println!("\x1b[1mUsage:\x1b[0m m <target> <body> [-a <await_time>]");
return;
}
let re = Regex::new(r#"'[^']*'|\S+"#).unwrap(); let re = Regex::new(r#"'[^']*'|\S+"#).unwrap();
let mut args: Vec<String> = re let mut args: Vec<String> = re

View File

@ -7,7 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
bincode = "1.3.3" bincode = "1.3.3"
kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.5.9-alpha" } kinode_process_lib = { git = "https://github.com/kinode-dao/process_lib", tag = "v0.6.0-alpha" }
rand = "0.8" rand = "0.8"
regex = "1.10.3" regex = "1.10.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -3,7 +3,7 @@ use kinode_process_lib::kernel_types as kt;
use kinode_process_lib::kinode::process::standard as wit; use kinode_process_lib::kinode::process::standard as wit;
use kinode_process_lib::{ use kinode_process_lib::{
get_blob, get_typed_state, our_capabilities, print_to_terminal, println, set_state, vfs, get_blob, get_typed_state, our_capabilities, print_to_terminal, println, set_state, vfs,
Address, Capability, PackageId, ProcessId, Request, Address, Capability, ProcessId, Request,
}; };
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -18,9 +18,11 @@ wit_bindgen::generate!({
}); });
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct EditAliases { enum TerminalAction {
EditAlias {
alias: String, alias: String,
process: Option<ProcessId>, process: Option<ProcessId>,
},
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -67,9 +69,7 @@ fn parse_command(state: &mut TerminalState, line: &str) -> anyhow::Result<()> {
None => (args.to_string(), None), None => (args.to_string(), None),
}; };
let wasm_path = format!("{}.wasm", process.process()); match handle_run(&state.our, &process, pipe.0, pipe.1) {
let package = PackageId::new(process.package(), process.publisher());
match handle_run(&state.our, &package, wasm_path, pipe.0, pipe.1) {
Ok(_) => Ok(()), // TODO clean up process Ok(_) => Ok(()), // TODO clean up process
Err(e) => Err(anyhow!("failed to instantiate script: {}", e)), Err(e) => Err(anyhow!("failed to instantiate script: {}", e)),
} }
@ -130,33 +130,23 @@ impl Guest for Component {
Ok(()) => continue, Ok(()) => continue,
Err(e) => println!("terminal: {e}"), Err(e) => println!("terminal: {e}"),
} }
} else if state.our.node == source.node { } else if state.our.node == source.node
let Ok(edit_aliases) = serde_json::from_slice::<EditAliases>(&body) else { && state.our.package() == source.package()
println!("terminal: invalid action!"); {
let Ok(action) = serde_json::from_slice::<TerminalAction>(&body) else {
println!("terminal: failed to parse action from: {}", source);
continue; continue;
}; };
match action {
match edit_aliases.process { TerminalAction::EditAlias { alias, process } => {
Some(process) => { match handle_alias_change(&mut state, alias, process) {
state Ok(()) => continue,
.aliases Err(e) => println!("terminal: {e}"),
.insert(edit_aliases.alias.clone(), process.clone()); };
println!(
"terminal: alias {} set to {}",
edit_aliases.alias, process
);
} }
None => {
state.aliases.remove(&edit_aliases.alias);
println!("terminal: alias {} removed", edit_aliases.alias);
}
}
if let Ok(new_state) = bincode::serialize(&state) {
set_state(&new_state);
} else {
println!("terminal: failed to serialize state!");
} }
} else { } else {
println!("terminal: ignoring message from: {}", source);
continue; continue;
} }
} }
@ -174,28 +164,14 @@ impl Guest for Component {
fn handle_run( fn handle_run(
our: &Address, our: &Address,
package: &PackageId, process: &ProcessId,
wasm_path: String,
args: String, args: String,
pipe: Option<(String, u64)>, pipe: Option<(String, u64)>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let wasm_path = format!("{}.wasm", process.process());
let package = format!("{}:{}", process.package(), process.publisher());
let drive_path = format!("/{}/pkg", package); let drive_path = format!("/{}/pkg", package);
Request::new() let Ok(entry) = get_entry(process) else {
.target(("our", "vfs", "distro", "sys"))
.body(serde_json::to_vec(&vfs::VfsRequest {
path: format!("{}/scripts.json", drive_path),
action: vfs::VfsAction::Read,
})?)
.send_and_await_response(5)??;
let Some(blob) = get_blob() else {
return Err(anyhow::anyhow!(
"couldn't find /{}/pkg/scripts.json",
package
));
};
let dot_scripts = String::from_utf8(blob.bytes)?;
let dot_scripts = serde_json::from_str::<HashMap<String, kt::DotScriptsEntry>>(&dot_scripts)?;
let Some(entry) = dot_scripts.get(&wasm_path) else {
return Err(anyhow::anyhow!("script not in scripts.json file")); return Err(anyhow::anyhow!("script not in scripts.json file"));
}; };
let wasm_path = if wasm_path.starts_with("/") { let wasm_path = if wasm_path.starts_with("/") {
@ -207,7 +183,7 @@ fn handle_run(
// build initial caps // build initial caps
let process_id = format!("{}:{}", rand::random::<u64>(), package); // all scripts are given random process IDs let process_id = format!("{}:{}", rand::random::<u64>(), package); // all scripts are given random process IDs
let Ok(parsed_new_process_id) = process_id.parse::<ProcessId>() else { let Ok(parsed_new_process_id) = process_id.parse::<ProcessId>() else {
return Err(anyhow::anyhow!("app store: invalid process id!")); return Err(anyhow::anyhow!("terminal: invalid process id!"));
}; };
let _bytes_response = Request::new() let _bytes_response = Request::new()
@ -217,13 +193,69 @@ fn handle_run(
action: vfs::VfsAction::Read, action: vfs::VfsAction::Read,
})?) })?)
.send_and_await_response(5)??; .send_and_await_response(5)??;
// process the caps we are going to grant to other processes
let mut granted_caps: Vec<(ProcessId, Capability)> = vec![];
if let Some(to_grant) = &entry.grant_capabilities {
for value in to_grant {
match value {
serde_json::Value::String(process_name) => {
if let Ok(parsed_process_id) = process_name.parse::<ProcessId>() {
granted_caps.push((
parsed_process_id,
Capability {
issuer: Address {
node: our.node.clone(),
process: parsed_new_process_id.clone(),
},
params: "\"messaging\"".into(),
},
));
}
}
serde_json::Value::Object(map) => {
if let Some(process_name) = map.get("process") {
if let Ok(parsed_process_id) = process_name
.as_str()
.unwrap_or_default()
.parse::<ProcessId>()
{
if let Some(params) = map.get("params") {
granted_caps.push((
parsed_process_id,
Capability {
issuer: Address {
node: our.node.clone(),
process: parsed_new_process_id.clone(),
},
params: params.to_string(),
},
));
}
}
}
}
_ => {
continue;
}
}
}
}
for (process, cap) in granted_caps.into_iter() {
Request::new()
.target(("our", "kernel", "distro", "sys"))
.body(serde_json::to_vec(&kt::KernelCommand::GrantCapabilities {
target: process,
capabilities: vec![kt::de_wit_capability(cap)],
})?)
.send()?;
}
Request::new() Request::new()
.target(("our", "kernel", "distro", "sys")) .target(("our", "kernel", "distro", "sys"))
.body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess { .body(serde_json::to_vec(&kt::KernelCommand::InitializeProcess {
id: parsed_new_process_id.clone(), id: parsed_new_process_id.clone(),
wasm_bytes_handle: wasm_path.clone(), wasm_bytes_handle: wasm_path.clone(),
wit_version: None, wit_version: None,
on_exit: kt::OnExit::None, // TODO this should send a message back to runner:script:sys so that it can Drop capabilities on_exit: kt::OnExit::None,
initial_capabilities: HashSet::new(), initial_capabilities: HashSet::new(),
public: entry.public, public: entry.public,
})?) })?)
@ -269,6 +301,11 @@ fn handle_run(
} }
} }
} }
// always give it the cap to message the terminal back
requested_caps.push(kt::de_wit_capability(Capability {
issuer: our.clone(),
params: "\"messaging\"".to_string(),
}));
if entry.request_networking { if entry.request_networking {
requested_caps.push(kt::de_wit_capability(Capability { requested_caps.push(kt::de_wit_capability(Capability {
issuer: Address::new(&our.node, ("kernel", "distro", "sys")), issuer: Address::new(&our.node, ("kernel", "distro", "sys")),
@ -305,63 +342,6 @@ fn handle_run(
capabilities: requested_caps, capabilities: requested_caps,
})?) })?)
.send()?; .send()?;
if let Some(to_grant) = &entry.grant_capabilities {
for value in to_grant {
match value {
serde_json::Value::String(process_name) => {
if let Ok(parsed_process_id) = process_name.parse::<ProcessId>() {
let _ = Request::new()
.target(("our", "kernel", "distro", "sys"))
.body(
serde_json::to_vec(&kt::KernelCommand::GrantCapabilities {
target: parsed_process_id,
capabilities: vec![kt::Capability {
issuer: Address {
node: our.node.clone(),
process: parsed_new_process_id.clone(),
},
params: "\"messaging\"".into(),
}],
})
.unwrap(),
)
.send()?;
}
}
serde_json::Value::Object(map) => {
if let Some(process_name) = map.get("process") {
if let Ok(parsed_process_id) = process_name
.as_str()
.unwrap_or_default()
.parse::<ProcessId>()
{
if let Some(params) = map.get("params") {
let _ = Request::new()
.target(("our", "kernel", "distro", "sys"))
.body(
serde_json::to_vec(&kt::KernelCommand::GrantCapabilities {
target: parsed_process_id,
capabilities: vec![kt::Capability {
issuer: Address {
node: our.node.clone(),
process: parsed_new_process_id.clone(),
},
params: params.to_string(),
}],
})
.unwrap(),
)
.send()?;
}
}
}
}
_ => {
continue;
}
}
}
}
let _ = Request::new() let _ = Request::new()
.target(("our", "kernel", "distro", "sys")) .target(("our", "kernel", "distro", "sys"))
.body(serde_json::to_vec(&kt::KernelCommand::RunProcess( .body(serde_json::to_vec(&kt::KernelCommand::RunProcess(
@ -396,3 +376,54 @@ fn handle_run(
Ok(()) Ok(())
} }
fn handle_alias_change(
state: &mut TerminalState,
alias: String,
process: Option<ProcessId>,
) -> anyhow::Result<()> {
match process {
Some(process) => {
// first check to make sure the script is actually a script
let Ok(_) = get_entry(&process) else {
return Err(anyhow!("terminal: process {} not found", process));
};
state.aliases.insert(alias.clone(), process.clone());
println!("terminal: alias {} set to {}", alias, process);
}
None => {
if state.aliases.contains_key(&alias) {
state.aliases.remove(&alias);
println!("terminal: alias {} removed", alias);
} else {
println!("terminal: alias {} not found", alias);
}
}
}
set_state(&bincode::serialize(&state)?);
Ok(())
}
fn get_entry(process: &ProcessId) -> anyhow::Result<kt::DotScriptsEntry> {
let drive_path = format!("/{}:{}/pkg", process.package(), process.publisher());
Request::new()
.target(("our", "vfs", "distro", "sys"))
.body(serde_json::to_vec(&vfs::VfsRequest {
path: format!("{}/scripts.json", drive_path),
action: vfs::VfsAction::Read,
})?)
.send_and_await_response(5)??;
let Some(blob) = get_blob() else {
return Err(anyhow::anyhow!(
"couldn't find /{}/pkg/scripts.json",
process.package()
));
};
let dot_scripts = String::from_utf8(blob.bytes)?;
let dot_scripts = serde_json::from_str::<HashMap<String, kt::DotScriptsEntry>>(&dot_scripts)?;
let Some(entry) = dot_scripts.get(&format!("{}.wasm", process.process())) else {
return Err(anyhow::anyhow!("script not in scripts.json file"));
};
Ok(entry.clone())
}

View File

@ -86,6 +86,7 @@ async fn handle_kernel_request(
senders: &mut Senders, senders: &mut Senders,
process_handles: &mut ProcessHandles, process_handles: &mut ProcessHandles,
process_map: &mut t::ProcessMap, process_map: &mut t::ProcessMap,
reverse_cap_index: &mut t::ReverseCapIndex,
caps_oracle: t::CapMessageSender, caps_oracle: t::CapMessageSender,
engine: &Engine, engine: &Engine,
) { ) {
@ -334,7 +335,37 @@ async fn handle_kernel_request(
) )
}) })
.collect(); .collect();
entry.capabilities.extend(signed_caps); entry.capabilities.extend(signed_caps.clone());
// add these to reverse cap index
for (cap, _) in &signed_caps {
reverse_cap_index
.entry(cap.clone().issuer.process)
.or_insert_with(HashMap::new)
.entry(target.clone())
.or_insert_with(Vec::new)
.push(cap.clone());
}
let _ = persist_state(&our_name, &send_to_loop, process_map).await;
}
t::KernelCommand::DropCapabilities {
target,
capabilities,
} => {
let Some(entry) = process_map.get_mut(&target) else {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 1,
content: format!(
"kernel: no such process {:?} to DropCapabilities",
target
),
})
.await;
return;
};
for cap in capabilities {
entry.capabilities.remove(&cap);
}
let _ = persist_state(&our_name, &send_to_loop, process_map).await; let _ = persist_state(&our_name, &send_to_loop, process_map).await;
} }
// send 'run' message to a process that's already been initialized // send 'run' message to a process that's already been initialized
@ -423,7 +454,14 @@ async fn handle_kernel_request(
t::KernelCommand::KillProcess(process_id) => { t::KernelCommand::KillProcess(process_id) => {
// brutal and savage killing: aborting the task. // brutal and savage killing: aborting the task.
// do not do this to a process if you don't want to risk // do not do this to a process if you don't want to risk
// dropped messages / un-replied-to-requests // dropped messages / un-replied-to-requests / revoked caps
caps_oracle
.send(t::CapMessage::RevokeAll {
on: process_id.clone(),
responder: tokio::sync::oneshot::channel().0,
})
.await
.expect("event loop: fatal: sender died");
let _ = senders.remove(&process_id); let _ = senders.remove(&process_id);
let process_handle = match process_handles.remove(&process_id) { let process_handle = match process_handles.remove(&process_id) {
Some(ph) => ph, Some(ph) => ph,
@ -622,6 +660,7 @@ pub async fn kernel(
our: t::Identity, our: t::Identity,
keypair: Arc<signature::Ed25519KeyPair>, keypair: Arc<signature::Ed25519KeyPair>,
mut process_map: t::ProcessMap, mut process_map: t::ProcessMap,
mut reverse_cap_index: t::ReverseCapIndex,
caps_oracle_sender: t::CapMessageSender, caps_oracle_sender: t::CapMessageSender,
mut caps_oracle_receiver: t::CapMessageReceiver, mut caps_oracle_receiver: t::CapMessageReceiver,
send_to_loop: t::MessageSender, send_to_loop: t::MessageSender,
@ -1004,6 +1043,7 @@ pub async fn kernel(
&mut senders, &mut senders,
&mut process_handles, &mut process_handles,
&mut process_map, &mut process_map,
&mut reverse_cap_index,
caps_oracle_sender.clone(), caps_oracle_sender.clone(),
&engine, &engine,
).await; ).await;
@ -1060,17 +1100,28 @@ pub async fn kernel(
cap.clone(), cap.clone(),
keypair.sign(&rmp_serde::to_vec(&cap).unwrap()).as_ref().to_vec() keypair.sign(&rmp_serde::to_vec(&cap).unwrap()).as_ref().to_vec()
)).collect(); )).collect();
entry.capabilities.extend(signed_caps); entry.capabilities.extend(signed_caps.clone());
// now we have to insert all caps into the reverse cap index
for (cap, _) in &signed_caps {
reverse_cap_index
.entry(cap.clone().issuer.process)
.or_insert_with(HashMap::new)
.entry(on.clone())
.or_insert_with(Vec::new)
.push(cap.clone());
}
let _ = persist_state(&our.name, &send_to_loop, &process_map).await; let _ = persist_state(&our.name, &send_to_loop, &process_map).await;
let _ = responder.send(true); let _ = responder.send(true);
}, },
t::CapMessage::_Drop { on, cap, responder } => { t::CapMessage::Drop { on, caps, responder } => {
// remove cap from process map // remove cap from process map
let Some(entry) = process_map.get_mut(&on) else { let Some(entry) = process_map.get_mut(&on) else {
let _ = responder.send(false); let _ = responder.send(false);
continue; continue;
}; };
for cap in &caps {
entry.capabilities.remove(&cap); entry.capabilities.remove(&cap);
}
let _ = persist_state(&our.name, &send_to_loop, &process_map).await; let _ = persist_state(&our.name, &send_to_loop, &process_map).await;
let _ = responder.send(true); let _ = responder.send(true);
}, },
@ -1092,6 +1143,22 @@ pub async fn kernel(
} }
); );
}, },
t::CapMessage::RevokeAll { on, responder } => {
let Some(granter) = reverse_cap_index.get(&on) else {
let _ = responder.send(true);
continue;
};
for (grantee, caps) in granter {
let Some(entry) = process_map.get_mut(&grantee) else {
continue;
};
for cap in caps {
entry.capabilities.remove(&cap);
}
}
let _ = persist_state(&our.name, &send_to_loop, &process_map).await;
let _ = responder.send(true);
}
t::CapMessage::FilterCaps { on, caps, responder } => { t::CapMessage::FilterCaps { on, caps, responder } => {
let _ = responder.send( let _ = responder.send(
match process_map.get(&on) { match process_map.get(&on) {

View File

@ -597,7 +597,9 @@ pub async fn make_process_loop(
}) })
.collect(); .collect();
// send message to tell main kernel loop to remove handler // fulfill the designated OnExit behavior
match metadata.on_exit {
t::OnExit::None => {
send_to_loop send_to_loop
.send(t::KernelMessage { .send(t::KernelMessage {
id: rand::random(), id: rand::random(),
@ -617,10 +619,6 @@ pub async fn make_process_loop(
lazy_load_blob: None, lazy_load_blob: None,
}) })
.await?; .await?;
// fulfill the designated OnExit behavior
match metadata.on_exit {
t::OnExit::None => {
let _ = send_to_terminal let _ = send_to_terminal
.send(t::Printout { .send(t::Printout {
verbosity: 1, verbosity: 1,
@ -630,6 +628,25 @@ pub async fn make_process_loop(
} }
// if restart, tell ourselves to init the app again, with same capabilities // if restart, tell ourselves to init the app again, with same capabilities
t::OnExit::Restart => { t::OnExit::Restart => {
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: our_kernel.clone(),
target: our_kernel.clone(),
rsvp: None,
message: t::Message::Request(t::Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&t::KernelCommand::KillProcess(
metadata.our.process.clone(),
))
.unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await?;
if is_error { if is_error {
let _ = send_to_terminal let _ = send_to_terminal
.send(t::Printout { .send(t::Printout {
@ -712,18 +729,6 @@ pub async fn make_process_loop(
.await?; .await?;
for (address, mut request, blob) in requests { for (address, mut request, blob) in requests {
request.expects_response = None; request.expects_response = None;
let (tx, rx) = tokio::sync::oneshot::channel();
caps_oracle
.send(t::CapMessage::Has {
on: metadata.our.process.clone(),
cap: t::Capability {
issuer: address.clone(),
params: "\"messaging\"".into(),
},
responder: tx,
})
.await?;
if let Ok(true) = rx.await {
send_to_loop send_to_loop
.send(t::KernelMessage { .send(t::KernelMessage {
id: rand::random(), id: rand::random(),
@ -735,7 +740,25 @@ pub async fn make_process_loop(
}) })
.await?; .await?;
} }
} send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: our_kernel.clone(),
target: our_kernel.clone(),
rsvp: None,
message: t::Message::Request(t::Request {
inherit: false,
expects_response: None,
body: serde_json::to_vec(&t::KernelCommand::KillProcess(
metadata.our.process.clone(),
))
.unwrap(),
metadata: None,
capabilities: vec![],
}),
lazy_load_blob: None,
})
.await?;
} }
} }
Ok(()) Ok(())

View File

@ -398,6 +398,25 @@ impl StandardHost for process::ProcessWasi {
Ok(()) Ok(())
} }
// TODO 0.6.0
// async fn drop_capabilities(&mut self, caps: Vec<wit::Capability>) -> Result<()> {
// let (tx, rx) = tokio::sync::oneshot::channel();
// let _ = self
// .process
// .caps_oracle
// .send(t::CapMessage::Drop {
// on: self.process.metadata.our.process.clone(),
// caps: caps
// .iter()
// .map(|cap| t::de_wit_capability(cap.clone()).0)
// .collect(),
// responder: tx,
// })
// .await?;
// let _ = rx.await?;
// Ok(())
// }
async fn our_capabilities(&mut self) -> Result<Vec<wit::Capability>> { async fn our_capabilities(&mut self) -> Result<Vec<wit::Capability>> {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self let _ = self

View File

@ -431,7 +431,7 @@ async fn main() {
*/ */
let networking_keypair_arc = Arc::new(decoded_keyfile.networking_keypair); let networking_keypair_arc = Arc::new(decoded_keyfile.networking_keypair);
let (kernel_process_map, db) = state::load_state( let (kernel_process_map, db, reverse_cap_index) = state::load_state(
our.name.clone(), our.name.clone(),
networking_keypair_arc.clone(), networking_keypair_arc.clone(),
home_directory_path.clone(), home_directory_path.clone(),
@ -445,6 +445,7 @@ async fn main() {
our.clone(), our.clone(),
networking_keypair_arc.clone(), networking_keypair_arc.clone(),
kernel_process_map.clone(), kernel_process_map.clone(),
reverse_cap_index,
caps_oracle_sender.clone(), caps_oracle_sender.clone(),
caps_oracle_receiver, caps_oracle_receiver,
kernel_message_sender.clone(), kernel_message_sender.clone(),

View File

@ -19,7 +19,7 @@ pub async fn load_state(
keypair: Arc<signature::Ed25519KeyPair>, keypair: Arc<signature::Ed25519KeyPair>,
home_directory_path: String, home_directory_path: String,
runtime_extensions: Vec<(ProcessId, MessageSender, bool)>, runtime_extensions: Vec<(ProcessId, MessageSender, bool)>,
) -> Result<(ProcessMap, DB), StateError> { ) -> Result<(ProcessMap, DB, ReverseCapIndex), StateError> {
let state_path = format!("{}/kernel", &home_directory_path); let state_path = format!("{}/kernel", &home_directory_path);
if let Err(e) = fs::create_dir_all(&state_path).await { if let Err(e) = fs::create_dir_all(&state_path).await {
@ -37,6 +37,7 @@ pub async fn load_state(
// let cf_descriptor = ColumnFamilyDescriptor::new(cf_name, Options::default()); // let cf_descriptor = ColumnFamilyDescriptor::new(cf_name, Options::default());
let db = DB::open_default(state_path).unwrap(); let db = DB::open_default(state_path).unwrap();
let mut process_map: ProcessMap = HashMap::new(); let mut process_map: ProcessMap = HashMap::new();
let mut reverse_cap_index: ReverseCapIndex = HashMap::new();
let kernel_id = process_to_vec(KERNEL_PROCESS_ID.clone()); let kernel_id = process_to_vec(KERNEL_PROCESS_ID.clone());
match db.get(&kernel_id) { match db.get(&kernel_id) {
@ -73,11 +74,12 @@ pub async fn load_state(
home_directory_path.clone(), home_directory_path.clone(),
runtime_extensions.clone(), runtime_extensions.clone(),
&mut process_map, &mut process_map,
&mut reverse_cap_index,
) )
.await .await
.unwrap(); .unwrap();
Ok((process_map, db)) Ok((process_map, db, reverse_cap_index))
} }
pub async fn state_sender( pub async fn state_sender(
@ -307,6 +309,7 @@ async fn bootstrap(
home_directory_path: String, home_directory_path: String,
runtime_extensions: Vec<(ProcessId, MessageSender, bool)>, runtime_extensions: Vec<(ProcessId, MessageSender, bool)>,
process_map: &mut ProcessMap, process_map: &mut ProcessMap,
reverse_cap_index: &mut ReverseCapIndex,
) -> Result<()> { ) -> Result<()> {
// println!("bootstrapping node...\r"); // println!("bootstrapping node...\r");
@ -699,7 +702,13 @@ async fn bootstrap(
}; };
process process
.capabilities .capabilities
.insert(cap.clone(), sign_cap(cap, keypair.clone())); .insert(cap.clone(), sign_cap(cap.clone(), keypair.clone()));
reverse_cap_index
.entry(cap.clone().issuer.process)
.or_insert_with(HashMap::new)
.entry(our_process_id.parse().unwrap())
.or_insert_with(Vec::new)
.push(cap);
} }
} }
} }
@ -717,9 +726,16 @@ async fn bootstrap(
}, },
params: params.to_string(), params: params.to_string(),
}; };
process process.capabilities.insert(
.capabilities cap.clone(),
.insert(cap.clone(), sign_cap(cap, keypair.clone())); sign_cap(cap.clone(), keypair.clone()),
);
reverse_cap_index
.entry(cap.clone().issuer.process)
.or_insert_with(HashMap::new)
.entry(our_process_id.parse().unwrap())
.or_insert_with(Vec::new)
.push(cap);
} }
} }
} }

View File

@ -928,6 +928,11 @@ pub enum KernelCommand {
target: ProcessId, target: ProcessId,
capabilities: Vec<Capability>, capabilities: Vec<Capability>,
}, },
/// Drop capabilities. Does nothing if process doesn't have these caps
DropCapabilities {
target: ProcessId,
capabilities: Vec<Capability>,
},
/// Tell the kernel to run a process that has already been installed. /// Tell the kernel to run a process that has already been installed.
/// TODO: in the future, this command could be extended to allow for /// TODO: in the future, this command could be extended to allow for
/// resource provision. /// resource provision.
@ -966,10 +971,10 @@ pub enum CapMessage {
caps: Vec<Capability>, caps: Vec<Capability>,
responder: tokio::sync::oneshot::Sender<bool>, responder: tokio::sync::oneshot::Sender<bool>,
}, },
_Drop { /// root delete: uncritically remove all `caps` from `on`
// not used yet! Drop {
on: ProcessId, on: ProcessId,
cap: Capability, caps: Vec<Capability>,
responder: tokio::sync::oneshot::Sender<bool>, responder: tokio::sync::oneshot::Sender<bool>,
}, },
/// does `on` have `cap` in its store? /// does `on` have `cap` in its store?
@ -984,6 +989,11 @@ pub enum CapMessage {
on: ProcessId, on: ProcessId,
responder: tokio::sync::oneshot::Sender<Vec<(Capability, Vec<u8>)>>, responder: tokio::sync::oneshot::Sender<Vec<(Capability, Vec<u8>)>>,
}, },
/// Remove all caps issued by `on` from every process on the entire system
RevokeAll {
on: ProcessId,
responder: tokio::sync::oneshot::Sender<bool>,
},
/// before `on` sends a message, filter out any bogus caps it may have attached, sign any new /// before `on` sends a message, filter out any bogus caps it may have attached, sign any new
/// caps it may have created, and retreive the signature for the caps in its store. /// caps it may have created, and retreive the signature for the caps in its store.
FilterCaps { FilterCaps {
@ -993,6 +1003,8 @@ pub enum CapMessage {
}, },
} }
pub type ReverseCapIndex = HashMap<ProcessId, HashMap<ProcessId, Vec<Capability>>>;
pub type ProcessMap = HashMap<ProcessId, PersistedProcess>; pub type ProcessMap = HashMap<ProcessId, PersistedProcess>;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]