diff --git a/src/vfs.rs b/src/vfs.rs index 2041018b..d9155550 100644 --- a/src/vfs.rs +++ b/src/vfs.rs @@ -26,14 +26,13 @@ pub async fn vfs( let open_files: Arc>>> = Arc::new(DashMap::new()); - let process_queues = Arc::new(Mutex::new( - HashMap::>::new(), - )); - // note: queues should be based on drive, not process + let process_queues: Arc>>>>> = + Arc::new(Mutex::new(HashMap::new())); + loop { tokio::select! { Some(km) = recv_from_loop.recv() => { - if our_node != km.source.node { + if our_node.clone() != km.source.node { println!( "vfs: request must come from our_node={}, got: {}", our_node, @@ -42,7 +41,20 @@ pub async fn vfs( continue; } - // clone arcs for thread + let queue = { + let mut process_lock = process_queues.lock().await; + process_lock + .entry(km.source.process.clone()) + .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) + .clone() + }; + + { + let mut queue_lock = queue.lock().await; + queue_lock.push_back(km.clone()); + } + + // clone Arcs let our_node = our_node.clone(); let send_to_caps_oracle = send_to_caps_oracle.clone(); let send_to_terminal = send_to_terminal.clone(); @@ -50,21 +62,9 @@ pub async fn vfs( let open_files = open_files.clone(); let vfs_path = vfs_path.clone(); - let mut process_lock = process_queues.lock().await; - - if let Some(queue) = process_lock.get_mut(&km.source.process) { - queue.push_back(km.clone()); - } else { - let mut new_queue = VecDeque::new(); - new_queue.push_back(km.clone()); - process_lock.insert(km.source.process.clone(), new_queue); - } - - let process_queues_clone = process_queues.clone(); - tokio::spawn(async move { - let mut process_lock = process_queues_clone.lock().await; - if let Some(km) = process_lock.get_mut(&km.source.process).and_then(|q| q.pop_front()) { + let mut queue_lock = queue.lock().await; + if let Some(km) = queue_lock.pop_front() { if let Err(e) = handle_request( our_node.clone(), km.clone(), @@ -81,12 +81,6 @@ pub async fn vfs( .await; } } - // Remove the process entry if no more tasks are left - if let Some(queue) = process_lock.get(&km.source.process) { - if queue.is_empty() { - process_lock.remove(&km.source.process); - } - } }); } }