println macro replace working, fixing up other apps

This commit is contained in:
dr-frmr 2023-11-05 23:03:06 -05:00
parent 56be17145a
commit 679c4b975c
No known key found for this signature in database
7 changed files with 570 additions and 698 deletions

View File

@ -1,14 +1,13 @@
use serde::{Deserialize, Serialize};
use sha2::Digest;
use std::collections::{HashMap, HashSet};
use uqbar_process_lib::component::uq_process::api::*;
use uqbar_process_lib::component::uq_process::types::NodeId;
use uqbar_process_lib::*;
use uqbar_process_lib::kernel_types as kt;
use uqbar_process_lib::uqbar::process::standard as wit;
use uqbar_process_lib::{NodeId, Address, ProcessId, Request};
wit_bindgen::generate!({
path: "../../../wit",
world: "uq-process",
path: "../../wit",
world: "process",
exports: {
world: Component,
},

View File

@ -11,24 +11,18 @@ opt-level = "s"
lto = true
[dependencies]
cargo-component-bindings = { git = "https://github.com/bytecodealliance/cargo-component" }
serde_json = "1.0"
serde = {version = "1.0", features = ["derive"] }
wit-bindgen = { version = "0.11.0", default_features = false }
thiserror = "1.0.43"
anyhow = "1.0"
alloy-sol-types = "0.3.2"
hex = "0.4.3"
alloy-primitives = "0.3.3"
alloy-sol-types = "0.3.2"
bincode = "1.3.3"
hex = "0.4.3"
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", version = "0.13.0" }
uqbar_process_lib = { path = "../../process_lib" }
[lib]
crate-type = ["cdylib"]
[package.metadata.component]
package = "component:uq-process"
[package.metadata.component.target]
path = "wit"
[package.metadata.component.dependencies]
package = "uqbar:process"

View File

@ -1,17 +1,20 @@
cargo_component_bindings::generate!();
use alloy_primitives::FixedBytes;
use alloy_sol_types::{sol, SolEvent};
use bindings::component::uq_process::types::*;
use bindings::{print_to_terminal, receive, send_request, send_response, UqProcess};
use hex;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::string::FromUtf8Error;
use uqbar_process_lib::uqbar::process::standard as wit;
use uqbar_process_lib::{Address, ProcessId, Request, Response};
#[allow(dead_code)]
mod process_lib;
wit_bindgen::generate!({
path: "../../wit",
world: "process",
exports: {
world: Component,
},
});
struct Component;
@ -98,8 +101,26 @@ fn subscribe_to_qns(from_block: u64) -> Vec<u8> {
.to_vec()
}
fn serialize_state(state: &State) -> anyhow::Result<Vec<u8>> {
Ok(bincode::serialize(state)?)
}
fn deserialize_state(bytes: &[u8]) -> anyhow::Result<State> {
Ok(bincode::deserialize(bytes)?)
}
fn serialize_message(message: &NetActions) -> anyhow::Result<Vec<u8>> {
Ok(serde_json::to_vec(message)?)
}
fn deserialize_message(bytes: &[u8]) -> anyhow::Result<NetActions> {
Ok(serde_json::from_slice(bytes)?)
}
impl UqProcess for Component {
fn init(our: Address) {
fn init(our: String) {
let our = Address::from_str(&our).unwrap();
let mut state: State = State {
names: HashMap::new(),
nodes: HashMap::new(),
@ -107,247 +128,202 @@ impl UqProcess for Component {
};
// if we have state, load it in
match process_lib::get_state::<State>() {
match get_typed_state::<State>(deserialize_state) {
Some(s) => {
state = s;
}
None => {}
}
bindings::print_to_terminal(
print_to_terminal(
0,
&format!("qns_indexer: starting at block {}", state.block),
);
// shove all state into net::net
send_request(
&Address {
node: our.node.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
&Request {
inherit: false,
expects_response: None,
metadata: None,
ipc: serde_json::to_vec(&NetActions::QnsBatchUpdate(
state.nodes.values().cloned().collect::<Vec<_>>(),
))
.unwrap(),
},
None,
None,
);
match main(our, state) {
Ok(_) => {}
Err(e) => {
print_to_terminal(0, &format!("qns_indexer: ended with error: {:?}", e));
}
}
}
}
let _ = send_request(
&Address {
node: our.node.clone(),
process: ProcessId::from_str("eth_rpc:sys:uqbar").unwrap(),
},
&Request {
inherit: false, // TODO what
expects_response: Some(5), // TODO evaluate
metadata: None,
// -1 because there could be other events in the last processed block
ipc: subscribe_to_qns(state.block - 1),
},
None,
None,
);
fn main(our: Address, state: State) -> anyhow::Result<()> {
// shove all state into net::net
Request::new()
.target(Address::new(our.node, "net:sys:uqbar")?)?
.ipc(
&NetActions::QnsBatchUpdate(state.nodes.values().cloned().collect::<Vec<_>>()),
serialize_message,
)?
.send()?;
let http_server_address = ProcessId::from_str("http_server:sys:uqbar").unwrap();
Request::new()
.target(Address::new(our.node, "eth_rpc:sys:uqbar")?)?
.ipc_bytes(subscribe_to_qns(state.block - 1))?
.expects_response(5)
.send()?;
let _register_endpoint = send_request(
&Address {
node: our.node.clone(),
process: http_server_address.clone(),
},
&Request {
inherit: false,
expects_response: None,
metadata: None,
ipc: json!({
"BindPath": {
"path": "/node/:name",
"authenticated": false,
"local_only": false
}
})
.to_string()
.as_bytes()
.to_vec(),
},
None,
None,
);
let http_server_address = ProcessId::from_str("http_server:sys:uqbar").unwrap();
loop {
let Ok((source, message)) = receive() else {
print_to_terminal(0, "qns_indexer: got network error");
continue;
};
let Message::Request(request) = message else {
// TODO we should store the subscription ID for eth_rpc
// incase we want to cancel/reset it
// print_to_terminal(0, "qns_indexer: got response");
continue;
};
Request::new()
.target(Address::new(our.node, http_server_address)?)?
.ipc_bytes(
json!({
"BindPath": {
"path": "/node/:name",
"authenticated": false,
"local_only": false
}
})
.to_string()
.as_bytes(),
)?
.send()?;
if source.process == http_server_address {
if let Ok(ipc_json) =
serde_json::from_slice::<serde_json::Value>(&request.ipc)
{
if ipc_json["path"].as_str().unwrap_or_default() == "/node/:name" {
if let Some(name) = ipc_json["url_params"]["name"].as_str() {
if let Some(node) = state.nodes.get(name) {
send_response(
&Response {
inherit: false,
ipc: serde_json::json!({
"status": 200,
"headers": {
"Content-Type": "application/json",
},
})
.to_string()
.as_bytes()
.to_vec(),
metadata: None,
},
Some(&Payload {
loop {
let Ok((source, message)) = receive() else {
print_to_terminal(0, "qns_indexer: got network error");
continue;
};
let Message::Request(request) = message else {
// TODO we should store the subscription ID for eth_rpc
// incase we want to cancel/reset it
// print_to_terminal(0, "qns_indexer: got response");
continue;
};
if source.process == http_server_address {
if let Ok(ipc_json) = serde_json::from_slice::<serde_json::Value>(&request.ipc) {
if ipc_json["path"].as_str().unwrap_or_default() == "/node/:name" {
if let Some(name) = ipc_json["url_params"]["name"].as_str() {
if let Some(node) = state.nodes.get(name) {
Response::new()
.ipc_bytes(
serde_json::json!({
"status": 200,
"headers": {
"Content-Type": "application/json",
},
})
.payload(&Payload {
mime: Some("application/json".to_string()),
bytes: serde_json::to_string(&node)
.unwrap()
.as_bytes()
.to_vec(),
}),
);
continue;
}
})?,
)
.send()?;
continue;
}
}
}
send_response(
&Response {
inherit: false,
ipc: serde_json::json!({
"status": 404,
"headers": {
"Content-Type": "application/json",
},
})
.to_string()
.as_bytes()
.to_vec(),
metadata: None,
},
Some(&Payload {
}
Response::new()
.ipc_bytes(
serde_json::json!({
"status": 404,
"headers": {
"Content-Type": "application/json",
},
})
.payload(&Payload {
mime: Some("application/json".to_string()),
bytes: "Not Found".to_string().as_bytes().to_vec(),
}),
);
continue;
}
})?,
)
.send()?;
continue;
}
let Ok(msg) = serde_json::from_slice::<AllActions>(&request.ipc) else {
print_to_terminal(0, "qns_indexer: got invalid message");
continue;
};
let Ok(msg) = serde_json::from_slice::<AllActions>(&request.ipc) else {
print_to_terminal(0, "qns_indexer: got invalid message");
continue;
};
match msg {
// Probably more message types later...maybe not...
AllActions::EventSubscription(e) => {
state.block = hex_to_u64(&e.block_number).unwrap();
match decode_hex(&e.topics[0].clone()) {
NodeRegistered::SIGNATURE_HASH => {
// bindings::print_to_terminal(0, format!("qns_indexer: got NodeRegistered event: {:?}", e).as_str());
match msg {
// Probably more message types later...maybe not...
AllActions::EventSubscription(e) => {
state.block = hex_to_u64(&e.block_number).unwrap();
match decode_hex(&e.topics[0].clone()) {
NodeRegistered::SIGNATURE_HASH => {
// print_to_terminal(0, format!("qns_indexer: got NodeRegistered event: {:?}", e).as_str());
let node = &e.topics[1];
let decoded =
NodeRegistered::decode_data(&decode_hex_to_vec(&e.data), true)
.unwrap();
let Ok(name) = dnswire_decode(decoded.0.clone()) else {
bindings::print_to_terminal(
1,
&format!("qns_indexer: failed to decode name: {:?}", decoded.0),
);
continue;
};
state.names.insert(node.to_string(), name);
}
WsChanged::SIGNATURE_HASH => {
let node = &e.topics[1];
let decoded =
WsChanged::decode_data(&decode_hex_to_vec(&e.data), true).unwrap();
let public_key = hex::encode(decoded.0);
let ip = decoded.1;
let port = decoded.2;
let routers_raw = decoded.3;
let routers: Vec<String> = routers_raw
.iter()
.map(|r| {
let key = hex::encode(r);
match state.names.get(&key) {
Some(name) => name.clone(),
None => format!("0x{}", key), // TODO it should actually just panic here
}
})
.collect::<Vec<String>>();
let Some(name) = state.names.get(node) else {
bindings::print_to_terminal(0, &format!("qns_indexer: failed to find name for node during WsChanged: {:?}", node));
continue;
};
let update = QnsUpdate {
name: name.clone(),
owner: "0x".to_string(), // TODO or get rid of
node: node.clone(),
public_key: format!("0x{}", public_key),
ip: format!(
"{}.{}.{}.{}",
(ip >> 24) & 0xFF,
(ip >> 16) & 0xFF,
(ip >> 8) & 0xFF,
ip & 0xFF
),
port,
routers,
};
state.nodes.insert(name.clone(), update.clone());
send_request(
&Address {
node: our.node.clone(),
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
&Request {
inherit: false,
expects_response: None,
metadata: None,
ipc: serde_json::to_vec(&NetActions::QnsUpdate(update.clone()))
.unwrap(),
},
None,
None,
let node = &e.topics[1];
let decoded =
NodeRegistered::decode_data(&decode_hex_to_vec(&e.data), true).unwrap();
let Ok(name) = dnswire_decode(decoded.0.clone()) else {
print_to_terminal(
1,
&format!("qns_indexer: failed to decode name: {:?}", decoded.0),
);
}
event => {
bindings::print_to_terminal(
0,
format!("qns_indexer: got unknown event: {:?}", event).as_str(),
);
}
continue;
};
state.names.insert(node.to_string(), name);
}
WsChanged::SIGNATURE_HASH => {
let node = &e.topics[1];
let decoded =
WsChanged::decode_data(&decode_hex_to_vec(&e.data), true).unwrap();
let public_key = hex::encode(decoded.0);
let ip = decoded.1;
let port = decoded.2;
let routers_raw = decoded.3;
let routers: Vec<String> = routers_raw
.iter()
.map(|r| {
let key = hex::encode(r);
match state.names.get(&key) {
Some(name) => name.clone(),
None => format!("0x{}", key), // TODO it should actually just panic here
}
})
.collect::<Vec<String>>();
let Some(name) = state.names.get(node) else {
print_to_terminal(0, &format!("qns_indexer: failed to find name for node during WsChanged: {:?}", node));
continue;
};
let update = QnsUpdate {
name: name.clone(),
owner: "0x".to_string(), // TODO or get rid of
node: node.clone(),
public_key: format!("0x{}", public_key),
ip: format!(
"{}.{}.{}.{}",
(ip >> 24) & 0xFF,
(ip >> 16) & 0xFF,
(ip >> 8) & 0xFF,
ip & 0xFF
),
port,
routers,
};
state.nodes.insert(name.clone(), update.clone());
Request::new()
.target(Address::new(our.node, "net:sys:uqbar")?)?
.ipc(&NetActions::QnsUpdate(update.clone()), serialize_message)?
.send()?;
}
event => {
print_to_terminal(
0,
format!("qns_indexer: got unknown event: {:?}", event).as_str(),
);
}
}
}
process_lib::set_state::<State>(&state);
}
set_typed_state::<State>(&state, serialize_state);
}
}
// helpers
// TODO these probably exist somewhere in alloy...not sure where though.
fn decode_hex(s: &str) -> FixedBytes<32> {

View File

@ -1 +0,0 @@
../../../src/process_lib.rs

View File

@ -1,4 +1,6 @@
use uqbar_process_lib::uqbar::process::standard::*;
use anyhow::anyhow;
use uqbar_process_lib::uqbar::process::standard as wit;
use uqbar_process_lib::{Address, ProcessId, Request, println};
wit_bindgen::generate!({
path: "../../wit",
@ -8,99 +10,65 @@ wit_bindgen::generate!({
},
});
struct Component;
fn serialize_message(message: &&str) -> anyhow::Result<Vec<u8>> {
Ok(serde_json::to_vec(message)?)
}
fn parse_command(our_name: &str, line: &str) {
fn parse_command(our_name: &str, line: &str) -> anyhow::Result<()> {
let (head, tail) = line.split_once(" ").unwrap_or((&line, ""));
match head {
"" | " " => {}
"" | " " => return Ok(()),
"!hi" => {
let (target, message) = match tail.split_once(" ") {
let (node_id, message) = match tail.split_once(" ") {
Some((s, t)) => (s, t),
None => {
print_to_terminal(0, &format!("invalid command: \"{}\"", line));
return;
}
None => return Err(anyhow!("invalid command: \"{line}\"")),
};
send_request(
&Address {
node: if target == "our" {
our_name.into()
} else {
target.into()
},
process: ProcessId::from_str("net:sys:uqbar").unwrap(),
},
&Request {
inherit: false,
expects_response: Some(5),
ipc: message.into(),
metadata: None,
},
None,
None,
);
let node_id = if node_id == "our" { our_name } else { node_id };
Request::new()
.target(Address::new(node_id, "net:sys:uqbar").unwrap())?
.ipc(&message, serialize_message)?
.expects_response(5)
.send()?;
Ok(())
}
"!message" => {
let (target_node, tail) = match tail.split_once(" ") {
let (node_id, tail) = match tail.split_once(" ") {
Some((s, t)) => (s, t),
None => {
print_to_terminal(0, &format!("invalid command: \"{}\"", line));
return;
}
None => return Err(anyhow!("invalid command: \"{line}\"")),
};
let (target_process, ipc) = match tail.split_once(" ") {
Some((a, p)) => (a, p),
None => {
print_to_terminal(0, &format!("invalid command: \"{}\"", line));
return;
}
None => return Err(anyhow!("invalid command: \"{line}\"")),
};
// TODO: why does this work but using the API below does not?
// Is it related to passing json in rather than a Serialize type?
//
send_request(
&Address {
node: if target_node == "our" {
our_name.into()
} else {
target_node.into()
},
process: ProcessId::from_str(target_process).unwrap_or_else(|_| {
ProcessId::from_str(&format!("{}:sys:uqbar", target_process)).unwrap()
}),
},
&Request {
inherit: false,
expects_response: None,
ipc: ipc.into(),
metadata: None,
},
None,
None,
);
}
_ => {
print_to_terminal(0, &format!("invalid command: \"{line}\""));
let node_id = if node_id == "our" { our_name } else { node_id };
let process = ProcessId::from_str(target_process).unwrap_or_else(|_| {
ProcessId::from_str(&format!("{}:sys:uqbar", target_process)).unwrap()
});
Request::new()
.target(Address::new(node_id, process).unwrap())?
.ipc(&ipc, serialize_message)?
.send()?;
Ok(())
}
_ => return Err(anyhow!("invalid command: \"{line}\"")),
}
}
struct Component;
impl Guest for Component {
fn init(our: String) {
let our = Address::from_str(&our).unwrap();
assert_eq!(our.process.to_string(), "terminal:terminal:uqbar");
print_to_terminal(1, &format!("terminal: start"));
println!("terminal: start");
loop {
let (source, message) = match receive() {
let (source, message) = match wit::receive() {
Ok((source, message)) => (source, message),
Err((error, _context)) => {
print_to_terminal(0, &format!("net error: {:?}!", error.kind));
println!("terminal: net error: {:?}!", error.kind);
continue;
}
};
match message {
Message::Request(Request {
wit::Message::Request(wit::Request {
expects_response,
ipc,
..
@ -108,11 +76,14 @@ impl Guest for Component {
if our.node != source.node || our.process != source.process {
continue;
}
parse_command(&our.node, std::str::from_utf8(&ipc).unwrap_or_default());
match parse_command(&our.node, std::str::from_utf8(&ipc).unwrap_or_default()) {
Ok(()) => continue,
Err(e) => println!("terminal: {e}"),
}
}
Message::Response((Response { ipc, metadata, .. }, _)) => {
wit::Message::Response((wit::Response { ipc, metadata, .. }, _)) => {
if let Ok(txt) = std::str::from_utf8(&ipc) {
print_to_terminal(0, &format!("net response: {}", txt));
println!("terminal: net response: {txt}");
}
}
}

View File

@ -1,5 +1,5 @@
#![allow(dead_code)]
use crate::uqbar::process::standard::*;
use crate::uqbar::process::standard as wit;
pub use crate::uqbar::process::standard::*;
/// Uqbar process standard library for Rust compiled to WASM
/// Must be used in context of bindings generated by uqbar.wit
use serde::{Deserialize, Serialize};
@ -12,15 +12,15 @@ wit_bindgen::generate!({
pub mod kernel_types;
/// Override the println! macro to print to the terminal
/// TODO make this work
// macro_rules! println {
// () => {
// $print_to_terminal(0, "\n");
// };
// ($($arg:tt)*) => {
// $print_to_terminal(0, &format!($($arg)*));
// };
// }
#[macro_export]
macro_rules! println {
() => {
$crate::print_to_terminal(0, "\n");
};
($($arg:tt)*) => {{
$crate::print_to_terminal(0, &format!($($arg)*));
}};
}
/// PackageId is like a ProcessId, but for a package. Only contains the name
/// of the package and the name of the publisher.
@ -121,6 +121,22 @@ impl ProcessId {
}
}
pub trait IntoProcessId {
fn into_process_id(self) -> Result<ProcessId, ProcessIdParseError>;
}
impl IntoProcessId for ProcessId {
fn into_process_id(self) -> Result<ProcessId, ProcessIdParseError> {
Ok(self)
}
}
impl IntoProcessId for &str {
fn into_process_id(self) -> Result<ProcessId, ProcessIdParseError> {
ProcessId::from_str(self)
}
}
impl std::fmt::Display for ProcessId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
@ -181,6 +197,12 @@ impl std::error::Error for ProcessIdParseError {
/// Address is defined in the wit bindings, but constructors and methods here.
impl Address {
pub fn new<T: IntoProcessId>(node: &str, process: T) -> Result<Address, ProcessIdParseError> {
Ok(Address {
node: node.to_string(),
process: process.into_process_id()?,
})
}
pub fn from_str(input: &str) -> Result<Self, AddressParseError> {
// split string on colons into 4 segments,
// first one with @, next 3 with :
@ -222,6 +244,22 @@ impl Address {
}
}
pub trait IntoAddress {
fn into_address(self) -> Result<Address, AddressParseError>;
}
impl IntoAddress for Address {
fn into_address(self) -> Result<Address, AddressParseError> {
Ok(self)
}
}
impl IntoAddress for &str {
fn into_address(self) -> Result<Address, AddressParseError> {
Address::from_str(self)
}
}
#[derive(Debug)]
pub enum AddressParseError {
TooManyColons,
@ -229,6 +267,30 @@ pub enum AddressParseError {
MissingField,
}
impl std::fmt::Display for AddressParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
AddressParseError::TooManyColons => "Too many colons in ProcessId string",
AddressParseError::MissingNodeId => "Node ID missing",
AddressParseError::MissingField => "Missing field in ProcessId string",
}
)
}
}
impl std::error::Error for AddressParseError {
fn description(&self) -> &str {
match self {
AddressParseError::TooManyColons => "Too many colons in ProcessId string",
AddressParseError::MissingNodeId => "Node ID missing",
AddressParseError::MissingField => "Missing field in ProcessId string",
}
}
}
///
/// Here, we define wrappers over the wit bindings to make them easier to use.
/// This library prescribes the use of IPC and metadata types serialized and
@ -237,100 +299,278 @@ pub enum AddressParseError {
/// For payloads, we use bincode to serialize and deserialize to bytes.
///
pub fn send_typed_request<T1>(
target: &Address,
inherit_payload_and_context: bool,
ipc: &T1,
metadata: Option<String>,
payload: Option<&Payload>,
pub struct Request {
target: Option<Address>,
inherit: bool,
timeout: Option<u64>,
) -> anyhow::Result<()>
where
T1: serde::Serialize,
{
crate::send_request(
target,
&Request {
inherit: inherit_payload_and_context,
expects_response: timeout,
ipc: serde_json::to_vec(ipc)?,
metadata,
},
None,
payload,
);
Ok(())
}
pub fn send_typed_response<T1>(
inherit_payload: bool,
ipc: &T1,
ipc: Option<Vec<u8>>,
metadata: Option<String>,
payload: Option<&Payload>, // will overwrite inherit flag if both are set
) -> anyhow::Result<()>
where
T1: serde::Serialize,
{
crate::send_response(
&Response {
inherit: inherit_payload,
ipc: serde_json::to_vec(ipc)?,
metadata,
},
payload,
);
Ok(())
payload: Option<Payload>,
context: Option<Vec<u8>>,
}
pub fn send_and_await_typed_response<T1>(
target: &Address,
inherit_payload_and_context: bool,
ipc: &T1,
impl Request {
pub fn new() -> Self {
Request {
target: None,
inherit: false,
timeout: None,
ipc: None,
metadata: None,
payload: None,
context: None,
}
}
pub fn target<T: IntoAddress>(mut self, target: T) -> Result<Self, AddressParseError> {
self.target = Some(target.into_address()?);
Ok(self)
}
pub fn inherit(mut self, inherit: bool) -> Self {
self.inherit = inherit;
self
}
pub fn expects_response(mut self, timeout: u64) -> Self {
self.timeout = Some(timeout);
self
}
pub fn ipc_bytes(mut self, ipc: Vec<u8>) -> Self {
self.ipc = Some(ipc);
self
}
pub fn ipc<T, F>(mut self, ipc: &T, serializer: F) -> anyhow::Result<Self>
where
F: Fn(&T) -> anyhow::Result<Vec<u8>>,
{
self.ipc = Some(serializer(ipc)?);
Ok(self)
}
pub fn metadata(mut self, metadata: Option<String>) -> Self {
self.metadata = metadata;
self
}
pub fn payload(mut self, payload: Option<Payload>) -> Self {
self.payload = payload;
self
}
pub fn payload_mime(mut self, mime: String) -> Self {
if self.payload.is_none() {
self.payload = Some(Payload {
mime: Some(mime),
bytes: vec![],
});
self
} else {
self.payload = Some(Payload {
mime: Some(mime),
bytes: self.payload.unwrap().bytes,
});
self
}
}
pub fn payload_bytes(mut self, bytes: Vec<u8>) -> Self {
if self.payload.is_none() {
self.payload = Some(Payload { mime: None, bytes });
self
} else {
self.payload = Some(Payload {
mime: self.payload.unwrap().mime,
bytes,
});
self
}
}
pub fn context_bytes(mut self, context: Option<Vec<u8>>) -> Self {
self.context = context;
self
}
pub fn context<T, F>(mut self, context: &T, serializer: F) -> anyhow::Result<Self>
where
F: Fn(&T) -> anyhow::Result<Vec<u8>>,
{
self.context = Some(serializer(context)?);
Ok(self)
}
pub fn send(self) -> anyhow::Result<()> {
if let (Some(target), Some(ipc)) = (self.target, self.ipc) {
crate::send_request(
&target,
&wit::Request {
inherit: self.inherit,
expects_response: self.timeout,
ipc: serde_json::to_vec(&ipc)?,
metadata: self.metadata,
},
self.context.as_ref(),
self.payload.as_ref(),
);
Ok(())
} else {
Err(anyhow::anyhow!("missing fields"))
}
}
pub fn send_and_await_response(self) -> anyhow::Result<Result<(Address, Message), SendError>> {
if let (Some(target), Some(ipc)) = (self.target, self.ipc) {
Ok(crate::send_and_await_response(
&target,
&wit::Request {
inherit: self.inherit,
expects_response: self.timeout,
ipc: serde_json::to_vec(&ipc)?,
metadata: self.metadata,
},
self.payload.as_ref(),
))
} else {
Err(anyhow::anyhow!("missing fields"))
}
}
}
pub struct Response {
inherit: bool,
ipc: Option<Vec<u8>>,
metadata: Option<String>,
payload: Option<&Payload>,
timeout: u64,
) -> anyhow::Result<Result<(Address, Message), SendError>>
where
T1: serde::Serialize,
{
let res = crate::send_and_await_response(
target,
&Request {
inherit: inherit_payload_and_context,
expects_response: Some(timeout),
ipc: serde_json::to_vec(ipc)?,
metadata,
},
payload,
);
Ok(res)
payload: Option<Payload>,
}
pub fn get_typed_payload<T: serde::de::DeserializeOwned>() -> Option<T> {
impl Response {
pub fn new() -> Self {
Response {
inherit: false,
ipc: None,
metadata: None,
payload: None,
}
}
pub fn inherit(mut self, inherit: bool) -> Self {
self.inherit = inherit;
self
}
pub fn ipc_bytes(mut self, ipc: Vec<u8>) -> Self {
self.ipc = Some(ipc);
self
}
pub fn ipc<T, F>(mut self, ipc: &T, serializer: F) -> anyhow::Result<Self>
where
F: Fn(&T) -> anyhow::Result<Vec<u8>>,
{
self.ipc = Some(serializer(ipc)?);
Ok(self)
}
pub fn metadata(mut self, metadata: Option<String>) -> Self {
self.metadata = metadata;
self
}
pub fn payload(mut self, payload: Option<Payload>) -> Self {
self.payload = payload;
self
}
pub fn payload_mime(mut self, mime: String) -> Self {
if self.payload.is_none() {
self.payload = Some(Payload {
mime: Some(mime),
bytes: vec![],
});
self
} else {
self.payload = Some(Payload {
mime: Some(mime),
bytes: self.payload.unwrap().bytes,
});
self
}
}
pub fn payload_bytes(mut self, bytes: Vec<u8>) -> Self {
if self.payload.is_none() {
self.payload = Some(Payload { mime: None, bytes });
self
} else {
self.payload = Some(Payload {
mime: self.payload.unwrap().mime,
bytes,
});
self
}
}
pub fn send(self) -> anyhow::Result<()> {
if let Some(ipc) = self.ipc {
crate::send_response(
&wit::Response {
inherit: self.inherit,
ipc: serde_json::to_vec(&ipc)?,
metadata: self.metadata,
},
self.payload.as_ref(),
);
Ok(())
} else {
Err(anyhow::anyhow!("missing IPC"))
}
}
}
pub fn make_payload<T, F>(payload: &T, serializer: F) -> anyhow::Result<Payload>
where
F: Fn(&T) -> anyhow::Result<Vec<u8>>,
{
Ok(Payload {
mime: None,
bytes: serializer(payload)?,
})
}
pub fn get_typed_payload<T, F>(deserializer: F) -> Option<T>
where
F: Fn(&[u8]) -> anyhow::Result<T>,
{
match crate::get_payload() {
Some(payload) => match bincode::deserialize::<T>(&payload.bytes) {
Ok(bytes) => Some(bytes),
Some(payload) => match deserializer(&payload.bytes) {
Ok(thing) => Some(thing),
Err(_) => None,
},
None => None,
}
}
pub fn get_typed_state<T: serde::de::DeserializeOwned>() -> Option<T> {
match crate::get_state() {
Some(bytes) => match bincode::deserialize::<T>(&bytes) {
Ok(state) => Some(state),
Err(_) => None,
},
None => None,
}
}
pub fn set_typed_state<T>(state: &T) -> anyhow::Result<()>
pub fn get_typed_state<T, F>(deserializer: F) -> Option<T>
where
T: serde::Serialize,
F: Fn(&[u8]) -> anyhow::Result<T>,
{
crate::set_state(&bincode::serialize(state)?);
match crate::get_state() {
Some(bytes) => match deserializer(&bytes) {
Ok(thing) => Some(thing),
Err(_) => None,
},
None => None,
}
}
pub fn set_typed_state<T, F>(state: &T, serializer: F) -> anyhow::Result<()>
where
F: Fn(&T) -> anyhow::Result<Vec<u8>>,
{
crate::set_state(&serializer(state)?);
Ok(())
}

View File

@ -1,307 +0,0 @@
use serde::{Deserialize, Serialize};
use super::bindings::component::uq_process::types::*;
use super::bindings::{get_capability, share_capability, Address, Payload, ProcessId, SendError};
#[derive(Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct PackageId {
pub package_name: String,
pub publisher_node: String,
}
impl PackageId {
pub fn new(package_name: &str, publisher_node: &str) -> Self {
PackageId {
package_name: package_name.into(),
publisher_node: publisher_node.into(),
}
}
pub fn from_str(input: &str) -> Result<Self, ProcessIdParseError> {
// split string on colons into 2 segments
let mut segments = input.split(':');
let package_name = segments
.next()
.ok_or(ProcessIdParseError::MissingField)?
.to_string();
let publisher_node = segments
.next()
.ok_or(ProcessIdParseError::MissingField)?
.to_string();
if segments.next().is_some() {
return Err(ProcessIdParseError::TooManyColons);
}
Ok(PackageId {
package_name,
publisher_node,
})
}
pub fn to_string(&self) -> String {
[self.package_name.as_str(), self.publisher_node.as_str()].join(":")
}
pub fn package(&self) -> &str {
&self.package_name
}
pub fn publisher_node(&self) -> &str {
&self.publisher_node
}
}
#[allow(dead_code)]
impl ProcessId {
/// generates a random u64 number if process_name is not declared
pub fn new(process_name: &str, package_name: &str, publisher_node: &str) -> Self {
ProcessId {
process_name: process_name.into(),
package_name: package_name.into(),
publisher_node: publisher_node.into(),
}
}
pub fn from_str(input: &str) -> Result<Self, ProcessIdParseError> {
// split string on colons into 3 segments
let mut segments = input.split(':');
let process_name = segments
.next()
.ok_or(ProcessIdParseError::MissingField)?
.to_string();
let package_name = segments
.next()
.ok_or(ProcessIdParseError::MissingField)?
.to_string();
let publisher_node = segments
.next()
.ok_or(ProcessIdParseError::MissingField)?
.to_string();
if segments.next().is_some() {
return Err(ProcessIdParseError::TooManyColons);
}
Ok(ProcessId {
process_name,
package_name,
publisher_node,
})
}
pub fn to_string(&self) -> String {
[
self.process_name.as_str(),
self.package_name.as_str(),
self.publisher_node.as_str(),
]
.join(":")
}
pub fn process(&self) -> &str {
&self.process_name
}
pub fn package(&self) -> &str {
&self.package_name
}
pub fn publisher_node(&self) -> &str {
&self.publisher_node
}
}
impl std::fmt::Display for ProcessId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}:{}",
self.process_name, self.package_name, self.publisher_node
)
}
}
impl PartialEq for ProcessId {
fn eq(&self, other: &Self) -> bool {
self.process_name == other.process_name
&& self.package_name == other.package_name
&& self.publisher_node == other.publisher_node
}
}
impl PartialEq<&str> for ProcessId {
fn eq(&self, other: &&str) -> bool {
&self.to_string() == other
}
}
impl PartialEq<ProcessId> for &str {
fn eq(&self, other: &ProcessId) -> bool {
self == &other.to_string()
}
}
#[derive(Debug)]
pub enum ProcessIdParseError {
TooManyColons,
MissingField,
}
impl std::fmt::Display for ProcessIdParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string",
ProcessIdParseError::MissingField => "Missing field in ProcessId string",
}
)
}
}
impl std::error::Error for ProcessIdParseError {
fn description(&self) -> &str {
match self {
ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string",
ProcessIdParseError::MissingField => "Missing field in ProcessId string",
}
}
}
impl Address {
pub fn from_str(input: &str) -> Result<Self, AddressParseError> {
// split string on colons into 4 segments,
// first one with @, next 3 with :
let mut name_rest = input.split('@');
let node = name_rest
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
let mut segments = name_rest
.next()
.ok_or(AddressParseError::MissingNodeId)?
.split(':');
let process_name = segments
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
let package_name = segments
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
let publisher_node = segments
.next()
.ok_or(AddressParseError::MissingField)?
.to_string();
if segments.next().is_some() {
return Err(AddressParseError::TooManyColons);
}
Ok(Address {
node,
process: ProcessId {
process_name,
package_name,
publisher_node,
},
})
}
pub fn to_string(&self) -> String {
[self.node.as_str(), &self.process.to_string()].join("@")
}
}
#[derive(Debug)]
pub enum AddressParseError {
TooManyColons,
MissingNodeId,
MissingField,
}
pub fn send_and_await_response(
target: &Address,
inherit: bool,
ipc: Vec<u8>,
metadata: Option<Json>,
payload: Option<&Payload>,
timeout: u64,
) -> Result<(Address, Message), SendError> {
super::bindings::send_and_await_response(
target,
&Request {
inherit,
expects_response: Some(timeout),
ipc,
metadata,
},
payload,
)
}
pub fn send_request(
target: &Address,
inherit: bool,
ipc: Vec<u8>,
metadata: Option<Json>,
context: Option<&Vec<u8>>,
payload: Option<&Payload>,
) {
super::bindings::send_request(
target,
&Request {
inherit,
expects_response: None,
ipc,
metadata,
},
context,
payload,
)
}
pub fn get_state<T: serde::de::DeserializeOwned>() -> Option<T> {
match super::bindings::get_state() {
Some(bytes) => match bincode::deserialize::<T>(&bytes) {
Ok(state) => Some(state),
Err(_) => None,
},
None => None,
}
}
pub fn set_state<T>(state: &T)
where
T: serde::Serialize,
{
super::bindings::set_state(&bincode::serialize(state).unwrap());
}
pub fn parse_message_ipc<T>(json_bytes: &[u8]) -> anyhow::Result<T>
where
for<'a> T: serde::Deserialize<'a>,
{
let parsed: T = serde_json::from_slice(json_bytes)?;
Ok(parsed)
}
pub fn grant_messaging(our: &Address, grant_to: &Vec<ProcessId>) {
let Some(our_messaging_cap) = get_capability(
our,
&"\"messaging\"".into()
) else {
panic!("missing self-messaging cap!")
};
for process in grant_to {
share_capability(&process, &our_messaging_cap);
}
}
// move these to better place!
#[derive(Serialize, Deserialize, Debug)]
pub enum FsAction {
Write,
Replace(u128),
Append(Option<u128>),
Read(u128),
ReadChunk(ReadChunkRequest),
Delete(u128),
Length(u128),
// process state management
GetState,
SetState,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ReadChunkRequest {
pub file_uuid: u128,
pub start: u64,
pub length: u64,
}