0.3.0: fix kernel API to separate process init and process run

This commit is contained in:
dr-frmr 2023-11-28 17:33:49 -05:00
parent 9f554e55e9
commit 45605958f3
No known key found for this signature in database
10 changed files with 1140 additions and 918 deletions

2
Cargo.lock generated
View File

@ -5093,7 +5093,7 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]] [[package]]
name = "uqbar" name = "uqbar"
version = "0.1.0" version = "0.3.0"
dependencies = [ dependencies = [
"aes-gcm 0.10.2", "aes-gcm 0.10.2",
"anyhow", "anyhow",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "uqbar" name = "uqbar"
version = "0.1.0" version = "0.3.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -10,7 +10,7 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]] [[package]]
name = "app_store" name = "app_store"
version = "0.1.0" version = "0.2.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bincode", "bincode",
@ -46,6 +46,12 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "bytes"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -87,6 +93,21 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
dependencies = [
"percent-encoding",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.7" version = "0.14.7"
@ -123,12 +144,33 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "http"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]] [[package]]
name = "id-arena" name = "id-arena"
version = "2.2.1" version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005"
[[package]]
name = "idna"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.1.0" version = "2.1.0"
@ -164,6 +206,12 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "percent-encoding"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.17" version = "0.2.17"
@ -298,18 +346,68 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "thiserror"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.50"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "typenum" name = "typenum"
version = "1.17.0" version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unicode-bidi"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.12" version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
"tinyvec",
]
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.10.1" version = "1.10.1"
@ -324,16 +422,31 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]] [[package]]
name = "uqbar_process_lib" name = "uqbar_process_lib"
version = "0.2.0" version = "0.3.0"
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=e53c124#e53c124ec95ef99c06d201d4d08dada8ec691d29" source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=955badd#955badd96647c215d6de956fbeedd8a92ee2f343"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bincode", "bincode",
"http",
"rand", "rand",
"serde", "serde",
"serde_json",
"thiserror",
"url",
"wit-bindgen", "wit-bindgen",
] ]
[[package]]
name = "url"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]] [[package]]
name = "version_check" name = "version_check"
version = "0.9.4" version = "0.9.4"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "app_store" name = "app_store"
version = "0.1.0" version = "0.2.0"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -17,7 +17,7 @@ rand = "0.8"
serde = {version = "1.0", features = ["derive"] } serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = "0.10.8" sha2 = "0.10.8"
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "e53c124" } uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "955badd" }
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "5390bab780733f1660d14c254ec985df2816bf1d" } wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "5390bab780733f1660d14c254ec985df2816bf1d" }
[lib] [lib]

View File

