RPC: start a process (#6)

This commit is contained in:
Will Galebach 2023-10-04 08:29:54 +01:00 committed by GitHub
parent ad5cc1a7ff
commit fd1437ecc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 397 additions and 53 deletions

View File

@ -0,0 +1 @@
../../../src/kernel_types.rs

View File

@ -5,11 +5,13 @@ use bindings::{
get_capabilities, get_capability, get_payload, print_to_terminal, receive,
send_and_await_response, send_request, send_requests, send_response, Guest,
};
use kernel_types::Capability;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
extern crate base64;
mod kernel_types;
mod process_lib;
struct Component;
@ -19,7 +21,7 @@ struct RpcMessage {
pub node: String,
pub process: String,
pub inherit: Option<bool>,
pub expects_response: Option<u64>, // always false?
pub expects_response: Option<u64>,
pub ipc: Option<String>,
pub metadata: Option<String>,
pub context: Option<String>,
@ -27,6 +29,14 @@ struct RpcMessage {
pub data: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct StartProcess {
pub node: Option<String>,
pub process: String,
pub capabilities: Option<Vec<(String, String)>>, // list of (process, params) for the caps
pub wasm: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct CapabilitiesTransfer {
pub destination_node: String,
@ -36,6 +46,16 @@ struct CapabilitiesTransfer {
pub params: String,
}
#[derive(Debug, Deserialize)]
struct WriteFileId {
Write: u128,
}
#[derive(Debug, Deserialize)]
struct WriteFileResult {
Ok: WriteFileId,
}
// curl http://localhost:8080/rpc/message -H 'content-type: application/json' -d '{"node": "hosted", "process": "vfs", "inherit": false, "expects_response": null, "ipc": "{\"New\": {\"identifier\": \"foo\"}}", "metadata": null, "context": null, "mime": null, "data": null}'
fn send_http_response(status: u16, headers: HashMap<String, String>, payload_bytes: Vec<u8>) {
@ -59,10 +79,6 @@ fn send_http_response(status: u16, headers: HashMap<String, String>, payload_byt
const RPC_PAGE: &str = include_str!("rpc.html");
fn binary_encoded_string_to_bytes(s: &str) -> Vec<u8> {
s.chars().map(|c| c as u8).collect()
}
impl Guest for Component {
fn init(our: Address) {
print_to_terminal(0, "RPC: start");
@ -72,25 +88,8 @@ impl Guest for Component {
process: ProcessId::Name("http_bindings".to_string()),
};
// <address, request, option<context>, option<payload>>
let http_endpoint_binding_requests: [(Address, Request, Option<Context>, Option<Payload>);
3] = [
// (
// bindings_address.clone(),
// Request {
// inherit: false,
// expects_response: None,
// ipc: Some(json!({
// "action": "bind-app",
// "path": "/rpc",
// "app": "rpc",
// "local_only": true,
// }).to_string()),
// metadata: None,
// },
// None,
// None
// ),
4] = [
(
bindings_address.clone(),
Request {
@ -110,6 +109,25 @@ impl Guest for Component {
None,
None,
),
(
bindings_address.clone(),
Request {
inherit: false,
expects_response: None,
ipc: Some(
json!({
"action": "bind-app",
"path": "/rpc/start-process",
"app": "rpc",
"local_only": true,
})
.to_string(),
),
metadata: None,
},
None,
None,
),
(
bindings_address.clone(),
Request {
@ -289,11 +307,11 @@ impl Guest for Component {
Err(_) => None,
};
let caps = get_capabilities();
print_to_terminal(
0,
format!("rpc: got capabilities {:?}", caps).as_str(),
);
// let caps = get_capabilities();
// print_to_terminal(
// 0,
// format!("rpc: got capabilities {:?}", caps).as_str(),
// );
let result = send_and_await_response(
&Address {
@ -311,8 +329,7 @@ impl Guest for Component {
match result {
Ok((_source, message)) => {
let Message::Response((response, _context)) = message
else {
let Message::Response((response, _context)) = message else {
print_to_terminal(
1,
"rpc: got unexpected response to message",
@ -328,13 +345,13 @@ impl Guest for Component {
continue;
};
let (mime, data) = match get_payload() {
Some(p) => {
let mime = match p.mime {
Some(mime) => mime,
None => "application/octet-stream".to_string(),
};
let bytes = p.bytes;
let (mime, data) = match get_payload() {
Some(p) => {
let mime = match p.mime {
Some(mime) => mime,
None => "application/octet-stream".to_string(),
};
let bytes = p.bytes;
(mime, base64::encode(bytes))
}
@ -355,7 +372,239 @@ impl Guest for Component {
send_http_response(200, default_headers.clone(), body);
continue;
}
Err(error) => {
Err(_) => {
print_to_terminal(1, "rpc: error coming back");
send_http_response(
500,
default_headers.clone(),
"Network Error".to_string().as_bytes().to_vec(),
);
continue;
}
}
}
"/rpc/start-process" => {
let Some(payload) = get_payload() else {
print_to_terminal(1, "rpc: no bytes in payload, skipping...");
send_http_response(
400,
default_headers.clone(),
"No payload".to_string().as_bytes().to_vec(),
);
continue;
};
let body_json: StartProcess =
match serde_json::from_slice::<StartProcess>(&payload.bytes) {
Ok(v) => v,
Err(_) => {
print_to_terminal(
1,
&format!(
"rpc: JSON is not valid StartProcess: {:?}",
serde_json::from_slice::<serde_json::Value>(
&payload.bytes
)
),
);
send_http_response(
400,
default_headers.clone(),
"JSON is not valid StartProcess"
.to_string()
.as_bytes()
.to_vec(),
);
continue;
}
};
let payload = match base64::decode(&body_json.wasm) {
Ok(bytes) => Some(Payload {
mime: Some("bytes".to_string()),
bytes,
}),
Err(_) => None,
};
let node = match body_json.node {
Some(node) => node,
None => our.node.clone(),
};
// let caps = get_capabilities();
// print_to_terminal(
// 0,
// format!("rpc: got capabilities {:?}", caps).as_str(),
// );
let write_wasm_result = send_and_await_response(
&Address {
node: node.clone(),
process: ProcessId::Name("filesystem".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5),
ipc: Some(
json!({
"Write": None::<String>,
})
.to_string(),
),
metadata: None,
},
payload.as_ref(),
);
match write_wasm_result {
Ok((_source, message)) => {
let Message::Response((response, _context)) = message else {
print_to_terminal(
1,
"rpc: got unexpected response to message",
);
send_http_response(
500,
default_headers,
"Invalid Internal Response"
.to_string()
.as_bytes()
.to_vec(),
);
continue;
};
let wasm_bytes_handle = match response.ipc {
Some(ipc) => {
match serde_json::from_str::<WriteFileResult>(&ipc) {
Ok(result) => result.Ok.Write,
Err(_) => {
send_http_response(
500,
default_headers.clone(),
"Write Error"
.to_string()
.as_bytes()
.to_vec(),
);
continue;
}
}
}
None => {
send_http_response(
500,
default_headers.clone(),
"Write Error".to_string().as_bytes().to_vec(),
);
continue;
}
};
let mut capabilities: HashSet<Capability> = HashSet::new();
match body_json.capabilities {
Some(caps) => {
for cap in caps {
capabilities.insert(Capability {
issuer: kernel_types::Address {
node: node.clone(),
process: kernel_types::ProcessId::Name(
cap.0,
),
},
params: cap.1,
});
}
}
None => (),
};
let stop_process_command =
kernel_types::KernelCommand::KillProcess(
kernel_types::ProcessId::Name(
body_json.process.clone(),
),
);
send_request(
&Address {
node: node.clone(),
process: ProcessId::Name("kernel".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5),
ipc: Some(
serde_json::to_string(&stop_process_command)
.unwrap(),
),
metadata: None,
},
None,
None,
);
let start_process_command =
kernel_types::KernelCommand::StartProcess {
name: Some(body_json.process),
wasm_bytes_handle,
on_panic: kernel_types::OnPanic::Restart,
initial_capabilities: capabilities,
};
let ipc = match serde_json::to_string(&start_process_command) {
Ok(ipc) => ipc,
Err(_) => {
print_to_terminal(
1,
"rpc: failed to serialize StartProcess command",
);
send_http_response(
500,
default_headers.clone(),
"Internal Error".to_string().as_bytes().to_vec(),
);
continue;
}
};
let start_wasm_result = send_and_await_response(
&Address {
node,
process: ProcessId::Name("kernel".to_string()),
},
&Request {
inherit: false,
expects_response: Some(5),
ipc: Some(ipc),
metadata: None,
},
None,
);
match start_wasm_result {
Ok((_source, _message)) => {
send_http_response(
200,
default_headers.clone(),
"Success".to_string().as_bytes().to_vec(),
);
continue;
}
Err(_) => {
print_to_terminal(1, "rpc: error coming back");
send_http_response(
500,
default_headers.clone(),
"Network Error".to_string().as_bytes().to_vec(),
);
continue;
}
}
}
Err(_) => {
print_to_terminal(1, "rpc: error coming back");
send_http_response(
500,
@ -404,18 +653,23 @@ impl Guest for Component {
}
};
print_to_terminal(
0,
format!("rpc: node {:?}", body_json.node).as_str(),
);
print_to_terminal(
0,
format!("rpc: process {:?}", body_json.process).as_str(),
);
print_to_terminal(
0,
format!("rpc: params {:?}", body_json.params).as_str(),
);
// print_to_terminal(
// 0,
// format!("rpc: node {:?}", body_json.node).as_str(),
// );
// print_to_terminal(
// 0,
// format!("rpc: process {:?}", body_json.process).as_str(),
// );
// print_to_terminal(
// 0,
// format!("rpc: params {:?}", body_json.params).as_str(),
// );
// // let caps = get_capabilities();
// print_to_terminal(
// 0,
// format!("rpc: got capabilities {:?}", caps).as_str(),
// );
let capability = get_capability(
&Address {

View File

@ -0,0 +1,25 @@
#!/bin/bash
# Use this script to upload a .wasm file and start the process through rpc.
# Check if there are enough parameters provided.
if [ "$#" -ne 4 ]; then
echo "Usage: $0 <url> <node-id> <process> <wasm-file>"
exit 1
fi
URL="$1"
NODE="$2"
PROCESS="$3"
WASM_FILE="$4"
# Put the payload in a temporary file (.wasm is too large otherwise).
JSON_PAYLOAD=$(echo -n '{"node": "'"$NODE"'", "process": "'"$PROCESS"'", "capabilities": [["http_bindings", "{\"messaging\": \"{\"Name\":\"http_bindings\"}\"}"]], "wasm": "'"$(base64 < "$WASM_FILE")"'"}')
echo -n "$JSON_PAYLOAD" > /tmp/temp_payload.json
# Upload the wasm file and start the process through rpc.
OUTPUT=$(curl -s "$URL/rpc/start-process" -H 'content-type: application/json' --data-binary @/tmp/temp_payload.json)
echo $OUTPUT
rm /tmp/temp_payload.json

View File

@ -248,6 +248,39 @@ impl UqProcessImports for ProcessWasi {
signed_capabilities: None,
})
.await?;
// child processes are always able to Message parent
let _ = self
.process
.caps_oracle
.send(t::CapMessage::Add {
on: de_wit_process_id(id.clone()),
cap: t::Capability {
issuer: self.process.metadata.our.clone(),
params: serde_json::to_string(&serde_json::json!({
"messaging": self.process.metadata.our.process.clone(),
}))
.unwrap(),
},
})
.unwrap();
// parent process is always able to Message child
let _ = self
.process
.caps_oracle
.send(t::CapMessage::Add {
on: self.process.metadata.our.process.clone(),
cap: t::Capability {
issuer: self.process.metadata.our.clone(),
params: serde_json::to_string(&serde_json::json!({
"messaging": de_wit_process_id(id.clone()),
}))
.unwrap(),
},
})
.unwrap();
unimplemented!()
//Ok(Some(id))
}

View File

@ -1,3 +1,5 @@
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use super::bindings::component::uq_process::types as wit;
@ -42,7 +44,7 @@ impl PartialEq<u64> for ProcessId {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
pub struct Address {
pub node: String,
pub process: ProcessId,
@ -74,7 +76,7 @@ pub enum Message {
Response((Response, Option<Context>)),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)]
pub struct Capability {
pub issuer: Address,
pub params: String, // JSON-string
@ -108,6 +110,35 @@ pub enum OnPanic {
Requests(Vec<(Address, Request)>),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PersistedProcess {
pub wasm_bytes_handle: u128,
pub on_panic: OnPanic,
pub capabilities: HashSet<Capability>,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum KernelCommand {
StartProcess {
name: Option<String>,
wasm_bytes_handle: u128,
on_panic: OnPanic,
initial_capabilities: HashSet<Capability>,
},
KillProcess(ProcessId), // this is extrajudicial killing: we might lose messages!
RebootProcess {
// kernel only
process_id: ProcessId,
persisted: PersistedProcess,
},
Shutdown,
// capabilities creation
GrantCapability {
to_process: ProcessId,
params: String, // JSON-string
},
}
//
// conversions between wit types and kernel types (annoying)
//