diff --git a/modules/rpc/src/kernel_types.rs b/modules/rpc/src/kernel_types.rs new file mode 120000 index 00000000..8311791c --- /dev/null +++ b/modules/rpc/src/kernel_types.rs @@ -0,0 +1 @@ +../../../src/kernel_types.rs \ No newline at end of file diff --git a/modules/rpc/src/lib.rs b/modules/rpc/src/lib.rs index e58a511a..7ccd2dbf 100644 --- a/modules/rpc/src/lib.rs +++ b/modules/rpc/src/lib.rs @@ -5,11 +5,13 @@ use bindings::{ get_capabilities, get_capability, get_payload, print_to_terminal, receive, send_and_await_response, send_request, send_requests, send_response, Guest, }; +use kernel_types::Capability; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; extern crate base64; +mod kernel_types; mod process_lib; struct Component; @@ -19,7 +21,7 @@ struct RpcMessage { pub node: String, pub process: String, pub inherit: Option, - pub expects_response: Option, // always false? + pub expects_response: Option, pub ipc: Option, pub metadata: Option, pub context: Option, @@ -27,6 +29,14 @@ struct RpcMessage { pub data: Option, } +#[derive(Debug, Serialize, Deserialize)] +struct StartProcess { + pub node: Option, + pub process: String, + pub capabilities: Option>, // list of (process, params) for the caps + pub wasm: String, +} + #[derive(Debug, Serialize, Deserialize)] struct CapabilitiesTransfer { pub destination_node: String, @@ -36,6 +46,16 @@ struct CapabilitiesTransfer { pub params: String, } +#[derive(Debug, Deserialize)] +struct WriteFileId { + Write: u128, +} + +#[derive(Debug, Deserialize)] +struct WriteFileResult { + Ok: WriteFileId, +} + // curl http://localhost:8080/rpc/message -H 'content-type: application/json' -d '{"node": "hosted", "process": "vfs", "inherit": false, "expects_response": null, "ipc": "{\"New\": {\"identifier\": \"foo\"}}", "metadata": null, "context": null, "mime": null, "data": null}' fn send_http_response(status: u16, headers: HashMap, payload_bytes: Vec) { @@ -59,10 +79,6 @@ fn send_http_response(status: u16, headers: HashMap, payload_byt const RPC_PAGE: &str = include_str!("rpc.html"); -fn binary_encoded_string_to_bytes(s: &str) -> Vec { - s.chars().map(|c| c as u8).collect() -} - impl Guest for Component { fn init(our: Address) { print_to_terminal(0, "RPC: start"); @@ -72,25 +88,8 @@ impl Guest for Component { process: ProcessId::Name("http_bindings".to_string()), }; - // , option> let http_endpoint_binding_requests: [(Address, Request, Option, Option); - 3] = [ - // ( - // bindings_address.clone(), - // Request { - // inherit: false, - // expects_response: None, - // ipc: Some(json!({ - // "action": "bind-app", - // "path": "/rpc", - // "app": "rpc", - // "local_only": true, - // }).to_string()), - // metadata: None, - // }, - // None, - // None - // ), + 4] = [ ( bindings_address.clone(), Request { @@ -110,6 +109,25 @@ impl Guest for Component { None, None, ), + ( + bindings_address.clone(), + Request { + inherit: false, + expects_response: None, + ipc: Some( + json!({ + "action": "bind-app", + "path": "/rpc/start-process", + "app": "rpc", + "local_only": true, + }) + .to_string(), + ), + metadata: None, + }, + None, + None, + ), ( bindings_address.clone(), Request { @@ -289,11 +307,11 @@ impl Guest for Component { Err(_) => None, }; - let caps = get_capabilities(); - print_to_terminal( - 0, - format!("rpc: got capabilities {:?}", caps).as_str(), - ); + // let caps = get_capabilities(); + // print_to_terminal( + // 0, + // format!("rpc: got capabilities {:?}", caps).as_str(), + // ); let result = send_and_await_response( &Address { @@ -311,8 +329,7 @@ impl Guest for Component { match result { Ok((_source, message)) => { - let Message::Response((response, _context)) = message - else { + let Message::Response((response, _context)) = message else { print_to_terminal( 1, "rpc: got unexpected response to message", @@ -328,13 +345,13 @@ impl Guest for Component { continue; }; - let (mime, data) = match get_payload() { - Some(p) => { - let mime = match p.mime { - Some(mime) => mime, - None => "application/octet-stream".to_string(), - }; - let bytes = p.bytes; + let (mime, data) = match get_payload() { + Some(p) => { + let mime = match p.mime { + Some(mime) => mime, + None => "application/octet-stream".to_string(), + }; + let bytes = p.bytes; (mime, base64::encode(bytes)) } @@ -355,7 +372,239 @@ impl Guest for Component { send_http_response(200, default_headers.clone(), body); continue; } - Err(error) => { + Err(_) => { + print_to_terminal(1, "rpc: error coming back"); + send_http_response( + 500, + default_headers.clone(), + "Network Error".to_string().as_bytes().to_vec(), + ); + continue; + } + } + } + "/rpc/start-process" => { + let Some(payload) = get_payload() else { + print_to_terminal(1, "rpc: no bytes in payload, skipping..."); + send_http_response( + 400, + default_headers.clone(), + "No payload".to_string().as_bytes().to_vec(), + ); + continue; + }; + + let body_json: StartProcess = + match serde_json::from_slice::(&payload.bytes) { + Ok(v) => v, + Err(_) => { + print_to_terminal( + 1, + &format!( + "rpc: JSON is not valid StartProcess: {:?}", + serde_json::from_slice::( + &payload.bytes + ) + ), + ); + send_http_response( + 400, + default_headers.clone(), + "JSON is not valid StartProcess" + .to_string() + .as_bytes() + .to_vec(), + ); + continue; + } + }; + + let payload = match base64::decode(&body_json.wasm) { + Ok(bytes) => Some(Payload { + mime: Some("bytes".to_string()), + bytes, + }), + Err(_) => None, + }; + + let node = match body_json.node { + Some(node) => node, + None => our.node.clone(), + }; + + // let caps = get_capabilities(); + // print_to_terminal( + // 0, + // format!("rpc: got capabilities {:?}", caps).as_str(), + // ); + + let write_wasm_result = send_and_await_response( + &Address { + node: node.clone(), + process: ProcessId::Name("filesystem".to_string()), + }, + &Request { + inherit: false, + expects_response: Some(5), + ipc: Some( + json!({ + "Write": None::, + }) + .to_string(), + ), + metadata: None, + }, + payload.as_ref(), + ); + + match write_wasm_result { + Ok((_source, message)) => { + let Message::Response((response, _context)) = message else { + print_to_terminal( + 1, + "rpc: got unexpected response to message", + ); + send_http_response( + 500, + default_headers, + "Invalid Internal Response" + .to_string() + .as_bytes() + .to_vec(), + ); + continue; + }; + + let wasm_bytes_handle = match response.ipc { + Some(ipc) => { + match serde_json::from_str::(&ipc) { + Ok(result) => result.Ok.Write, + Err(_) => { + send_http_response( + 500, + default_headers.clone(), + "Write Error" + .to_string() + .as_bytes() + .to_vec(), + ); + continue; + } + } + } + None => { + send_http_response( + 500, + default_headers.clone(), + "Write Error".to_string().as_bytes().to_vec(), + ); + continue; + } + }; + + let mut capabilities: HashSet = HashSet::new(); + + match body_json.capabilities { + Some(caps) => { + for cap in caps { + capabilities.insert(Capability { + issuer: kernel_types::Address { + node: node.clone(), + process: kernel_types::ProcessId::Name( + cap.0, + ), + }, + params: cap.1, + }); + } + } + None => (), + }; + + let stop_process_command = + kernel_types::KernelCommand::KillProcess( + kernel_types::ProcessId::Name( + body_json.process.clone(), + ), + ); + + send_request( + &Address { + node: node.clone(), + process: ProcessId::Name("kernel".to_string()), + }, + &Request { + inherit: false, + expects_response: Some(5), + ipc: Some( + serde_json::to_string(&stop_process_command) + .unwrap(), + ), + metadata: None, + }, + None, + None, + ); + + let start_process_command = + kernel_types::KernelCommand::StartProcess { + name: Some(body_json.process), + wasm_bytes_handle, + on_panic: kernel_types::OnPanic::Restart, + initial_capabilities: capabilities, + }; + + let ipc = match serde_json::to_string(&start_process_command) { + Ok(ipc) => ipc, + Err(_) => { + print_to_terminal( + 1, + "rpc: failed to serialize StartProcess command", + ); + send_http_response( + 500, + default_headers.clone(), + "Internal Error".to_string().as_bytes().to_vec(), + ); + continue; + } + }; + + let start_wasm_result = send_and_await_response( + &Address { + node, + process: ProcessId::Name("kernel".to_string()), + }, + &Request { + inherit: false, + expects_response: Some(5), + ipc: Some(ipc), + metadata: None, + }, + None, + ); + + match start_wasm_result { + Ok((_source, _message)) => { + send_http_response( + 200, + default_headers.clone(), + "Success".to_string().as_bytes().to_vec(), + ); + continue; + } + Err(_) => { + print_to_terminal(1, "rpc: error coming back"); + send_http_response( + 500, + default_headers.clone(), + "Network Error".to_string().as_bytes().to_vec(), + ); + continue; + } + } + } + Err(_) => { print_to_terminal(1, "rpc: error coming back"); send_http_response( 500, @@ -404,18 +653,23 @@ impl Guest for Component { } }; - print_to_terminal( - 0, - format!("rpc: node {:?}", body_json.node).as_str(), - ); - print_to_terminal( - 0, - format!("rpc: process {:?}", body_json.process).as_str(), - ); - print_to_terminal( - 0, - format!("rpc: params {:?}", body_json.params).as_str(), - ); + // print_to_terminal( + // 0, + // format!("rpc: node {:?}", body_json.node).as_str(), + // ); + // print_to_terminal( + // 0, + // format!("rpc: process {:?}", body_json.process).as_str(), + // ); + // print_to_terminal( + // 0, + // format!("rpc: params {:?}", body_json.params).as_str(), + // ); + // // let caps = get_capabilities(); + // print_to_terminal( + // 0, + // format!("rpc: got capabilities {:?}", caps).as_str(), + // ); let capability = get_capability( &Address { diff --git a/modules/rpc/src/upload-and-start.sh b/modules/rpc/src/upload-and-start.sh new file mode 100755 index 00000000..f4d90e03 --- /dev/null +++ b/modules/rpc/src/upload-and-start.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Use this script to upload a .wasm file and start the process through rpc. + +# Check if there are enough parameters provided. +if [ "$#" -ne 4 ]; then + echo "Usage: $0 " + exit 1 +fi + +URL="$1" +NODE="$2" +PROCESS="$3" +WASM_FILE="$4" + +# Put the payload in a temporary file (.wasm is too large otherwise). +JSON_PAYLOAD=$(echo -n '{"node": "'"$NODE"'", "process": "'"$PROCESS"'", "capabilities": [["http_bindings", "{\"messaging\": \"{\"Name\":\"http_bindings\"}\"}"]], "wasm": "'"$(base64 < "$WASM_FILE")"'"}') +echo -n "$JSON_PAYLOAD" > /tmp/temp_payload.json + +# Upload the wasm file and start the process through rpc. +OUTPUT=$(curl -s "$URL/rpc/start-process" -H 'content-type: application/json' --data-binary @/tmp/temp_payload.json) + +echo $OUTPUT + +rm /tmp/temp_payload.json diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index 5eed7528..ac54f314 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -248,6 +248,39 @@ impl UqProcessImports for ProcessWasi { signed_capabilities: None, }) .await?; + + // child processes are always able to Message parent + let _ = self + .process + .caps_oracle + .send(t::CapMessage::Add { + on: de_wit_process_id(id.clone()), + cap: t::Capability { + issuer: self.process.metadata.our.clone(), + params: serde_json::to_string(&serde_json::json!({ + "messaging": self.process.metadata.our.process.clone(), + })) + .unwrap(), + }, + }) + .unwrap(); + + // parent process is always able to Message child + let _ = self + .process + .caps_oracle + .send(t::CapMessage::Add { + on: self.process.metadata.our.process.clone(), + cap: t::Capability { + issuer: self.process.metadata.our.clone(), + params: serde_json::to_string(&serde_json::json!({ + "messaging": de_wit_process_id(id.clone()), + })) + .unwrap(), + }, + }) + .unwrap(); + unimplemented!() //Ok(Some(id)) } diff --git a/src/kernel_types.rs b/src/kernel_types.rs index 2ab169e1..a5938921 100644 --- a/src/kernel_types.rs +++ b/src/kernel_types.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use serde::{Deserialize, Serialize}; use super::bindings::component::uq_process::types as wit; @@ -42,7 +44,7 @@ impl PartialEq for ProcessId { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] pub struct Address { pub node: String, pub process: ProcessId, @@ -74,7 +76,7 @@ pub enum Message { Response((Response, Option)), } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] pub struct Capability { pub issuer: Address, pub params: String, // JSON-string @@ -108,6 +110,35 @@ pub enum OnPanic { Requests(Vec<(Address, Request)>), } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PersistedProcess { + pub wasm_bytes_handle: u128, + pub on_panic: OnPanic, + pub capabilities: HashSet, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum KernelCommand { + StartProcess { + name: Option, + wasm_bytes_handle: u128, + on_panic: OnPanic, + initial_capabilities: HashSet, + }, + KillProcess(ProcessId), // this is extrajudicial killing: we might lose messages! + RebootProcess { + // kernel only + process_id: ProcessId, + persisted: PersistedProcess, + }, + Shutdown, + // capabilities creation + GrantCapability { + to_process: ProcessId, + params: String, // JSON-string + }, +} + // // conversions between wit types and kernel types (annoying) //