diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 95d49da1df..b4bcba24db 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -2183,7 +2183,7 @@ async fn test_apply_code_actions_with_commands(cx: &mut gpui::TestAppContext) { }); } -#[gpui::test] +#[gpui::test(iterations = 10)] async fn test_save_file(cx: &mut gpui::TestAppContext) { let fs = FakeFs::new(cx.background()); fs.insert_tree( diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 1bc15257a0..3459bd7e5d 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -12,7 +12,9 @@ use futures::{ mpsc::{self, UnboundedSender}, oneshot, }, - select_biased, Stream, StreamExt, + select_biased, + task::Poll, + Stream, StreamExt, }; use fuzzy::CharBag; use git::{DOT_GIT, GITIGNORE}; @@ -41,11 +43,11 @@ use std::{ mem, ops::{Deref, DerefMut}, path::{Path, PathBuf}, + pin::Pin, sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, Arc, }, - task::Poll, time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet}; @@ -154,20 +156,12 @@ impl DerefMut for LocalSnapshot { } enum ScanState { - /// The worktree is performing its initial scan of the filesystem. - Initializing { - snapshot: LocalSnapshot, - barrier: Option, - }, - Initialized { - snapshot: LocalSnapshot, - }, - /// The worktree is updating in response to filesystem events. - Updating, + Started, Updated { snapshot: LocalSnapshot, changes: HashMap, PathChange>, barrier: Option, + scanning: bool, }, } @@ -244,9 +238,24 @@ impl Worktree { cx.spawn_weak(|this, mut cx| async move { while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) { this.update(&mut cx, |this, cx| { - this.as_local_mut() - .unwrap() - .background_scanner_updated(state, cx); + let this = this.as_local_mut().unwrap(); + match state { + ScanState::Started => { + *this.is_scanning.0.borrow_mut() = true; + } + ScanState::Updated { + snapshot, + changes, + barrier, + scanning, + } => { + *this.is_scanning.0.borrow_mut() = scanning; + this.set_snapshot(snapshot, cx); + cx.emit(Event::UpdatedEntries(changes)); + drop(barrier); + } + } + cx.notify(); }); } }) @@ -258,9 +267,15 @@ impl Worktree { let background = cx.background().clone(); async move { let events = fs.watch(&abs_path, Duration::from_millis(100)).await; - BackgroundScanner::new(snapshot, scan_states_tx, fs, background) - .run(events, path_changes_rx) - .await; + BackgroundScanner::new( + snapshot, + fs, + scan_states_tx, + background, + path_changes_rx, + ) + .run(events) + .await; } }); @@ -533,38 +548,6 @@ impl LocalWorktree { Ok(updated) } - fn background_scanner_updated( - &mut self, - scan_state: ScanState, - cx: &mut ModelContext, - ) { - match scan_state { - ScanState::Initializing { snapshot, barrier } => { - *self.is_scanning.0.borrow_mut() = true; - self.set_snapshot(snapshot, cx); - drop(barrier); - } - ScanState::Initialized { snapshot } => { - *self.is_scanning.0.borrow_mut() = false; - self.set_snapshot(snapshot, cx); - } - ScanState::Updating => { - *self.is_scanning.0.borrow_mut() = true; - } - ScanState::Updated { - snapshot, - changes, - barrier, - } => { - *self.is_scanning.0.borrow_mut() = false; - cx.emit(Event::UpdatedEntries(changes)); - self.set_snapshot(snapshot, cx); - drop(barrier); - } - } - cx.notify(); - } - fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext) { let updated_repos = Self::changed_repos( &self.snapshot.git_repositories, @@ -1337,14 +1320,6 @@ impl Snapshot { &self.root_name } - pub fn scan_started(&mut self) { - self.scan_id += 1; - } - - pub fn scan_completed(&mut self) { - self.completed_scan_id = self.scan_id; - } - pub fn scan_id(&self) -> usize { self.scan_id } @@ -1539,17 +1514,20 @@ impl LocalSnapshot { return; }; + match parent_entry.kind { + EntryKind::PendingDir => { + parent_entry.kind = EntryKind::Dir; + } + EntryKind::Dir => {} + _ => return, + } + if let Some(ignore) = ignore { self.ignores_by_parent_abs_path.insert( self.abs_path.join(&parent_path).into(), (ignore, self.scan_id), ); } - if matches!(parent_entry.kind, EntryKind::PendingDir) { - parent_entry.kind = EntryKind::Dir; - } else { - unreachable!(); - } if parent_path.file_name() == Some(&DOT_GIT) { let abs_path = self.abs_path.join(&parent_path); @@ -2135,53 +2113,47 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { } struct BackgroundScanner { - fs: Arc, snapshot: Mutex, - notify: UnboundedSender, + fs: Arc, + status_updates_tx: UnboundedSender, executor: Arc, + refresh_requests_rx: channel::Receiver<(Vec, barrier::Sender)>, + prev_state: Mutex<(Snapshot, Vec>)>, + finished_initial_scan: bool, } impl BackgroundScanner { fn new( snapshot: LocalSnapshot, - notify: UnboundedSender, fs: Arc, + status_updates_tx: UnboundedSender, executor: Arc, + refresh_requests_rx: channel::Receiver<(Vec, barrier::Sender)>, ) -> Self { Self { fs, - snapshot: Mutex::new(snapshot), - notify, + status_updates_tx, executor, + refresh_requests_rx, + prev_state: Mutex::new((snapshot.snapshot.clone(), Vec::new())), + snapshot: Mutex::new(snapshot), + finished_initial_scan: false, } } - fn abs_path(&self) -> Arc { - self.snapshot.lock().abs_path.clone() - } - async fn run( - self, - events_rx: impl Stream>, - mut changed_paths: channel::Receiver<(Vec, barrier::Sender)>, + &mut self, + mut events_rx: Pin>>>, ) { use futures::FutureExt as _; - // Retrieve the basic properties of the root node. - let root_char_bag; - let root_abs_path; - let root_inode; - let root_is_dir; - let next_entry_id; - { - let mut snapshot = self.snapshot.lock(); - snapshot.scan_started(); - root_char_bag = snapshot.root_char_bag; - root_abs_path = snapshot.abs_path.clone(); - root_inode = snapshot.root_entry().map(|e| e.inode); - root_is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir()); - next_entry_id = snapshot.next_entry_id.clone(); - } + let (root_abs_path, root_inode) = { + let snapshot = self.snapshot.lock(); + ( + snapshot.abs_path.clone(), + snapshot.root_entry().map(|e| e.inode), + ) + }; // Populate ignores above the root. let ignore_stack; @@ -2205,198 +2177,220 @@ impl BackgroundScanner { } }; - if root_is_dir { - let mut ancestor_inodes = TreeSet::default(); - if let Some(root_inode) = root_inode { - ancestor_inodes.insert(root_inode); + // Perform an initial scan of the directory. + let (scan_job_tx, scan_job_rx) = channel::unbounded(); + smol::block_on(scan_job_tx.send(ScanJob { + abs_path: root_abs_path, + path: Arc::from(Path::new("")), + ignore_stack, + ancestor_inodes: TreeSet::from_ordered_entries(root_inode), + scan_queue: scan_job_tx.clone(), + })) + .unwrap(); + drop(scan_job_tx); + self.scan_dirs(true, scan_job_rx).await; + self.send_status_update(false, None); + + // Process any any FS events that occurred while performing the initial scan. + // For these events, update events cannot be as precise, because we didn't + // have the previous state loaded yet. + if let Poll::Ready(Some(events)) = futures::poll!(events_rx.next()) { + let mut paths = events.into_iter().map(|e| e.path).collect::>(); + while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) { + paths.extend(more_events.into_iter().map(|e| e.path)); } - - let (tx, rx) = channel::unbounded(); - self.executor - .block(tx.send(ScanJob { - abs_path: root_abs_path.to_path_buf(), - path: Arc::from(Path::new("")), - ignore_stack, - ancestor_inodes, - scan_queue: tx.clone(), - })) - .unwrap(); - drop(tx); - - let progress_update_count = AtomicUsize::new(0); - self.executor - .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - let mut last_progress_update_count = 0; - let progress_update_timer = self.pause_between_progress_updates().fuse(); - futures::pin_mut!(progress_update_timer); - loop { - select_biased! { - // Send periodic progress updates to the worktree. Use an atomic counter - // to ensure that only one of the workers sends a progress update after - // the update interval elapses. - _ = progress_update_timer => { - match progress_update_count.compare_exchange( - last_progress_update_count, - last_progress_update_count + 1, - SeqCst, - SeqCst - ) { - Ok(_) => { - last_progress_update_count += 1; - if self - .notify - .unbounded_send(ScanState::Initializing { - snapshot: self.snapshot.lock().clone(), - barrier: None, - }) - .is_err() - { - break; - } - } - Err(current_count) => last_progress_update_count = current_count, - } - progress_update_timer.set(self.pause_between_progress_updates().fuse()); - } - - // Refresh any paths requested by the main thread. - job = changed_paths.recv().fuse() => { - let Ok((abs_paths, barrier)) = job else { break }; - self.update_entries_for_paths(abs_paths, None).await; - if self - .notify - .unbounded_send(ScanState::Initializing { - snapshot: self.snapshot.lock().clone(), - barrier: Some(barrier), - }) - .is_err() - { - break; - } - } - - // Recursively load directories from the file system. - job = rx.recv().fuse() => { - let Ok(job) = job else { break }; - if let Err(err) = self - .scan_dir(root_char_bag, next_entry_id.clone(), &job) - .await - { - log::error!("error scanning {:?}: {}", job.abs_path, err); - } - } - } - } - }); - } - }) - .await; + self.process_events(paths).await; + self.send_status_update(false, None); } - self.snapshot.lock().scan_completed(); + self.finished_initial_scan = true; + + // Continue processing events until the worktree is dropped. + loop { + select_biased! { + // Process any path refresh requests from the worktree. Prioritize + // these before handling changes reported by the filesystem. + request = self.refresh_requests_rx.recv().fuse() => { + let Ok((paths, barrier)) = request else { break }; + self.reload_entries_for_paths(paths, None).await; + if !self.send_status_update(false, Some(barrier)) { + break; + } + } + + events = events_rx.next().fuse() => { + let Some(events) = events else { break }; + let mut paths = events.into_iter().map(|e| e.path).collect::>(); + while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) { + paths.extend(more_events.into_iter().map(|e| e.path)); + } + self.process_events(paths).await; + self.send_status_update(false, None); + } + } + } + } + + async fn process_events(&mut self, paths: Vec) { + use futures::FutureExt as _; + + let (scan_job_tx, scan_job_rx) = channel::unbounded(); + if let Some(mut paths) = self + .reload_entries_for_paths(paths, Some(scan_job_tx.clone())) + .await + { + paths.sort_unstable(); + util::extend_sorted(&mut self.prev_state.lock().1, paths, usize::MAX, Ord::cmp); + } + drop(scan_job_tx); + self.scan_dirs(false, scan_job_rx).await; + + let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); + let snapshot = self.update_ignore_statuses(ignore_queue_tx); + self.executor + .scoped(|scope| { + for _ in 0..self.executor.num_cpus() { + scope.spawn(async { + loop { + select_biased! { + // Process any path refresh requests before moving on to process + // the queue of ignore statuses. + request = self.refresh_requests_rx.recv().fuse() => { + let Ok((paths, barrier)) = request else { break }; + self.reload_entries_for_paths(paths, None).await; + if !self.send_status_update(false, Some(barrier)) { + return; + } + } + + // Recursively process directories whose ignores have changed. + job = ignore_queue_rx.recv().fuse() => { + let Ok(job) = job else { break }; + self.update_ignore_status(job, &snapshot).await; + } + } + } + }); + } + }) + .await; + + let mut snapshot = self.snapshot.lock(); + let mut git_repositories = mem::take(&mut snapshot.git_repositories); + git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some()); + snapshot.git_repositories = git_repositories; + snapshot.removed_entry_ids.clear(); + snapshot.completed_scan_id = snapshot.scan_id; + } + + async fn scan_dirs( + &self, + enable_progress_updates: bool, + scan_jobs_rx: channel::Receiver, + ) { + use futures::FutureExt as _; if self - .notify - .unbounded_send(ScanState::Initialized { - snapshot: self.snapshot.lock().clone(), - }) + .status_updates_tx + .unbounded_send(ScanState::Started) .is_err() { return; } - // Process any events that occurred while performing the initial scan. These - // events can't be reported as precisely, because there is no snapshot of the - // worktree before they occurred. - futures::pin_mut!(events_rx); - if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) { - while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) { - events.extend(additional_events); - } - let abs_paths = events.into_iter().map(|e| e.path).collect(); - if self.notify.unbounded_send(ScanState::Updating).is_err() { - return; - } - if let Some(changes) = self.process_events(abs_paths, true).await { - if self - .notify - .unbounded_send(ScanState::Updated { - snapshot: self.snapshot.lock().clone(), - changes, - barrier: None, - }) - .is_err() - { - return; - } - } else { - return; - } - } + let progress_update_count = AtomicUsize::new(0); + self.executor + .scoped(|scope| { + for _ in 0..self.executor.num_cpus() { + scope.spawn(async { + let mut last_progress_update_count = 0; + let progress_update_timer = self.progress_timer(enable_progress_updates).fuse(); + futures::pin_mut!(progress_update_timer); - // Continue processing events until the worktree is dropped. - loop { - let barrier; - let abs_paths; - select_biased! { - request = changed_paths.next().fuse() => { - let Some((paths, b)) = request else { break }; - abs_paths = paths; - barrier = Some(b); - } - events = events_rx.next().fuse() => { - let Some(events) = events else { break }; - abs_paths = events.into_iter().map(|e| e.path).collect(); - barrier = None; - } - } + loop { + select_biased! { + // Process any path refresh requests before moving on to process + // the scan queue, so that user operations are prioritized. + request = self.refresh_requests_rx.recv().fuse() => { + let Ok((paths, barrier)) = request else { break }; + self.reload_entries_for_paths(paths, None).await; + if !self.send_status_update(false, Some(barrier)) { + return; + } + } - if self.notify.unbounded_send(ScanState::Updating).is_err() { - return; - } - if let Some(changes) = self.process_events(abs_paths, false).await { - if self - .notify - .unbounded_send(ScanState::Updated { - snapshot: self.snapshot.lock().clone(), - changes, - barrier, + // Send periodic progress updates to the worktree. Use an atomic counter + // to ensure that only one of the workers sends a progress update after + // the update interval elapses. + _ = progress_update_timer => { + match progress_update_count.compare_exchange( + last_progress_update_count, + last_progress_update_count + 1, + SeqCst, + SeqCst + ) { + Ok(_) => { + last_progress_update_count += 1; + self.send_status_update(true, None); + } + Err(count) => { + last_progress_update_count = count; + } + } + progress_update_timer.set(self.progress_timer(enable_progress_updates).fuse()); + } + + // Recursively load directories from the file system. + job = scan_jobs_rx.recv().fuse() => { + let Ok(job) = job else { break }; + if let Err(err) = self.scan_dir(&job).await { + if job.path.as_ref() != Path::new("") { + log::error!("error scanning directory {:?}: {}", job.abs_path, err); + } + } + } + } + } }) - .is_err() - { - return; } - } else { - return; - } - } + }) + .await; } - async fn pause_between_progress_updates(&self) { - #[cfg(any(test, feature = "test-support"))] - if self.fs.is_fake() { - return self.executor.simulate_random_delay().await; - } - smol::Timer::after(Duration::from_millis(100)).await; + fn send_status_update(&self, scanning: bool, barrier: Option) -> bool { + let mut prev_state = self.prev_state.lock(); + let snapshot = self.snapshot.lock().clone(); + let mut old_snapshot = snapshot.snapshot.clone(); + mem::swap(&mut old_snapshot, &mut prev_state.0); + let changed_paths = mem::take(&mut prev_state.1); + let changes = self.build_change_set(&old_snapshot, &snapshot.snapshot, changed_paths); + self.status_updates_tx + .unbounded_send(ScanState::Updated { + snapshot, + changes, + scanning, + barrier, + }) + .is_ok() } - async fn scan_dir( - &self, - root_char_bag: CharBag, - next_entry_id: Arc, - job: &ScanJob, - ) -> Result<()> { + async fn scan_dir(&self, job: &ScanJob) -> Result<()> { let mut new_entries: Vec = Vec::new(); let mut new_jobs: Vec> = Vec::new(); let mut ignore_stack = job.ignore_stack.clone(); let mut new_ignore = None; - + let (root_abs_path, root_char_bag, next_entry_id) = { + let snapshot = self.snapshot.lock(); + ( + snapshot.abs_path().clone(), + snapshot.root_char_bag, + snapshot.next_entry_id.clone(), + ) + }; let mut child_paths = self.fs.read_dir(&job.abs_path).await?; while let Some(child_abs_path) = child_paths.next().await { - let child_abs_path = match child_abs_path { - Ok(child_abs_path) => child_abs_path, + let child_abs_path: Arc = match child_abs_path { + Ok(child_abs_path) => child_abs_path.into(), Err(error) => { log::error!("error processing entry {:?}", error); continue; @@ -2419,8 +2413,7 @@ impl BackgroundScanner { match build_gitignore(&child_abs_path, self.fs.as_ref()).await { Ok(ignore) => { let ignore = Arc::new(ignore); - ignore_stack = - ignore_stack.append(job.abs_path.as_path().into(), ignore.clone()); + ignore_stack = ignore_stack.append(job.abs_path.clone(), ignore.clone()); new_ignore = Some(ignore); } Err(error) => { @@ -2438,7 +2431,7 @@ impl BackgroundScanner { // new jobs as well. let mut new_jobs = new_jobs.iter_mut(); for entry in &mut new_entries { - let entry_abs_path = self.abs_path().join(&entry.path); + let entry_abs_path = root_abs_path.join(&entry.path); entry.is_ignored = ignore_stack.is_abs_path_ignored(&entry_abs_path, entry.is_dir()); @@ -2507,69 +2500,18 @@ impl BackgroundScanner { Ok(()) } - async fn process_events( - &self, - abs_paths: Vec, - received_before_initialized: bool, - ) -> Option, PathChange>> { - let (scan_queue_tx, scan_queue_rx) = channel::unbounded(); - - let prev_snapshot = { - let mut snapshot = self.snapshot.lock(); - snapshot.scan_started(); - snapshot.clone() - }; - - let event_paths = self - .update_entries_for_paths(abs_paths, Some(scan_queue_tx)) - .await?; - - // Scan any directories that were created as part of this event batch. - self.executor - .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - while let Ok(job) = scan_queue_rx.recv().await { - if let Err(err) = self - .scan_dir( - prev_snapshot.root_char_bag, - prev_snapshot.next_entry_id.clone(), - &job, - ) - .await - { - log::error!("error scanning {:?}: {}", job.abs_path, err); - } - } - }); - } - }) - .await; - - // Attempt to detect renames only over a single batch of file-system events. - self.snapshot.lock().removed_entry_ids.clear(); - - self.update_ignore_statuses().await; - self.update_git_repositories(); - let changes = self.build_change_set( - prev_snapshot.snapshot, - event_paths, - received_before_initialized, - ); - self.snapshot.lock().scan_completed(); - Some(changes) - } - - async fn update_entries_for_paths( + async fn reload_entries_for_paths( &self, mut abs_paths: Vec, scan_queue_tx: Option>, ) -> Option>> { + let doing_recursive_update = scan_queue_tx.is_some(); + abs_paths.sort_unstable(); abs_paths.dedup_by(|a, b| a.starts_with(&b)); let root_abs_path = self.snapshot.lock().abs_path.clone(); - let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.ok()?; + let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.log_err()?; let metadata = futures::future::join_all( abs_paths .iter() @@ -2579,29 +2521,35 @@ impl BackgroundScanner { .await; let mut snapshot = self.snapshot.lock(); - if scan_queue_tx.is_some() { - for abs_path in &abs_paths { - if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) { - snapshot.remove_path(path); - } + + if snapshot.completed_scan_id == snapshot.scan_id { + snapshot.scan_id += 1; + if !doing_recursive_update { + snapshot.completed_scan_id = snapshot.scan_id; } } - let mut event_paths = Vec::with_capacity(abs_paths.len()); - for (abs_path, metadata) in abs_paths.into_iter().zip(metadata.into_iter()) { - let path: Arc = match abs_path.strip_prefix(&root_canonical_path) { - Ok(path) => Arc::from(path.to_path_buf()), - Err(_) => { - log::error!( - "unexpected event {:?} for root path {:?}", - abs_path, - root_canonical_path - ); - continue; + // Remove any entries for paths that no longer exist or are being recursively + // refreshed. Do this before adding any new entries, so that renames can be + // detected regardless of the order of the paths. + let mut event_paths = Vec::>::with_capacity(abs_paths.len()); + for (abs_path, metadata) in abs_paths.iter().zip(metadata.iter()) { + if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) { + if matches!(metadata, Ok(None)) || doing_recursive_update { + snapshot.remove_path(path); } - }; - event_paths.push(path.clone()); - let abs_path = root_abs_path.join(&path); + event_paths.push(path.into()); + } else { + log::error!( + "unexpected event {:?} for root path {:?}", + abs_path, + root_canonical_path + ); + } + } + + for (path, metadata) in event_paths.iter().cloned().zip(metadata.into_iter()) { + let abs_path: Arc = root_abs_path.join(&path).into(); match metadata { Ok(Some(metadata)) => { @@ -2626,15 +2574,14 @@ impl BackgroundScanner { let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path); if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) { ancestor_inodes.insert(metadata.inode); - self.executor - .block(scan_queue_tx.send(ScanJob { - abs_path, - path, - ignore_stack, - ancestor_inodes, - scan_queue: scan_queue_tx.clone(), - })) - .unwrap(); + smol::block_on(scan_queue_tx.send(ScanJob { + abs_path, + path, + ignore_stack, + ancestor_inodes, + scan_queue: scan_queue_tx.clone(), + })) + .unwrap(); } } } @@ -2649,7 +2596,10 @@ impl BackgroundScanner { Some(event_paths) } - async fn update_ignore_statuses(&self) { + fn update_ignore_statuses( + &self, + ignore_queue_tx: Sender, + ) -> LocalSnapshot { let mut snapshot = self.snapshot.lock().clone(); let mut ignores_to_update = Vec::new(); let mut ignores_to_delete = Vec::new(); @@ -2674,7 +2624,6 @@ impl BackgroundScanner { .remove(&parent_abs_path); } - let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); ignores_to_update.sort_unstable(); let mut ignores_to_update = ignores_to_update.into_iter().peekable(); while let Some(parent_abs_path) = ignores_to_update.next() { @@ -2686,35 +2635,15 @@ impl BackgroundScanner { } let ignore_stack = snapshot.ignore_stack_for_abs_path(&parent_abs_path, true); - ignore_queue_tx - .send(UpdateIgnoreStatusJob { - abs_path: parent_abs_path, - ignore_stack, - ignore_queue: ignore_queue_tx.clone(), - }) - .await - .unwrap(); + smol::block_on(ignore_queue_tx.send(UpdateIgnoreStatusJob { + abs_path: parent_abs_path, + ignore_stack, + ignore_queue: ignore_queue_tx.clone(), + })) + .unwrap(); } - drop(ignore_queue_tx); - self.executor - .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - while let Ok(job) = ignore_queue_rx.recv().await { - self.update_ignore_status(job, &snapshot).await; - } - }); - } - }) - .await; - } - - fn update_git_repositories(&self) { - let mut snapshot = self.snapshot.lock(); - let mut git_repositories = mem::take(&mut snapshot.git_repositories); - git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some()); - snapshot.git_repositories = git_repositories; + snapshot } async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) { @@ -2728,7 +2657,7 @@ impl BackgroundScanner { let path = job.abs_path.strip_prefix(&snapshot.abs_path).unwrap(); for mut entry in snapshot.child_entries(path).cloned() { let was_ignored = entry.is_ignored; - let abs_path = self.abs_path().join(&entry.path); + let abs_path = snapshot.abs_path().join(&entry.path); entry.is_ignored = ignore_stack.is_abs_path_ignored(&abs_path, entry.is_dir()); if entry.is_dir() { let child_ignore_stack = if entry.is_ignored { @@ -2762,16 +2691,16 @@ impl BackgroundScanner { fn build_change_set( &self, - old_snapshot: Snapshot, + old_snapshot: &Snapshot, + new_snapshot: &Snapshot, event_paths: Vec>, - received_before_initialized: bool, ) -> HashMap, PathChange> { use PathChange::{Added, AddedOrUpdated, Removed, Updated}; - let new_snapshot = self.snapshot.lock(); let mut changes = HashMap::default(); let mut old_paths = old_snapshot.entries_by_path.cursor::(); let mut new_paths = new_snapshot.entries_by_path.cursor::(); + let received_before_initialized = !self.finished_initial_scan; for path in event_paths { let path = PathKey(path); @@ -2799,9 +2728,9 @@ impl BackgroundScanner { // If the worktree was not fully initialized when this event was generated, // we can't know whether this entry was added during the scan or whether // it was merely updated. - changes.insert(old_entry.path.clone(), AddedOrUpdated); + changes.insert(new_entry.path.clone(), AddedOrUpdated); } else if old_entry.mtime != new_entry.mtime { - changes.insert(old_entry.path.clone(), Updated); + changes.insert(new_entry.path.clone(), Updated); } old_paths.next(&()); new_paths.next(&()); @@ -2826,6 +2755,19 @@ impl BackgroundScanner { } changes } + + async fn progress_timer(&self, running: bool) { + if !running { + return futures::future::pending().await; + } + + #[cfg(any(test, feature = "test-support"))] + if self.fs.is_fake() { + return self.executor.simulate_random_delay().await; + } + + smol::Timer::after(Duration::from_millis(100)).await; + } } fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag { @@ -2839,7 +2781,7 @@ fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag { } struct ScanJob { - abs_path: PathBuf, + abs_path: Arc, path: Arc, ignore_stack: Arc, scan_queue: Sender, @@ -3524,7 +3466,7 @@ mod tests { let fs = FakeFs::new(cx.background()); fs.insert_tree( - "/a", + "/root", json!({ "b": {}, "c": {}, @@ -3535,7 +3477,7 @@ mod tests { let tree = Worktree::local( client, - "/a".as_ref(), + "/root".as_ref(), true, fs, Default::default(), @@ -3555,6 +3497,7 @@ mod tests { assert!(entry.is_dir()); cx.foreground().run_until_parked(); + tree.read_with(cx, |tree, _| { assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir); }); diff --git a/crates/sum_tree/src/tree_map.rs b/crates/sum_tree/src/tree_map.rs index 112366cdf5..0778cc5294 100644 --- a/crates/sum_tree/src/tree_map.rs +++ b/crates/sum_tree/src/tree_map.rs @@ -154,6 +154,12 @@ impl TreeSet where K: Clone + Debug + Default + Ord, { + pub fn from_ordered_entries(entries: impl IntoIterator) -> Self { + Self(TreeMap::from_ordered_entries( + entries.into_iter().map(|key| (key, ())), + )) + } + pub fn insert(&mut self, key: K) { self.0.insert(key, ()); }