mirror of
https://github.com/uqbar-dao/nectar.git
synced 2024-11-23 03:44:04 +03:00
Merge pull request #76 from uqbar-dao/dr/kernel-manage-execution
kernel: adjust API to enable execution management
This commit is contained in:
commit
3a2d631906
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -5093,7 +5093,7 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uqbar"
|
name = "uqbar"
|
||||||
version = "0.1.0"
|
version = "0.3.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes-gcm 0.10.2",
|
"aes-gcm 0.10.2",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "uqbar"
|
name = "uqbar"
|
||||||
version = "0.1.0"
|
version = "0.3.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
119
modules/app_store/app_store/Cargo.lock
generated
119
modules/app_store/app_store/Cargo.lock
generated
@ -10,7 +10,7 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "app_store"
|
name = "app_store"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bincode",
|
"bincode",
|
||||||
@ -46,6 +46,12 @@ dependencies = [
|
|||||||
"generic-array",
|
"generic-array",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bytes"
|
||||||
|
version = "1.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cfg-if"
|
name = "cfg-if"
|
||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
@ -87,6 +93,21 @@ version = "1.0.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
|
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fnv"
|
||||||
|
version = "1.0.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "form_urlencoded"
|
||||||
|
version = "1.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
|
||||||
|
dependencies = [
|
||||||
|
"percent-encoding",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "generic-array"
|
name = "generic-array"
|
||||||
version = "0.14.7"
|
version = "0.14.7"
|
||||||
@ -123,12 +144,33 @@ dependencies = [
|
|||||||
"unicode-segmentation",
|
"unicode-segmentation",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "http"
|
||||||
|
version = "1.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"fnv",
|
||||||
|
"itoa",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "id-arena"
|
name = "id-arena"
|
||||||
version = "2.2.1"
|
version = "2.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005"
|
checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "idna"
|
||||||
|
version = "0.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
|
||||||
|
dependencies = [
|
||||||
|
"unicode-bidi",
|
||||||
|
"unicode-normalization",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "indexmap"
|
name = "indexmap"
|
||||||
version = "2.1.0"
|
version = "2.1.0"
|
||||||
@ -164,6 +206,12 @@ version = "0.4.20"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "percent-encoding"
|
||||||
|
version = "2.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ppv-lite86"
|
name = "ppv-lite86"
|
||||||
version = "0.2.17"
|
version = "0.2.17"
|
||||||
@ -298,18 +346,68 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thiserror"
|
||||||
|
version = "1.0.50"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2"
|
||||||
|
dependencies = [
|
||||||
|
"thiserror-impl",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thiserror-impl"
|
||||||
|
version = "1.0.50"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tinyvec"
|
||||||
|
version = "1.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
|
||||||
|
dependencies = [
|
||||||
|
"tinyvec_macros",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tinyvec_macros"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "typenum"
|
name = "typenum"
|
||||||
version = "1.17.0"
|
version = "1.17.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
|
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-bidi"
|
||||||
|
version = "0.3.13"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-ident"
|
name = "unicode-ident"
|
||||||
version = "1.0.12"
|
version = "1.0.12"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
|
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unicode-normalization"
|
||||||
|
version = "0.1.22"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
|
||||||
|
dependencies = [
|
||||||
|
"tinyvec",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "unicode-segmentation"
|
name = "unicode-segmentation"
|
||||||
version = "1.10.1"
|
version = "1.10.1"
|
||||||
@ -324,16 +422,31 @@ checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uqbar_process_lib"
|
name = "uqbar_process_lib"
|
||||||
version = "0.2.0"
|
version = "0.3.0"
|
||||||
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=e53c124#e53c124ec95ef99c06d201d4d08dada8ec691d29"
|
source = "git+ssh://git@github.com/uqbar-dao/process_lib.git?rev=955badd#955badd96647c215d6de956fbeedd8a92ee2f343"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bincode",
|
"bincode",
|
||||||
|
"http",
|
||||||
"rand",
|
"rand",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"thiserror",
|
||||||
|
"url",
|
||||||
"wit-bindgen",
|
"wit-bindgen",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "url"
|
||||||
|
version = "2.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
|
||||||
|
dependencies = [
|
||||||
|
"form_urlencoded",
|
||||||
|
"idna",
|
||||||
|
"percent-encoding",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "version_check"
|
name = "version_check"
|
||||||
version = "0.9.4"
|
version = "0.9.4"
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "app_store"
|
name = "app_store"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
@ -17,7 +17,7 @@ rand = "0.8"
|
|||||||
serde = {version = "1.0", features = ["derive"] }
|
serde = {version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
sha2 = "0.10.8"
|
sha2 = "0.10.8"
|
||||||
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "e53c124" }
|
uqbar_process_lib = { git = "ssh://git@github.com/uqbar-dao/process_lib.git", rev = "955badd" }
|
||||||
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "5390bab780733f1660d14c254ec985df2816bf1d" }
|
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "5390bab780733f1660d14c254ec985df2816bf1d" }
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
|
@ -5,7 +5,7 @@ use uqbar_process_lib::kernel_types as kt;
|
|||||||
use uqbar_process_lib::uqbar::process::standard as wit;
|
use uqbar_process_lib::uqbar::process::standard as wit;
|
||||||
use uqbar_process_lib::{
|
use uqbar_process_lib::{
|
||||||
get_capability, get_payload, get_typed_state, grant_messaging, println, receive, set_state,
|
get_capability, get_payload, get_typed_state, grant_messaging, println, receive, set_state,
|
||||||
Address, Message, NodeId, PackageId, ProcessId, Request, Response,
|
share_capability, Address, Message, NodeId, PackageId, ProcessId, Request, Response,
|
||||||
};
|
};
|
||||||
|
|
||||||
wit_bindgen::generate!({
|
wit_bindgen::generate!({
|
||||||
@ -165,11 +165,11 @@ impl Guest for Component {
|
|||||||
// so that they can send us requests.
|
// so that they can send us requests.
|
||||||
grant_messaging(
|
grant_messaging(
|
||||||
&our,
|
&our,
|
||||||
&Vec::from([
|
vec![
|
||||||
ProcessId::from_str("http_server:sys:uqbar").unwrap(),
|
ProcessId::new(Some("http_server"), "sys", "uqbar"),
|
||||||
ProcessId::from_str("terminal:terminal:uqbar").unwrap(),
|
ProcessId::new(Some("terminal"), "terminal", "uqbar"),
|
||||||
ProcessId::from_str("vfs:sys:uqbar").unwrap(),
|
ProcessId::new(Some("vfs"), "sys", "uqbar"),
|
||||||
]),
|
],
|
||||||
);
|
);
|
||||||
println!("{}: start", our.process);
|
println!("{}: start", our.process);
|
||||||
|
|
||||||
@ -212,9 +212,7 @@ fn handle_message(
|
|||||||
Ok(None) => return Ok(()),
|
Ok(None) => return Ok(()),
|
||||||
Ok(Some(resp)) => {
|
Ok(Some(resp)) => {
|
||||||
if req.expects_response.is_some() {
|
if req.expects_response.is_some() {
|
||||||
Response::new()
|
Response::new().ipc(serde_json::to_vec(&resp)?).send()?;
|
||||||
.ipc_bytes(serde_json::to_vec(&resp)?)
|
|
||||||
.send()?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -227,9 +225,7 @@ fn handle_message(
|
|||||||
Ok(None) => return Ok(()),
|
Ok(None) => return Ok(()),
|
||||||
Ok(Some(resp)) => {
|
Ok(Some(resp)) => {
|
||||||
if req.expects_response.is_some() {
|
if req.expects_response.is_some() {
|
||||||
Response::new()
|
Response::new().ipc(serde_json::to_vec(&resp)?).send()?;
|
||||||
.ipc_bytes(serde_json::to_vec(&resp)?)
|
|
||||||
.send()?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
@ -251,9 +247,9 @@ fn handle_message(
|
|||||||
if state.requested_packages.remove(&package_id) {
|
if state.requested_packages.remove(&package_id) {
|
||||||
// auto-take zip from payload and request ourself with New
|
// auto-take zip from payload and request ourself with New
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(our.clone())?
|
.target(our.clone())
|
||||||
.inherit(true)
|
.inherit(true)
|
||||||
.ipc_bytes(serde_json::to_vec(&Req::LocalRequest(
|
.ipc(serde_json::to_vec(&Req::LocalRequest(
|
||||||
LocalRequest::NewPackage {
|
LocalRequest::NewPackage {
|
||||||
package: package_id,
|
package: package_id,
|
||||||
mirror: true,
|
mirror: true,
|
||||||
@ -322,18 +318,13 @@ fn handle_local_request(
|
|||||||
}
|
}
|
||||||
match request {
|
match request {
|
||||||
LocalRequest::NewPackage { package, mirror } => {
|
LocalRequest::NewPackage { package, mirror } => {
|
||||||
let vfs_address = Address {
|
|
||||||
node: our.node.clone(),
|
|
||||||
process: ProcessId::from_str("vfs:sys:uqbar")?,
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some(mut payload) = get_payload() else {
|
let Some(mut payload) = get_payload() else {
|
||||||
return Err(anyhow::anyhow!("no payload"));
|
return Err(anyhow::anyhow!("no payload"));
|
||||||
};
|
};
|
||||||
|
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(vfs_address.clone())?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::New,
|
action: kt::VfsAction::New,
|
||||||
})?)
|
})?)
|
||||||
@ -347,8 +338,8 @@ fn handle_local_request(
|
|||||||
// add zip bytes
|
// add zip bytes
|
||||||
payload.mime = Some("application/zip".to_string());
|
payload.mime = Some("application/zip".to_string());
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(vfs_address.clone())?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::Add {
|
action: kt::VfsAction::Add {
|
||||||
full_path: package.to_string(),
|
full_path: package.to_string(),
|
||||||
@ -361,9 +352,9 @@ fn handle_local_request(
|
|||||||
// save the zip file itself in VFS for sharing with other nodes
|
// save the zip file itself in VFS for sharing with other nodes
|
||||||
// call it <package>.zip
|
// call it <package>.zip
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(vfs_address.clone())?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.inherit(true)
|
.inherit(true)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::Add {
|
action: kt::VfsAction::Add {
|
||||||
full_path: format!("/{}.zip", package.to_string()),
|
full_path: format!("/{}.zip", package.to_string()),
|
||||||
@ -373,8 +364,8 @@ fn handle_local_request(
|
|||||||
.payload(payload)
|
.payload(payload)
|
||||||
.send_and_await_response(5)??;
|
.send_and_await_response(5)??;
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(vfs_address.clone())?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::GetEntry("/metadata.json".into()),
|
action: kt::VfsAction::GetEntry("/metadata.json".into()),
|
||||||
})?)
|
})?)
|
||||||
@ -408,9 +399,9 @@ fn handle_local_request(
|
|||||||
install_from,
|
install_from,
|
||||||
} => Ok(Some(Resp::DownloadResponse(
|
} => Ok(Some(Resp::DownloadResponse(
|
||||||
match Request::new()
|
match Request::new()
|
||||||
.target(Address::new(&install_from, our.process.clone())?)?
|
.target(Address::new(install_from, our.process.clone()))
|
||||||
.inherit(true)
|
.inherit(true)
|
||||||
.ipc_bytes(serde_json::to_vec(&RemoteRequest::Download(
|
.ipc(serde_json::to_vec(&RemoteRequest::Download(
|
||||||
package.clone(),
|
package.clone(),
|
||||||
))?)
|
))?)
|
||||||
.send_and_await_response(5)
|
.send_and_await_response(5)
|
||||||
@ -430,13 +421,9 @@ fn handle_local_request(
|
|||||||
},
|
},
|
||||||
))),
|
))),
|
||||||
LocalRequest::Install(package) => {
|
LocalRequest::Install(package) => {
|
||||||
let vfs_address = Address {
|
|
||||||
node: our.node.clone(),
|
|
||||||
process: ProcessId::from_str("vfs:sys:uqbar")?,
|
|
||||||
};
|
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(Address::new(&our.node, "vfs:sys:uqbar")?)?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::GetEntry("/manifest.json".into()),
|
action: kt::VfsAction::GetEntry("/manifest.json".into()),
|
||||||
})?)
|
})?)
|
||||||
@ -446,16 +433,43 @@ fn handle_local_request(
|
|||||||
};
|
};
|
||||||
let manifest = String::from_utf8(payload.bytes)?;
|
let manifest = String::from_utf8(payload.bytes)?;
|
||||||
let manifest = serde_json::from_str::<Vec<kt::PackageManifestEntry>>(&manifest)?;
|
let manifest = serde_json::from_str::<Vec<kt::PackageManifestEntry>>(&manifest)?;
|
||||||
for entry in manifest {
|
// always grant read/write to their drive, which we created for them
|
||||||
|
let Some(read_cap) = get_capability(
|
||||||
|
&Address::new(&our.node, ("vfs", "sys", "uqbar")),
|
||||||
|
&serde_json::to_string(&serde_json::json!({
|
||||||
|
"kind": "read",
|
||||||
|
"drive": package.to_string(),
|
||||||
|
}))?,
|
||||||
|
) else {
|
||||||
|
return Err(anyhow::anyhow!("app-store: no read cap"));
|
||||||
|
};
|
||||||
|
let Some(write_cap) = get_capability(
|
||||||
|
&Address::new(&our.node, ("vfs", "sys", "uqbar")),
|
||||||
|
&serde_json::to_string(&serde_json::json!({
|
||||||
|
"kind": "write",
|
||||||
|
"drive": package.to_string(),
|
||||||
|
}))?,
|
||||||
|
) else {
|
||||||
|
return Err(anyhow::anyhow!("app-store: no write cap"));
|
||||||
|
};
|
||||||
|
let Some(networking_cap) = get_capability(
|
||||||
|
&Address::new(&our.node, ("kernel", "sys", "uqbar")),
|
||||||
|
&"\"network\"".to_string(),
|
||||||
|
) else {
|
||||||
|
return Err(anyhow::anyhow!("app-store: no net cap"));
|
||||||
|
};
|
||||||
|
// first, for each process in manifest, initialize it
|
||||||
|
// then, once all have been initialized, grant them requested caps
|
||||||
|
// and finally start them.
|
||||||
|
for entry in &manifest {
|
||||||
let path = if entry.process_wasm_path.starts_with("/") {
|
let path = if entry.process_wasm_path.starts_with("/") {
|
||||||
entry.process_wasm_path
|
entry.process_wasm_path.clone()
|
||||||
} else {
|
} else {
|
||||||
format!("/{}", entry.process_wasm_path)
|
format!("/{}", entry.process_wasm_path)
|
||||||
};
|
};
|
||||||
|
|
||||||
let (_, hash_response) = Request::new()
|
let (_, hash_response) = Request::new()
|
||||||
.target(Address::new(&our.node, "vfs:sys:uqbar")?)?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::GetHash(path.clone()),
|
action: kt::VfsAction::GetHash(path.clone()),
|
||||||
})?)
|
})?)
|
||||||
@ -467,95 +481,93 @@ fn handle_local_request(
|
|||||||
let kt::VfsResponse::GetHash(Some(hash)) = serde_json::from_slice(&ipc)? else {
|
let kt::VfsResponse::GetHash(Some(hash)) = serde_json::from_slice(&ipc)? else {
|
||||||
return Err(anyhow::anyhow!("no hash in vfs"));
|
return Err(anyhow::anyhow!("no hash in vfs"));
|
||||||
};
|
};
|
||||||
|
|
||||||
// build initial caps
|
// build initial caps
|
||||||
let mut initial_capabilities: HashSet<kt::SignedCapability> = HashSet::new();
|
let mut initial_capabilities: HashSet<kt::SignedCapability> = HashSet::new();
|
||||||
if entry.request_networking {
|
if entry.request_networking {
|
||||||
let Some(networking_cap) = get_capability(
|
initial_capabilities.insert(kt::de_wit_signed_capability(networking_cap.clone()));
|
||||||
&Address {
|
|
||||||
node: our.node.clone(),
|
|
||||||
process: ProcessId::from_str("kernel:sys:uqbar")?,
|
|
||||||
},
|
|
||||||
&"\"network\"".to_string(),
|
|
||||||
) else {
|
|
||||||
return Err(anyhow::anyhow!("app-store: no net cap"));
|
|
||||||
};
|
|
||||||
initial_capabilities.insert(kt::de_wit_signed_capability(networking_cap));
|
|
||||||
}
|
}
|
||||||
let Some(read_cap) = get_capability(
|
initial_capabilities.insert(kt::de_wit_signed_capability(read_cap.clone()));
|
||||||
&vfs_address.clone(),
|
initial_capabilities.insert(kt::de_wit_signed_capability(write_cap.clone()));
|
||||||
&serde_json::to_string(&serde_json::json!({
|
|
||||||
"kind": "read",
|
|
||||||
"drive": package.to_string(),
|
|
||||||
}))?,
|
|
||||||
) else {
|
|
||||||
return Err(anyhow::anyhow!("app-store: no read cap"));
|
|
||||||
};
|
|
||||||
initial_capabilities.insert(kt::de_wit_signed_capability(read_cap));
|
|
||||||
let Some(write_cap) = get_capability(
|
|
||||||
&vfs_address.clone(),
|
|
||||||
&serde_json::to_string(&serde_json::json!({
|
|
||||||
"kind": "write",
|
|
||||||
"drive": package.to_string(),
|
|
||||||
}))?,
|
|
||||||
) else {
|
|
||||||
return Err(anyhow::anyhow!("app-store: no write cap"));
|
|
||||||
};
|
|
||||||
initial_capabilities.insert(kt::de_wit_signed_capability(write_cap));
|
|
||||||
|
|
||||||
for process_name in &entry.request_messaging {
|
|
||||||
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
|
|
||||||
// TODO handle arbitrary caps here
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let Some(messaging_cap) = get_capability(
|
|
||||||
&Address {
|
|
||||||
node: our.node.clone(),
|
|
||||||
process: parsed_process_id.clone(),
|
|
||||||
},
|
|
||||||
&"\"messaging\"".into(),
|
|
||||||
) else {
|
|
||||||
println!("app-store: no cap for {} to give away!", process_name);
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
initial_capabilities.insert(kt::de_wit_signed_capability(messaging_cap));
|
|
||||||
}
|
|
||||||
|
|
||||||
let process_id = format!("{}:{}", entry.process_name, package.to_string());
|
let process_id = format!("{}:{}", entry.process_name, package.to_string());
|
||||||
let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else {
|
let Ok(parsed_new_process_id) = ProcessId::from_str(&process_id) else {
|
||||||
return Err(anyhow::anyhow!("app-store: invalid process id!"));
|
return Err(anyhow::anyhow!("app-store: invalid process id!"));
|
||||||
};
|
};
|
||||||
|
// kill process if it already exists
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(Address::new(&our.node, "kernel:sys:uqbar")?)?
|
.target(Address::from_str("our@kernel:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::KernelCommand::KillProcess(
|
.ipc(serde_json::to_vec(&kt::KernelCommand::KillProcess(
|
||||||
kt::ProcessId::de_wit(parsed_new_process_id.clone()),
|
parsed_new_process_id.clone(),
|
||||||
))?)
|
))?)
|
||||||
.send()?;
|
.send()?;
|
||||||
|
|
||||||
// kernel start process takes bytes as payload + wasm_bytes_handle...
|
|
||||||
// reconsider perhaps
|
|
||||||
let (_, _bytes_response) = Request::new()
|
let (_, _bytes_response) = Request::new()
|
||||||
.target(Address::new(&our.node, "vfs:sys:uqbar")?)?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::GetEntry(path),
|
action: kt::VfsAction::GetEntry(path),
|
||||||
})?)
|
})?)
|
||||||
.send_and_await_response(5)??;
|
.send_and_await_response(5)??;
|
||||||
|
|
||||||
let Some(payload) = get_payload() else {
|
|
||||||
return Err(anyhow::anyhow!("no wasm bytes payload."));
|
|
||||||
};
|
|
||||||
|
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(Address::new(&our.node, "kernel:sys:uqbar")?)?
|
.target(Address::from_str("our@kernel:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::KernelCommand::StartProcess {
|
.ipc(serde_json::to_vec(&kt::KernelCommand::InitializeProcess {
|
||||||
id: kt::ProcessId::de_wit(parsed_new_process_id),
|
id: parsed_new_process_id,
|
||||||
wasm_bytes_handle: hash,
|
wasm_bytes_handle: hash,
|
||||||
on_panic: entry.on_panic,
|
on_panic: entry.on_panic.clone(),
|
||||||
initial_capabilities,
|
initial_capabilities,
|
||||||
public: entry.public,
|
public: entry.public,
|
||||||
})?)
|
})?)
|
||||||
.payload(payload)
|
.inherit(true)
|
||||||
|
.send_and_await_response(5)?;
|
||||||
|
}
|
||||||
|
for entry in &manifest {
|
||||||
|
let process_id = ProcessId::new(
|
||||||
|
Some(&entry.process_name),
|
||||||
|
package.package(),
|
||||||
|
package.publisher(),
|
||||||
|
);
|
||||||
|
if let Some(to_request) = &entry.request_messaging {
|
||||||
|
for process_name in to_request {
|
||||||
|
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
|
||||||
|
// TODO handle arbitrary caps here
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let Some(messaging_cap) = get_capability(
|
||||||
|
&Address {
|
||||||
|
node: our.node.clone(),
|
||||||
|
process: parsed_process_id.clone(),
|
||||||
|
},
|
||||||
|
&"\"messaging\"".into(),
|
||||||
|
) else {
|
||||||
|
println!("app-store: no cap for {} to give away!", process_name);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
share_capability(&process_id, &messaging_cap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(to_grant) = &entry.grant_messaging {
|
||||||
|
let Some(messaging_cap) = get_capability(
|
||||||
|
&Address {
|
||||||
|
node: our.node.clone(),
|
||||||
|
process: process_id.clone(),
|
||||||
|
},
|
||||||
|
&"\"messaging\"".into(),
|
||||||
|
) else {
|
||||||
|
println!("app-store: no cap for {} to give away!", process_id);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
for process_name in to_grant {
|
||||||
|
let Ok(parsed_process_id) = ProcessId::from_str(&process_name) else {
|
||||||
|
// TODO handle arbitrary caps here
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
share_capability(&parsed_process_id, &messaging_cap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Request::new()
|
||||||
|
.target(Address::from_str("our@kernel:sys:uqbar")?)
|
||||||
|
.ipc(serde_json::to_vec(&kt::KernelCommand::RunProcess(
|
||||||
|
process_id,
|
||||||
|
))?)
|
||||||
.send_and_await_response(5)?;
|
.send_and_await_response(5)?;
|
||||||
}
|
}
|
||||||
Ok(Some(Resp::InstallResponse(InstallResponse::Success)))
|
Ok(Some(Resp::InstallResponse(InstallResponse::Success)))
|
||||||
@ -588,8 +600,8 @@ fn handle_remote_request(
|
|||||||
// get the .zip from VFS and attach as payload to response
|
// get the .zip from VFS and attach as payload to response
|
||||||
let file_name = format!("/{}.zip", package.to_string());
|
let file_name = format!("/{}.zip", package.to_string());
|
||||||
Request::new()
|
Request::new()
|
||||||
.target(Address::new(&our.node, "vfs:sys:uqbar")?)?
|
.target(Address::from_str("our@vfs:sys:uqbar")?)
|
||||||
.ipc_bytes(serde_json::to_vec(&kt::VfsRequest {
|
.ipc(serde_json::to_vec(&kt::VfsRequest {
|
||||||
drive: package.to_string(),
|
drive: package.to_string(),
|
||||||
action: kt::VfsAction::GetEntry(file_name.clone()),
|
action: kt::VfsAction::GetEntry(file_name.clone()),
|
||||||
})?)
|
})?)
|
||||||
|
@ -313,15 +313,18 @@ async fn bootstrap(
|
|||||||
"{}:{}:{}",
|
"{}:{}:{}",
|
||||||
entry.process_name, package_name, package_publisher
|
entry.process_name, package_name, package_publisher
|
||||||
);
|
);
|
||||||
entry.request_messaging.push(our_process_id.clone());
|
entry.request_messaging = Some(entry.request_messaging.unwrap_or_default());
|
||||||
for process_name in &entry.request_messaging {
|
if let Some(ref mut request_messaging) = entry.request_messaging {
|
||||||
requested_caps.insert(Capability {
|
request_messaging.push(our_process_id.clone());
|
||||||
issuer: Address {
|
for process_name in request_messaging {
|
||||||
node: our_name.to_string(),
|
requested_caps.insert(Capability {
|
||||||
process: ProcessId::from_str(process_name).unwrap(),
|
issuer: Address {
|
||||||
},
|
node: our_name.to_string(),
|
||||||
params: "\"messaging\"".into(),
|
process: ProcessId::from_str(process_name).unwrap(),
|
||||||
});
|
},
|
||||||
|
params: "\"messaging\"".into(),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if entry.request_networking {
|
if entry.request_networking {
|
||||||
|
2422
src/kernel/mod.rs
2422
src/kernel/mod.rs
File diff suppressed because it is too large
Load Diff
608
src/kernel/process.rs
Normal file
608
src/kernel/process.rs
Normal file
@ -0,0 +1,608 @@
|
|||||||
|
use crate::kernel::{ProcessMessageReceiver, ProcessMessageSender};
|
||||||
|
use crate::types as t;
|
||||||
|
use crate::KERNEL_PROCESS_ID;
|
||||||
|
use anyhow::Result;
|
||||||
|
use ring::signature;
|
||||||
|
use std::collections::{HashMap, VecDeque};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
pub use uqbar::process::standard as wit;
|
||||||
|
pub use uqbar::process::standard::Host as StandardHost;
|
||||||
|
use wasmtime::component::*;
|
||||||
|
use wasmtime::{Engine, Store};
|
||||||
|
use wasmtime_wasi::preview2::{Table, WasiCtx, WasiCtxBuilder, WasiView};
|
||||||
|
|
||||||
|
bindgen!({
|
||||||
|
path: "wit",
|
||||||
|
world: "process",
|
||||||
|
async: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
pub struct ProcessState {
|
||||||
|
pub keypair: Arc<signature::Ed25519KeyPair>,
|
||||||
|
pub metadata: t::ProcessMetadata,
|
||||||
|
pub recv_in_process: ProcessMessageReceiver,
|
||||||
|
pub self_sender: ProcessMessageSender,
|
||||||
|
pub send_to_loop: t::MessageSender,
|
||||||
|
pub send_to_terminal: t::PrintSender,
|
||||||
|
pub prompting_message: Option<t::KernelMessage>,
|
||||||
|
pub last_payload: Option<t::Payload>,
|
||||||
|
pub contexts: HashMap<u64, (t::ProcessContext, JoinHandle<()>)>,
|
||||||
|
pub message_queue: VecDeque<Result<t::KernelMessage, t::WrappedSendError>>,
|
||||||
|
pub caps_oracle: t::CapMessageSender,
|
||||||
|
pub next_message_caps: Option<Vec<t::SignedCapability>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ProcessWasi {
|
||||||
|
pub process: ProcessState,
|
||||||
|
table: Table,
|
||||||
|
wasi: WasiCtx,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WasiView for ProcessWasi {
|
||||||
|
fn table(&self) -> &Table {
|
||||||
|
&self.table
|
||||||
|
}
|
||||||
|
fn table_mut(&mut self) -> &mut Table {
|
||||||
|
&mut self.table
|
||||||
|
}
|
||||||
|
fn ctx(&self) -> &WasiCtx {
|
||||||
|
&self.wasi
|
||||||
|
}
|
||||||
|
fn ctx_mut(&mut self) -> &mut WasiCtx {
|
||||||
|
&mut self.wasi
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send_and_await_response(
|
||||||
|
process: &mut ProcessWasi,
|
||||||
|
source: Option<t::Address>,
|
||||||
|
target: wit::Address,
|
||||||
|
request: wit::Request,
|
||||||
|
payload: Option<wit::Payload>,
|
||||||
|
) -> Result<Result<(wit::Address, wit::Message), wit::SendError>> {
|
||||||
|
if request.expects_response.is_none() {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"kernel: got invalid send_and_await_response() Request from {:?}: must expect response",
|
||||||
|
process.process.metadata.our.process
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let id = process
|
||||||
|
.process
|
||||||
|
.handle_request(source, target, request, None, payload)
|
||||||
|
.await;
|
||||||
|
match id {
|
||||||
|
Ok(id) => match process.process.get_specific_message_for_process(id).await {
|
||||||
|
Ok((address, wit::Message::Response(response))) => {
|
||||||
|
Ok(Ok((address, wit::Message::Response(response))))
|
||||||
|
}
|
||||||
|
Ok((_address, wit::Message::Request(_))) => Err(anyhow::anyhow!(
|
||||||
|
"fatal: received Request instead of Response"
|
||||||
|
)),
|
||||||
|
Err((net_err, _context)) => Ok(Err(net_err)),
|
||||||
|
},
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessState {
|
||||||
|
/// Ingest latest message directed to this process, and mark it as the prompting message.
|
||||||
|
/// If there is no message in the queue, wait async until one is received.
|
||||||
|
/// The message will only be saved as the prompting-message if it's a Request.
|
||||||
|
pub async fn get_next_message_for_process(
|
||||||
|
&mut self,
|
||||||
|
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||||
|
let res = match self.message_queue.pop_front() {
|
||||||
|
Some(message_from_queue) => message_from_queue,
|
||||||
|
None => self.recv_in_process.recv().await.unwrap(),
|
||||||
|
};
|
||||||
|
self.kernel_message_to_process_receive(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// takes Request generated by a process and sends it to the main event loop.
|
||||||
|
/// will only fail if process does not have capability to send to target.
|
||||||
|
/// if the request has a timeout (expects response), start a task to track
|
||||||
|
/// that timeout and return timeout error if it expires.
|
||||||
|
pub async fn handle_request(
|
||||||
|
&mut self,
|
||||||
|
fake_source: Option<t::Address>, // only used when kernel steps in to get/set state
|
||||||
|
target: wit::Address,
|
||||||
|
request: wit::Request,
|
||||||
|
new_context: Option<wit::Context>,
|
||||||
|
payload: Option<wit::Payload>,
|
||||||
|
) -> Result<u64> {
|
||||||
|
let source = match &fake_source {
|
||||||
|
Some(_) => fake_source.unwrap(),
|
||||||
|
None => self.metadata.our.clone(),
|
||||||
|
};
|
||||||
|
// if request chooses to inherit context, match id to prompting_message
|
||||||
|
// otherwise, id is generated randomly
|
||||||
|
let request_id: u64 = if request.inherit
|
||||||
|
&& request.expects_response.is_none()
|
||||||
|
&& self.prompting_message.is_some()
|
||||||
|
{
|
||||||
|
self.prompting_message.as_ref().unwrap().id
|
||||||
|
} else {
|
||||||
|
loop {
|
||||||
|
let id = rand::random();
|
||||||
|
if !self.contexts.contains_key(&id) {
|
||||||
|
break id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let payload = match payload {
|
||||||
|
Some(p) => Some(t::Payload {
|
||||||
|
mime: p.mime,
|
||||||
|
bytes: p.bytes,
|
||||||
|
}),
|
||||||
|
None => match request.inherit {
|
||||||
|
true => self.last_payload.clone(),
|
||||||
|
false => None,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// rsvp is set if there was a Request expecting Response
|
||||||
|
// followed by inheriting Request(s) not expecting Response;
|
||||||
|
// this is done such that the ultimate request handler knows that,
|
||||||
|
// in fact, a Response *is* expected.
|
||||||
|
// could also be None if entire chain of Requests are
|
||||||
|
// not expecting Response
|
||||||
|
let kernel_message = t::KernelMessage {
|
||||||
|
id: request_id,
|
||||||
|
source: source.clone(),
|
||||||
|
target: t::Address::de_wit(target.clone()),
|
||||||
|
rsvp: match (
|
||||||
|
request.inherit,
|
||||||
|
request.expects_response,
|
||||||
|
&self.prompting_message,
|
||||||
|
) {
|
||||||
|
// this request expects response, so receives any response
|
||||||
|
// make sure to use the real source, not a fake injected-by-kernel source
|
||||||
|
(_, Some(_), _) => Some(self.metadata.our.clone()),
|
||||||
|
// this request inherits, so response will be routed to prompting message
|
||||||
|
(true, None, Some(ref prompt)) => prompt.rsvp.clone(),
|
||||||
|
// this request doesn't inherit, and doesn't itself want a response
|
||||||
|
(false, None, _) => None,
|
||||||
|
// no rsvp because neither prompting message nor this request wants a response
|
||||||
|
(_, None, None) => None,
|
||||||
|
},
|
||||||
|
message: t::Message::Request(t::de_wit_request(request.clone())),
|
||||||
|
payload: payload.clone(),
|
||||||
|
signed_capabilities: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// modify the process' context map as needed.
|
||||||
|
// if there is a prompting message, we need to store the ultimate
|
||||||
|
// even if there is no new context string.
|
||||||
|
// TODO optimize this significantly
|
||||||
|
if let Some(timeout_secs) = request.expects_response {
|
||||||
|
let self_sender = self.self_sender.clone();
|
||||||
|
let timeout_handle = tokio::spawn(async move {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
|
||||||
|
let _ = self_sender
|
||||||
|
.send(Err(t::WrappedSendError {
|
||||||
|
id: request_id,
|
||||||
|
source: t::Address::de_wit(target.clone()), // TODO check this
|
||||||
|
error: t::SendError {
|
||||||
|
kind: t::SendErrorKind::Timeout,
|
||||||
|
target: t::Address::de_wit(target),
|
||||||
|
message: t::Message::Request(t::de_wit_request(request.clone())),
|
||||||
|
payload,
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
self.save_context(kernel_message.id, new_context, timeout_handle)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.send_to_loop
|
||||||
|
.send(kernel_message)
|
||||||
|
.await
|
||||||
|
.expect("fatal: kernel couldn't send request");
|
||||||
|
|
||||||
|
Ok(request_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// takes Response generated by a process and sends it to the main event loop.
|
||||||
|
pub async fn send_response(&mut self, response: wit::Response, payload: Option<wit::Payload>) {
|
||||||
|
let (id, target) = match self.make_response_id_target().await {
|
||||||
|
Some(r) => r,
|
||||||
|
None => {
|
||||||
|
let _ = self
|
||||||
|
.send_to_terminal
|
||||||
|
.send(t::Printout {
|
||||||
|
verbosity: 1,
|
||||||
|
content: format!("kernel: dropping Response {:?}", response),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let payload = match response.inherit {
|
||||||
|
true => self.last_payload.clone(),
|
||||||
|
false => t::de_wit_payload(payload),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.send_to_loop
|
||||||
|
.send(t::KernelMessage {
|
||||||
|
id,
|
||||||
|
source: self.metadata.our.clone(),
|
||||||
|
target,
|
||||||
|
rsvp: None,
|
||||||
|
message: t::Message::Response((
|
||||||
|
t::de_wit_response(response),
|
||||||
|
// the context will be set by the process receiving this Response.
|
||||||
|
None,
|
||||||
|
)),
|
||||||
|
payload,
|
||||||
|
signed_capabilities: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("fatal: kernel couldn't send response");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// save a context for a given request.
|
||||||
|
async fn save_context(
|
||||||
|
&mut self,
|
||||||
|
request_id: u64,
|
||||||
|
context: Option<t::Context>,
|
||||||
|
jh: tokio::task::JoinHandle<()>,
|
||||||
|
) {
|
||||||
|
self.contexts.insert(
|
||||||
|
request_id,
|
||||||
|
(
|
||||||
|
t::ProcessContext {
|
||||||
|
prompting_message: if self.prompting_message.is_some() {
|
||||||
|
self.prompting_message.clone()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
},
|
||||||
|
context,
|
||||||
|
},
|
||||||
|
jh,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// instead of ingesting latest, wait for a specific ID and queue all others
|
||||||
|
async fn get_specific_message_for_process(
|
||||||
|
&mut self,
|
||||||
|
awaited_message_id: u64,
|
||||||
|
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||||
|
// first, check if the awaited message is already in the queue and handle if so
|
||||||
|
for (i, message) in self.message_queue.iter().enumerate() {
|
||||||
|
match message {
|
||||||
|
Ok(ref km) if km.id == awaited_message_id => {
|
||||||
|
let km = self.message_queue.remove(i).unwrap();
|
||||||
|
return self.kernel_message_to_process_receive(km.clone());
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// next, wait for the awaited message to arrive
|
||||||
|
loop {
|
||||||
|
let res = self.recv_in_process.recv().await.unwrap();
|
||||||
|
match res {
|
||||||
|
Ok(ref km) if km.id == awaited_message_id => {
|
||||||
|
return self.kernel_message_to_process_receive(Ok(km.clone()))
|
||||||
|
}
|
||||||
|
Ok(km) => self.message_queue.push_back(Ok(km)),
|
||||||
|
Err(e) if e.id == awaited_message_id => {
|
||||||
|
return self.kernel_message_to_process_receive(Err(e))
|
||||||
|
}
|
||||||
|
Err(e) => self.message_queue.push_back(Err(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// convert a message from the main event loop into a result for the process to receive
|
||||||
|
/// if the message is a response or error, get context if we have one
|
||||||
|
fn kernel_message_to_process_receive(
|
||||||
|
&mut self,
|
||||||
|
res: Result<t::KernelMessage, t::WrappedSendError>,
|
||||||
|
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
|
||||||
|
let (context, km) = match res {
|
||||||
|
Ok(km) => match self.contexts.remove(&km.id) {
|
||||||
|
None => {
|
||||||
|
// TODO if this a response, ignore it if we don't have outstanding context
|
||||||
|
self.last_payload = km.payload.clone();
|
||||||
|
self.prompting_message = Some(km.clone());
|
||||||
|
(None, km)
|
||||||
|
}
|
||||||
|
Some((context, timeout_handle)) => {
|
||||||
|
timeout_handle.abort();
|
||||||
|
self.last_payload = km.payload.clone();
|
||||||
|
self.prompting_message = match context.prompting_message {
|
||||||
|
None => Some(km.clone()),
|
||||||
|
Some(prompting_message) => Some(prompting_message),
|
||||||
|
};
|
||||||
|
(context.context, km)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => match self.contexts.remove(&e.id) {
|
||||||
|
None => return Err((t::en_wit_send_error(e.error), None)),
|
||||||
|
Some((context, timeout_handle)) => {
|
||||||
|
timeout_handle.abort();
|
||||||
|
self.prompting_message = context.prompting_message;
|
||||||
|
return Err((t::en_wit_send_error(e.error), context.context));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// note: the context in the KernelMessage is not actually the one we want:
|
||||||
|
// (in fact it should be None, possibly always)
|
||||||
|
// we need to get *our* context for this message id
|
||||||
|
Ok((
|
||||||
|
km.source.en_wit().to_owned(),
|
||||||
|
match km.message {
|
||||||
|
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
|
||||||
|
t::Message::Response((response, _context)) => {
|
||||||
|
wit::Message::Response((t::en_wit_response(response), context))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Given the current process state, return the id and target that
|
||||||
|
/// a response it emits should have. This takes into
|
||||||
|
/// account the `rsvp` of the prompting message, if any.
|
||||||
|
async fn make_response_id_target(&self) -> Option<(u64, t::Address)> {
|
||||||
|
let Some(ref prompting_message) = self.prompting_message else {
|
||||||
|
println!("need non-None prompting_message to handle Response");
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
match &prompting_message.rsvp {
|
||||||
|
None => {
|
||||||
|
let _ = self
|
||||||
|
.send_to_terminal
|
||||||
|
.send(t::Printout {
|
||||||
|
verbosity: 1,
|
||||||
|
content: "kernel: prompting_message has no rsvp".into(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
Some(address) => Some((prompting_message.id, address.clone())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// create a specific process, and generate a task that will run it.
|
||||||
|
pub async fn make_process_loop(
|
||||||
|
keypair: Arc<signature::Ed25519KeyPair>,
|
||||||
|
metadata: t::ProcessMetadata,
|
||||||
|
send_to_loop: t::MessageSender,
|
||||||
|
send_to_terminal: t::PrintSender,
|
||||||
|
mut recv_in_process: ProcessMessageReceiver,
|
||||||
|
send_to_process: ProcessMessageSender,
|
||||||
|
wasm_bytes: Vec<u8>,
|
||||||
|
caps_oracle: t::CapMessageSender,
|
||||||
|
engine: Engine,
|
||||||
|
) -> Result<()> {
|
||||||
|
// before process can be instantiated, need to await 'run' message from kernel
|
||||||
|
let mut pre_boot_queue = Vec::<Result<t::KernelMessage, t::WrappedSendError>>::new();
|
||||||
|
while let Some(message) = recv_in_process.recv().await {
|
||||||
|
match message {
|
||||||
|
Err(_) => {
|
||||||
|
pre_boot_queue.push(message);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(message) => {
|
||||||
|
if (message.source
|
||||||
|
== t::Address {
|
||||||
|
node: metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.clone(),
|
||||||
|
})
|
||||||
|
&& (message.message
|
||||||
|
== t::Message::Request(t::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: None,
|
||||||
|
ipc: b"run".to_vec(),
|
||||||
|
metadata: None,
|
||||||
|
}))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pre_boot_queue.push(Ok(message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// now that we've received the run message, we can send the pre-boot queue
|
||||||
|
for message in pre_boot_queue {
|
||||||
|
send_to_process
|
||||||
|
.send(message)
|
||||||
|
.await
|
||||||
|
.expect("make_process_loop: couldn't send message to process");
|
||||||
|
}
|
||||||
|
|
||||||
|
let component =
|
||||||
|
Component::new(&engine, wasm_bytes).expect("make_process_loop: couldn't read file");
|
||||||
|
|
||||||
|
let mut linker = Linker::new(&engine);
|
||||||
|
Process::add_to_linker(&mut linker, |state: &mut ProcessWasi| state).unwrap();
|
||||||
|
|
||||||
|
let table = Table::new();
|
||||||
|
let wasi = WasiCtxBuilder::new().build();
|
||||||
|
|
||||||
|
wasmtime_wasi::preview2::command::add_to_linker(&mut linker).unwrap();
|
||||||
|
|
||||||
|
let mut store = Store::new(
|
||||||
|
&engine,
|
||||||
|
ProcessWasi {
|
||||||
|
process: ProcessState {
|
||||||
|
keypair: keypair.clone(),
|
||||||
|
metadata: metadata.clone(),
|
||||||
|
recv_in_process,
|
||||||
|
self_sender: send_to_process,
|
||||||
|
send_to_loop: send_to_loop.clone(),
|
||||||
|
send_to_terminal: send_to_terminal.clone(),
|
||||||
|
prompting_message: None,
|
||||||
|
last_payload: None,
|
||||||
|
contexts: HashMap::new(),
|
||||||
|
message_queue: VecDeque::new(),
|
||||||
|
caps_oracle: caps_oracle.clone(),
|
||||||
|
next_message_caps: None,
|
||||||
|
},
|
||||||
|
table,
|
||||||
|
wasi,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let (bindings, _bindings) =
|
||||||
|
match Process::instantiate_async(&mut store, &component, &linker).await {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(e) => {
|
||||||
|
let _ = send_to_terminal
|
||||||
|
.send(t::Printout {
|
||||||
|
verbosity: 0,
|
||||||
|
content: format!(
|
||||||
|
"mk: process {:?} failed to instantiate: {:?}",
|
||||||
|
metadata.our.process, e,
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// the process will run until it returns from init()
|
||||||
|
let is_error = match bindings
|
||||||
|
.call_init(&mut store, &metadata.our.to_string())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => {
|
||||||
|
let _ = send_to_terminal
|
||||||
|
.send(t::Printout {
|
||||||
|
verbosity: 1,
|
||||||
|
content: format!("process {} returned without error", metadata.our.process,),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = send_to_terminal
|
||||||
|
.send(t::Printout {
|
||||||
|
verbosity: 0,
|
||||||
|
content: format!("process {:?} ended with error:", metadata.our.process,),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
for line in format!("{:?}", e).lines() {
|
||||||
|
let _ = send_to_terminal
|
||||||
|
.send(t::Printout {
|
||||||
|
verbosity: 0,
|
||||||
|
content: line.into(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// the process has completed, perform cleanup
|
||||||
|
let our_kernel = t::Address {
|
||||||
|
node: metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_error {
|
||||||
|
// get caps before killing
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = caps_oracle
|
||||||
|
.send(t::CapMessage::GetAll {
|
||||||
|
on: metadata.our.process.clone(),
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
let initial_capabilities = rx.await.unwrap().into_iter().collect();
|
||||||
|
|
||||||
|
// always send message to tell main kernel loop to remove handler
|
||||||
|
send_to_loop
|
||||||
|
.send(t::KernelMessage {
|
||||||
|
id: rand::random(),
|
||||||
|
source: our_kernel.clone(),
|
||||||
|
target: our_kernel.clone(),
|
||||||
|
rsvp: None,
|
||||||
|
message: t::Message::Request(t::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: None,
|
||||||
|
ipc: serde_json::to_vec(&t::KernelCommand::KillProcess(
|
||||||
|
metadata.our.process.clone(),
|
||||||
|
))
|
||||||
|
.unwrap(),
|
||||||
|
metadata: None,
|
||||||
|
}),
|
||||||
|
payload: None,
|
||||||
|
signed_capabilities: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("event loop: fatal: sender died");
|
||||||
|
|
||||||
|
// fulfill the designated OnPanic behavior
|
||||||
|
match metadata.on_panic {
|
||||||
|
t::OnPanic::None => {}
|
||||||
|
// if restart, tell ourselves to init the app again, with same capabilities
|
||||||
|
t::OnPanic::Restart => {
|
||||||
|
send_to_loop
|
||||||
|
.send(t::KernelMessage {
|
||||||
|
id: rand::random(),
|
||||||
|
source: our_kernel.clone(),
|
||||||
|
target: our_kernel.clone(),
|
||||||
|
rsvp: None,
|
||||||
|
message: t::Message::Request(t::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: None,
|
||||||
|
ipc: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
|
||||||
|
id: metadata.our.process.clone(),
|
||||||
|
wasm_bytes_handle: metadata.wasm_bytes_handle,
|
||||||
|
on_panic: metadata.on_panic,
|
||||||
|
initial_capabilities,
|
||||||
|
public: metadata.public,
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
metadata: None,
|
||||||
|
}),
|
||||||
|
payload: None,
|
||||||
|
signed_capabilities: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("event loop: fatal: sender died");
|
||||||
|
}
|
||||||
|
// if requests, fire them
|
||||||
|
// even in death, a process can only message processes it has capabilities for
|
||||||
|
t::OnPanic::Requests(requests) => {
|
||||||
|
for (address, mut request, payload) in requests {
|
||||||
|
request.expects_response = None;
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = caps_oracle
|
||||||
|
.send(t::CapMessage::Has {
|
||||||
|
on: metadata.our.process.clone(),
|
||||||
|
cap: t::Capability {
|
||||||
|
issuer: address.clone(),
|
||||||
|
params: "\"messaging\"".into(),
|
||||||
|
},
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
if let Ok(true) = rx.await {
|
||||||
|
send_to_loop
|
||||||
|
.send(t::KernelMessage {
|
||||||
|
id: rand::random(),
|
||||||
|
source: metadata.our.clone(),
|
||||||
|
target: address,
|
||||||
|
rsvp: None,
|
||||||
|
message: t::Message::Request(request),
|
||||||
|
payload,
|
||||||
|
signed_capabilities: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("event loop: fatal: sender died");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
652
src/kernel/standard_host.rs
Normal file
652
src/kernel/standard_host.rs
Normal file
@ -0,0 +1,652 @@
|
|||||||
|
use crate::kernel::process;
|
||||||
|
use crate::kernel::process::uqbar::process::standard as wit;
|
||||||
|
use crate::types as t;
|
||||||
|
use crate::FILESYSTEM_PROCESS_ID;
|
||||||
|
use crate::KERNEL_PROCESS_ID;
|
||||||
|
use crate::VFS_PROCESS_ID;
|
||||||
|
use anyhow::Result;
|
||||||
|
use ring::signature::{self, KeyPair};
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use crate::kernel::process::StandardHost;
|
||||||
|
|
||||||
|
///
|
||||||
|
/// create the process API. this is where the functions that a process can use live.
|
||||||
|
///
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl StandardHost for process::ProcessWasi {
|
||||||
|
//
|
||||||
|
// system utils:
|
||||||
|
//
|
||||||
|
async fn print_to_terminal(&mut self, verbosity: u8, content: String) -> Result<()> {
|
||||||
|
match self
|
||||||
|
.process
|
||||||
|
.send_to_terminal
|
||||||
|
.send(t::Printout { verbosity, content })
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(e) => Err(anyhow::anyhow!("fatal: couldn't send to terminal: {:?}", e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_eth_block(&mut self) -> Result<u64> {
|
||||||
|
// TODO connect to eth RPC
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// process management:
|
||||||
|
//
|
||||||
|
|
||||||
|
/// TODO critical: move to kernel logic to enable persistence of choice made here
|
||||||
|
async fn set_on_panic(&mut self, on_panic: wit::OnPanic) -> Result<()> {
|
||||||
|
self.process.metadata.on_panic = t::de_wit_on_panic(on_panic);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// create a message from the *kernel* to the filesystem,
|
||||||
|
/// asking it to fetch the current state saved under this process
|
||||||
|
async fn get_state(&mut self) -> Result<Option<Vec<u8>>> {
|
||||||
|
let old_last_payload = self.process.last_payload.clone();
|
||||||
|
let res = match process::send_and_await_response(
|
||||||
|
self,
|
||||||
|
Some(t::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.clone(),
|
||||||
|
}),
|
||||||
|
wit::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: FILESYSTEM_PROCESS_ID.en_wit(),
|
||||||
|
},
|
||||||
|
wit::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: Some(5),
|
||||||
|
ipc: serde_json::to_vec(&t::FsAction::GetState(
|
||||||
|
self.process.metadata.our.process.clone(),
|
||||||
|
))
|
||||||
|
.unwrap(),
|
||||||
|
metadata: Some(self.process.metadata.our.process.to_string()),
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(_resp)) => {
|
||||||
|
// basically assuming filesystem responding properly here
|
||||||
|
match &self.process.last_payload {
|
||||||
|
None => Ok(None),
|
||||||
|
Some(payload) => Ok(Some(payload.bytes.clone())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => Ok(None),
|
||||||
|
};
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// create a message from the *kernel* to the filesystem,
|
||||||
|
/// asking it to replace the current state saved under
|
||||||
|
/// this process with these bytes
|
||||||
|
async fn set_state(&mut self, bytes: Vec<u8>) -> Result<()> {
|
||||||
|
let old_last_payload = self.process.last_payload.clone();
|
||||||
|
let res = match process::send_and_await_response(
|
||||||
|
self,
|
||||||
|
Some(t::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.clone(),
|
||||||
|
}),
|
||||||
|
wit::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: FILESYSTEM_PROCESS_ID.en_wit(),
|
||||||
|
},
|
||||||
|
wit::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: Some(5),
|
||||||
|
ipc: serde_json::to_vec(&t::FsAction::SetState(
|
||||||
|
self.process.metadata.our.process.clone(),
|
||||||
|
))
|
||||||
|
.unwrap(),
|
||||||
|
metadata: Some(self.process.metadata.our.process.to_string()),
|
||||||
|
},
|
||||||
|
Some(wit::Payload { mime: None, bytes }),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(_resp)) => {
|
||||||
|
// basically assuming filesystem responding properly here
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
_ => Err(anyhow::anyhow!(
|
||||||
|
"filesystem did not respond properly to SetState!!"
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// create a message from the *kernel* to the filesystem,
|
||||||
|
/// asking it to delete the current state saved under this process
|
||||||
|
async fn clear_state(&mut self) -> Result<()> {
|
||||||
|
let old_last_payload = self.process.last_payload.clone();
|
||||||
|
let res = match process::send_and_await_response(
|
||||||
|
self,
|
||||||
|
Some(t::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.clone(),
|
||||||
|
}),
|
||||||
|
wit::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: FILESYSTEM_PROCESS_ID.en_wit(),
|
||||||
|
},
|
||||||
|
wit::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: Some(5),
|
||||||
|
ipc: serde_json::to_vec(&t::FsAction::DeleteState(
|
||||||
|
self.process.metadata.our.process.clone(),
|
||||||
|
))
|
||||||
|
.unwrap(),
|
||||||
|
metadata: None,
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(_resp)) => {
|
||||||
|
// basically assuming filesystem responding properly here
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
_ => Err(anyhow::anyhow!(
|
||||||
|
"filesystem did not respond properly to ClearState!!"
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// shortcut to spawn a new process. the child process will automatically
|
||||||
|
/// be able to send messages to the parent process, and vice versa.
|
||||||
|
/// the .wasm file for the process must already be in VFS.
|
||||||
|
async fn spawn(
|
||||||
|
&mut self,
|
||||||
|
name: Option<String>,
|
||||||
|
wasm_path: String, // must be located within package's drive
|
||||||
|
on_panic: wit::OnPanic,
|
||||||
|
capabilities: wit::Capabilities,
|
||||||
|
public: bool,
|
||||||
|
) -> Result<Result<wit::ProcessId, wit::SpawnError>> {
|
||||||
|
// save existing payload to restore later
|
||||||
|
let old_last_payload = self.process.last_payload.clone();
|
||||||
|
let vfs_address = wit::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: VFS_PROCESS_ID.en_wit(),
|
||||||
|
};
|
||||||
|
let our_drive_name = [
|
||||||
|
self.process.metadata.our.process.package(),
|
||||||
|
self.process.metadata.our.process.publisher(),
|
||||||
|
]
|
||||||
|
.join(":");
|
||||||
|
let Ok(Ok((_, hash_response))) = process::send_and_await_response(
|
||||||
|
self,
|
||||||
|
None,
|
||||||
|
vfs_address.clone(),
|
||||||
|
wit::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: Some(5),
|
||||||
|
ipc: serde_json::to_vec(&t::VfsRequest {
|
||||||
|
drive: our_drive_name.clone(),
|
||||||
|
action: t::VfsAction::GetHash(wasm_path.clone()),
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
metadata: None,
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
println!("spawn: GetHash fail");
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return Ok(Err(wit::SpawnError::NoFileAtPath));
|
||||||
|
};
|
||||||
|
let wit::Message::Response((wit::Response { ipc, .. }, _)) = hash_response else {
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return Ok(Err(wit::SpawnError::NoFileAtPath));
|
||||||
|
};
|
||||||
|
let t::VfsResponse::GetHash(Some(hash)) = serde_json::from_slice(&ipc).unwrap() else {
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return Ok(Err(wit::SpawnError::NoFileAtPath));
|
||||||
|
};
|
||||||
|
let Ok(Ok(_)) = process::send_and_await_response(
|
||||||
|
self,
|
||||||
|
None,
|
||||||
|
vfs_address,
|
||||||
|
wit::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: Some(5),
|
||||||
|
ipc: serde_json::to_vec(&t::VfsRequest {
|
||||||
|
drive: our_drive_name,
|
||||||
|
action: t::VfsAction::GetEntry(wasm_path.clone()),
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
metadata: None,
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return Ok(Err(wit::SpawnError::NoFileAtPath));
|
||||||
|
};
|
||||||
|
let Some(t::Payload { mime: _, ref bytes }) = self.process.last_payload else {
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return Ok(Err(wit::SpawnError::NoFileAtPath));
|
||||||
|
};
|
||||||
|
|
||||||
|
let name = match name {
|
||||||
|
Some(name) => name,
|
||||||
|
None => rand::random::<u64>().to_string(),
|
||||||
|
};
|
||||||
|
let new_process_id = t::ProcessId::new(
|
||||||
|
Some(&name),
|
||||||
|
self.process.metadata.our.process.package(),
|
||||||
|
self.process.metadata.our.process.publisher(),
|
||||||
|
);
|
||||||
|
let Ok(Ok((_, _response))) = process::send_and_await_response(
|
||||||
|
self,
|
||||||
|
Some(t::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.clone(),
|
||||||
|
}),
|
||||||
|
wit::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.en_wit(),
|
||||||
|
},
|
||||||
|
wit::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: Some(5), // TODO evaluate
|
||||||
|
ipc: serde_json::to_vec(&t::KernelCommand::InitializeProcess {
|
||||||
|
id: new_process_id.clone(),
|
||||||
|
wasm_bytes_handle: hash,
|
||||||
|
on_panic: t::de_wit_on_panic(on_panic),
|
||||||
|
initial_capabilities: match capabilities {
|
||||||
|
wit::Capabilities::None => HashSet::new(),
|
||||||
|
wit::Capabilities::All => {
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = self
|
||||||
|
.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::GetAll {
|
||||||
|
on: self.process.metadata.our.process.clone(),
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
rx.await.unwrap()
|
||||||
|
}
|
||||||
|
wit::Capabilities::Some(caps) => caps
|
||||||
|
.into_iter()
|
||||||
|
.map(|cap| t::SignedCapability {
|
||||||
|
issuer: t::Address::de_wit(cap.issuer),
|
||||||
|
params: cap.params,
|
||||||
|
signature: cap.signature,
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
},
|
||||||
|
public,
|
||||||
|
})
|
||||||
|
.unwrap(),
|
||||||
|
metadata: None,
|
||||||
|
},
|
||||||
|
Some(wit::Payload {
|
||||||
|
mime: None,
|
||||||
|
bytes: bytes.to_vec(),
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return Ok(Err(wit::SpawnError::NameTaken));
|
||||||
|
};
|
||||||
|
// finally, send the command to run the new process
|
||||||
|
let Ok(Ok((_, response))) = process::send_and_await_response(
|
||||||
|
self,
|
||||||
|
Some(t::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.clone(),
|
||||||
|
}),
|
||||||
|
wit::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: KERNEL_PROCESS_ID.en_wit(),
|
||||||
|
},
|
||||||
|
wit::Request {
|
||||||
|
inherit: false,
|
||||||
|
expects_response: Some(5), // TODO evaluate
|
||||||
|
ipc: serde_json::to_vec(&t::KernelCommand::RunProcess(new_process_id.clone()))
|
||||||
|
.unwrap(),
|
||||||
|
metadata: None,
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
else {
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
return Ok(Err(wit::SpawnError::NameTaken));
|
||||||
|
};
|
||||||
|
// reset payload to what it was
|
||||||
|
self.process.last_payload = old_last_payload;
|
||||||
|
let wit::Message::Response((wit::Response { ipc, .. }, _)) = response else {
|
||||||
|
return Ok(Err(wit::SpawnError::NoFileAtPath));
|
||||||
|
};
|
||||||
|
let t::KernelResponse::StartedProcess = serde_json::from_slice(&ipc).unwrap() else {
|
||||||
|
return Ok(Err(wit::SpawnError::NoFileAtPath));
|
||||||
|
};
|
||||||
|
// child processes are always able to Message parent
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
self.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::Add {
|
||||||
|
on: new_process_id.clone(),
|
||||||
|
cap: t::Capability {
|
||||||
|
issuer: self.process.metadata.our.clone(),
|
||||||
|
params: "\"messaging\"".into(),
|
||||||
|
},
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let _ = rx.await.unwrap();
|
||||||
|
|
||||||
|
// parent process is always able to Message child
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
self.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::Add {
|
||||||
|
on: self.process.metadata.our.process.clone(),
|
||||||
|
cap: t::Capability {
|
||||||
|
issuer: t::Address {
|
||||||
|
node: self.process.metadata.our.node.clone(),
|
||||||
|
process: new_process_id.clone(),
|
||||||
|
},
|
||||||
|
params: "\"messaging\"".into(),
|
||||||
|
},
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let _ = rx.await.unwrap();
|
||||||
|
Ok(Ok(new_process_id.en_wit().to_owned()))
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// capabilities management
|
||||||
|
//
|
||||||
|
async fn get_capabilities(&mut self) -> Result<Vec<wit::SignedCapability>> {
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = self
|
||||||
|
.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::GetAll {
|
||||||
|
on: self.process.metadata.our.process.clone(),
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
Ok(rx
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_iter()
|
||||||
|
.map(|cap| wit::SignedCapability {
|
||||||
|
issuer: cap.issuer.en_wit(),
|
||||||
|
params: cap.params,
|
||||||
|
signature: cap.signature,
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_capability(
|
||||||
|
&mut self,
|
||||||
|
issuer: wit::Address,
|
||||||
|
params: String,
|
||||||
|
) -> Result<Option<wit::SignedCapability>> {
|
||||||
|
let cap = t::Capability {
|
||||||
|
issuer: t::Address::de_wit(issuer),
|
||||||
|
params,
|
||||||
|
};
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = self
|
||||||
|
.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::Has {
|
||||||
|
on: self.process.metadata.our.process.clone(),
|
||||||
|
cap: cap.clone(),
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
if rx.await.unwrap() {
|
||||||
|
let sig = self
|
||||||
|
.process
|
||||||
|
.keypair
|
||||||
|
.sign(&rmp_serde::to_vec(&cap).unwrap_or_default());
|
||||||
|
return Ok(Some(wit::SignedCapability {
|
||||||
|
issuer: cap.issuer.en_wit().to_owned(),
|
||||||
|
params: cap.params.clone(),
|
||||||
|
signature: sig.as_ref().to_vec(),
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn attach_capability(&mut self, capability: wit::SignedCapability) -> Result<()> {
|
||||||
|
match self.process.next_message_caps {
|
||||||
|
None => {
|
||||||
|
self.process.next_message_caps =
|
||||||
|
Some(vec![t::de_wit_signed_capability(capability)]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Some(ref mut v) => {
|
||||||
|
v.push(t::de_wit_signed_capability(capability));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_capabilities(&mut self, capabilities: Vec<wit::SignedCapability>) -> Result<()> {
|
||||||
|
let pk = signature::UnparsedPublicKey::new(
|
||||||
|
&signature::ED25519,
|
||||||
|
self.process.keypair.public_key(),
|
||||||
|
);
|
||||||
|
for signed_cap in capabilities {
|
||||||
|
// validate our signature!
|
||||||
|
let cap = t::Capability {
|
||||||
|
issuer: t::Address::de_wit(signed_cap.issuer),
|
||||||
|
params: signed_cap.params,
|
||||||
|
};
|
||||||
|
pk.verify(
|
||||||
|
&rmp_serde::to_vec(&cap).unwrap_or_default(),
|
||||||
|
&signed_cap.signature,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = self
|
||||||
|
.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::Add {
|
||||||
|
on: self.process.metadata.our.process.clone(),
|
||||||
|
cap: cap.clone(),
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let _ = rx.await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn has_capability(&mut self, params: String) -> Result<bool> {
|
||||||
|
if self.process.prompting_message.is_none() {
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"kernel: has_capability() called with no prompting_message"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let prompt = self.process.prompting_message.as_ref().unwrap();
|
||||||
|
if prompt.source.node == self.process.metadata.our.node {
|
||||||
|
// if local, need to ask them
|
||||||
|
let cap = t::Capability {
|
||||||
|
issuer: self.process.metadata.our.clone(),
|
||||||
|
params,
|
||||||
|
};
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = self
|
||||||
|
.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::Has {
|
||||||
|
on: prompt.source.process.clone(),
|
||||||
|
cap,
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
Ok(rx.await.unwrap_or(false))
|
||||||
|
} else {
|
||||||
|
// if remote, just check prompting_message
|
||||||
|
if prompt.signed_capabilities.is_none() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
for cap in prompt.signed_capabilities.as_ref().unwrap() {
|
||||||
|
if cap.issuer == self.process.metadata.our && cap.params == params {
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// generate a new cap with this process as the issuer and send to caps oracle
|
||||||
|
async fn create_capability(&mut self, to: wit::ProcessId, params: String) -> Result<()> {
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = self
|
||||||
|
.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::Add {
|
||||||
|
on: t::ProcessId::de_wit(to),
|
||||||
|
cap: t::Capability {
|
||||||
|
issuer: self.process.metadata.our.clone(),
|
||||||
|
params,
|
||||||
|
},
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let _ = rx.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn share_capability(
|
||||||
|
&mut self,
|
||||||
|
to: wit::ProcessId,
|
||||||
|
signed_cap: wit::SignedCapability,
|
||||||
|
) -> Result<()> {
|
||||||
|
let pk = signature::UnparsedPublicKey::new(
|
||||||
|
&signature::ED25519,
|
||||||
|
self.process.keypair.public_key(),
|
||||||
|
);
|
||||||
|
let cap = t::Capability {
|
||||||
|
issuer: t::Address::de_wit(signed_cap.issuer),
|
||||||
|
params: signed_cap.params,
|
||||||
|
};
|
||||||
|
pk.verify(
|
||||||
|
&rmp_serde::to_vec(&cap).unwrap_or_default(),
|
||||||
|
&signed_cap.signature,
|
||||||
|
)?;
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let _ = self
|
||||||
|
.process
|
||||||
|
.caps_oracle
|
||||||
|
.send(t::CapMessage::Add {
|
||||||
|
on: t::ProcessId::de_wit(to),
|
||||||
|
cap,
|
||||||
|
responder: tx,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let _ = rx.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
//
|
||||||
|
// message I/O:
|
||||||
|
//
|
||||||
|
|
||||||
|
/// from a process: receive the next incoming message. will wait async until a message is received.
|
||||||
|
/// the incoming message can be a Request or a Response, or an Error of the Network variety.
|
||||||
|
async fn receive(
|
||||||
|
&mut self,
|
||||||
|
) -> Result<Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)>> {
|
||||||
|
Ok(self.process.get_next_message_for_process().await)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// from a process: grab the payload part of the current prompting message.
|
||||||
|
/// if the prompting message did not have a payload, will return None.
|
||||||
|
/// will also return None if there is no prompting message.
|
||||||
|
async fn get_payload(&mut self) -> Result<Option<wit::Payload>> {
|
||||||
|
Ok(t::en_wit_payload(self.process.last_payload.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_request(
|
||||||
|
&mut self,
|
||||||
|
target: wit::Address,
|
||||||
|
request: wit::Request,
|
||||||
|
context: Option<wit::Context>,
|
||||||
|
payload: Option<wit::Payload>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let id = self
|
||||||
|
.process
|
||||||
|
.handle_request(None, target, request, context, payload)
|
||||||
|
.await;
|
||||||
|
match id {
|
||||||
|
Ok(_id) => Ok(()),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_requests(
|
||||||
|
&mut self,
|
||||||
|
requests: Vec<(
|
||||||
|
wit::Address,
|
||||||
|
wit::Request,
|
||||||
|
Option<wit::Context>,
|
||||||
|
Option<wit::Payload>,
|
||||||
|
)>,
|
||||||
|
) -> Result<()> {
|
||||||
|
for request in requests {
|
||||||
|
let id = self
|
||||||
|
.process
|
||||||
|
.handle_request(None, request.0, request.1, request.2, request.3)
|
||||||
|
.await;
|
||||||
|
match id {
|
||||||
|
Ok(_id) => continue,
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_response(
|
||||||
|
&mut self,
|
||||||
|
response: wit::Response,
|
||||||
|
payload: Option<wit::Payload>,
|
||||||
|
) -> Result<()> {
|
||||||
|
self.process.send_response(response, payload).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_and_await_response(
|
||||||
|
&mut self,
|
||||||
|
target: wit::Address,
|
||||||
|
request: wit::Request,
|
||||||
|
payload: Option<wit::Payload>,
|
||||||
|
) -> Result<Result<(wit::Address, wit::Message), wit::SendError>> {
|
||||||
|
process::send_and_await_response(self, None, target, request, payload).await
|
||||||
|
}
|
||||||
|
}
|
93
src/types.rs
93
src/types.rs
@ -1,4 +1,4 @@
|
|||||||
use crate::kernel::uqbar::process::standard as wit;
|
use crate::kernel::process::wit;
|
||||||
use ring::signature;
|
use ring::signature;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
@ -31,13 +31,32 @@ pub type NodeId = String; // QNS domain name
|
|||||||
/// the process name can be a random number, or a name chosen by the user.
|
/// the process name can be a random number, or a name chosen by the user.
|
||||||
/// the formatting is as follows:
|
/// the formatting is as follows:
|
||||||
/// `[process name]:[package name]:[node ID]`
|
/// `[process name]:[package name]:[node ID]`
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||||
pub struct ProcessId {
|
pub struct ProcessId {
|
||||||
process_name: String,
|
process_name: String,
|
||||||
package_name: String,
|
package_name: String,
|
||||||
publisher_node: NodeId,
|
publisher_node: NodeId,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Serialize for ProcessId {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: serde::ser::Serializer,
|
||||||
|
{
|
||||||
|
format!("{}", self).serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Deserialize<'a> for ProcessId {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<ProcessId, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::de::Deserializer<'a>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
ProcessId::from_str(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// PackageId is like a ProcessId, but for a package. Only contains the name
|
/// PackageId is like a ProcessId, but for a package. Only contains the name
|
||||||
/// of the package and the name of the publisher.
|
/// of the package and the name of the publisher.
|
||||||
#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||||
@ -212,7 +231,7 @@ impl std::error::Error for ProcessIdParseError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||||
pub struct Address {
|
pub struct Address {
|
||||||
pub node: NodeId,
|
pub node: NodeId,
|
||||||
pub process: ProcessId,
|
pub process: ProcessId,
|
||||||
@ -228,7 +247,7 @@ impl Address {
|
|||||||
process: process.into(),
|
process: process.into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn _from_str(input: &str) -> Result<Self, AddressParseError> {
|
pub fn from_str(input: &str) -> Result<Self, AddressParseError> {
|
||||||
// split string on colons into 4 segments,
|
// split string on colons into 4 segments,
|
||||||
// first one with @, next 3 with :
|
// first one with @, next 3 with :
|
||||||
let mut name_rest = input.split('@');
|
let mut name_rest = input.split('@');
|
||||||
@ -282,6 +301,25 @@ impl Address {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Serialize for Address {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: serde::ser::Serializer,
|
||||||
|
{
|
||||||
|
format!("{}", self).serialize(serializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Deserialize<'a> for Address {
|
||||||
|
fn deserialize<D>(deserializer: D) -> Result<Address, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::de::Deserializer<'a>,
|
||||||
|
{
|
||||||
|
let s = String::deserialize(deserializer)?;
|
||||||
|
Address::from_str(&s).map_err(serde::de::Error::custom)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<(&str, &str, &str, &str)> for Address {
|
impl From<(&str, &str, &str, &str)> for Address {
|
||||||
fn from(input: (&str, &str, &str, &str)) -> Self {
|
fn from(input: (&str, &str, &str, &str)) -> Self {
|
||||||
Address::new(input.0, (input.1, input.2, input.3))
|
Address::new(input.0, (input.1, input.2, input.3))
|
||||||
@ -716,25 +754,46 @@ pub enum DebugCommand {
|
|||||||
Step,
|
Step,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// IPC format for requests sent to kernel runtime module
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum KernelCommand {
|
pub enum KernelCommand {
|
||||||
|
/// RUNTIME ONLY: used to notify the kernel that booting is complete and
|
||||||
|
/// all processes have been loaded in from their persisted or bootstrapped state.
|
||||||
Booted,
|
Booted,
|
||||||
StartProcess {
|
/// Tell the kernel to install and prepare a new process for execution.
|
||||||
|
/// The process will not begin execution until the kernel receives a
|
||||||
|
/// `RunProcess` command with the same `id`.
|
||||||
|
///
|
||||||
|
/// The process that sends this command will be given messaging capabilities
|
||||||
|
/// for the new process if `public` is false.
|
||||||
|
InitializeProcess {
|
||||||
id: ProcessId,
|
id: ProcessId,
|
||||||
wasm_bytes_handle: u128,
|
wasm_bytes_handle: u128,
|
||||||
on_panic: OnPanic,
|
on_panic: OnPanic,
|
||||||
initial_capabilities: HashSet<SignedCapability>,
|
initial_capabilities: HashSet<SignedCapability>,
|
||||||
public: bool,
|
public: bool,
|
||||||
},
|
},
|
||||||
KillProcess(ProcessId), // this is extrajudicial killing: we might lose messages!
|
/// Tell the kernel to run a process that has already been installed.
|
||||||
// kernel only
|
/// TODO: in the future, this command could be extended to allow for
|
||||||
RebootProcess {
|
/// resource provision.
|
||||||
process_id: ProcessId,
|
RunProcess(ProcessId),
|
||||||
persisted: PersistedProcess,
|
/// Kill a running process immediately. This may result in the dropping / mishandling of messages!
|
||||||
},
|
KillProcess(ProcessId),
|
||||||
|
/// RUNTIME ONLY: notify the kernel that the runtime is shutting down and it
|
||||||
|
/// should gracefully stop and persist the running processes.
|
||||||
Shutdown,
|
Shutdown,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// IPC format for all KernelCommand responses
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub enum KernelResponse {
|
||||||
|
InitializedProcess,
|
||||||
|
InitializeProcessError,
|
||||||
|
StartedProcess,
|
||||||
|
RunProcessError,
|
||||||
|
KilledProcess(ProcessId),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum CapMessage {
|
pub enum CapMessage {
|
||||||
Add {
|
Add {
|
||||||
@ -756,17 +815,10 @@ pub enum CapMessage {
|
|||||||
},
|
},
|
||||||
GetAll {
|
GetAll {
|
||||||
on: ProcessId,
|
on: ProcessId,
|
||||||
responder: tokio::sync::oneshot::Sender<HashSet<Capability>>,
|
responder: tokio::sync::oneshot::Sender<HashSet<SignedCapability>>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub enum KernelResponse {
|
|
||||||
StartedProcess,
|
|
||||||
StartProcessError,
|
|
||||||
KilledProcess(ProcessId),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type ProcessMap = HashMap<ProcessId, PersistedProcess>;
|
pub type ProcessMap = HashMap<ProcessId, PersistedProcess>;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
@ -806,7 +858,8 @@ pub struct PackageManifestEntry {
|
|||||||
pub process_wasm_path: String,
|
pub process_wasm_path: String,
|
||||||
pub on_panic: OnPanic,
|
pub on_panic: OnPanic,
|
||||||
pub request_networking: bool,
|
pub request_networking: bool,
|
||||||
pub request_messaging: Vec<String>,
|
pub request_messaging: Option<Vec<String>>,
|
||||||
|
pub grant_messaging: Option<Vec<String>>,
|
||||||
pub public: bool,
|
pub public: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user