Merge pull request #215 from kinode-dao/da/pipe

Da/pipe
This commit is contained in:
doria 2024-01-29 21:21:18 -03:00 committed by GitHub
commit 93e007ced8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 110 additions and 18 deletions

View File

@ -1,5 +1,5 @@
use kinode_process_lib::{ use kinode_process_lib::{
await_next_request_body, call_init, get_blob, println, vfs, Address, Request, await_next_request_body, call_init, get_blob, println, vfs, Address, Request, Response,
}; };
wit_bindgen::generate!({ wit_bindgen::generate!({
@ -39,8 +39,5 @@ fn init(_our: Address) {
println!("no file found at {}", file_path); println!("no file found at {}", file_path);
return; return;
}; };
println!( let _ = Response::new().body(blob.bytes).send();
"{}",
String::from_utf8(blob.bytes).unwrap_or("could not stringify file".to_string())
);
} }

View File

@ -1,4 +1,4 @@
use kinode_process_lib::{await_next_request_body, call_init, println, Address}; use kinode_process_lib::{await_next_request_body, call_init, println, Address, Response};
wit_bindgen::generate!({ wit_bindgen::generate!({
path: "../../../wit", path: "../../../wit",
@ -16,8 +16,10 @@ fn init(_our: Address) {
return; return;
}; };
println!( let _ = Response::new()
"{}", .body(format!(
String::from_utf8(args).unwrap_or("echo: error".into()) "{}",
); String::from_utf8(args).unwrap_or("echo: error".into())
))
.send();
} }

View File

@ -1,5 +1,5 @@
use clap::{Arg, Command}; use clap::{Arg, Command};
use kinode_process_lib::{await_next_request_body, call_init, println, Address, Request}; use kinode_process_lib::{await_next_request_body, call_init, println, Address, Request, Response};
use regex::Regex; use regex::Regex;
wit_bindgen::generate!({ wit_bindgen::generate!({
@ -73,7 +73,7 @@ fn init(_our: Address) {
println!("m: awaiting response for {}s", s); println!("m: awaiting response for {}s", s);
match req.send_and_await_response(*s).unwrap() { match req.send_and_await_response(*s).unwrap() {
Ok(res) => { Ok(res) => {
println!("m: {:?}", res); let _ = Response::new().body(res.body()).send();
} }
Err(e) => { Err(e) => {
println!("m: SendError: {:?}", e.kind); println!("m: SendError: {:?}", e.kind);

View File

@ -2,6 +2,15 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "aho-corasick"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.75" version = "1.0.75"
@ -161,6 +170,12 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "memchr"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149"
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.17" version = "0.3.17"
@ -237,6 +252,35 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "regex"
version = "1.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.16" version = "1.0.16"
@ -314,6 +358,7 @@ dependencies = [
"bincode", "bincode",
"kinode_process_lib", "kinode_process_lib",
"rand", "rand",
"regex",
"serde", "serde",
"serde_json", "serde_json",
"wit-bindgen", "wit-bindgen",

View File

@ -15,6 +15,7 @@ anyhow = "1.0"
bincode = "1.3.3" bincode = "1.3.3"
kinode_process_lib = { git = "https://github.com/uqbar-dao/process_lib.git", rev = "329c7a8" } kinode_process_lib = { git = "https://github.com/uqbar-dao/process_lib.git", rev = "329c7a8" }
rand = "0.8" rand = "0.8"
regex = "1.10.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" } wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "efcc759" }

View File

@ -5,6 +5,7 @@ use kinode_process_lib::{
get_blob, get_capability, get_typed_state, our_capabilities, println, set_state, vfs, Address, get_blob, get_capability, get_typed_state, our_capabilities, println, set_state, vfs, Address,
Capability, PackageId, ProcessId, Request, Capability, PackageId, ProcessId, Request,
}; };
use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -43,9 +44,32 @@ fn parse_command(state: &mut TerminalState, line: &str) -> anyhow::Result<()> {
}, },
}; };
let re = Regex::new(r"(.*?)\|(\d+)\s*(.*)").unwrap();
let pipe = match re.captures(args) {
Some(caps) => {
let parsed_args = caps
.get(1)
.map_or("", |m| m.as_str())
.trim_end()
.to_string();
let time_str = caps.get(2).map_or("", |m| m.as_str());
let time: u64 = time_str.parse().unwrap_or(0);
let pipe = caps
.get(3)
.map_or("", |m| m.as_str())
.trim_start()
.to_string();
(parsed_args, Some((pipe, time)))
}
None => (args.to_string(), None),
};
let wasm_path = format!("{}.wasm", process.process()); let wasm_path = format!("{}.wasm", process.process());
let package = PackageId::new(process.package(), process.publisher()); let package = PackageId::new(process.package(), process.publisher());
match handle_run(&state.our, &package, wasm_path, args.to_string()) { match handle_run(&state.our, &package, wasm_path, pipe.0, pipe.1) {
Ok(_) => Ok(()), // TODO clean up process Ok(_) => Ok(()), // TODO clean up process
Err(e) => Err(anyhow!("failed to instantiate script: {}", e)), Err(e) => Err(anyhow!("failed to instantiate script: {}", e)),
} }
@ -138,9 +162,9 @@ impl Guest for Component {
} }
wit::Message::Response((wit::Response { body, .. }, _)) => { wit::Message::Response((wit::Response { body, .. }, _)) => {
if let Ok(txt) = std::str::from_utf8(&body) { if let Ok(txt) = std::str::from_utf8(&body) {
println!("response from {source}: {txt}"); println!("{txt}");
} else { } else {
println!("response from {source}: {body:?}"); println!("{body:?}");
} }
} }
} }
@ -153,6 +177,7 @@ fn handle_run(
package: &PackageId, package: &PackageId,
wasm_path: String, wasm_path: String,
args: String, args: String,
pipe: Option<(String, u64)>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let drive_path = format!("/{}/pkg", package); let drive_path = format!("/{}/pkg", package);
Request::new() Request::new()
@ -330,9 +355,31 @@ fn handle_run(
parsed_new_process_id.clone(), parsed_new_process_id.clone(),
))?) ))?)
.send_and_await_response(5)??; .send_and_await_response(5)??;
let _ = Request::new() let req = Request::new()
.target(("our", parsed_new_process_id)) .target(("our", parsed_new_process_id))
.body(args.into_bytes()) .body(args.into_bytes());
.send();
let Some(pipe) = pipe else {
req.send().unwrap();
return Ok(());
};
let Ok(res) = req.clone().send_and_await_response(pipe.1).unwrap() else {
return Err(anyhow::anyhow!("script timed out"));
};
let _ = Request::new()
.target(our)
.body(
format!(
"{} {}",
pipe.0,
String::from_utf8(res.body().to_vec()).unwrap()
)
.into_bytes()
.to_vec(),
)
.send()?;
Ok(()) Ok(())
} }