revert task queue changes

This commit is contained in:
dr-frmr 2024-06-05 18:50:05 -06:00
parent d31c65c8df
commit abcdc798fc
No known key found for this signature in database

View File

@ -1,5 +1,5 @@
use dashmap::DashMap;
// use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::io::prelude::*;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
@ -18,6 +18,7 @@ pub async fn vfs(
send_to_caps_oracle: CapMessageSender,
home_directory_path: String,
) -> anyhow::Result<()> {
let our_node = Arc::new(our_node);
let vfs_path = format!("{home_directory_path}/vfs");
if let Err(e) = fs::create_dir_all(&vfs_path).await {
@ -27,11 +28,11 @@ pub async fn vfs(
let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
// let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
// HashMap::new();
let mut process_queues: HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>> =
HashMap::new();
while let Some(km) = recv_from_loop.recv().await {
if our_node != km.source.node {
if *our_node != km.source.node {
let _ = send_to_terminal.send(Printout {
verbosity: 1,
content: format!(
@ -42,27 +43,27 @@ pub async fn vfs(
continue;
}
// let queue = process_queues
// .entry(km.source.process.clone())
// .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
// .clone();
let queue = process_queues
.entry(km.source.process.clone())
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone();
// {
// let mut queue_lock = queue.lock().await;
// queue_lock.push_back(km.clone());
// }
{
let mut queue_lock = queue.lock().await;
queue_lock.push_back(km.clone());
}
// // clone Arcs
// let our_node = our_node.clone();
// let send_to_caps_oracle = send_to_caps_oracle.clone();
// let send_to_terminal = send_to_terminal.clone();
// let send_to_loop = send_to_loop.clone();
// let open_files = open_files.clone();
// let vfs_path = vfs_path.clone();
// clone Arcs
let our_node = our_node.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let send_to_terminal = send_to_terminal.clone();
let send_to_loop = send_to_loop.clone();
let open_files = open_files.clone();
let vfs_path = vfs_path.clone();
// tokio::spawn(async move {
// let mut queue_lock = queue.lock().await;
// if let Some(km) = queue_lock.pop_front() {
tokio::spawn(async move {
let mut queue_lock = queue.lock().await;
if let Some(km) = queue_lock.pop_front() {
let (km_id, km_source) = (km.id.clone(), km.source.clone());
if let Err(e) = handle_request(
@ -77,11 +78,16 @@ pub async fn vfs(
.await
{
let _ = send_to_loop
.send(make_error_message(our_node.clone(), km_id, km_source, e))
.send(make_error_message(
our_node.to_string(),
km_id,
km_source,
e,
))
.await;
}
// }
// });
}
});
}
Ok(())
}
@ -95,19 +101,12 @@ async fn handle_request(
send_to_caps_oracle: &CapMessageSender,
vfs_path: &PathBuf,
) -> Result<(), VfsError> {
let KernelMessage {
id,
source,
message,
lazy_load_blob: blob,
..
} = km.clone();
let Message::Request(Request {
body,
expects_response,
metadata,
..
}) = message.clone()
}) = km.message
else {
return Err(VfsError::BadRequest {
error: "not a request".into(),
@ -129,7 +128,7 @@ async fn handle_request(
let (send_cap_bool, recv_cap_bool) = tokio::sync::oneshot::channel();
send_to_caps_oracle
.send(CapMessage::Has {
on: source.process.clone(),
on: km.source.process.clone(),
cap: Capability {
issuer: Address {
node: our_node.to_string(),
@ -161,12 +160,12 @@ async fn handle_request(
}
let response = KernelMessage {
id,
id: km.id,
source: Address {
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),
},
target: source,
target: km.source,
rsvp: None,
message: Message::Response((
Response {
@ -196,10 +195,10 @@ async fn handle_request(
let drive = format!("/{}/{}", package_id, drive);
let path = PathBuf::from(request.path.clone());
if km.source.process != *KERNEL_PROCESS_ID {
if &km.source.process != &*KERNEL_PROCESS_ID {
check_caps(
our_node,
source.clone(),
km.source.clone(),
send_to_caps_oracle.clone(),
&request,
path.clone(),
@ -251,7 +250,7 @@ async fn handle_request(
}
VfsAction::WriteAll => {
// doesn't create a file, writes at exact cursor.
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for WriteAll".into(),
});
@ -262,7 +261,7 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Write => {
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for Write".into(),
});
@ -271,7 +270,7 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Ok).unwrap(), None)
}
VfsAction::Append => {
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for Append".into(),
});
@ -430,7 +429,7 @@ async fn handle_request(
(serde_json::to_vec(&VfsResponse::Hash(hash)).unwrap(), None)
}
VfsAction::AddZip => {
let Some(blob) = blob else {
let Some(blob) = km.lazy_load_blob else {
return Err(VfsError::BadRequest {
error: "blob needs to exist for AddZip".into(),
});
@ -485,11 +484,11 @@ async fn handle_request(
if let Some(target) = km.rsvp.or_else(|| {
expects_response.map(|_| Address {
node: our_node.to_string(),
process: source.process.clone(),
process: km.source.process.clone(),
})
}) {
let response = KernelMessage {
id,
id: km.id,
source: Address {
node: our_node.to_string(),
process: VFS_PROCESS_ID.clone(),