This commit is contained in:
dr-frmr 2023-10-06 00:12:31 -04:00
parent 617a0e4ea5
commit 5b35a9b29b
No known key found for this signature in database
38 changed files with 661 additions and 692 deletions

3
.gitignore vendored
View File

@ -1,9 +1,12 @@
target/
.vscode
.app-signing
.DS_Store
*.swp
*.swo
*.zip
/home
modules/**/pkg/*.wasm
modules/**/wit
target.wasm
world

1
Cargo.lock generated
View File

@ -4867,6 +4867,7 @@ dependencies = [
"tokio-tungstenite 0.20.0",
"url",
"uuid 1.4.1",
"walkdir",
"warp",
"wasmtime",
"wasmtime-wasi",

View File

@ -5,6 +5,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[build-dependencies]
zip = "0.6"
walkdir = "2.4"
[dependencies]
aes-gcm = "0.10.2"
anyhow = "1.0.71"

View File

@ -23,7 +23,7 @@ mkdir -p "$target_path/target/wasm32-unknown-unknown/release" || { echo "Command
# Build the module using Cargo
cargo +nightly build \
$release_flag \
$debug_flag \
--no-default-features \
--manifest-path="$target_path/Cargo.toml" \
--target "wasm32-wasi" || {

View File

@ -1,5 +1,8 @@
use std::process::Command;
use std::{fs, io};
use std::{
fs, io,
io::{Read, Write},
};
fn run_command(cmd: &mut Command) -> io::Result<()> {
let status = cmd.status()?;
@ -29,6 +32,7 @@ fn main() {
];
for name in WASI_APPS {
println!("cargo:rerun-if-changed=modules/{}/src", name);
println!("cargo:rerun-if-changed=modules/{}/pkg/manifest.json", name);
}
let pwd = std::env::current_dir().unwrap();
@ -47,18 +51,19 @@ fn main() {
// Build wasm32-wasi apps.
for name in WASI_APPS {
// copy in the wit files
// remove old wit, if it existed
run_command(
Command::new("rm").args(&["-rf", &format!("{}/modules/{}/wit", pwd.display(), name)]),
)
.unwrap();
// copy in newly-made wit
run_command(Command::new("cp").args(&[
"-r",
"wit",
&format!("{}/modules/{}", pwd.display(), name),
]))
.unwrap();
// create target/bindings directory
fs::create_dir_all(&format!(
"{}/modules/{}/target/bindings/{}",
pwd.display(),
@ -66,6 +71,7 @@ fn main() {
name
))
.unwrap();
// copy newly-made target.wasm into target/bindings
run_command(Command::new("cp").args(&[
"target.wasm",
&format!(
@ -76,6 +82,7 @@ fn main() {
),
]))
.unwrap();
// copy newly-made world into target/bindings
run_command(Command::new("cp").args(&[
"world",
&format!(
@ -86,15 +93,7 @@ fn main() {
),
]))
.unwrap();
fs::create_dir_all(&format!(
"{}/modules/{}/target/wasm32-unknown-unknown/release",
pwd.display(),
name
))
.unwrap();
// build the module
// build the module targeting wasm32-wasi
run_command(Command::new("cargo").args(&[
"build",
"--release",
@ -108,8 +107,7 @@ fn main() {
"wasm32-wasi",
]))
.unwrap();
// adapt module to component with adaptor
// adapt module to component with adapter based on wasi_snapshot_preview1.wasm
run_command(Command::new("wasm-tools").args(&[
"component",
"new",
@ -130,8 +128,9 @@ fn main() {
&format!("{}/wasi_snapshot_preview1.wasm", pwd.display()),
]))
.unwrap();
// put wit into component & place where boot sequence expects to find it
// put wit into component & place final .wasm in /pkg
let pkg_folder = format!("{}/modules/{}/pkg/", pwd.display(), name);
let _ = run_command(Command::new("mkdir").args(&["-p", &pkg_folder]));
run_command(Command::new("wasm-tools").args(&[
"component",
"embed",
@ -145,13 +144,36 @@ fn main() {
name
),
"-o",
&format!(
"{}/modules/{}/target/wasm32-unknown-unknown/release/{}.wasm",
pwd.display(),
name,
name
),
&format!("{}/{}.wasm", pkg_folder, name),
]))
.unwrap();
// from the pkg folder, create a zip archive and save in target directory
let writer = std::fs::File::create(format!("{}/target/{}.zip", pwd.display(), name)).unwrap();
let options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Stored) // or CompressionMethod::Deflated
.unix_permissions(0o755);
let mut zip = zip::ZipWriter::new(writer);
for entry in walkdir::WalkDir::new(&pkg_folder) {
let entry = entry.unwrap();
let path = entry.path();
let name = path
.strip_prefix(std::path::Path::new(&pkg_folder))
.unwrap();
// Write a directory or file to the ZIP archive
if path.is_file() {
zip.start_file(name.to_string_lossy().into_owned(), options)
.unwrap();
let mut file = std::fs::File::open(path).unwrap();
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).unwrap();
zip.write_all(&buffer).unwrap();
} else if name.as_os_str().len() != 0 {
zip.add_directory(name.to_string_lossy().into_owned(), options)
.unwrap();
}
}
zip.finish().unwrap();
}
}

View File

@ -1,67 +0,0 @@
#!/bin/bash
all=false
debug="--release"
# parse arguments (--all, --release)
for arg in "$@"; do
case "$arg" in
--all)
all=true
;;
--debug)
debug="--release"
;;
*)
echo "Error: Unrecognized argument: $arg"
exit 1
;;
esac
done
pwd=$(pwd)
# create target.wasm (compiled .wit) & world
wasm-tools component wit "${pwd}/wit/" -o target.wasm --wasm || {
echo "Command failed"
exit 1
}
# Run the second command and exit if it fails
touch "${pwd}/world" || {
echo "Command failed"
exit 1
}
# Build logic for an app
build_app() {
dir="$1"
release="$2"
# Check if it contains a Cargo.toml
if [ -f "$dir/Cargo.toml" ]; then
./build-app.sh "$dir" $release
elif [ -d "$dir" ]; then
# It's a directory. Check its subdirectories
for sub_dir in "$dir"/*; do
if [ -f "$sub_dir/Cargo.toml" ]; then
./build-app.sh "$sub_dir" $release
fi
done
fi
}
# if --all compile all apps
if $all; then
modules_dir="./modules"
for dir in "$modules_dir"/*; do
if [ "key_value" = "$dir" ]; then
continue
fi
build_app "$dir" "$release"
done
else
DIRS=($(git -C . status --porcelain | grep 'modules/' | sed -n 's|^.*modules/\([^/]*\)/.*$|\1|p' | sort -u))
for dir in "${DIRS[@]}"; do
build_app "./modules/$dir" "$release"
done
fi

View File

@ -0,0 +1,21 @@
[
{
"process_name": "app_tracker",
"process_wasm_path": "app_tracker.wasm",
"on_panic": "Restart",
"request_networking": true,
"request_messaging": [
"http_bindings",
"terminal",
"filesystem",
"http_server",
"http_client",
"encryptor",
"net",
"vfs",
"kernel",
"eth_rpc"
],
"grant_messaging": []
}
]

Binary file not shown.

Binary file not shown.

View File

@ -1,19 +0,0 @@
[
{
"process_id": "chess",
"process_wasm": "chess.wasm",
"on_panic": {
"on_panic": "Restart"
},
"networking": true,
"request_messaging": [
"http_bindings",
"encryptor"
],
"grant_messaging": [
"terminal",
"http_bindings",
"encryptor"
]
}
]

View File

@ -0,0 +1,13 @@
[
{
"process_name": "chess",
"process_wasm_path": "chess.wasm",
"on_panic": "Restart",
"request_networking": true,
"request_messaging": [
"http_bindings",
"encryptor"
],
"grant_messaging": []
}
]

View File

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use super::bindings::component::uq_process::types::*;
use super::bindings::{get_payload, send_request, Address, Payload};
use super::bindings::{Address, get_payload, Payload, SendError, send_request};
impl PartialEq for ProcessId {
fn eq(&self, other: &Self) -> bool {
@ -50,61 +50,24 @@ pub fn send_and_await_response(
}
pub fn get_state(our: String) -> Option<Payload> {
let _ = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::GetState).unwrap()),
None,
None,
5, // TODO evaluate timeout
);
get_payload()
match super::bindings::get_state() {
Some(bytes) => Some(Payload {
mime: None,
bytes,
}),
None => None,
}
}
pub fn set_state(our: String, bytes: Vec<u8>) {
send_request(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5), // TODO evaluate timeout
ipc: Some(serde_json::to_string(&FsAction::SetState).unwrap()),
metadata: None,
},
None,
Some(&Payload { mime: None, bytes }),
);
super::bindings::set_state(&bytes);
}
pub fn await_set_state<T>(our: String, state: &T)
where
T: serde::Serialize,
{
// Request/Response stays local -> no SendError
let (_, response) = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::SetState).unwrap()),
None,
Some(&Payload {
mime: None,
bytes: bincode::serialize(state).unwrap(),
}),
5, // TODO evaluate timeout
)
.unwrap();
match response {
Message::Request(_) => panic!("got request from filesystem"),
Message::Response((response, _context)) => return,
}
super::bindings::set_state(&bincode::serialize(state).unwrap());
}
pub fn parse_message_ipc<T>(json_string: Option<String>) -> anyhow::Result<T>

View File

@ -0,0 +1,14 @@
[
{
"process_name": "homepage",
"process_wasm_path": "homepage.wasm",
"on_panic": "Restart",
"request_networking": false,
"request_messaging": [
"http_bindings",
"http_server",
"encryptor"
],
"grant_messaging": []
}
]

View File

@ -0,0 +1,17 @@
[
{
"process_name": "http_bindings",
"process_wasm_path": "http_bindings.wasm",
"on_panic": "Restart",
"request_networking": false,
"request_messaging": [
"http_server",
"http_client",
"encryptor",
"vfs"
],
"grant_messaging": [
"http_sever"
]
}
]

View File

@ -0,0 +1,13 @@
[
{
"process_name": "http_proxy",
"process_wasm_path": "http_proxy.wasm",
"on_panic": "Restart",
"request_networking": false,
"request_messaging": [
"http_bindings",
"encryptor"
],
"grant_messaging": []
}
]

View File

@ -86,6 +86,7 @@ fn handle_message (
"/key_value_worker.wasm",
&OnPanic::None, // TODO: notify us
&Capabilities::Some(vec![vfs_read, vfs_write]),
false, // not public
) else {
panic!("couldn't spawn"); // TODO
};

View File

@ -0,0 +1,12 @@
[
{
"process_name": "orgs",
"process_wasm_path": "orgs.wasm",
"on_panic": "Restart",
"request_networking": true,
"request_messaging": [
"http_bindings"
],
"grant_messaging": []
}
]

View File

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use super::bindings::component::uq_process::types::*;
use super::bindings::{get_payload, send_request, Address, Payload};
use super::bindings::{Address, get_payload, Payload, SendError, send_request};
impl PartialEq for ProcessId {
fn eq(&self, other: &Self) -> bool {
@ -50,61 +50,24 @@ pub fn send_and_await_response(
}
pub fn get_state(our: String) -> Option<Payload> {
let _ = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::GetState).unwrap()),
None,
None,
5, // TODO evaluate timeout
);
get_payload()
match super::bindings::get_state() {
Some(bytes) => Some(Payload {
mime: None,
bytes,
}),
None => None,
}
}
pub fn set_state(our: String, bytes: Vec<u8>) {
send_request(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5), // TODO evaluate timeout
ipc: Some(serde_json::to_string(&FsAction::SetState).unwrap()),
metadata: None,
},
None,
Some(&Payload { mime: None, bytes }),
);
super::bindings::set_state(&bytes);
}
pub fn await_set_state<T>(our: String, state: &T)
where
T: serde::Serialize,
{
// Request/Response stays local -> no SendError
let (_, response) = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::SetState).unwrap()),
None,
Some(&Payload {
mime: None,
bytes: bincode::serialize(state).unwrap(),
}),
5, // TODO evaluate timeout
)
.unwrap();
match response {
Message::Request(_) => panic!("got request from filesystem"),
Message::Response((response, _context)) => return,
}
super::bindings::set_state(&bincode::serialize(state).unwrap());
}
pub fn parse_message_ipc<T>(json_string: Option<String>) -> anyhow::Result<T>

View File

@ -0,0 +1,17 @@
[
{
"process_name": "qns_indexer",
"process_wasm_path": "qns_indexer.wasm",
"on_panic": "Restart",
"request_networking": true,
"request_messaging": [
"net",
"http_bindings",
"eth_rpc"
],
"grant_messaging": [
"eth_rpc",
"filesystem"
]
}
]

View File

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use super::bindings::component::uq_process::types::*;
use super::bindings::{get_payload, send_request, Address, Payload};
use super::bindings::{Address, get_payload, Payload, SendError, send_request};
impl PartialEq for ProcessId {
fn eq(&self, other: &Self) -> bool {
@ -50,61 +50,24 @@ pub fn send_and_await_response(
}
pub fn get_state(our: String) -> Option<Payload> {
let _ = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::GetState).unwrap()),
None,
None,
5, // TODO evaluate timeout
);
get_payload()
match super::bindings::get_state() {
Some(bytes) => Some(Payload {
mime: None,
bytes,
}),
None => None,
}
}
pub fn set_state(our: String, bytes: Vec<u8>) {
send_request(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5), // TODO evaluate timeout
ipc: Some(serde_json::to_string(&FsAction::SetState).unwrap()),
metadata: None,
},
None,
Some(&Payload { mime: None, bytes }),
);
super::bindings::set_state(&bytes);
}
pub fn await_set_state<T>(our: String, state: &T)
where
T: serde::Serialize,
{
// Request/Response stays local -> no SendError
let (_, response) = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::SetState).unwrap()),
None,
Some(&Payload {
mime: None,
bytes: bincode::serialize(state).unwrap(),
}),
5, // TODO evaluate timeout
)
.unwrap();
match response {
Message::Request(_) => panic!("got request from filesystem"),
Message::Response((response, _context)) => return,
}
super::bindings::set_state(&bincode::serialize(state).unwrap());
}
pub fn parse_message_ipc<T>(json_string: Option<String>) -> anyhow::Result<T>

View File

@ -0,0 +1,12 @@
[
{
"process_name": "rpc",
"process_wasm_path": "rpc.wasm",
"on_panic": "Restart",
"request_networking": false,
"request_messaging": [
"http_bindings"
],
"grant_messaging": []
}
]

Binary file not shown.

View File

@ -1,14 +0,0 @@
[
{
"process_id": "terminal",
"process_wasm": "terminal.wasm",
"on_panic": {
"on_panic": "Restart"
},
"networking": true,
"request_messaging": [
"net"
],
"grant_messaging": "all"
}
]

View File

@ -1 +0,0 @@
hello

View File

@ -0,0 +1,14 @@
[
{
"process_name": "terminal",
"process_wasm_path": "terminal.wasm",
"on_panic": "Restart",
"request_networking": true,
"request_messaging": [
"net"
],
"grant_messaging": [
"all"
]
}
]

View File

@ -1,7 +1,7 @@
cargo_component_bindings::generate!();
mod process_lib;
struct Component;
use bindings::{component::uq_process::types::*, Guest, print_to_terminal, receive, send_request};
use bindings::{component::uq_process::types::*, print_to_terminal, receive, send_request, Guest};
fn parse_command(our_name: &str, line: String) {
let (head, tail) = line.split_once(" ").unwrap_or((&line, ""));
@ -82,9 +82,7 @@ impl Guest for Component {
print_to_terminal(1, &format!("terminal: start"));
loop {
let (source, message) = match receive() {
Ok((source, message)) => {
(source, message)
}
Ok((source, message)) => (source, message),
Err((error, _context)) => {
print_to_terminal(0, &format!("net error: {:?}!", error.kind));
continue;
@ -96,8 +94,7 @@ impl Guest for Component {
ipc,
..
}) => {
if our.node != source.node
|| our.process != source.process {
if our.node != source.node || our.process != source.process {
continue;
}
let Some(command) = ipc else {
@ -105,7 +102,7 @@ impl Guest for Component {
};
parse_command(&our.node, command);
}
_ => continue
_ => continue,
}
}
}

View File

@ -1,142 +0,0 @@
use serde::{Deserialize, Serialize};
use super::bindings::component::uq_process::types::*;
use super::bindings::{get_payload, send_request, Address, Payload};
impl PartialEq for ProcessId {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ProcessId::Id(i1), ProcessId::Id(i2)) => i1 == i2,
(ProcessId::Name(s1), ProcessId::Name(s2)) => s1 == s2,
_ => false,
}
}
}
impl PartialEq<&str> for ProcessId {
fn eq(&self, other: &&str) -> bool {
match self {
ProcessId::Id(_) => false,
ProcessId::Name(s) => s == other,
}
}
}
impl PartialEq<u64> for ProcessId {
fn eq(&self, other: &u64) -> bool {
match self {
ProcessId::Id(i) => i == other,
ProcessId::Name(_) => false,
}
}
}
pub fn send_and_await_response(
target: &Address,
inherit: bool,
ipc: Option<Json>,
metadata: Option<Json>,
payload: Option<&Payload>,
timeout: u64,
) -> Result<(Address, Message), SendError> {
super::bindings::send_and_await_response(
target,
&Request {
inherit,
expects_response: Some(timeout),
ipc,
metadata,
},
payload,
)
}
pub fn get_state(our: String) -> Option<Payload> {
let _ = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::GetState).unwrap()),
None,
None,
5, // TODO evaluate timeout
);
get_payload()
}
pub fn set_state(our: String, bytes: Vec<u8>) {
send_request(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5), // TODO evaluate timeout
ipc: Some(serde_json::to_string(&FsAction::SetState).unwrap()),
metadata: None,
},
None,
Some(&Payload { mime: None, bytes }),
);
}
pub fn await_set_state<T>(our: String, state: &T)
where
T: serde::Serialize,
{
// Request/Response stays local -> no SendError
let (_, response) = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::SetState).unwrap()),
None,
Some(&Payload {
mime: None,
bytes: bincode::serialize(state).unwrap(),
}),
5, // TODO evaluate timeout
)
.unwrap();
match response {
Message::Request(_) => panic!("got request from filesystem"),
Message::Response((response, _context)) => return,
}
}
pub fn parse_message_ipc<T>(json_string: Option<String>) -> anyhow::Result<T>
where
for<'a> T: serde::Deserialize<'a>,
{
let parsed: T = serde_json::from_str(
json_string
.ok_or(anyhow::anyhow!("json payload empty"))?
.as_str(),
)?;
Ok(parsed)
}
// move these to better place!
#[derive(Serialize, Deserialize, Debug)]
pub enum FsAction {
Write,
Replace(u128),
Append(Option<u128>),
Read(u128),
ReadChunk(ReadChunkRequest),
Delete(u128),
Length(u128),
// process state management
GetState,
SetState,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ReadChunkRequest {
pub file_uuid: u128,
pub start: u64,
pub length: u64,
}

View File

@ -0,0 +1 @@
../../../src/process_lib.rs

View File

@ -3,6 +3,7 @@ use crate::types::*;
/// log structured filesystem
use anyhow::Result;
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::Read;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::oneshot::{Receiver, Sender};
@ -20,9 +21,9 @@ pub async fn load_fs(
// load/create fs directory, manifest + log if none.
let fs_directory_path_str = format!("{}/fs", &home_directory_path);
let new_boot = create_dir_if_dne(&fs_directory_path_str)
.await
.expect("failed creating fs dir!");
if let Err(e) = fs::create_dir_all(&fs_directory_path_str).await {
panic!("failed creating fs dir! {:?}", e);
}
let fs_directory_path: std::path::PathBuf =
fs::canonicalize(fs_directory_path_str).await.unwrap();
@ -67,26 +68,23 @@ pub async fn load_fs(
// get current processes' wasm_bytes handles. GetState(kernel)
match manifest.read(&kernel_process_id, None, None).await {
Err(_) => {
// first time!
// bootstrap filesystem
bootstrap(
&our_name,
&kernel_process_id,
&mut process_map,
&mut manifest,
&vfs_message_sender,
)
.await
.expect("fresh bootstrap failed!");
}
Ok(bytes) => {
process_map = bincode::deserialize(&bytes).expect("state map deserialization error!");
println!("found persisted processes: {:?}\r", process_map.keys());
}
}
if new_boot {
// bootstrap filesystem
let _ = bootstrap(
&our_name,
&kernel_process_id,
&mut process_map,
&mut manifest,
&vfs_message_sender,
)
.await
.expect("fresh bootstrap failed!");
}
Ok((process_map, manifest))
}
@ -105,85 +103,16 @@ async fn bootstrap(
manifest: &mut Manifest,
vfs_message_sender: &MessageSender,
) -> Result<()> {
let packages: Vec<zip::ZipArchive<std::io::Cursor<Vec<u8>>>> = get_zipped_packages().await;
for package in packages {
// for each file in package.zip, recursively through all dirs, send a newfile KM to VFS
let mut stack = Vec::new();
stack.push(package);
while let Some(mut package) = stack.pop() {
for i in 0..package.len() {
let mut file = package.by_index(i).unwrap();
if file.name().ends_with('/') {
let new_package =
zip::ZipArchive::new(std::io::Cursor::new(file.into_inner())).unwrap();
stack.push(new_package);
} else {
let file_path = file.sanitized_name();
let mut file_content = Vec::new();
file.read_to_end(&mut file_content).unwrap();
let km = KernelMessage::NewFile {
path: file_path,
content: file_content,
};
vfs_message_sender.send(km).await.unwrap();
}
}
}
// get and read manifest.json
// for each process-entry in manifest.json:
for entry in process_manifest {
// save in process map
let hash: [u8; 32] = hash_bytes(&wasm_bytes);
if let Some(id) = manifest.get_uuid_by_hash(&hash).await {
let entry =
process_map
.entry(ProcessId::Name(process_name))
.or_insert(PersistedProcess {
wasm_bytes_handle: id,
on_panic: OnPanic::Restart,
capabilities: HashSet::new(),
});
entry.capabilities.extend(special_capabilities.clone());
entry.wasm_bytes_handle = id;
} else {
// FsAction::Write
let file = FileIdentifier::new_uuid();
let _ = manifest.write(&file, &wasm_bytes).await;
let id = file.to_uuid().unwrap();
let entry =
process_map
.entry(ProcessId::Name(process_name))
.or_insert(PersistedProcess {
wasm_bytes_handle: id,
on_panic: OnPanic::Restart,
capabilities: HashSet::new(),
});
entry.capabilities.extend(special_capabilities.clone());
entry.wasm_bytes_handle = id;
}
// spawn the requested capabilities
// spawn the granted capabilities
}
}
const RUNTIME_MODULES: [&str; 8] = [
"filesystem",
"http_server",
"http_client",
"encryptor",
"net",
"vfs",
"kernel",
"eth_rpc",
println!("bootstrapping node...\r");
const RUNTIME_MODULES: [(&str, bool); 8] = [
("filesystem", false),
("http_server", true), // TODO evaluate
("http_client", false),
("encryptor", false),
("net", false),
("vfs", false),
("kernel", false),
("eth_rpc", true), // TODO evaluate
];
let mut runtime_caps: HashSet<Capability> = HashSet::new();
@ -191,7 +120,7 @@ async fn bootstrap(
runtime_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::Name(runtime_module.into()),
process: ProcessId::Name(runtime_module.0.into()),
},
params: "\"messaging\"".into(),
});
@ -207,15 +136,182 @@ async fn bootstrap(
// finally, save runtime modules in state map as well, somewhat fakely
for runtime_module in RUNTIME_MODULES {
let entry = process_map
.entry(ProcessId::Name(runtime_module.into()))
process_map
.entry(ProcessId::Name(runtime_module.0.into()))
.or_insert(PersistedProcess {
wasm_bytes_handle: 0,
on_panic: OnPanic::Restart,
capabilities: runtime_caps.clone(),
public: runtime_module.1,
});
}
let packages: Vec<(String, zip::ZipArchive<std::io::Cursor<Vec<u8>>>)> =
get_zipped_packages().await;
// need to grant all caps at the end, after process_map has been filled in!
let mut caps_to_grant = Vec::<(ProcessId, Capability)>::new();
for (package_name, mut package) in packages {
println!("fs: handling package {package_name}...\r");
// create a new package in VFS
vfs_message_sender
.send(KernelMessage {
id: 0,
source: Address {
node: our_name.to_string(),
process: ProcessId::Name("filesystem".into()),
},
target: Address {
node: our_name.to_string(),
process: ProcessId::Name("vfs".into()),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: Some(
serde_json::to_string::<VfsRequest>(&VfsRequest::New {
identifier: package_name.clone(),
})
.unwrap(),
),
metadata: None,
}),
payload: None,
signed_capabilities: None,
})
.await
.unwrap();
// for each file in package.zip, recursively through all dirs, send a newfile KM to VFS
for i in 0..package.len() {
let mut file = package.by_index(i).unwrap();
if file.is_file() {
let file_path = file
.enclosed_name()
.expect("fs: name error reading package.zip")
.to_owned();
println!("fs: found file {}...\r", file_path.display());
let mut file_content = Vec::new();
file.read_to_end(&mut file_content).unwrap();
vfs_message_sender
.send(KernelMessage {
id: 0,
source: Address {
node: our_name.to_string(),
process: ProcessId::Name("filesystem".into()),
},
target: Address {
node: our_name.to_string(),
process: ProcessId::Name("vfs".into()),
},
rsvp: None,
message: Message::Request(Request {
inherit: false,
expects_response: None,
ipc: Some(
serde_json::to_string::<VfsRequest>(&VfsRequest::Add {
identifier: package_name.clone(),
full_path: file_path.to_string_lossy().to_string(),
entry_type: AddEntryType::NewFile,
})
.unwrap(),
),
metadata: None,
}),
payload: Some(Payload {
mime: None,
bytes: file_content,
}),
signed_capabilities: None,
})
.await
.unwrap();
}
}
// get and read manifest.json
let Ok(mut package_manifest_zip) = package.by_name("manifest.json") else {
println!("fs: missing manifest for package {}, skipping", package_name);
continue;
};
let mut manifest_content = Vec::new();
package_manifest_zip
.read_to_end(&mut manifest_content)
.unwrap();
drop(package_manifest_zip);
let package_manifest = String::from_utf8(manifest_content)?;
let package_manifest = serde_json::from_str::<Vec<PackageManifestEntry>>(&package_manifest)
.expect("fs: manifest parse error");
// for each process-entry in manifest.json:
for mut entry in package_manifest {
let wasm_bytes = &mut Vec::new();
package
.by_name(&format!("{}", entry.process_wasm_path))
.expect("fs: no wasm found in package!")
.read_to_end(wasm_bytes)
.unwrap();
// spawn the requested capabilities
// remember: out of thin air, because this is the root distro
let mut requested_caps = HashSet::new();
entry.request_messaging.push(entry.process_name.clone());
for process_name in entry.request_messaging {
requested_caps.insert(Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::Name(process_name),
},
params: "\"messaging\"".into(),
});
}
let mut public_process = false;
// queue the granted capabilities
for process_name in entry.grant_messaging {
if process_name == "all" {
public_process = true;
continue;
}
caps_to_grant.push((
ProcessId::Name(process_name),
Capability {
issuer: Address {
node: our_name.to_string(),
process: ProcessId::Name(entry.process_name.clone()),
},
params: "\"messaging\"".into(),
},
));
}
// save in process map
let file = FileIdentifier::new_uuid();
manifest.write(&file, &wasm_bytes).await.unwrap();
let wasm_bytes_handle = file.to_uuid().unwrap();
process_map.insert(
ProcessId::Name(entry.process_name),
PersistedProcess {
wasm_bytes_handle,
on_panic: entry.on_panic,
capabilities: requested_caps,
public: public_process,
},
);
}
}
// grant queued capabilities from all packages
for (to, cap) in caps_to_grant {
let Some(proc) = process_map.get_mut(&to) else {
continue
};
proc.capabilities.insert(cap);
}
// save kernel process state. FsAction::SetState(kernel)
let serialized_process_map =
bincode::serialize(&process_map).expect("state map serialization error!");
@ -229,24 +325,26 @@ async fn bootstrap(
Ok(())
}
/// go into /modules folder and get all
async fn get_zipped_packages() -> Vec<zip::ZipArchive<std::io::Cursor<Vec<u8>>>> {
let modules_path = std::path::Path::new("modules");
/// go into /target folder and get all .zip package files
async fn get_zipped_packages() -> Vec<(String, zip::ZipArchive<std::io::Cursor<Vec<u8>>>)> {
println!("fs: reading distro packages...\r");
let target_path = std::path::Path::new("target");
let mut packages = Vec::new();
if let Ok(mut entries) = fs::read_dir(modules_path).await {
if let Ok(mut entries) = fs::read_dir(target_path).await {
while let Ok(Some(entry)) = entries.next_entry().await {
// get a file named package.zip
if let Some(pkg) = entry.file_name().to_str() {
if pkg == "package.zip" {
// read the file
if let Ok(bytes) = fs::read(entry.path()).await {
// extract the zip
if let Ok(zip) = zip::ZipArchive::new(std::io::Cursor::new(bytes)) {
// add to list of packages
packages.push(zip);
}
if entry.file_name().to_string_lossy().ends_with(".zip") {
let package_name = entry
.file_name()
.to_string_lossy()
.trim_end_matches(".zip")
.to_string();
if let Ok(bytes) = fs::read(entry.path()).await {
if let Ok(zip) = zip::ZipArchive::new(std::io::Cursor::new(bytes)) {
// add to list of packages
println!("fs: found package: {}\r", package_name);
packages.push((package_name, zip));
}
}
}
@ -256,39 +354,6 @@ async fn get_zipped_packages() -> Vec<zip::ZipArchive<std::io::Cursor<Vec<u8>>>>
return packages;
}
async fn get_processes_from_directories() -> Vec<(String, Vec<u8>)> {
let mut processes = Vec::new();
// Get the path to the /modules directory
let modules_path = std::path::Path::new("modules");
// Read the /modules directory
if let Ok(mut entries) = fs::read_dir(modules_path).await {
// Loop through the entries in the directory
while let Ok(Some(entry)) = entries.next_entry().await {
// If the entry is a directory, add its name to the list of processes
if let Ok(metadata) = entry.metadata().await {
if metadata.is_dir() {
if let Some(name) = entry.file_name().to_str() {
// Get the path to the wasm file for the process
let wasm_path = format!(
"modules/{}/target/wasm32-unknown-unknown/release/{}.wasm",
name, name
);
// Read the wasm file
if let Ok(wasm_bytes) = fs::read(wasm_path).await {
// Add the process name and wasm bytes to the list of processes
processes.push((name.to_string(), wasm_bytes));
}
}
}
}
}
}
processes
}
pub async fn fs_sender(
our_name: String,
manifest: Manifest,
@ -355,7 +420,7 @@ pub async fn fs_sender(
.await
{
send_to_loop
.send(make_error_message(our_name.clone(), km.id, km.source.clone(), e))
.send(make_error_message(our_name.clone(), &km, e))
.await
.unwrap();
}
@ -569,14 +634,14 @@ async fn handle_request(
(FsResponse::Length(length), None)
}
// process state handlers
FsAction::SetState => {
FsAction::SetState(process_id) => {
let Some(ref payload) = payload else {
return Err(FsError::BadBytes {
action: "SetState".into(),
});
};
let file = FileIdentifier::Process(source.process.clone());
let file = FileIdentifier::Process(process_id);
match manifest.write(&file, &payload.bytes).await {
Ok(_) => (),
Err(e) => {
@ -589,14 +654,14 @@ async fn handle_request(
(FsResponse::SetState, None)
}
FsAction::DeleteState => {
let file = FileIdentifier::Process(source.process.clone());
FsAction::DeleteState(process_id) => {
let file = FileIdentifier::Process(process_id);
manifest.delete(&file).await?;
(FsResponse::Delete(0), None)
}
FsAction::GetState => {
let file = FileIdentifier::Process(source.process.clone());
FsAction::GetState(process_id) => {
let file = FileIdentifier::Process(process_id);
match manifest.read(&file, None, None).await {
Err(e) => return Err(e),
@ -612,8 +677,11 @@ async fn handle_request(
node: our_name.clone(),
process: ProcessId::Name("filesystem".into()),
},
target: source.clone(),
rsvp,
target: match rsvp {
None => source,
Some(rsvp) => rsvp,
},
rsvp: None,
message: Message::Response((
Response {
ipc: Some(
@ -623,10 +691,13 @@ async fn handle_request(
},
None,
)),
payload: Some(Payload {
mime: None,
bytes: bytes.unwrap_or_default(),
}),
payload: match bytes {
Some(bytes) => Some(Payload {
mime: None,
bytes,
}),
None => None,
},
signed_capabilities: None,
};
@ -649,29 +720,17 @@ pub fn hash_bytes(bytes: &[u8]) -> [u8; 32] {
hasher.finalize().into()
}
// returns bool: if dir is new
async fn create_dir_if_dne(path: &str) -> Result<bool, FsError> {
if let Err(_) = fs::read_dir(&path).await {
match fs::create_dir_all(&path).await {
Ok(_) => Ok(true),
Err(e) => Err(FsError::CreateInitialDirError {
path: path.into(),
error: format!("{}", e),
}),
}
} else {
Ok(false)
}
}
fn make_error_message(our_name: String, id: u64, target: Address, error: FsError) -> KernelMessage {
fn make_error_message(our_name: String, km: &KernelMessage, error: FsError) -> KernelMessage {
KernelMessage {
id,
id: km.id,
source: Address {
node: our_name.clone(),
process: ProcessId::Name("fileystem".into()),
},
target,
target: match &km.rsvp {
None => km.source.clone(),
Some(rsvp) => rsvp.clone(),
},
rsvp: None,
message: Message::Response((
Response {

View File

@ -192,16 +192,129 @@ impl UqProcessImports for ProcessWasi {
// Ok(())
}
/// create a message from the *kernel* to the filesystem,
/// asking it to fetch the current state saved under this process
async fn get_state(&mut self) -> Result<Option<Vec<u8>>> {
unimplemented!()
let old_last_payload = self.process.last_payload.clone();
let res = match send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: t::ProcessId::Name("kernel".into()),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: wit::ProcessId::Name("filesystem".into()),
},
wit::Request {
inherit: false,
expects_response: Some(5),
ipc: Some(
serde_json::to_string(&t::FsAction::GetState(
self.process.metadata.our.process.clone(),
))
.unwrap(),
),
metadata: None,
},
None,
)
.await
{
Ok(Ok(_resp)) => {
// basically assuming filesystem responding properly here
match &self.process.last_payload {
None => Ok(None),
Some(payload) => Ok(Some(payload.bytes.clone())),
}
}
_ => Ok(None),
};
self.process.last_payload = old_last_payload;
return res;
}
async fn get_state(&mut self, bytes: Vec<u8>) -> Result<()> {
unimplemented!()
/// create a message from the *kernel* to the filesystem,
/// asking it to replace the current state saved under
/// this process with these bytes
async fn set_state(&mut self, bytes: Vec<u8>) -> Result<()> {
let old_last_payload = self.process.last_payload.clone();
let res = match send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: t::ProcessId::Name("kernel".into()),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: wit::ProcessId::Name("filesystem".into()),
},
wit::Request {
inherit: false,
expects_response: Some(5),
ipc: Some(
serde_json::to_string(&t::FsAction::SetState(
self.process.metadata.our.process.clone(),
))
.unwrap(),
),
metadata: None,
},
Some(Payload { mime: None, bytes }),
)
.await
{
Ok(Ok(_resp)) => {
// basically assuming filesystem responding properly here
Ok(())
}
_ => Err(anyhow::anyhow!(
"filesystem did not respond properly to SetState!!"
)),
};
self.process.last_payload = old_last_payload;
return res;
}
/// create a message from the *kernel* to the filesystem,
/// asking it to delete the current state saved under this process
async fn clear_state(&mut self) -> Result<()> {
unimplemented!()
let old_last_payload = self.process.last_payload.clone();
let res = match send_and_await_response(
self,
Some(t::Address {
node: self.process.metadata.our.node.clone(),
process: t::ProcessId::Name("kernel".into()),
}),
wit::Address {
node: self.process.metadata.our.node.clone(),
process: wit::ProcessId::Name("filesystem".into()),
},
wit::Request {
inherit: false,
expects_response: Some(5),
ipc: Some(
serde_json::to_string(&t::FsAction::DeleteState(
self.process.metadata.our.process.clone(),
))
.unwrap(),
),
metadata: None,
},
None,
)
.await
{
Ok(Ok(_resp)) => {
// basically assuming filesystem responding properly here
Ok(())
}
_ => Err(anyhow::anyhow!(
"filesystem did not respond properly to ClearState!!"
)),
};
self.process.last_payload = old_last_payload;
return res;
}
async fn spawn(
@ -211,6 +324,7 @@ impl UqProcessImports for ProcessWasi {
full_path: String,
on_panic: wit::OnPanic,
capabilities: wit::Capabilities,
public: bool,
) -> Result<Option<wit::ProcessId>> {
let vfs_address = wit::Address {
node: self.process.metadata.our.node.clone(),
@ -218,6 +332,7 @@ impl UqProcessImports for ProcessWasi {
};
let (_, hash_response) = send_and_await_response(
self,
None,
vfs_address.clone(),
wit::Request {
inherit: false,
@ -246,6 +361,7 @@ impl UqProcessImports for ProcessWasi {
let _ = send_and_await_response(
self,
None,
vfs_address,
wit::Request {
inherit: false,
@ -324,6 +440,7 @@ impl UqProcessImports for ProcessWasi {
})
.collect(),
},
public,
})
.unwrap(),
),
@ -519,7 +636,7 @@ impl UqProcessImports for ProcessWasi {
) -> Result<()> {
let id = self
.process
.handle_request(target, request, context, payload)
.handle_request(None, target, request, context, payload)
.await;
match id {
Ok(_id) => Ok(()),
@ -539,7 +656,7 @@ impl UqProcessImports for ProcessWasi {
for request in requests {
let id = self
.process
.handle_request(request.0, request.1, request.2, request.3)
.handle_request(None, request.0, request.1, request.2, request.3)
.await;
match id {
Ok(_id) => continue,
@ -564,12 +681,13 @@ impl UqProcessImports for ProcessWasi {
request: wit::Request,
payload: Option<wit::Payload>,
) -> Result<Result<(wit::Address, wit::Message), wit::SendError>> {
send_and_await_response(self, target, request, payload).await
send_and_await_response(self, None, target, request, payload).await
}
}
async fn send_and_await_response(
process: &mut ProcessWasi,
source: Option<t::Address>,
target: wit::Address,
request: wit::Request,
payload: Option<wit::Payload>,
@ -582,7 +700,7 @@ async fn send_and_await_response(
}
let id = process
.process
.handle_request(target, request, None, payload)
.handle_request(source, target, request, None, payload)
.await;
match id {
Ok(id) => match process.process.get_specific_message_for_process(id).await {
@ -730,12 +848,16 @@ impl Process {
/// will only fail if process does not have capability to send to target.
async fn handle_request(
&mut self,
fake_source: Option<t::Address>, // only used when kernel steps in to get/set state
target: wit::Address,
request: wit::Request,
new_context: Option<wit::Context>,
payload: Option<wit::Payload>,
) -> Result<u64> {
let source = self.metadata.our.clone();
let source = match &fake_source {
Some(_) => fake_source.unwrap(),
None => self.metadata.our.clone(),
};
// if request chooses to inherit context, match id to prompting_message
// otherwise, id is generated randomly
let request_id: u64 = if request.inherit
@ -768,7 +890,8 @@ impl Process {
&self.prompting_message,
) {
// this request expects response, so receives any response
(_, Some(_), _) => Some(source),
// make sure to use the real source, not a fake injected-by-kernel source
(_, Some(_), _) => Some(self.metadata.our.clone()),
// this request inherits, so response will be routed to prompting message
(true, None, Some(ref prompt)) => prompt.rsvp.clone(),
// this request doesn't inherit, and doesn't itself want a response
@ -861,7 +984,12 @@ async fn persist_state(
message: t::Message::Request(t::Request {
inherit: true,
expects_response: Some(5), // TODO evaluate
ipc: Some(serde_json::to_string(&t::FsAction::SetState).unwrap()),
ipc: Some(
serde_json::to_string(&t::FsAction::SetState(t::ProcessId::Name(
"kernel".into(),
)))
.unwrap(),
),
metadata: None,
}),
payload: Some(t::Payload { mime: None, bytes }),
@ -883,10 +1011,6 @@ async fn make_process_loop(
caps_oracle: t::CapMessageSender,
engine: &Engine,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
let our = metadata.our.clone();
let wasm_bytes_handle = metadata.wasm_bytes_handle.clone();
let on_panic = metadata.on_panic.clone();
// let dir = std::env::current_dir().unwrap();
let dir = cap_std::fs::Dir::open_ambient_dir(home_directory_path, cap_std::ambient_authority())
.unwrap();
@ -927,7 +1051,7 @@ async fn make_process_loop(
ProcessWasi {
process: Process {
keypair: keypair.clone(),
metadata,
metadata: metadata.clone(),
recv_in_process,
send_to_loop: send_to_loop.clone(),
send_to_terminal: send_to_terminal.clone(),
@ -953,7 +1077,7 @@ async fn make_process_loop(
verbosity: 0,
content: format!(
"mk: process {:?} failed to instantiate: {:?}",
our.process, e,
metadata.our.process, e,
),
})
.await;
@ -963,7 +1087,7 @@ async fn make_process_loop(
// the process will run until it returns from init()
let is_error = match bindings
.call_init(&mut store, &en_wit_address(our.clone()))
.call_init(&mut store, &en_wit_address(metadata.our.clone()))
.await
{
Ok(()) => false,
@ -971,7 +1095,10 @@ async fn make_process_loop(
let _ = send_to_terminal
.send(t::Printout {
verbosity: 0,
content: format!("mk: process {:?} ended with error:", our.process,),
content: format!(
"mk: process {:?} ended with error:",
metadata.our.process,
),
})
.await;
for line in format!("{:?}", e).lines() {
@ -988,19 +1115,19 @@ async fn make_process_loop(
// the process has completed, perform cleanup
let our_kernel = t::Address {
node: our.node.clone(),
node: metadata.our.node.clone(),
process: t::ProcessId::Name("kernel".into()),
};
if is_error {
// fulfill the designated OnPanic behavior
match on_panic {
match metadata.on_panic {
t::OnPanic::None => {}
// if restart, tell ourselves to init the app again, with same capabilities
t::OnPanic::Restart => {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = caps_oracle.send(t::CapMessage::GetAll {
on: our.process.clone(),
on: metadata.our.process.clone(),
responder: tx,
});
let initial_capabilities = rx
@ -1028,13 +1155,14 @@ async fn make_process_loop(
expects_response: None,
ipc: Some(
serde_json::to_string(&t::KernelCommand::StartProcess {
name: match &our.process {
name: match &metadata.our.process {
t::ProcessId::Name(name) => Some(name.into()),
t::ProcessId::Id(_) => None,
},
wasm_bytes_handle,
on_panic,
wasm_bytes_handle: metadata.wasm_bytes_handle,
on_panic: metadata.on_panic,
initial_capabilities,
public: metadata.public,
})
.unwrap(),
),
@ -1053,7 +1181,7 @@ async fn make_process_loop(
send_to_loop
.send(t::KernelMessage {
id: rand::random(),
source: our.clone(),
source: metadata.our.clone(),
target: address,
rsvp: None,
message: t::Message::Request(request),
@ -1078,8 +1206,10 @@ async fn make_process_loop(
inherit: false,
expects_response: None,
ipc: Some(
serde_json::to_string(&t::KernelCommand::KillProcess(our.process.clone()))
.unwrap(),
serde_json::to_string(&t::KernelCommand::KillProcess(
metadata.our.process.clone(),
))
.unwrap(),
),
metadata: None,
}),
@ -1136,6 +1266,7 @@ async fn handle_kernel_request(
wasm_bytes_handle,
on_panic,
initial_capabilities,
public,
} => {
let Some(ref payload) = km.payload else {
send_to_terminal
@ -1186,6 +1317,7 @@ async fn handle_kernel_request(
wasm_bytes_handle,
on_panic,
capabilities: valid_capabilities,
public,
},
reboot: false,
},
@ -1464,6 +1596,7 @@ async fn start_process(
},
wasm_bytes_handle: process_metadata.persisted.wasm_bytes_handle.clone(),
on_panic: process_metadata.persisted.on_panic.clone(),
public: process_metadata.persisted.public,
};
process_handles.insert(
process_id.clone(),
@ -1483,14 +1616,7 @@ async fn start_process(
),
);
process_map.insert(
process_id,
t::PersistedProcess {
wasm_bytes_handle: process_metadata.persisted.wasm_bytes_handle,
on_panic: process_metadata.persisted.on_panic,
capabilities: process_metadata.persisted.capabilities,
},
);
process_map.insert(process_id, process_metadata.persisted);
if !process_metadata.reboot {
// if new, persist

View File

@ -255,6 +255,13 @@ async fn main() {
let Ok(onchain_id) = contract.ws(node_id).call().await else {
panic!("error: RPC endpoint failed to fetch our node_id");
};
print_sender
.send(Printout {
verbosity: 0,
content: "established connection to Sepolia RPC".to_string(),
})
.await
.unwrap();
// double check that routers match on-chain information
let namehashed_routers: Vec<[u8; 32]> = routers
.clone()
@ -365,13 +372,6 @@ async fn main() {
file_key.to_vec(),
)
};
// load in fs.
let _ = print_sender
.send(Printout {
verbosity: 0,
content: "bootstrapping fs...".to_string(),
})
.await;
let (kernel_process_map, manifest) = filesystem::load_fs(
our.name.clone(),
@ -384,12 +384,6 @@ async fn main() {
.expect("fs load failed!");
let _ = kill_tx.send(true);
let _ = print_sender
.send(Printout {
verbosity: 0,
content: format!("{} now online", our.name),
})
.await;
let _ = print_sender
.send(Printout {
verbosity: 0,

View File

@ -50,61 +50,24 @@ pub fn send_and_await_response(
}
pub fn get_state(our: String) -> Option<Payload> {
let _ = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::GetState).unwrap()),
None,
None,
5, // TODO evaluate timeout
);
get_payload()
match super::bindings::get_state() {
Some(bytes) => Some(Payload {
mime: None,
bytes,
}),
None => None,
}
}
pub fn set_state(our: String, bytes: Vec<u8>) {
send_request(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5), // TODO evaluate timeout
ipc: Some(serde_json::to_string(&FsAction::SetState).unwrap()),
metadata: None,
},
None,
Some(&Payload { mime: None, bytes }),
);
super::bindings::set_state(&bytes);
}
pub fn await_set_state<T>(our: String, state: &T)
where
T: serde::Serialize,
{
// Request/Response stays local -> no SendError
let (_, response) = send_and_await_response(
&Address {
node: our,
process: ProcessId::Name("filesystem".to_string()),
},
false,
Some(serde_json::to_string(&FsAction::SetState).unwrap()),
None,
Some(&Payload {
mime: None,
bytes: bincode::serialize(state).unwrap(),
}),
5, // TODO evaluate timeout
)
.unwrap();
match response {
Message::Request(_) => panic!("got request from filesystem"),
Message::Response((response, _context)) => return,
}
super::bindings::set_state(&bincode::serialize(state).unwrap());
}
pub fn parse_message_ipc<T>(json_string: Option<String>) -> anyhow::Result<T>

View File

@ -182,6 +182,7 @@ pub struct ProcessMetadata {
pub our: Address,
pub wasm_bytes_handle: u128,
pub on_panic: OnPanic,
pub public: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -253,6 +254,7 @@ pub enum KernelCommand {
wasm_bytes_handle: u128,
on_panic: OnPanic,
initial_capabilities: HashSet<SignedCapability>,
public: bool,
},
KillProcess(ProcessId), // this is extrajudicial killing: we might lose messages!
// kernel only
@ -305,6 +307,7 @@ pub struct PersistedProcess {
// pub full_path: String,
pub on_panic: OnPanic,
pub capabilities: HashSet<Capability>,
pub public: bool, // marks if a process allows messages from any process
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -323,6 +326,16 @@ pub struct ProcessContext {
// filesystem.rs types
//
#[derive(Debug, Serialize, Deserialize)]
pub struct PackageManifestEntry {
pub process_name: String,
pub process_wasm_path: String,
pub on_panic: OnPanic,
pub request_networking: bool,
pub request_messaging: Vec<String>,
pub grant_messaging: Vec<String>, // special logic for the string "all"
}
#[derive(Serialize, Deserialize, Debug)]
pub enum FsAction {
Write,
@ -334,9 +347,9 @@ pub enum FsAction {
Delete(u128),
Length(u128),
SetLength((u128, u64)),
GetState,
SetState,
DeleteState,
GetState(ProcessId),
SetState(ProcessId),
DeleteState(ProcessId),
}
#[derive(Serialize, Deserialize, Debug)]

View File

@ -148,7 +148,10 @@ async fn persist_state(our_node: String, send_to_loop: &MessageSender, state: &I
message: Message::Request(Request {
inherit: true,
expects_response: Some(5), // TODO evaluate
ipc: Some(serde_json::to_string(&FsAction::SetState).unwrap()),
ipc: Some(
serde_json::to_string(&FsAction::SetState(ProcessId::Name("vfs".into())))
.unwrap(),
),
metadata: None,
}),
payload: Some(Payload {
@ -181,7 +184,10 @@ async fn load_state_from_reboot(
message: Message::Request(Request {
inherit: true,
expects_response: Some(5), // TODO evaluate
ipc: Some(serde_json::to_string(&FsAction::GetState).unwrap()),
ipc: Some(
serde_json::to_string(&FsAction::GetState(ProcessId::Name("vfs".into())))
.unwrap(),
),
metadata: None,
}),
payload: None,

View File

@ -131,7 +131,7 @@ world uq-process {
import clear-state: func()
import spawn: func(id: process-id, %package: string, full-path: string, on-panic: on-panic, capabilities: capabilities) ->
import spawn: func(id: process-id, %package: string, full-path: string, on-panic: on-panic, capabilities: capabilities, public: bool) ->
option<process-id>
// capabilities management