@ -5,7 +5,7 @@ use uqbar_process_lib::kernel_types as kt;
use uqbar_process_lib::uqbar::process::standard as wit; use uqbar_process_lib::uqbar::process::standard as wit;
use uqbar_process_lib::{ use uqbar_process_lib::{
get_capability, get_payload, get_typed_state, grant_messaging, println, receive, set_state, get_capability, get_payload, get_typed_state, grant_messaging, println, receive, set_state,
Address, Message, NodeId, PackageId, ProcessId, Request, Response, share_capability, Address, Message, NodeId, PackageId, ProcessId, Request, Response,
}; };
wit_bindgen::generate!({ wit_bindgen::generate!({
@ -165,11 +165,11 @@ impl Guest for Component {
// so that they can send us requests. // so that they can send us requests.
grant_messaging( grant_messaging(
&our, &our,
&Vec::from([ vec![
ProcessId::from_str("http_server:sys:uqbar").unwrap(), ProcessId::new(Some("http_server"), "sys", "uqbar"),
ProcessId::from_str("terminal:terminal:uqbar").unwrap(), ProcessId::new(Some("terminal"), "terminal", "uqbar"),
ProcessId::from_str("vfs:sys:uqbar").unwrap(), ProcessId::new(Some("vfs"), "sys", "uqbar"),
]), ],
); );
println!("{}: start", our.process); println!("{}: start", our.process);
@ -212,9 +212,7 @@ fn handle_message(
Ok(None) => return Ok(()), Ok(None) => return Ok(()),
Ok(Some(resp)) => { Ok(Some(resp)) => {
if req.expects_response.is_some() { if req.expects_response.is_some() {
Response::new() Response::new().ipc(serde_json::to_vec(&resp)?).send()?;
.ipc_bytes(serde_json::to_vec(&resp)?)
.send()?;
} }
} }
Err(err) => { Err(err) => {
@ -227,9 +225,7 @@ fn handle_message(
Ok(None) => return Ok(()), Ok(None) => return Ok(()),
Ok(Some(resp)) => { Ok(Some(resp)) => {
if req.expects_response.is_some() { if req.expects_response.is_some() {
Response::new() Response::new().ipc(serde_json::to_vec(&resp)?).send()?;
.ipc_bytes(serde_json::to_vec(&resp)?)
.send()?;
} }
} }
Err(err) => { Err(err) => {
@ -251,9 +247,9 @@ fn handle_message(
if state.requested_packages.remove(&package_id) { if state.requested_packages.remove(&package_id) {
// auto-take zip from payload and request ourself with New // auto-take zip from payload and request ourself with New
Request::new() Request::new()
.target(our.clone())? .target(our.clone())
.inherit(true) .inherit(true)
.ipc_bytes(serde_json::to_vec(&Req::LocalRequest( .ipc(serde_json::to_vec(&Req::LocalRequest(
LocalRequest::NewPackage { LocalRequest::NewPackage {
package: package_id, package: package_id,
mirror: true, mirror: true,
@ -322,18 +318,13 @@ fn handle_local_request(
} }
match request { match request {
LocalRequest::NewPackage { package, mirror } => { LocalRequest::NewPackage { package, mirror } => {
let vfs_address = Address {
node: our.node.clone(),
process: ProcessId::from_str("vfs:sys:uqbar")?,
};
let Some(mut payload) = get_payload() else { let Some(mut payload) = get_payload() else {
return Err(anyhow::anyhow!("no payload")); return Err(anyhow::anyhow!("no payload"));
}; };
Request::new() Request::new()
.target(vfs_address.clone())? .target(Address::from_str("our@vfs:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::New, action: kt::VfsAction::New,
})?) })?)
@ -347,8 +338,8 @@ fn handle_local_request(
// add zip bytes // add zip bytes
payload.mime = Some("application/zip".to_string()); payload.mime = Some("application/zip".to_string());
Request::new() Request::new()
.target(vfs_address.clone())? .target(Address::from_str("our@vfs:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::Add { action: kt::VfsAction::Add {
full_path: package.to_string(), full_path: package.to_string(),
@ -361,9 +352,9 @@ fn handle_local_request(
// save the zip file itself in VFS for sharing with other nodes // save the zip file itself in VFS for sharing with other nodes
// call it <package>.zip // call it <package>.zip
Request::new() Request::new()
.target(vfs_address.clone())? .target(Address::from_str("our@vfs:sys:uqbar")?)
.inherit(true) .inherit(true)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::Add { action: kt::VfsAction::Add {
full_path: format!("/{}.zip", package.to_string()), full_path: format!("/{}.zip", package.to_string()),
@ -373,8 +364,8 @@ fn handle_local_request(
.payload(payload) .payload(payload)
.send_and_await_response(5)??; .send_and_await_response(5)??;
Request::new() Request::new()
.target(vfs_address.clone())? .target(Address::from_str("our@vfs:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::GetEntry("/metadata.json".into()), action: kt::VfsAction::GetEntry("/metadata.json".into()),
})?) })?)
@ -408,9 +399,9 @@ fn handle_local_request(
install_from, install_from,
} => Ok(Some(Resp::DownloadResponse( } => Ok(Some(Resp::DownloadResponse(
match Request::new() match Request::new()
.target(Address::new(&install_from, our.process.clone())?)? .target(Address::new(install_from, our.process.clone()))
.inherit(true) .inherit(true)
.ipc_bytes(serde_json::to_vec(&RemoteRequest::Download( .ipc(serde_json::to_vec(&RemoteRequest::Download(
package.clone(), package.clone(),
))?) ))?)
.send_and_await_response(5) .send_and_await_response(5)
@ -430,13 +421,9 @@ fn handle_local_request(
}, },
))), ))),
LocalRequest::Install(package) => { LocalRequest::Install(package) => {
let vfs_address = Address {
node: our.node.clone(),
process: ProcessId::from_str("vfs:sys:uqbar")?,
};
Request::new() Request::new()
.target(Address::new(&our.node, "vfs:sys:uqbar")?)? .target(Address::from_str("our@vfs:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::GetEntry("/manifest.json".into()), action: kt::VfsAction::GetEntry("/manifest.json".into()),
})?) })?)
@ -446,16 +433,43 @@ fn handle_local_request(
}; };
let manifest = String::from_utf8(payload.bytes)?; let manifest = String::from_utf8(payload.bytes)?;
let manifest = serde_json::from_str::<Vec<kt::PackageManifestEntry>>(&manifest)?; let manifest = serde_json::from_str::<Vec<kt::PackageManifestEntry>>(&manifest)?;
for entry in manifest { // always grant read/write to their drive, which we created for them
let Some(read_cap) = get_capability(
&Address::new(&our.node, ("vfs", "sys", "uqbar")),
&serde_json::to_string(&serde_json::json!({
"kind": "read",
"drive": package.to_string(),
}))?,
) else {
return Err(anyhow::anyhow!("app-store: no read cap"));
};
let Some(write_cap) = get_capability(
&Address::new(&our.node, ("vfs", "sys", "uqbar")),
&serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": package.to_string(),
}))?,
) else {
return Err(anyhow::anyhow!("app-store: no write cap"));
};
let Some(networking_cap) = get_capability(
&Address::new(&our.node, ("kernel", "sys", "uqbar")),
&"\"network\"".to_string(),
) else {
return Err(anyhow::anyhow!("app-store: no net cap"));
};
// first, for each process in manifest, initialize it
// then, once all have been initialized, grant them requested caps
// and finally start them.
for entry in &manifest {
let path = if entry.process_wasm_path.starts_with("/") { let path = if entry.process_wasm_path.starts_with("/") {
entry.process_wasm_path entry.process_wasm_path.clone()
} else { } else {
format!("/{}", entry.process_wasm_path) format!("/{}", entry.process_wasm_path)
}; };
let (_, hash_response) = Request::new() let (_, hash_response) = Request::new()
.target(Address::new(&our.node, "vfs:sys:uqbar")?)? .target(Address::from_str("our@vfs:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::GetHash(path.clone()), action: kt::VfsAction::GetHash(path.clone()),
})?) })?)
@ -467,95 +481,93 @@ fn handle_local_request(
let kt::VfsResponse::GetHash(Some(hash)) = serde_json::from_slice(&ipc)? else { let kt::VfsResponse::GetHash(Some(hash)) = serde_json::from_slice(&ipc)? else {
return Err(anyhow::anyhow!("no hash in vfs")); return Err(anyhow::anyhow!("no hash in vfs"));
}; };
// build initial caps // build initial caps
let mut initial_capabilities: HashSet<kt::SignedCapability> = HashSet::new(); let mut initial_capabilities: HashSet<kt::SignedCapability> = HashSet::new();
if entry.request_networking { if entry.request_networking {
let Some(networking_cap) = get_capability( initial_capabilities.insert(kt::de_wit_signed_capability(networking_cap.clone()));
&Address {
node: our.node.clone(),
process: ProcessId::from_str("kernel:sys:uqbar")?,
},
&"\"network\"".to_string(),
) else {
return Err(anyhow::anyhow!("app-store: no net cap"));
};
initial_capabilities.insert(kt::de_wit_signed_capability(networking_cap));
} }
let Some(read_cap) = get_capability( initial_capabilities.insert(kt::de_wit_signed_capability(read_cap.clone()));
&vfs_address.clone(), initial_capabilities.insert(kt::de_wit_signed_capability(write_cap.clone()));
&serde_json::to_string(&serde_json::json!({
"kind": "read",
"drive": package.to_string(),
}))?,
) else {
return Err(anyhow::anyhow!("app-store: no read cap"));
};
initial_capabilities.insert(kt::de_wit_signed_capability(read_cap));
let Some(write_cap) = get_capability(
&vfs_address.clone(),
&serde_json::to_string(&serde_json::json!({
"kind": "write",
"drive": package.to_string(),
}))?,
) else {
return Err(anyhow::anyhow!("app-store: no write cap"));
};
initial_capabilities.insert(kt::de_wit_signed_capability(write_cap));
for process_name in &entry.request_messaging {
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
// TODO handle arbitrary caps here
continue;
};
let Some(messaging_cap) = get_capability(
&Address {
node: our.node.clone(),
process: parsed_process_id.clone(),
},
&"\"messaging\"".into(),
) else {
println!("app-store: no cap for {} to give away!", process_name);
continue;
};
initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
}
let process_id = format!("{}:{}", entry.process_name, package.to_string()); let process_id = format!("{}:{}", entry.process_name, package.to_string());
let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else { let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else {
return Err(anyhow::anyhow!("app-store: invalid process id!")); return Err(anyhow::anyhow!("app-store: invalid process id!"));
}; };
// kill process if it already exists
Request::new() Request::new()
.target(Address::new(&our.node, "kernel:sys:uqbar")?)? .target(Address::from_str("our@kernel:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::KernelCommand::KillProcess( .ipc(serde_json::to_vec(&kt::KernelCommand::KillProcess(
kt::ProcessId::de_wit(parsed_new_process_id.clone()), parsed_new_process_id.clone(),
))?) ))?)
.send()?; .send()?;
// kernel start process takes bytes as payload + wasm_bytes_handle...
// reconsider perhaps
let (_, _bytes_response) = Request::new() let (_, _bytes_response) = Request::new()
.target(Address::new(&our.node, "vfs:sys:uqbar")?)? .target(Address::from_str("our@vfs:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::GetEntry(path), action: kt::VfsAction::GetEntry(path),
})?) })?)
.send_and_await_response(5)??; .send_and_await_response(5)??;
let Some(payload) = get_payload() else {
return Err(anyhow::anyhow!("no wasm bytes payload."));
};
Request::new() Request::new()
.target(Address::new(&our.node, "kernel:sys:uqbar")?)? .target(Address::from_str("our@kernel:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::KernelCommand::StartProcess { .ipc(serde_json::to_vec(&kt::KernelCommand::InitializeProcess {
id: kt::ProcessId::de_wit(parsed_new_process_id), id: parsed_new_process_id,
wasm_bytes_handle: hash, wasm_bytes_handle: hash,
on_panic: entry.on_panic, on_panic: entry.on_panic.clone(),
initial_capabilities, initial_capabilities,
public: entry.public, public: entry.public,
})?) })?)
.payload(payload) .inherit(true)
.send_and_await_response(5)?;
}
for entry in &manifest {
let process_id = ProcessId::new(
Some(&entry.process_name),
package.package(),
package.publisher(),
);
if let Some(to_request) = &entry.request_messaging {
for process_name in to_request {
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
// TODO handle arbitrary caps here
continue;
};
let Some(messaging_cap) = get_capability(
&Address {
node: our.node.clone(),
process: parsed_process_id.clone(),
},
&"\"messaging\"".into(),
) else {
println!("app-store: no cap for {} to give away!", process_name);
continue;
};
share_capability(&process_id, &messaging_cap);
}
}
if let Some(to_grant) = &entry.grant_messaging {
let Some(messaging_cap) = get_capability(
&Address {
node: our.node.clone(),
process: process_id.clone(),
},
&"\"messaging\"".into(),
) else {
println!("app-store: no cap for {} to give away!", process_id);
continue;
};
for process_name in to_grant {
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
// TODO handle arbitrary caps here
continue;
};
share_capability(&parsed_process_id, &messaging_cap);
}
}
Request::new()
.target(Address::from_str("our@kernel:sys:uqbar")?)
.ipc(serde_json::to_vec(&kt::KernelCommand::RunProcess(
process_id,
))?)
.send_and_await_response(5)?; .send_and_await_response(5)?;
} }
Ok(Some(Resp::InstallResponse(InstallResponse::Success))) Ok(Some(Resp::InstallResponse(InstallResponse::Success)))
@ -588,8 +600,8 @@ fn handle_remote_request(
// get the .zip from VFS and attach as payload to response // get the .zip from VFS and attach as payload to response
let file_name = format!("/{}.zip", package.to_string()); let file_name = format!("/{}.zip", package.to_string());
Request::new() Request::new()
.target(Address::new(&our.node, "vfs:sys:uqbar")?)? .target(Address::from_str("our@vfs:sys:uqbar")?)
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest { .ipc(serde_json::to_vec(&kt::VfsRequest {
drive: package.to_string(), drive: package.to_string(),
action: kt::VfsAction::GetEntry(file_name.clone()), action: kt::VfsAction::GetEntry(file_name.clone()),
})?) })?)

