vfs: better mutex queue

This commit is contained in:
bitful-pannul 2023-12-15 13:43:52 -03:00
parent 66793fc84b
commit ee4aa1d282

View File

@ -26,14 +26,13 @@ pub async fn vfs(
let open_files: Arc<DashMap<PathBuf, Arc<Mutex<fs::File>>>> = Arc::new(DashMap::new());
let process_queues = Arc::new(Mutex::new(
HashMap::<ProcessId, VecDeque<KernelMessage>>::new(),
));
// note: queues should be based on drive, not process
let process_queues: Arc<Mutex<HashMap<ProcessId, Arc<Mutex<VecDeque<KernelMessage>>>>>> =
Arc::new(Mutex::new(HashMap::new()));
loop {
tokio::select! {
Some(km) = recv_from_loop.recv() => {
if our_node != km.source.node {
if our_node.clone() != km.source.node {
println!(
"vfs: request must come from our_node={}, got: {}",
our_node,
@ -42,7 +41,20 @@ pub async fn vfs(
continue;
}
// clone arcs for thread
let queue = {
let mut process_lock = process_queues.lock().await;
process_lock
.entry(km.source.process.clone())
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone()
};
{
let mut queue_lock = queue.lock().await;
queue_lock.push_back(km.clone());
}
// clone Arcs
let our_node = our_node.clone();
let send_to_caps_oracle = send_to_caps_oracle.clone();
let send_to_terminal = send_to_terminal.clone();
@ -50,21 +62,9 @@ pub async fn vfs(
let open_files = open_files.clone();
let vfs_path = vfs_path.clone();
let mut process_lock = process_queues.lock().await;
if let Some(queue) = process_lock.get_mut(&km.source.process) {
queue.push_back(km.clone());
} else {
let mut new_queue = VecDeque::new();
new_queue.push_back(km.clone());
process_lock.insert(km.source.process.clone(), new_queue);
}
let process_queues_clone = process_queues.clone();
tokio::spawn(async move {
let mut process_lock = process_queues_clone.lock().await;
if let Some(km) = process_lock.get_mut(&km.source.process).and_then(|q| q.pop_front()) {
let mut queue_lock = queue.lock().await;
if let Some(km) = queue_lock.pop_front() {
if let Err(e) = handle_request(
our_node.clone(),
km.clone(),
@ -81,12 +81,6 @@ pub async fn vfs(
.await;
}
}
// Remove the process entry if no more tasks are left
if let Some(queue) = process_lock.get(&km.source.process) {
if queue.is_empty() {
process_lock.remove(&km.source.process);
}
}
});
}
}