diff --git a/kinode/src/vfs.rs b/kinode/src/vfs.rs index e8142826..e46e08c7 100644 --- a/kinode/src/vfs.rs +++ b/kinode/src/vfs.rs @@ -1,5 +1,5 @@ use dashmap::DashMap; -// use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::io::prelude::*; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; @@ -18,6 +18,7 @@ pub async fn vfs( send_to_caps_oracle: CapMessageSender, home_directory_path: String, ) -> anyhow::Result<()> { + let our_node = Arc::new(our_node); let vfs_path = format!("{home_directory_path}/vfs"); if let Err(e) = fs::create_dir_all(&vfs_path).await { @@ -27,11 +28,11 @@ pub async fn vfs( let open_files: Arc>>> = Arc::new(DashMap::new()); - // let mut process_queues: HashMap>>> = - // HashMap::new(); + let mut process_queues: HashMap>>> = + HashMap::new(); while let Some(km) = recv_from_loop.recv().await { - if our_node != km.source.node { + if *our_node != km.source.node { let _ = send_to_terminal.send(Printout { verbosity: 1, content: format!( @@ -42,46 +43,51 @@ pub async fn vfs( continue; } - // let queue = process_queues - // .entry(km.source.process.clone()) - // .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) - // .clone(); + let queue = process_queues + .entry(km.source.process.clone()) + .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) + .clone(); - // { - // let mut queue_lock = queue.lock().await; - // queue_lock.push_back(km.clone()); - // } - - // // clone Arcs - // let our_node = our_node.clone(); - // let send_to_caps_oracle = send_to_caps_oracle.clone(); - // let send_to_terminal = send_to_terminal.clone(); - // let send_to_loop = send_to_loop.clone(); - // let open_files = open_files.clone(); - // let vfs_path = vfs_path.clone(); - - // tokio::spawn(async move { - // let mut queue_lock = queue.lock().await; - // if let Some(km) = queue_lock.pop_front() { - let (km_id, km_source) = (km.id.clone(), km.source.clone()); - - if let Err(e) = handle_request( - &our_node, - km, - open_files.clone(), - &send_to_loop, - &send_to_terminal, - &send_to_caps_oracle, - &vfs_path, - ) - .await { - let _ = send_to_loop - .send(make_error_message(our_node.clone(), km_id, km_source, e)) - .await; + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km.clone()); } - // } - // }); + + // clone Arcs + let our_node = our_node.clone(); + let send_to_caps_oracle = send_to_caps_oracle.clone(); + let send_to_terminal = send_to_terminal.clone(); + let send_to_loop = send_to_loop.clone(); + let open_files = open_files.clone(); + let vfs_path = vfs_path.clone(); + + tokio::spawn(async move { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { + let (km_id, km_source) = (km.id.clone(), km.source.clone()); + + if let Err(e) = handle_request( + &our_node, + km, + open_files.clone(), + &send_to_loop, + &send_to_terminal, + &send_to_caps_oracle, + &vfs_path, + ) + .await + { + let _ = send_to_loop + .send(make_error_message( + our_node.to_string(), + km_id, + km_source, + e, + )) + .await; + } + } + }); } Ok(()) } @@ -95,19 +101,12 @@ async fn handle_request( send_to_caps_oracle: &CapMessageSender, vfs_path: &PathBuf, ) -> Result<(), VfsError> { - let KernelMessage { - id, - source, - message, - lazy_load_blob: blob, - .. - } = km.clone(); let Message::Request(Request { body, expects_response, metadata, .. - }) = message.clone() + }) = km.message else { return Err(VfsError::BadRequest { error: "not a request".into(), @@ -129,7 +128,7 @@ async fn handle_request( let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel(); send_to_caps_oracle .send(CapMessage::Has { - on: source.process.clone(), + on: km.source.process.clone(), cap: Capability { issuer: Address { node: our_node.to_string(), @@ -161,12 +160,12 @@ async fn handle_request( } let response = KernelMessage { - id, + id: km.id, source: Address { node: our_node.to_string(), process: VFS_PROCESS_ID.clone(), }, - target: source, + target: km.source, rsvp: None, message: Message::Response(( Response { @@ -196,10 +195,10 @@ async fn handle_request( let drive = format!("/{}/{}", package_id, drive); let path = PathBuf::from(request.path.clone()); - if km.source.process != *KERNEL_PROCESS_ID { + if &km.source.process != &*KERNEL_PROCESS_ID { check_caps( our_node, - source.clone(), + km.source.clone(), send_to_caps_oracle.clone(), &request, path.clone(), @@ -251,7 +250,7 @@ async fn handle_request( } VfsAction::WriteAll => { // doesn't create a file, writes at exact cursor. - let Some(blob) = blob else { + let Some(blob) = km.lazy_load_blob else { return Err(VfsError::BadRequest { error: "blob needs to exist for WriteAll".into(), }); @@ -262,7 +261,7 @@ async fn handle_request( (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::Write => { - let Some(blob) = blob else { + let Some(blob) = km.lazy_load_blob else { return Err(VfsError::BadRequest { error: "blob needs to exist for Write".into(), }); @@ -271,7 +270,7 @@ async fn handle_request( (serde_json::to_vec(&VfsResponse::Ok).unwrap(), None) } VfsAction::Append => { - let Some(blob) = blob else { + let Some(blob) = km.lazy_load_blob else { return Err(VfsError::BadRequest { error: "blob needs to exist for Append".into(), }); @@ -430,7 +429,7 @@ async fn handle_request( (serde_json::to_vec(&VfsResponse::Hash(hash)).unwrap(), None) } VfsAction::AddZip => { - let Some(blob) = blob else { + let Some(blob) = km.lazy_load_blob else { return Err(VfsError::BadRequest { error: "blob needs to exist for AddZip".into(), }); @@ -485,11 +484,11 @@ async fn handle_request( if let Some(target) = km.rsvp.or_else(|| { expects_response.map(|_| Address { node: our_node.to_string(), - process: source.process.clone(), + process: km.source.process.clone(), }) }) { let response = KernelMessage { - id, + id: km.id, source: Address { node: our_node.to_string(), process: VFS_PROCESS_ID.clone(),