diff --git a/kinode/packages/terminal/cat/src/lib.rs b/kinode/packages/terminal/cat/src/lib.rs index 8193998b..5fb508ee 100644 --- a/kinode/packages/terminal/cat/src/lib.rs +++ b/kinode/packages/terminal/cat/src/lib.rs @@ -41,5 +41,8 @@ fn init(_our: Address) { println!("no file found at {}", file_path); return; }; - println!("{}", String::from_utf8(blob.bytes).unwrap()); + match String::from_utf8(blob.bytes) { + Ok(s) => println!("{s}"), + Err(_e) => println!("error: file at {file_path} could not be parsed as utf-8 string!"), + } } diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index 9a99be25..e8142826 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -1,5 +1,5 @@ use dashmap::DashMap; -use std::collections::{HashMap, VecDeque}; +// use std::collections::{HashMap, VecDeque}; use std::io::prelude::*; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; @@ -18,7 +18,7 @@ pub async fn vfs( send_to_caps_oracle: CapMessageSender, home_directory_path: String, ) -> anyhow::Result<()> { - let vfs_path = format!("{}/vfs", &home_directory_path); + let vfs_path = format!("{home_directory_path}/vfs"); if let Err(e) = fs::create_dir_all(&vfs_path).await { panic!("failed creating vfs dir! {:?}", e); @@ -27,70 +27,73 @@ pub async fn vfs( let open_files: Arc>>> = Arc::new(DashMap::new()); - let mut process_queues: HashMap>>> = - HashMap::new(); + // let mut process_queues: HashMap>>> = + // HashMap::new(); - loop { - let Some(km) = recv_from_loop.recv().await else { - continue; - }; - if our_node.clone() != km.source.node { - println!( - "vfs: request must come from our_node={}, got: {}", - our_node, km.source.node, - ); + while let Some(km) = recv_from_loop.recv().await { + if our_node != km.source.node { + let _ = send_to_terminal.send(Printout { + verbosity: 1, + content: format!( + "vfs: got request from {}, but requests must come from our node {our_node}\r", + km.source.node, + ), + }); continue; } - let queue = process_queues - .entry(km.source.process.clone()) - .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - .clone(); + // let queue = process_queues + // .entry(km.source.process.clone()) + // .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) + // .clone(); + // { + // let mut queue_lock = queue.lock().await; + // queue_lock.push_back(km.clone()); + // } + + // // clone Arcs + // let our_node = our_node.clone(); + // let send_to_caps_oracle = send_to_caps_oracle.clone(); + // let send_to_terminal = send_to_terminal.clone(); + // let send_to_loop = send_to_loop.clone(); + // let open_files = open_files.clone(); + // let vfs_path = vfs_path.clone(); + + // tokio::spawn(async move { + // let mut queue_lock = queue.lock().await; + // if let Some(km) = queue_lock.pop_front() { + let (km_id, km_source) = (km.id.clone(), km.source.clone()); + + if let Err(e) = handle_request( + &our_node, + km, + open_files.clone(), + &send_to_loop, + &send_to_terminal, + &send_to_caps_oracle, + &vfs_path, + ) + .await { - let mut queue_lock = queue.lock().await; - queue_lock.push_back(km.clone()); + let _ = send_to_loop + .send(make_error_message(our_node.clone(), km_id, km_source, e)) + .await; } - - // clone Arcs - let our_node = our_node.clone(); - let send_to_caps_oracle = send_to_caps_oracle.clone(); - let send_to_terminal = send_to_terminal.clone(); - let send_to_loop = send_to_loop.clone(); - let open_files = open_files.clone(); - let vfs_path = vfs_path.clone(); - - tokio::spawn(async move { - let mut queue_lock = queue.lock().await; - if let Some(km) = queue_lock.pop_front() { - if let Err(e) = handle_request( - our_node.clone(), - km.clone(), - open_files.clone(), - send_to_loop.clone(), - send_to_terminal.clone(), - send_to_caps_oracle.clone(), - vfs_path.clone(), - ) - .await - { - let _ = send_to_loop - .send(make_error_message(our_node.clone(), km.id, km.source, e)) - .await; - } - } - }); + // } + // }); } + Ok(()) } async fn handle_request( - our_node: String, + our_node: &str, km: KernelMessage, open_files: Arc>>>, - send_to_loop: MessageSender, - send_to_terminal: PrintSender, - send_to_caps_oracle: CapMessageSender, - vfs_path: PathBuf, + send_to_loop: &MessageSender, + send_to_terminal: &PrintSender, + send_to_caps_oracle: &CapMessageSender, + vfs_path: &PathBuf, ) -> Result<(), VfsError> { let KernelMessage { id, @@ -114,7 +117,6 @@ async fn handle_request( let request: VfsRequest = match serde_json::from_slice(&body) { Ok(r) => r, Err(e) => { - println!("vfs: got invalid Request: {}", e); return Err(VfsError::BadJson { error: e.to_string(), }); @@ -130,7 +132,7 @@ async fn handle_request( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -161,7 +163,7 @@ async fn handle_request( let response = KernelMessage { id, source: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, target: source, @@ -196,7 +198,7 @@ async fn handle_request( if km.source.process != *KERNEL_PROCESS_ID { check_caps( - our_node.clone(), + our_node, source.clone(), send_to_caps_oracle.clone(), &request, @@ -470,7 +472,6 @@ async fn handle_request( } else if is_dir { fs::create_dir_all(local_path).await?; } else { - println!("vfs: zip with non-file non-dir"); return Err(VfsError::CreateDirError { path: path.display().to_string(), error: "vfs: zip with non-file non-dir".into(), @@ -483,14 +484,14 @@ async fn handle_request( if let Some(target) = km.rsvp.or_else(|| { expects_response.map(|_| Address { - node: our_node.clone(), + node: our_node.to_string(), process: source.process.clone(), }) }) { let response = KernelMessage { id, source: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, target, @@ -512,7 +513,6 @@ async fn handle_request( let _ = send_to_loop.send(response).await; } else { - println!("vfs: not sending response: "); send_to_terminal .send(Printout { verbosity: 2, @@ -538,11 +538,7 @@ async fn parse_package_and_drive( let normalized_path = normalize_path(&joined_path); if !normalized_path.starts_with(vfs_path) { return Err(VfsError::BadRequest { - error: format!( - "input path tries to escape parent vfs directory: {:?}", - path - ) - .into(), + error: format!("input path tries to escape parent vfs directory: {path}"), })?; } @@ -550,11 +546,7 @@ async fn parse_package_and_drive( let path = normalized_path .strip_prefix(vfs_path) .map_err(|_| VfsError::BadRequest { - error: format!( - "input path tries to escape parent vfs directory: {:?}", - path - ) - .into(), + error: format!("input path tries to escape parent vfs directory: {path}"), })? .display() .to_string(); @@ -567,7 +559,7 @@ async fn parse_package_and_drive( if parts.len() < 2 { return Err(VfsError::ParseError { error: "malformed path".into(), - path: path.to_string(), + path, }); } @@ -576,7 +568,7 @@ async fn parse_package_and_drive( Err(e) => { return Err(VfsError::ParseError { error: e.to_string(), - path: path.to_string(), + path, }) } }; @@ -617,7 +609,7 @@ async fn open_file>( } async fn check_caps( - our_node: String, + our_node: &str, source: Address, mut send_to_caps_oracle: CapMessageSender, request: &VfsRequest, @@ -635,7 +627,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -676,7 +668,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -718,7 +710,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -761,7 +753,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ @@ -792,7 +784,7 @@ async fn check_caps( on: source.process.clone(), cap: Capability { issuer: Address { - node: our_node.clone(), + node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, params: serde_json::to_string(&serde_json::json!({ diff --git a/lib/src/core.rs b/lib/src/core.rs index abe202e9..3f8ce517 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -252,15 +252,22 @@ pub enum ProcessIdParseError { impl std::fmt::Display for ProcessIdParseError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.to_string()) + write!( + f, + "{}", + match self { + ProcessIdParseError::TooManyColons => "Too many colons", + ProcessIdParseError::MissingField => "Missing field", + } + ) } } impl std::error::Error for ProcessIdParseError { fn description(&self) -> &str { match self { - ProcessIdParseError::TooManyColons => "Too many colons in ProcessId string", - ProcessIdParseError::MissingField => "Missing field in ProcessId string", + ProcessIdParseError::TooManyColons => "Too many colons", + ProcessIdParseError::MissingField => "Missing field", } } } @@ -1514,7 +1521,7 @@ pub enum VfsError { BadBytes { action: String, path: String }, #[error("vfs: bad request error: {error}")] BadRequest { error: String }, - #[error("vfs: error parsing path: {path}, error: {error}")] + #[error("vfs: error parsing path: {path}: {error}")] ParseError { error: String, path: String }, #[error("vfs: IO error: {error}, at path {path}")] IOError { error: String, path: String },