View File

@ -313,15 +313,18 @@ async fn bootstrap(
"{}:{}:{}", "{}:{}:{}",
entry.process_name, package_name, package_publisher entry.process_name, package_name, package_publisher
); );
entry.request_messaging.push(our_process_id.clone()); entry.request_messaging = Some(entry.request_messaging.unwrap_or_default());
for process_name in &entry.request_messaging { if let Some(ref mut request_messaging) = entry.request_messaging {
requested_caps.insert(Capability { request_messaging.push(our_process_id.clone());
issuer: Address { for process_name in request_messaging {
node: our_name.to_string(), requested_caps.insert(Capability {
process: ProcessId::from_str(process_name).unwrap(), issuer: Address {
}, node: our_name.to_string(),
params: "\"messaging\"".into(), process: ProcessId::from_str(process_name).unwrap(),
}); },
params: "\"messaging\"".into(),
});
}
} }
if entry.request_networking { if entry.request_networking {

File diff suppressed because it is too large Load Diff

View File

@ -4,12 +4,7 @@ use crate::KERNEL_PROCESS_ID;
use anyhow::Result; use anyhow::Result;
use ring::signature; use ring::signature;
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::future::Future; use std::sync::Arc;
use std::pin::Pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
pub use uqbar::process::standard as wit; pub use uqbar::process::standard as wit;
pub use uqbar::process::standard::Host as StandardHost; pub use uqbar::process::standard::Host as StandardHost;
@ -377,52 +372,56 @@ impl ProcessState {
/// create a specific process, and generate a task that will run it. /// create a specific process, and generate a task that will run it.
pub async fn make_process_loop( pub async fn make_process_loop(
booted: Arc<AtomicBool>,
keypair: Arc<signature::Ed25519KeyPair>, keypair: Arc<signature::Ed25519KeyPair>,
metadata: t::ProcessMetadata, metadata: t::ProcessMetadata,
send_to_loop: t::MessageSender, send_to_loop: t::MessageSender,
send_to_terminal: t::PrintSender, send_to_terminal: t::PrintSender,
mut recv_in_process: ProcessMessageReceiver, mut recv_in_process: ProcessMessageReceiver,
send_to_process: ProcessMessageSender, send_to_process: ProcessMessageSender,
wasm_bytes: &Vec<u8>, wasm_bytes: Vec<u8>,
caps_oracle: t::CapMessageSender, caps_oracle: t::CapMessageSender,
engine: &Engine, engine: Engine,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> { ) -> Result<()> {
// before process can be instantiated, need to await booted message from kernel // before process can be instantiated, need to await 'run' message from kernel
if !booted.load(Ordering::Relaxed) { let mut pre_boot_queue = Vec::<Result<t::KernelMessage, t::WrappedSendError>>::new();
let mut pre_boot_queue = Vec::<Result<t::KernelMessage, t::WrappedSendError>>::new(); while let Some(message) = recv_in_process.recv().await {
while let Some(message) = recv_in_process.recv().await { match message {
match message { Err(_) => {
Err(_) => { pre_boot_queue.push(message);
pre_boot_queue.push(message); continue;
continue; }
} Ok(message) => {
Ok(message) => { if (message.source
if (message.source == t::Address {
== t::Address { node: metadata.our.node.clone(),
node: metadata.our.node.clone(), process: KERNEL_PROCESS_ID.clone(),
process: KERNEL_PROCESS_ID.clone(), })
}) && (message.message
&& (message.message == t::Message::Request(t::Request {
== t::Message::Request(t::Request { inherit: false,
inherit: false, expects_response: None,
expects_response: None, ipc: b"run".to_vec(),
ipc: "booted".as_bytes().to_vec(), metadata: None,
metadata: None, }))
})) {
{ break;
break;
}
pre_boot_queue.push(Ok(message));
} }
pre_boot_queue.push(Ok(message));
} }
} }
} }
// now that we've received the run message, we can send the pre-boot queue
for message in pre_boot_queue {
send_to_process
.send(message)
.await
.expect("make_process_loop: couldn't send message to process");
}
let component = let component =
Component::new(engine, wasm_bytes).expect("make_process_loop: couldn't read file"); Component::new(&engine, wasm_bytes).expect("make_process_loop: couldn't read file");
let mut linker = Linker::new(engine); let mut linker = Linker::new(&engine);
Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap(); Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap();
let table = Table::new(); let table = Table::new();
@ -431,7 +430,7 @@ pub async fn make_process_loop(
wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap(); wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap();
let mut store = Store::new( let mut store = Store::new(
engine, &engine,
ProcessWasi { ProcessWasi {
process: ProcessState { process: ProcessState {
keypair: keypair.clone(), keypair: keypair.clone(),
@ -452,158 +451,158 @@ pub async fn make_process_loop(
}, },
); );
Box::pin(async move { let (bindings, _bindings) =
let (bindings, _bindings) = match Process::instantiate_async(&mut store, &component, &linker).await {
match Process::instantiate_async(&mut store, &component, &linker).await { Ok(b) => b,
Ok(b) => b,
Err(e) => {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!(
"mk: process {:?} failed to instantiate: {:?}",
metadata.our.process, e,
),
})
.await;
return Err(e);
}
};
// the process will run until it returns from init()
let is_error = match bindings
.call_init(&mut store, &metadata.our.to_string())
.await
{
Ok(()) => {
let _ =
send_to_terminal
.send(t::Printout {
verbosity: 1,
content: format!(
"process {} returned without error",
metadata.our.process,
),
})
.await;
false
}
Err(e) => { Err(e) => {
let _ = send_to_terminal let _ = send_to_terminal
.send(t::Printout { .send(t::Printout {
verbosity: 0, verbosity: 0,
content: format!("process {:?} ended with error:", metadata.our.process,), content: format!(
"mk: process {:?} failed to instantiate: {:?}",
metadata.our.process, e,
),
}) })
.await; .await;
for line in format!("{:?}", e).lines() { return Err(e);
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: line.into(),
})
.await;
}
true
} }
}; };
// the process has completed, perform cleanup // the process will run until it returns from init()
let our_kernel = t::Address { let is_error = match bindings
node: metadata.our.node.clone(), .call_init(&mut store, &metadata.our.to_string())
process: KERNEL_PROCESS_ID.clone(), .await
}; {
Ok(()) => {
if is_error { let _ = send_to_terminal
// get caps before killing .send(t::Printout {
let (tx, rx) = tokio::sync::oneshot::channel(); verbosity: 1,
let _ = caps_oracle content: format!("process {} returned without error", metadata.our.process,),
.send(t::CapMessage::GetAll {
on: metadata.our.process.clone(),
responder: tx,
}) })
.await; .await;
let initial_capabilities = rx.await.unwrap().into_iter().collect(); false
}
// always send message to tell main kernel loop to remove handler Err(e) => {
send_to_loop let _ = send_to_terminal
.send(t::KernelMessage { .send(t::Printout {
id: rand::random(), verbosity: 0,
source: our_kernel.clone(), content: format!("process {:?} ended with error:", metadata.our.process,),
target: our_kernel.clone(),
rsvp: None,
message: t::Message::Request(t::Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&t::KernelCommand::KillProcess(
metadata.our.process.clone(),
))
.unwrap(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
}) })
.await .await;
.expect("event loop: fatal: sender died"); for line in format!("{:?}", e).lines() {
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: line.into(),
})
.await;
}
true
}
};
// fulfill the designated OnPanic behavior // the process has completed, perform cleanup
match metadata.on_panic { let our_kernel = t::Address {
t::OnPanic::None => {} node: metadata.our.node.clone(),
// if restart, tell ourselves to init the app again, with same capabilities process: KERNEL_PROCESS_ID.clone(),
t::OnPanic::Restart => { };
send_to_loop
.send(t::KernelMessage { if is_error {
id: rand::random(), // get caps before killing
source: our_kernel.clone(), let (tx, rx) = tokio::sync::oneshot::channel();
target: our_kernel.clone(), let _ = caps_oracle
rsvp: None, .send(t::CapMessage::GetAll {
message: t::Message::Request(t::Request { on: metadata.our.process.clone(),
inherit: false, responder: tx,
expects_response: None, })
ipc: serde_json::to_vec(&t::KernelCommand::RebootProcess { .await;
process_id: metadata.our.process.clone(), let initial_capabilities = rx.await.unwrap().into_iter().collect();
persisted: t::PersistedProcess {
wasm_bytes_handle: metadata.wasm_bytes_handle, // always send message to tell main kernel loop to remove handler
on_panic: metadata.on_panic, send_to_loop
capabilities: initial_capabilities, .send(t::KernelMessage {
public: metadata.public, id: rand::random(),
}, source: our_kernel.clone(),
}) target: our_kernel.clone(),
.unwrap(), rsvp: None,
metadata: None, message: t::Message::Request(t::Request {
}), inherit: false,
payload: None, expects_response: None,
signed_capabilities: None, ipc: serde_json::to_vec(&t::KernelCommand::KillProcess(
metadata.our.process.clone(),
))
.unwrap(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
})
.await
.expect("event loop: fatal: sender died");
// fulfill the designated OnPanic behavior
match metadata.on_panic {
t::OnPanic::None => {}
// if restart, tell ourselves to init the app again, with same capabilities
t::OnPanic::Restart => {
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: our_kernel.clone(),
target: our_kernel.clone(),
rsvp: None,
message: t::Message::Request(t::Request {
inherit: false,
expects_response: None,
ipc: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
id: metadata.our.process.clone(),
wasm_bytes_handle: metadata.wasm_bytes_handle,
on_panic: metadata.on_panic,
initial_capabilities,
public: metadata.public,
})
.unwrap(),
metadata: None,
}),
payload: None,
signed_capabilities: None,
})
.await
.expect("event loop: fatal: sender died");
}
// if requests, fire them
// even in death, a process can only message processes it has capabilities for
t::OnPanic::Requests(requests) => {
for (address, mut request, payload) in requests {
request.expects_response = None;
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = caps_oracle
.send(t::CapMessage::Has {
on: metadata.our.process.clone(),
cap: t::Capability {
issuer: address.clone(),
params: "\"messaging\"".into(),
},
responder: tx,
}) })
.await .await;
.expect("event loop: fatal: sender died"); if let Ok(true) = rx.await {
} send_to_loop
// if requests, fire them .send(t::KernelMessage {
// even in death, a process can only message processes it has capabilities for id: rand::random(),
t::OnPanic::Requests(requests) => { source: metadata.our.clone(),
for (address, mut request, payload) in requests { target: address,
request.expects_response = None; rsvp: None,
if initial_capabilities.contains(&t::Capability { message: t::Message::Request(request),
issuer: address.clone(), payload,
params: "\"messaging\"".into(), signed_capabilities: None,
}) { })
send_to_loop .await
.send(t::KernelMessage { .expect("event loop: fatal: sender died");
id: rand::random(),
source: metadata.our.clone(),
target: address,
rsvp: None,
message: t::Message::Request(request),
payload,
signed_capabilities: None,
})
.await
.expect("event loop: fatal: sender died");
}
} }
} }
} }
} }
Ok(()) }
}) Ok(())
} }

