From f7a86967fd488e72f8166c74bb5fb2cbc9cd8953 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 24 May 2024 17:41:35 -0700 Subject: [PATCH] Avoid holding worktree lock for a long time while updating large repos' git status (#12266) Fixes https://github.com/zed-industries/zed/issues/9575 Fixes https://github.com/zed-industries/zed/issues/4294 ### Problem When a large git repository's `.git` folder changes (due to a `git commit`, `git reset` etc), Zed needs to recompute the git status for every file in that git repository. Part of computing the git status is the *unstaged* part - the comparison between the content of the file and the version in the git index. In a large git repository like `chromium` or `linux`, this is inherently pretty slow. Previously, we performed this git status all at once, and held a lock on our `BackgroundScanner`'s state for the entire time. On my laptop, in the `linux` repo, this would often take around 13 seconds. When opening a file, Zed always refreshes the metadata for that file in its in-memory snapshot of worktree. This is normally very fast, but if another task is holding a lock on the `BackgroundScanner`, it blocks. ### Solution I've restructured how Zed handles Git statuses, so that when a git repository is updated, we recompute files' git statuses in fixed-sized batches. In between these batches, the `BackgroundScanner` is free to perform other work, so that file operations coming from the main thread will still be responsive. Release Notes: - Fixed a bug that caused long delays in opening files right after performing a commit in very large git repositories. --- Cargo.lock | 1 + crates/project/src/project.rs | 2 +- crates/worktree/Cargo.toml | 1 + crates/worktree/src/worktree.rs | 710 ++++++++++++++------------ crates/worktree/src/worktree_tests.rs | 120 +---- 5 files changed, 384 insertions(+), 450 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9844fb5b1f..2c4170048b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12799,6 +12799,7 @@ dependencies = [ "client", "clock", "collections", + "env_logger", "fs", "futures 0.3.28", "fuzzy", diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 34c8dec394..0ba691472c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -11014,7 +11014,7 @@ async fn search_ignored_entry( } } else if !fs_metadata.is_symlink { if !query.file_matches(Some(&ignored_abs_path)) - || snapshot.is_path_excluded(ignored_entry.path.to_path_buf()) + || snapshot.is_path_excluded(&ignored_entry.path) { continue; } diff --git a/crates/worktree/Cargo.toml b/crates/worktree/Cargo.toml index 89bb3e847e..c590c41577 100644 --- a/crates/worktree/Cargo.toml +++ b/crates/worktree/Cargo.toml @@ -52,6 +52,7 @@ util.workspace = true [dev-dependencies] clock = {workspace = true, features = ["test-support"]} collections = { workspace = true, features = ["test-support"] } +env_logger.workspace = true git2.workspace = true gpui = {workspace = true, features = ["test-support"]} http.workspace = true diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index 6ecb79eb79..ccef0ab290 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -45,6 +45,7 @@ use postage::{ use serde::Serialize; use settings::{Settings, SettingsLocation, SettingsStore}; use smol::channel::{self, Sender}; +use std::time::Instant; use std::{ any::Any, cmp::{self, Ordering}, @@ -76,6 +77,8 @@ pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100); #[cfg(not(feature = "test-support"))] pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100); +const GIT_STATUS_UPDATE_BATCH_SIZE: usize = 100; + #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)] pub struct WorktreeId(usize); @@ -1549,7 +1552,7 @@ impl LocalWorktree { old_path: Option>, cx: &mut ModelContext, ) -> Task>> { - if self.is_path_excluded(path.to_path_buf()) { + if self.is_path_excluded(&path) { return Task::ready(Ok(None)); } let paths = if let Some(old_path) = old_path.as_ref() { @@ -1557,9 +1560,11 @@ impl LocalWorktree { } else { vec![path.clone()] }; + let t0 = Instant::now(); let mut refresh = self.refresh_entries_for_paths(paths); cx.spawn(move |this, mut cx| async move { refresh.recv().await; + log::trace!("refreshed entry {path:?} in {:?}", t0.elapsed()); let new_entry = this.update(&mut cx, |this, _| { this.entry_for_path(path) .cloned() @@ -2015,12 +2020,16 @@ impl Snapshot { ) -> Traversal { let mut cursor = self.entries_by_path.cursor(); cursor.seek(&TraversalTarget::Path(path), Bias::Left, &()); - Traversal { + let mut traversal = Traversal { cursor, include_files, include_dirs, include_ignored, + }; + if traversal.end_offset() == traversal.start_offset() { + traversal.next(); } + traversal } pub fn files(&self, include_ignored: bool, start: usize) -> Traversal { @@ -2174,31 +2183,6 @@ impl Snapshot { } } - pub fn descendent_entries<'a>( - &'a self, - include_dirs: bool, - include_ignored: bool, - parent_path: &'a Path, - ) -> DescendentEntriesIter<'a> { - let mut cursor = self.entries_by_path.cursor(); - cursor.seek(&TraversalTarget::Path(parent_path), Bias::Left, &()); - let mut traversal = Traversal { - cursor, - include_files: true, - include_dirs, - include_ignored, - }; - - if traversal.end_offset() == traversal.start_offset() { - traversal.advance(); - } - - DescendentEntriesIter { - traversal, - parent_path, - } - } - pub fn root_entry(&self) -> Option<&Entry> { self.entry_for_path("") } @@ -2538,19 +2522,12 @@ impl LocalSnapshot { }) } - pub fn is_path_excluded(&self, mut path: PathBuf) -> bool { - loop { - if self - .file_scan_exclusions + pub fn is_path_excluded(&self, path: &Path) -> bool { + path.ancestors().any(|path| { + self.file_scan_exclusions .iter() .any(|exclude_matcher| exclude_matcher.is_match(&path)) - { - return true; - } - if !path.pop() { - return false; - } - } + }) } } @@ -2578,11 +2555,11 @@ impl BackgroundScannerState { if let Some((repo_entry, repo)) = self.snapshot.repo_for_path(&path) { if let Some(workdir_path) = repo_entry.work_directory(&self.snapshot) { if let Ok(repo_path) = repo_entry.relativize(&self.snapshot, &path) { - containing_repository = Some(( - workdir_path, - repo.repo_ptr.clone(), - repo.repo_ptr.lock().staged_statuses(&repo_path), - )); + containing_repository = Some(ScanJobContainingRepository { + work_directory: workdir_path, + repository: repo.repo_ptr.clone(), + staged_statuses: repo.repo_ptr.lock().staged_statuses(&repo_path), + }); } } } @@ -2722,92 +2699,11 @@ impl BackgroundScannerState { self.snapshot.check_invariants(false); } - fn reload_repositories(&mut self, dot_git_dirs_to_reload: &HashSet, fs: &dyn Fs) { - let scan_id = self.snapshot.scan_id; - - for dot_git_dir in dot_git_dirs_to_reload { - // If there is already a repository for this .git directory, reload - // the status for all of its files. - let repository = self - .snapshot - .git_repositories - .iter() - .find_map(|(entry_id, repo)| { - (repo.git_dir_path.as_ref() == dot_git_dir).then(|| (*entry_id, repo.clone())) - }); - match repository { - None => { - self.build_git_repository(Arc::from(dot_git_dir.as_path()), fs); - } - Some((entry_id, repository)) => { - if repository.git_dir_scan_id == scan_id { - continue; - } - let Some(work_dir) = self - .snapshot - .entry_for_id(entry_id) - .map(|entry| RepositoryWorkDirectory(entry.path.clone())) - else { - continue; - }; - - log::info!("reload git repository {dot_git_dir:?}"); - let repository = repository.repo_ptr.lock(); - let branch = repository.branch_name(); - repository.reload_index(); - - self.snapshot - .git_repositories - .update(&entry_id, |entry| entry.git_dir_scan_id = scan_id); - self.snapshot - .snapshot - .repository_entries - .update(&work_dir, |entry| entry.branch = branch.map(Into::into)); - - self.update_git_statuses(&work_dir, &*repository); - } - } - } - - // Remove any git repositories whose .git entry no longer exists. - let snapshot = &mut self.snapshot; - let mut ids_to_preserve = HashSet::default(); - for (&work_directory_id, entry) in snapshot.git_repositories.iter() { - let exists_in_snapshot = snapshot - .entry_for_id(work_directory_id) - .map_or(false, |entry| { - snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some() - }); - if exists_in_snapshot { - ids_to_preserve.insert(work_directory_id); - } else { - let git_dir_abs_path = snapshot.abs_path().join(&entry.git_dir_path); - let git_dir_excluded = snapshot.is_path_excluded(entry.git_dir_path.to_path_buf()); - if git_dir_excluded - && !matches!(smol::block_on(fs.metadata(&git_dir_abs_path)), Ok(None)) - { - ids_to_preserve.insert(work_directory_id); - } - } - } - - snapshot - .git_repositories - .retain(|work_directory_id, _| ids_to_preserve.contains(work_directory_id)); - snapshot - .repository_entries - .retain(|_, entry| ids_to_preserve.contains(&entry.work_directory.0)); - } - fn build_git_repository( &mut self, dot_git_path: Arc, fs: &dyn Fs, - ) -> Option<( - RepositoryWorkDirectory, - Arc>, - TreeMap, - )> { + ) -> Option<(RepositoryWorkDirectory, Arc>)> { let work_dir_path: Arc = match dot_git_path.parent() { Some(parent_dir) => { // Guard against repositories inside the repository metadata @@ -2842,11 +2738,7 @@ impl BackgroundScannerState { dot_git_path: Arc, location_in_repo: Option>, fs: &dyn Fs, - ) -> Option<( - RepositoryWorkDirectory, - Arc>, - TreeMap, - )> { + ) -> Option<(RepositoryWorkDirectory, Arc>)> { let work_dir_id = self .snapshot .entry_for_path(work_dir_path.clone()) @@ -2857,22 +2749,17 @@ impl BackgroundScannerState { } let abs_path = self.snapshot.abs_path.join(&dot_git_path); - let repository = fs.open_repo(abs_path.as_path())?; + let repository = fs.open_repo(&abs_path)?; let work_directory = RepositoryWorkDirectory(work_dir_path.clone()); - let repo_lock = repository.lock(); self.snapshot.repository_entries.insert( work_directory.clone(), RepositoryEntry { work_directory: work_dir_id.into(), - branch: repo_lock.branch_name().map(Into::into), + branch: repository.lock().branch_name().map(Into::into), location_in_repo, }, ); - - let staged_statuses = self.update_git_statuses(&work_directory, &*repo_lock); - drop(repo_lock); - self.snapshot.git_repositories.insert( work_dir_id, LocalRepositoryEntry { @@ -2882,47 +2769,7 @@ impl BackgroundScannerState { }, ); - Some((work_directory, repository, staged_statuses)) - } - - fn update_git_statuses( - &mut self, - work_directory: &RepositoryWorkDirectory, - repo: &dyn GitRepository, - ) -> TreeMap { - let repo_entry = self.snapshot.repository_entries.get(work_directory); - let staged_statuses = repo.staged_statuses(Path::new("")); - - let mut changes = vec![]; - let mut edits = vec![]; - - for mut entry in self - .snapshot - .descendent_entries(false, false, &work_directory.0) - .cloned() - { - let repo_path = - repo_entry.map(|repo_entry| repo_entry.relativize(&self.snapshot, &entry.path)); - let Some(Ok(repo_path)) = repo_path else { - continue; - }; - let Some(mtime) = entry.mtime else { - continue; - }; - let git_file_status = combine_git_statuses( - staged_statuses.get(&repo_path).copied(), - repo.unstaged_status(&repo_path, mtime), - ); - if entry.git_status != git_file_status { - entry.git_status = git_file_status; - changes.push(entry.path.clone()); - edits.push(Edit::Insert(entry)); - } - } - - self.snapshot.entries_by_path.edit(edits, &()); - util::extend_sorted(&mut self.changed_paths, changes, usize::MAX, Ord::cmp); - staged_statuses + Some((work_directory, repository)) } } @@ -3522,9 +3369,10 @@ impl BackgroundScanner { async fn run(&mut self, mut fs_events_rx: Pin>>>) { use futures::FutureExt as _; - // Populate ignores above the root. + // If the worktree root does not contain a git repository, then find + // the git repository in an ancestor directory. Find any gitignore files + // in ancestor directories. let root_abs_path = self.state.lock().snapshot.abs_path.clone(); - let mut external_git_repo = None; for (index, ancestor) in root_abs_path.ancestors().enumerate() { if index != 0 { if let Ok(ignore) = @@ -3537,10 +3385,29 @@ impl BackgroundScanner { .insert(ancestor.into(), (ignore.into(), false)); } } - if ancestor.join(&*DOT_GIT).is_dir() { + + let ancestor_dot_git = ancestor.join(&*DOT_GIT); + if ancestor_dot_git.is_dir() { if index != 0 { - external_git_repo = Some(ancestor.to_path_buf()); + // We canonicalize, since the FS events use the canonicalized path. + if let Some(ancestor_dot_git) = + self.fs.canonicalize(&ancestor_dot_git).await.log_err() + { + let ancestor_git_events = + self.fs.watch(&ancestor_dot_git, FS_WATCH_LATENCY).await; + fs_events_rx = select(fs_events_rx, ancestor_git_events).boxed(); + + // We associate the external git repo with our root folder and + // also mark where in the git repo the root folder is located. + self.state.lock().build_git_repository_for_path( + Path::new("").into(), + ancestor_dot_git.into(), + Some(root_abs_path.strip_prefix(ancestor).unwrap().into()), + self.fs.as_ref(), + ); + }; } + // Reached root of git repository. break; } @@ -3570,41 +3437,6 @@ impl BackgroundScanner { state.snapshot.completed_scan_id = state.snapshot.scan_id; } - // If we don't have a git repository at the root, we check whether we found an external - // git repository. - if self.state.lock().snapshot.root_git_entry().is_none() { - if let Some(external_git_repo) = external_git_repo { - let root_abs_path = self.state.lock().snapshot.abs_path.clone(); - let external_dot_git = external_git_repo.join(&*DOT_GIT); - - // We canonicalize, since the FS events use the canonicalized path. - let dot_git_canonical_path = - self.fs.canonicalize(&external_dot_git).await.log_err(); - let location_in_repo = root_abs_path.strip_prefix(external_git_repo).log_err(); - - if let Some((dot_git_canonical_path, location_in_repo)) = - dot_git_canonical_path.zip(location_in_repo) - { - // We associate the external git repo with our root folder and - // also mark where in the git repo the root folder is located. - - self.state.lock().build_git_repository_for_path( - Arc::from(Path::new("")), - dot_git_canonical_path.clone().into(), - Some(location_in_repo.into()), - self.fs.as_ref(), - ); - - let external_events = self - .fs - .watch(&dot_git_canonical_path, FS_WATCH_LATENCY) - .await; - - fs_events_rx = select(fs_events_rx, external_events).boxed() - }; - } - } - self.send_status_update(false, None); // Process any any FS events that occurred while performing the initial scan. @@ -3710,7 +3542,7 @@ impl BackgroundScanner { }; let mut relative_paths = Vec::with_capacity(abs_paths.len()); - let mut dot_git_paths_to_reload = HashSet::default(); + let mut dot_git_paths = Vec::new(); abs_paths.sort_unstable(); abs_paths.dedup_by(|a, b| a.starts_with(&b)); abs_paths.retain(|abs_path| { @@ -3723,10 +3555,11 @@ impl BackgroundScanner { { let dot_git_path = dot_git_dir .strip_prefix(&root_canonical_path) - .ok() - .map(|path| path.to_path_buf()) - .unwrap_or_else(|| dot_git_dir.to_path_buf()); - dot_git_paths_to_reload.insert(dot_git_path.to_path_buf()); + .unwrap_or(dot_git_dir) + .to_path_buf(); + if !dot_git_paths.contains(&dot_git_path) { + dot_git_paths.push(dot_git_path); + } is_git_related = true; } @@ -3756,7 +3589,7 @@ impl BackgroundScanner { return false; } - if snapshot.is_path_excluded(relative_path.to_path_buf()) { + if snapshot.is_path_excluded(&relative_path) { if !is_git_related { log::debug!("ignoring FS event for excluded path {relative_path:?}"); } @@ -3768,14 +3601,9 @@ impl BackgroundScanner { } }); - if dot_git_paths_to_reload.is_empty() && relative_paths.is_empty() { - return; - } - - if !relative_paths.is_empty() { + let (scan_job_tx, scan_job_rx) = channel::unbounded(); + if !relative_paths.is_empty() || !dot_git_paths.is_empty() { log::debug!("received fs events {:?}", relative_paths); - - let (scan_job_tx, scan_job_rx) = channel::unbounded(); self.reload_entries_for_paths( root_path, root_canonical_path, @@ -3784,23 +3612,17 @@ impl BackgroundScanner { Some(scan_job_tx.clone()), ) .await; - drop(scan_job_tx); - self.scan_dirs(false, scan_job_rx).await; + } - let (scan_job_tx, scan_job_rx) = channel::unbounded(); - self.update_ignore_statuses(scan_job_tx).await; - self.scan_dirs(false, scan_job_rx).await; + self.update_ignore_statuses(scan_job_tx).await; + self.scan_dirs(false, scan_job_rx).await; + + if !dot_git_paths.is_empty() { + self.update_git_repositories(dot_git_paths).await; } { let mut state = self.state.lock(); - if !dot_git_paths_to_reload.is_empty() { - if relative_paths.is_empty() { - state.snapshot.scan_id += 1; - } - log::debug!("reloading repositories: {dot_git_paths_to_reload:?}"); - state.reload_repositories(&dot_git_paths_to_reload, self.fs.as_ref()); - } state.snapshot.completed_scan_id = state.snapshot.scan_id; for (_, entry_id) in mem::take(&mut state.removed_entry_ids) { state.scanned_dirs.remove(&entry_id); @@ -3932,43 +3754,74 @@ impl BackgroundScanner { async fn scan_dir(&self, job: &ScanJob) -> Result<()> { let root_abs_path; - let mut ignore_stack; - let mut new_ignore; let root_char_bag; - let next_entry_id; { - let state = self.state.lock(); - let snapshot = &state.snapshot; - root_abs_path = snapshot.abs_path().clone(); - if snapshot.is_path_excluded(job.path.to_path_buf()) { + let snapshot = &self.state.lock().snapshot; + if snapshot.is_path_excluded(&job.path) { log::error!("skipping excluded directory {:?}", job.path); return Ok(()); } log::debug!("scanning directory {:?}", job.path); - ignore_stack = job.ignore_stack.clone(); - new_ignore = None; + root_abs_path = snapshot.abs_path().clone(); root_char_bag = snapshot.root_char_bag; - next_entry_id = self.next_entry_id.clone(); - drop(state); } - let mut dotgit_path = None; + let next_entry_id = self.next_entry_id.clone(); + let mut ignore_stack = job.ignore_stack.clone(); + let mut containing_repository = job.containing_repository.clone(); + let mut new_ignore = None; let mut root_canonical_path = None; let mut new_entries: Vec = Vec::new(); let mut new_jobs: Vec> = Vec::new(); - let mut child_paths = self.fs.read_dir(&job.abs_path).await?; - while let Some(child_abs_path) = child_paths.next().await { - let child_abs_path: Arc = match child_abs_path { - Ok(child_abs_path) => child_abs_path.into(), - Err(error) => { - log::error!("error processing entry {:?}", error); - continue; + let mut child_paths = self + .fs + .read_dir(&job.abs_path) + .await? + .filter_map(|entry| async { + match entry { + Ok(entry) => Some(entry), + Err(error) => { + log::error!("error processing entry {:?}", error); + None + } } - }; + }) + .collect::>() + .await; + + // Ensure .git and gitignore files are processed first. + let mut ixs_to_move_to_front = Vec::new(); + for (ix, child_abs_path) in child_paths.iter().enumerate() { + let filename = child_abs_path.file_name().unwrap(); + if filename == *DOT_GIT { + ixs_to_move_to_front.insert(0, ix); + } else if filename == *GITIGNORE { + ixs_to_move_to_front.push(ix); + } + } + for (dest_ix, src_ix) in ixs_to_move_to_front.into_iter().enumerate() { + child_paths.swap(dest_ix, src_ix); + } + + for child_abs_path in child_paths { + let child_abs_path: Arc = child_abs_path.into(); let child_name = child_abs_path.file_name().unwrap(); let child_path: Arc = job.path.join(child_name).into(); - // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored - if child_name == *GITIGNORE { + + if child_name == *DOT_GIT { + if let Some((work_directory, repository)) = self + .state + .lock() + .build_git_repository(child_path.clone(), self.fs.as_ref()) + { + let staged_statuses = repository.lock().staged_statuses(Path::new("")); + containing_repository = Some(ScanJobContainingRepository { + work_directory, + repository, + staged_statuses, + }); + } + } else if child_name == *GITIGNORE { match build_gitignore(&child_abs_path, self.fs.as_ref()).await { Ok(ignore) => { let ignore = Arc::new(ignore); @@ -3983,42 +3836,15 @@ impl BackgroundScanner { ); } } - - // Update ignore status of any child entries we've already processed to reflect the - // ignore file in the current directory. Because `.gitignore` starts with a `.`, - // there should rarely be too numerous. Update the ignore stack associated with any - // new jobs as well. - let mut new_jobs = new_jobs.iter_mut(); - for entry in &mut new_entries { - let entry_abs_path = root_abs_path.join(&entry.path); - entry.is_ignored = - ignore_stack.is_abs_path_ignored(&entry_abs_path, entry.is_dir()); - - if entry.is_dir() { - if let Some(job) = new_jobs.next().expect("missing scan job for entry") { - job.ignore_stack = if entry.is_ignored { - IgnoreStack::all() - } else { - ignore_stack.clone() - }; - } - } - } - } - // If we find a .git, we'll need to load the repository. - else if child_name == *DOT_GIT { - dotgit_path = Some(child_path.clone()); } { - let relative_path = job.path.join(child_name); let mut state = self.state.lock(); - if state.snapshot.is_path_excluded(relative_path.clone()) { - log::debug!("skipping excluded child entry {relative_path:?}"); - state.remove_path(&relative_path); + if state.snapshot.is_path_excluded(&child_path) { + log::debug!("skipping excluded child entry {child_path:?}"); + state.remove_path(&child_path); continue; } - drop(state); } let child_metadata = match self.fs.metadata(&child_abs_path).await { @@ -4091,21 +3917,19 @@ impl BackgroundScanner { }, ancestor_inodes, scan_queue: job.scan_queue.clone(), - containing_repository: job.containing_repository.clone(), + containing_repository: containing_repository.clone(), })); } } else { child_entry.is_ignored = ignore_stack.is_abs_path_ignored(&child_abs_path, false); if !child_entry.is_ignored { - if let Some((repository_dir, repository, staged_statuses)) = - &job.containing_repository - { - if let Ok(repo_path) = child_entry.path.strip_prefix(&repository_dir.0) { + if let Some(repo) = &containing_repository { + if let Ok(repo_path) = child_entry.path.strip_prefix(&repo.work_directory) { if let Some(mtime) = child_entry.mtime { let repo_path = RepoPath(repo_path.into()); child_entry.git_status = combine_git_statuses( - staged_statuses.get(&repo_path).copied(), - repository.lock().unstaged_status(&repo_path, mtime), + repo.staged_statuses.get(&repo_path).copied(), + repo.repository.lock().unstaged_status(&repo_path, mtime), ); } } @@ -4145,14 +3969,7 @@ impl BackgroundScanner { state.populate_dir(&job.path, new_entries, new_ignore); - let repository = - dotgit_path.and_then(|path| state.build_git_repository(path, self.fs.as_ref())); - - for mut new_job in new_jobs.into_iter().flatten() { - if let Some(containing_repository) = &repository { - new_job.containing_repository = Some(containing_repository.clone()); - } - + for new_job in new_jobs.into_iter().flatten() { job.scan_queue .try_send(new_job) .expect("channel is unbounded"); @@ -4458,6 +4275,237 @@ impl BackgroundScanner { state.snapshot.entries_by_id.edit(entries_by_id_edits, &()); } + async fn update_git_repositories(&self, dot_git_paths: Vec) { + log::debug!("reloading repositories: {dot_git_paths:?}"); + + let (update_job_tx, update_job_rx) = channel::unbounded(); + { + let mut state = self.state.lock(); + let scan_id = state.snapshot.scan_id; + for dot_git_dir in dot_git_paths { + let existing_repository_entry = + state + .snapshot + .git_repositories + .iter() + .find_map(|(entry_id, repo)| { + (repo.git_dir_path.as_ref() == dot_git_dir) + .then(|| (*entry_id, repo.clone())) + }); + + let (work_dir, repository) = match existing_repository_entry { + None => { + match state.build_git_repository(dot_git_dir.into(), self.fs.as_ref()) { + Some(output) => output, + None => continue, + } + } + Some((entry_id, repository)) => { + if repository.git_dir_scan_id == scan_id { + continue; + } + let Some(work_dir) = state + .snapshot + .entry_for_id(entry_id) + .map(|entry| RepositoryWorkDirectory(entry.path.clone())) + else { + continue; + }; + + log::info!("reload git repository {dot_git_dir:?}"); + let repo = repository.repo_ptr.lock(); + let branch = repo.branch_name(); + repo.reload_index(); + + state + .snapshot + .git_repositories + .update(&entry_id, |entry| entry.git_dir_scan_id = scan_id); + state + .snapshot + .snapshot + .repository_entries + .update(&work_dir, |entry| entry.branch = branch.map(Into::into)); + (work_dir, repository.repo_ptr.clone()) + } + }; + + let staged_statuses = repository.lock().staged_statuses(Path::new("")); + let mut files = + state + .snapshot + .traverse_from_path(true, false, false, work_dir.0.as_ref()); + let mut start_path = work_dir.0.clone(); + while start_path.starts_with(&work_dir.0) { + files.advance_by(GIT_STATUS_UPDATE_BATCH_SIZE); + let end_path = files.entry().map(|e| e.path.clone()); + smol::block_on(update_job_tx.send(UpdateGitStatusesJob { + start_path: start_path.clone(), + end_path: end_path.clone(), + containing_repository: ScanJobContainingRepository { + work_directory: work_dir.clone(), + repository: repository.clone(), + staged_statuses: staged_statuses.clone(), + }, + })) + .unwrap(); + if let Some(end_path) = end_path { + start_path = end_path; + } else { + break; + } + } + } + + // Remove any git repositories whose .git entry no longer exists. + let snapshot = &mut state.snapshot; + let mut ids_to_preserve = HashSet::default(); + for (&work_directory_id, entry) in snapshot.git_repositories.iter() { + let exists_in_snapshot = snapshot + .entry_for_id(work_directory_id) + .map_or(false, |entry| { + snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some() + }); + if exists_in_snapshot { + ids_to_preserve.insert(work_directory_id); + } else { + let git_dir_abs_path = snapshot.abs_path().join(&entry.git_dir_path); + let git_dir_excluded = snapshot.is_path_excluded(&entry.git_dir_path); + if git_dir_excluded + && !matches!( + smol::block_on(self.fs.metadata(&git_dir_abs_path)), + Ok(None) + ) + { + ids_to_preserve.insert(work_directory_id); + } + } + } + + snapshot + .git_repositories + .retain(|work_directory_id, _| ids_to_preserve.contains(work_directory_id)); + snapshot + .repository_entries + .retain(|_, entry| ids_to_preserve.contains(&entry.work_directory.0)); + } + drop(update_job_tx); + + self.executor + .scoped(|scope| { + // Git status updates are currently not very parallelizable, + // because they need to lock the git repository. Limit the number + // of workers so that + for _ in 0..self.executor.num_cpus().min(3) { + scope.spawn(async { + let mut entries = Vec::with_capacity(GIT_STATUS_UPDATE_BATCH_SIZE); + loop { + select_biased! { + // Process any path refresh requests before moving on to process + // the queue of ignore statuses. + request = self.scan_requests_rx.recv().fuse() => { + let Ok(request) = request else { break }; + if !self.process_scan_request(request, true).await { + return; + } + } + + // Process git status updates in batches. + job = update_job_rx.recv().fuse() => { + let Ok(job) = job else { break }; + self.update_git_statuses(job, &mut entries); + } + } + } + }); + } + }) + .await; + } + + /// Update the git statuses for a given batch of entries. + fn update_git_statuses(&self, job: UpdateGitStatusesJob, entries: &mut Vec) { + let t0 = Instant::now(); + let repo_work_dir = &job.containing_repository.work_directory; + let state = self.state.lock(); + let Some(repo_entry) = state + .snapshot + .repository_entries + .get(&repo_work_dir) + .cloned() + else { + return; + }; + + // Retrieve a batch of entries for this job, and then release the state lock. + entries.clear(); + for entry in state + .snapshot + .traverse_from_path(true, false, false, &job.start_path) + { + if job + .end_path + .as_ref() + .map_or(false, |end| &entry.path >= end) + || !entry.path.starts_with(&repo_work_dir) + { + break; + } + entries.push(entry.clone()); + } + drop(state); + + // Determine which entries in this batch have changed their git status. + let mut edits = vec![]; + for entry in entries.iter() { + let Ok(repo_path) = entry.path.strip_prefix(&repo_work_dir) else { + continue; + }; + let Some(mtime) = entry.mtime else { + continue; + }; + let repo_path = RepoPath(if let Some(location) = &repo_entry.location_in_repo { + location.join(repo_path) + } else { + repo_path.to_path_buf() + }); + let git_status = combine_git_statuses( + job.containing_repository + .staged_statuses + .get(&repo_path) + .copied(), + job.containing_repository + .repository + .lock() + .unstaged_status(&repo_path, mtime), + ); + if entry.git_status != git_status { + let mut entry = entry.clone(); + entry.git_status = git_status; + edits.push(Edit::Insert(entry)); + } + } + + // Apply the git status changes. + let mut state = self.state.lock(); + let path_changes = edits.iter().map(|edit| { + if let Edit::Insert(entry) = edit { + entry.path.clone() + } else { + unreachable!() + } + }); + util::extend_sorted(&mut state.changed_paths, path_changes, usize::MAX, Ord::cmp); + state.snapshot.entries_by_path.edit(edits, &()); + + log::trace!( + "refreshed git status of {} entries starting with {} in {:?}", + entries.len(), + job.start_path.display(), + t0.elapsed() + ); + } + fn build_change_set( &self, old_snapshot: &Snapshot, @@ -4607,11 +4655,14 @@ struct ScanJob { scan_queue: Sender, ancestor_inodes: TreeSet, is_external: bool, - containing_repository: Option<( - RepositoryWorkDirectory, - Arc>, - TreeMap, - )>, + containing_repository: Option, +} + +#[derive(Clone)] +struct ScanJobContainingRepository { + work_directory: RepositoryWorkDirectory, + repository: Arc>, + staged_statuses: TreeMap, } struct UpdateIgnoreStatusJob { @@ -4621,6 +4672,12 @@ struct UpdateIgnoreStatusJob { scan_queue: Sender, } +struct UpdateGitStatusesJob { + start_path: Arc, + end_path: Option>, + containing_repository: ScanJobContainingRepository, +} + pub trait WorktreeModelHandle { #[cfg(any(test, feature = "test-support"))] fn flush_fs_events<'a>( @@ -4829,9 +4886,13 @@ pub struct Traversal<'a> { impl<'a> Traversal<'a> { pub fn advance(&mut self) -> bool { + self.advance_by(1) + } + + pub fn advance_by(&mut self, count: usize) -> bool { self.cursor.seek_forward( &TraversalTarget::Count { - count: self.end_offset() + 1, + count: self.end_offset() + count, include_dirs: self.include_dirs, include_files: self.include_files, include_ignored: self.include_ignored, @@ -4953,25 +5014,6 @@ impl<'a> Iterator for ChildEntriesIter<'a> { } } -pub struct DescendentEntriesIter<'a> { - parent_path: &'a Path, - traversal: Traversal<'a>, -} - -impl<'a> Iterator for DescendentEntriesIter<'a> { - type Item = &'a Entry; - - fn next(&mut self) -> Option { - if let Some(item) = self.traversal.entry() { - if item.path.starts_with(&self.parent_path) { - self.traversal.advance(); - return Some(item); - } - } - None - } -} - impl<'a> From<&'a Entry> for proto::Entry { fn from(entry: &'a Entry) -> Self { Self { diff --git a/crates/worktree/src/worktree_tests.rs b/crates/worktree/src/worktree_tests.rs index 12282fed15..9f040da857 100644 --- a/crates/worktree/src/worktree_tests.rs +++ b/crates/worktree/src/worktree_tests.rs @@ -15,13 +15,7 @@ use pretty_assertions::assert_eq; use rand::prelude::*; use serde_json::json; use settings::{Settings, SettingsStore}; -use std::{ - env, - fmt::Write, - mem, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{env, fmt::Write, mem, path::Path, sync::Arc}; use util::{test::temp_tree, ResultExt}; #[gpui::test] @@ -80,114 +74,6 @@ async fn test_traversal(cx: &mut TestAppContext) { }) } -#[gpui::test] -async fn test_descendent_entries(cx: &mut TestAppContext) { - init_test(cx); - let fs = FakeFs::new(cx.background_executor.clone()); - fs.insert_tree( - "/root", - json!({ - "a": "", - "b": { - "c": { - "d": "" - }, - "e": {} - }, - "f": "", - "g": { - "h": {} - }, - "i": { - "j": { - "k": "" - }, - "l": { - - } - }, - ".gitignore": "i/j\n", - }), - ) - .await; - - let tree = Worktree::local( - build_client(cx), - Path::new("/root"), - true, - fs, - Default::default(), - &mut cx.to_async(), - ) - .await - .unwrap(); - cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete()) - .await; - - tree.read_with(cx, |tree, _| { - assert_eq!( - tree.descendent_entries(false, false, Path::new("b")) - .map(|entry| entry.path.as_ref()) - .collect::>(), - vec![Path::new("b/c/d"),] - ); - assert_eq!( - tree.descendent_entries(true, false, Path::new("b")) - .map(|entry| entry.path.as_ref()) - .collect::>(), - vec![ - Path::new("b"), - Path::new("b/c"), - Path::new("b/c/d"), - Path::new("b/e"), - ] - ); - - assert_eq!( - tree.descendent_entries(false, false, Path::new("g")) - .map(|entry| entry.path.as_ref()) - .collect::>(), - Vec::::new() - ); - assert_eq!( - tree.descendent_entries(true, false, Path::new("g")) - .map(|entry| entry.path.as_ref()) - .collect::>(), - vec![Path::new("g"), Path::new("g/h"),] - ); - }); - - // Expand gitignored directory. - tree.read_with(cx, |tree, _| { - tree.as_local() - .unwrap() - .refresh_entries_for_paths(vec![Path::new("i/j").into()]) - }) - .recv() - .await; - - tree.read_with(cx, |tree, _| { - assert_eq!( - tree.descendent_entries(false, false, Path::new("i")) - .map(|entry| entry.path.as_ref()) - .collect::>(), - Vec::::new() - ); - assert_eq!( - tree.descendent_entries(false, true, Path::new("i")) - .map(|entry| entry.path.as_ref()) - .collect::>(), - vec![Path::new("i/j/k")] - ); - assert_eq!( - tree.descendent_entries(true, false, Path::new("i")) - .map(|entry| entry.path.as_ref()) - .collect::>(), - vec![Path::new("i"), Path::new("i/l"),] - ); - }) -} - #[gpui::test(iterations = 10)] async fn test_circular_symlinks(cx: &mut TestAppContext) { init_test(cx); @@ -2704,6 +2590,10 @@ fn check_worktree_entries( } fn init_test(cx: &mut gpui::TestAppContext) { + if std::env::var("RUST_LOG").is_ok() { + env_logger::try_init().ok(); + } + cx.update(|cx| { let settings_store = SettingsStore::test(cx); cx.set_global(settings_store);