View File

@ -256,7 +256,7 @@ impl StandardHost for process::ProcessWasi {
self.process.metadata.our.process.package(), self.process.metadata.our.process.package(),
self.process.metadata.our.process.publisher(), self.process.metadata.our.process.publisher(),
); );
let Ok(Ok((_, response))) = process::send_and_await_response( let Ok(Ok((_, _response))) = process::send_and_await_response(
self, self,
Some(t::Address { Some(t::Address {
node: self.process.metadata.our.node.clone(), node: self.process.metadata.our.node.clone(),
@ -269,11 +269,10 @@ impl StandardHost for process::ProcessWasi {
wit::Request { wit::Request {
inherit: false, inherit: false,
expects_response: Some(5), // TODO evaluate expects_response: Some(5), // TODO evaluate
ipc: serde_json::to_vec(&t::KernelCommand::StartProcess { ipc: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
id: new_process_id.clone(), id: new_process_id.clone(),
wasm_bytes_handle: hash, wasm_bytes_handle: hash,
on_panic: t::de_wit_on_panic(on_panic), on_panic: t::de_wit_on_panic(on_panic),
// TODO
initial_capabilities: match capabilities { initial_capabilities: match capabilities {
wit::Capabilities::None => HashSet::new(), wit::Capabilities::None => HashSet::new(),
wit::Capabilities::All => { wit::Capabilities::All => {
@ -286,20 +285,7 @@ impl StandardHost for process::ProcessWasi {
responder: tx, responder: tx,
}) })
.await; .await;
rx.await rx.await.unwrap()
.unwrap()
.into_iter()
.map(|cap| t::SignedCapability {
issuer: cap.issuer.clone(),
params: cap.params.clone(),
signature: self
.process
.keypair
.sign(&rmp_serde::to_vec(&cap).unwrap())
.as_ref()
.to_vec(),
})
.collect()
} }
wit::Capabilities::Some(caps) => caps wit::Capabilities::Some(caps) => caps
.into_iter() .into_iter()
@ -326,6 +312,32 @@ impl StandardHost for process::ProcessWasi {
self.process.last_payload = old_last_payload; self.process.last_payload = old_last_payload;
return Ok(Err(wit::SpawnError::NameTaken)); return Ok(Err(wit::SpawnError::NameTaken));
}; };
// finally, send the command to run the new process
let Ok(Ok((_, response))) = process::send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.clone(),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: KERNEL_PROCESS_ID.en_wit(),
},
wit::Request {
inherit: false,
expects_response: Some(5), // TODO evaluate
ipc: serde_json::to_vec(&t::KernelCommand::RunProcess(new_process_id.clone()))
.unwrap(),
metadata: None,
},
None,
)
.await
else {
// reset payload to what it was
self.process.last_payload = old_last_payload;
return Ok(Err(wit::SpawnError::NameTaken));
};
// reset payload to what it was // reset payload to what it was
self.process.last_payload = old_last_payload; self.process.last_payload = old_last_payload;
let wit::Message::Response((wit::Response { ipc, .. }, _)) = response else { let wit::Message::Response((wit::Response { ipc, .. }, _)) = response else {
@ -389,14 +401,9 @@ impl StandardHost for process::ProcessWasi {
.unwrap() .unwrap()
.into_iter() .into_iter()
.map(|cap| wit::SignedCapability { .map(|cap| wit::SignedCapability {
issuer: cap.issuer.en_wit().to_owned(), issuer: cap.issuer.en_wit(),
params: cap.params.clone(), params: cap.params,
signature: self signature: cap.signature,
.process
.keypair
.sign(&rmp_serde::to_vec(&cap).unwrap())
.as_ref()
.to_vec(),
}) })
.collect()) .collect())
} }

View File

@ -31,13 +31,32 @@ pub type NodeId = String; // QNS domain name
/// the process name can be a random number, or a name chosen by the user. /// the process name can be a random number, or a name chosen by the user.
/// the formatting is as follows: /// the formatting is as follows:
/// `[process name]:[package name]:[node ID]` /// `[process name]:[package name]:[node ID]`
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] #[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct ProcessId { pub struct ProcessId {
process_name: String, process_name: String,
package_name: String, package_name: String,
publisher_node: NodeId, publisher_node: NodeId,
} }
impl Serialize for ProcessId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
format!("{}", self).serialize(serializer)
}
}
impl<'a> Deserialize<'a> for ProcessId {
fn deserialize<D>(deserializer: D) -> Result<ProcessId, D::Error>
where
D: serde::de::Deserializer<'a>,
{
let s = String::deserialize(deserializer)?;
ProcessId::from_str(&s).map_err(serde::de::Error::custom)
}
}
/// PackageId is like a ProcessId, but for a package. Only contains the name /// PackageId is like a ProcessId, but for a package. Only contains the name
/// of the package and the name of the publisher. /// of the package and the name of the publisher.
#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] #[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
@ -212,7 +231,7 @@ impl std::error::Error for ProcessIdParseError {
} }
} }
#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct Address { pub struct Address {
pub node: NodeId, pub node: NodeId,
pub process: ProcessId, pub process: ProcessId,
@ -228,7 +247,7 @@ impl Address {
process: process.into(), process: process.into(),
} }
} }
pub fn _from_str(input: &str) -> Result<Self, AddressParseError> { pub fn from_str(input: &str) -> Result<Self, AddressParseError> {
// split string on colons into 4 segments, // split string on colons into 4 segments,
// first one with @, next 3 with : // first one with @, next 3 with :
let mut name_rest = input.split('@'); let mut name_rest = input.split('@');
@ -282,6 +301,25 @@ impl Address {
} }
} }
impl Serialize for Address {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
format!("{}", self).serialize(serializer)
}
}
impl<'a> Deserialize<'a> for Address {
fn deserialize<D>(deserializer: D) -> Result<Address, D::Error>
where
D: serde::de::Deserializer<'a>,
{
let s = String::deserialize(deserializer)?;
Address::from_str(&s).map_err(serde::de::Error::custom)
}
}
impl From<(&str, &str, &str, &str)> for Address { impl From<(&str, &str, &str, &str)> for Address {
fn from(input: (&str, &str, &str, &str)) -> Self { fn from(input: (&str, &str, &str, &str)) -> Self {
Address::new(input.0, (input.1, input.2, input.3)) Address::new(input.0, (input.1, input.2, input.3))
@ -716,25 +754,46 @@ pub enum DebugCommand {
Step, Step,
} }
/// IPC format for requests sent to kernel runtime module
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum KernelCommand { pub enum KernelCommand {
/// RUNTIME ONLY: used to notify the kernel that booting is complete and
/// all processes have been loaded in from their persisted or bootstrapped state.
Booted, Booted,
StartProcess { /// Tell the kernel to install and prepare a new process for execution.
/// The process will not begin execution until the kernel receives a
/// `RunProcess` command with the same `id`.
///
/// The process that sends this command will be given messaging capabilities
/// for the new process if `public` is false.
InitializeProcess {
id: ProcessId, id: ProcessId,
wasm_bytes_handle: u128, wasm_bytes_handle: u128,
on_panic: OnPanic, on_panic: OnPanic,
initial_capabilities: HashSet<SignedCapability>, initial_capabilities: HashSet<SignedCapability>,
public: bool, public: bool,
}, },
KillProcess(ProcessId), // this is extrajudicial killing: we might lose messages! /// Tell the kernel to run a process that has already been installed.
// kernel only /// TODO: in the future, this command could be extended to allow for
RebootProcess { /// resource provision.
process_id: ProcessId, RunProcess(ProcessId),
persisted: PersistedProcess, /// Kill a running process immediately. This may result in the dropping / mishandling of messages!
}, KillProcess(ProcessId),
/// RUNTIME ONLY: notify the kernel that the runtime is shutting down and it
/// should gracefully stop and persist the running processes.
Shutdown, Shutdown,
} }
/// IPC format for all KernelCommand responses
#[derive(Debug, Serialize, Deserialize)]
pub enum KernelResponse {
InitializedProcess,
InitializeProcessError,
StartedProcess,
RunProcessError,
KilledProcess(ProcessId),
}
#[derive(Debug)] #[derive(Debug)]
pub enum CapMessage { pub enum CapMessage {
Add { Add {
@ -756,17 +815,10 @@ pub enum CapMessage {
}, },
GetAll { GetAll {
on: ProcessId, on: ProcessId,
responder: tokio::sync::oneshot::Sender<HashSet<Capability>>, responder: tokio::sync::oneshot::Sender<HashSet<SignedCapability>>,
}, },
} }
#[derive(Debug, Serialize, Deserialize)]
pub enum KernelResponse {
StartedProcess,
StartProcessError,
KilledProcess(ProcessId),
}
pub type ProcessMap = HashMap<ProcessId, PersistedProcess>; pub type ProcessMap = HashMap<ProcessId, PersistedProcess>;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -806,7 +858,8 @@ pub struct PackageManifestEntry {
pub process_wasm_path: String, pub process_wasm_path: String,
pub on_panic: OnPanic, pub on_panic: OnPanic,
pub request_networking: bool, pub request_networking: bool,
pub request_messaging: Vec<String>, pub request_messaging: Option<Vec<String>>,
pub grant_messaging: Option<Vec<String>>,
pub public: bool, pub public: bool,
} }