Avoid holding worktree lock for a long time while updating large repos' git status (#12266)

Fixes https://github.com/zed-industries/zed/issues/9575
Fixes https://github.com/zed-industries/zed/issues/4294

### Problem

When a large git repository's `.git` folder changes (due to a `git
commit`, `git reset` etc), Zed needs to recompute the git status for
every file in that git repository. Part of computing the git status is
the *unstaged* part - the comparison between the content of the file and
the version in the git index. In a large git repository like `chromium`
or `linux`, this is inherently pretty slow.

Previously, we performed this git status all at once, and held a lock on
our `BackgroundScanner`'s state for the entire time. On my laptop, in
the `linux` repo, this would often take around 13 seconds.

When opening a file, Zed always refreshes the metadata for that file in
its in-memory snapshot of worktree. This is normally very fast, but if
another task is holding a lock on the `BackgroundScanner`, it blocks.

###  Solution

I've restructured how Zed handles Git statuses, so that when a git
repository is updated, we recompute files' git statuses in fixed-sized
batches. In between these batches, the `BackgroundScanner` is free to
perform other work, so that file operations coming from the main thread
will still be responsive.

Release Notes:

- Fixed a bug that caused long delays in opening files right after
performing a commit in very large git repositories.
This commit is contained in:
Max Brunsfeld 2024-05-24 17:41:35 -07:00 committed by GitHub
parent 800c1ba916
commit f7a86967fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 384 additions and 450 deletions

1
Cargo.lock generated
View File

@ -12799,6 +12799,7 @@ dependencies = [
"client",
"clock",
"collections",
"env_logger",
"fs",
"futures 0.3.28",
"fuzzy",

View File

@ -11014,7 +11014,7 @@ async fn search_ignored_entry(
}
} else if !fs_metadata.is_symlink {
if !query.file_matches(Some(&ignored_abs_path))
|| snapshot.is_path_excluded(ignored_entry.path.to_path_buf())
|| snapshot.is_path_excluded(&ignored_entry.path)
{
continue;
}

View File

@ -52,6 +52,7 @@ util.workspace = true
[dev-dependencies]
clock = {workspace = true, features = ["test-support"]}
collections = { workspace = true, features = ["test-support"] }
env_logger.workspace = true
git2.workspace = true
gpui = {workspace = true, features = ["test-support"]}
http.workspace = true

View File

@ -45,6 +45,7 @@ use postage::{
use serde::Serialize;
use settings::{Settings, SettingsLocation, SettingsStore};
use smol::channel::{self, Sender};
use std::time::Instant;
use std::{
any::Any,
cmp::{self, Ordering},
@ -76,6 +77,8 @@ pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100);
#[cfg(not(feature = "test-support"))]
pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100);
const GIT_STATUS_UPDATE_BATCH_SIZE: usize = 100;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
pub struct WorktreeId(usize);
@ -1549,7 +1552,7 @@ impl LocalWorktree {
old_path: Option<Arc<Path>>,
cx: &mut ModelContext<Worktree>,
) -> Task<Result<Option<Entry>>> {
if self.is_path_excluded(path.to_path_buf()) {
if self.is_path_excluded(&path) {
return Task::ready(Ok(None));
}
let paths = if let Some(old_path) = old_path.as_ref() {
@ -1557,9 +1560,11 @@ impl LocalWorktree {
} else {
vec![path.clone()]
};
let t0 = Instant::now();
let mut refresh = self.refresh_entries_for_paths(paths);
cx.spawn(move |this, mut cx| async move {
refresh.recv().await;
log::trace!("refreshed entry {path:?} in {:?}", t0.elapsed());
let new_entry = this.update(&mut cx, |this, _| {
this.entry_for_path(path)
.cloned()
@ -2015,12 +2020,16 @@ impl Snapshot {
) -> Traversal {
let mut cursor = self.entries_by_path.cursor();
cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
Traversal {
let mut traversal = Traversal {
cursor,
include_files,
include_dirs,
include_ignored,
};
if traversal.end_offset() == traversal.start_offset() {
traversal.next();
}
traversal
}
pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
@ -2174,31 +2183,6 @@ impl Snapshot {
}
}
pub fn descendent_entries<'a>(
&'a self,
include_dirs: bool,
include_ignored: bool,
parent_path: &'a Path,
) -> DescendentEntriesIter<'a> {
let mut cursor = self.entries_by_path.cursor();
cursor.seek(&TraversalTarget::Path(parent_path), Bias::Left, &());
let mut traversal = Traversal {
cursor,
include_files: true,
include_dirs,
include_ignored,
};
if traversal.end_offset() == traversal.start_offset() {
traversal.advance();
}
DescendentEntriesIter {
traversal,
parent_path,
}
}
pub fn root_entry(&self) -> Option<&Entry> {
self.entry_for_path("")
}
@ -2538,19 +2522,12 @@ impl LocalSnapshot {
})
}
pub fn is_path_excluded(&self, mut path: PathBuf) -> bool {
loop {
if self
.file_scan_exclusions
pub fn is_path_excluded(&self, path: &Path) -> bool {
path.ancestors().any(|path| {
self.file_scan_exclusions
.iter()
.any(|exclude_matcher| exclude_matcher.is_match(&path))
{
return true;
}
if !path.pop() {
return false;
}
}
})
}
}
@ -2578,11 +2555,11 @@ impl BackgroundScannerState {
if let Some((repo_entry, repo)) = self.snapshot.repo_for_path(&path) {
if let Some(workdir_path) = repo_entry.work_directory(&self.snapshot) {
if let Ok(repo_path) = repo_entry.relativize(&self.snapshot, &path) {
containing_repository = Some((
workdir_path,
repo.repo_ptr.clone(),
repo.repo_ptr.lock().staged_statuses(&repo_path),
));
containing_repository = Some(ScanJobContainingRepository {
work_directory: workdir_path,
repository: repo.repo_ptr.clone(),
staged_statuses: repo.repo_ptr.lock().staged_statuses(&repo_path),
});
}
}
}
@ -2722,92 +2699,11 @@ impl BackgroundScannerState {
self.snapshot.check_invariants(false);
}
fn reload_repositories(&mut self, dot_git_dirs_to_reload: &HashSet<PathBuf>, fs: &dyn Fs) {
let scan_id = self.snapshot.scan_id;
for dot_git_dir in dot_git_dirs_to_reload {
// If there is already a repository for this .git directory, reload
// the status for all of its files.
let repository = self
.snapshot
.git_repositories
.iter()
.find_map(|(entry_id, repo)| {
(repo.git_dir_path.as_ref() == dot_git_dir).then(|| (*entry_id, repo.clone()))
});
match repository {
None => {
self.build_git_repository(Arc::from(dot_git_dir.as_path()), fs);
}
Some((entry_id, repository)) => {
if repository.git_dir_scan_id == scan_id {
continue;
}
let Some(work_dir) = self
.snapshot
.entry_for_id(entry_id)
.map(|entry| RepositoryWorkDirectory(entry.path.clone()))
else {
continue;
};
log::info!("reload git repository {dot_git_dir:?}");
let repository = repository.repo_ptr.lock();
let branch = repository.branch_name();
repository.reload_index();
self.snapshot
.git_repositories
.update(&entry_id, |entry| entry.git_dir_scan_id = scan_id);
self.snapshot
.snapshot
.repository_entries
.update(&work_dir, |entry| entry.branch = branch.map(Into::into));
self.update_git_statuses(&work_dir, &*repository);
}
}
}
// Remove any git repositories whose .git entry no longer exists.
let snapshot = &mut self.snapshot;
let mut ids_to_preserve = HashSet::default();
for (&work_directory_id, entry) in snapshot.git_repositories.iter() {
let exists_in_snapshot = snapshot
.entry_for_id(work_directory_id)
.map_or(false, |entry| {
snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some()
});
if exists_in_snapshot {
ids_to_preserve.insert(work_directory_id);
} else {
let git_dir_abs_path = snapshot.abs_path().join(&entry.git_dir_path);
let git_dir_excluded = snapshot.is_path_excluded(entry.git_dir_path.to_path_buf());
if git_dir_excluded
&& !matches!(smol::block_on(fs.metadata(&git_dir_abs_path)), Ok(None))
{
ids_to_preserve.insert(work_directory_id);
}
}
}
snapshot
.git_repositories
.retain(|work_directory_id, _| ids_to_preserve.contains(work_directory_id));
snapshot
.repository_entries
.retain(|_, entry| ids_to_preserve.contains(&entry.work_directory.0));
}
fn build_git_repository(
&mut self,
dot_git_path: Arc<Path>,
fs: &dyn Fs,
) -> Option<(
RepositoryWorkDirectory,
Arc<Mutex<dyn GitRepository>>,
TreeMap<RepoPath, GitFileStatus>,
)> {
) -> Option<(RepositoryWorkDirectory, Arc<Mutex<dyn GitRepository>>)> {
let work_dir_path: Arc<Path> = match dot_git_path.parent() {
Some(parent_dir) => {
// Guard against repositories inside the repository metadata
@ -2842,11 +2738,7 @@ impl BackgroundScannerState {
dot_git_path: Arc<Path>,
location_in_repo: Option<Arc<Path>>,
fs: &dyn Fs,
) -> Option<(
RepositoryWorkDirectory,
Arc<Mutex<dyn GitRepository>>,
TreeMap<RepoPath, GitFileStatus>,
)> {
) -> Option<(RepositoryWorkDirectory, Arc<Mutex<dyn GitRepository>>)> {
let work_dir_id = self
.snapshot
.entry_for_path(work_dir_path.clone())
@ -2857,22 +2749,17 @@ impl BackgroundScannerState {
}
let abs_path = self.snapshot.abs_path.join(&dot_git_path);
let repository = fs.open_repo(abs_path.as_path())?;
let repository = fs.open_repo(&abs_path)?;
let work_directory = RepositoryWorkDirectory(work_dir_path.clone());
let repo_lock = repository.lock();
self.snapshot.repository_entries.insert(
work_directory.clone(),
RepositoryEntry {
work_directory: work_dir_id.into(),
branch: repo_lock.branch_name().map(Into::into),
branch: repository.lock().branch_name().map(Into::into),
location_in_repo,
},
);
let staged_statuses = self.update_git_statuses(&work_directory, &*repo_lock);
drop(repo_lock);
self.snapshot.git_repositories.insert(
work_dir_id,
LocalRepositoryEntry {
@ -2882,47 +2769,7 @@ impl BackgroundScannerState {
},
);
Some((work_directory, repository, staged_statuses))
}
fn update_git_statuses(
&mut self,
work_directory: &RepositoryWorkDirectory,
repo: &dyn GitRepository,
) -> TreeMap<RepoPath, GitFileStatus> {
let repo_entry = self.snapshot.repository_entries.get(work_directory);
let staged_statuses = repo.staged_statuses(Path::new(""));
let mut changes = vec![];
let mut edits = vec![];
for mut entry in self
.snapshot
.descendent_entries(false, false, &work_directory.0)
.cloned()
{
let repo_path =
repo_entry.map(|repo_entry| repo_entry.relativize(&self.snapshot, &entry.path));
let Some(Ok(repo_path)) = repo_path else {
continue;
};
let Some(mtime) = entry.mtime else {
continue;
};
let git_file_status = combine_git_statuses(
staged_statuses.get(&repo_path).copied(),
repo.unstaged_status(&repo_path, mtime),
);
if entry.git_status != git_file_status {
entry.git_status = git_file_status;
changes.push(entry.path.clone());
edits.push(Edit::Insert(entry));
}
}
self.snapshot.entries_by_path.edit(edits, &());
util::extend_sorted(&mut self.changed_paths, changes, usize::MAX, Ord::cmp);
staged_statuses
Some((work_directory, repository))
}
}
@ -3522,9 +3369,10 @@ impl BackgroundScanner {
async fn run(&mut self, mut fs_events_rx: Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>) {
use futures::FutureExt as _;
// Populate ignores above the root.
// If the worktree root does not contain a git repository, then find
// the git repository in an ancestor directory. Find any gitignore files
// in ancestor directories.
let root_abs_path = self.state.lock().snapshot.abs_path.clone();
let mut external_git_repo = None;
for (index, ancestor) in root_abs_path.ancestors().enumerate() {
if index != 0 {
if let Ok(ignore) =
@ -3537,10 +3385,29 @@ impl BackgroundScanner {
.insert(ancestor.into(), (ignore.into(), false));
}
}
if ancestor.join(&*DOT_GIT).is_dir() {
let ancestor_dot_git = ancestor.join(&*DOT_GIT);
if ancestor_dot_git.is_dir() {
if index != 0 {
external_git_repo = Some(ancestor.to_path_buf());
// We canonicalize, since the FS events use the canonicalized path.
if let Some(ancestor_dot_git) =
self.fs.canonicalize(&ancestor_dot_git).await.log_err()
{
let ancestor_git_events =
self.fs.watch(&ancestor_dot_git, FS_WATCH_LATENCY).await;
fs_events_rx = select(fs_events_rx, ancestor_git_events).boxed();
// We associate the external git repo with our root folder and
// also mark where in the git repo the root folder is located.
self.state.lock().build_git_repository_for_path(
Path::new("").into(),
ancestor_dot_git.into(),
Some(root_abs_path.strip_prefix(ancestor).unwrap().into()),
self.fs.as_ref(),
);
};
}
// Reached root of git repository.
break;
}
@ -3570,41 +3437,6 @@ impl BackgroundScanner {
state.snapshot.completed_scan_id = state.snapshot.scan_id;
}
// If we don't have a git repository at the root, we check whether we found an external
// git repository.
if self.state.lock().snapshot.root_git_entry().is_none() {
if let Some(external_git_repo) = external_git_repo {
let root_abs_path = self.state.lock().snapshot.abs_path.clone();
let external_dot_git = external_git_repo.join(&*DOT_GIT);
// We canonicalize, since the FS events use the canonicalized path.
let dot_git_canonical_path =
self.fs.canonicalize(&external_dot_git).await.log_err();
let location_in_repo = root_abs_path.strip_prefix(external_git_repo).log_err();
if let Some((dot_git_canonical_path, location_in_repo)) =
dot_git_canonical_path.zip(location_in_repo)
{
// We associate the external git repo with our root folder and
// also mark where in the git repo the root folder is located.
self.state.lock().build_git_repository_for_path(
Arc::from(Path::new("")),
dot_git_canonical_path.clone().into(),
Some(location_in_repo.into()),
self.fs.as_ref(),
);
let external_events = self
.fs
.watch(&dot_git_canonical_path, FS_WATCH_LATENCY)
.await;
fs_events_rx = select(fs_events_rx, external_events).boxed()
};
}
}
self.send_status_update(false, None);
// Process any any FS events that occurred while performing the initial scan.
@ -3710,7 +3542,7 @@ impl BackgroundScanner {
};
let mut relative_paths = Vec::with_capacity(abs_paths.len());
let mut dot_git_paths_to_reload = HashSet::default();
let mut dot_git_paths = Vec::new();
abs_paths.sort_unstable();
abs_paths.dedup_by(|a, b| a.starts_with(&b));
abs_paths.retain(|abs_path| {
@ -3723,10 +3555,11 @@ impl BackgroundScanner {
{
let dot_git_path = dot_git_dir
.strip_prefix(&root_canonical_path)
.ok()
.map(|path| path.to_path_buf())
.unwrap_or_else(|| dot_git_dir.to_path_buf());
dot_git_paths_to_reload.insert(dot_git_path.to_path_buf());
.unwrap_or(dot_git_dir)
.to_path_buf();
if !dot_git_paths.contains(&dot_git_path) {
dot_git_paths.push(dot_git_path);
}
is_git_related = true;
}
@ -3756,7 +3589,7 @@ impl BackgroundScanner {
return false;
}
if snapshot.is_path_excluded(relative_path.to_path_buf()) {
if snapshot.is_path_excluded(&relative_path) {
if !is_git_related {
log::debug!("ignoring FS event for excluded path {relative_path:?}");
}
@ -3768,14 +3601,9 @@ impl BackgroundScanner {
}
});
if dot_git_paths_to_reload.is_empty() && relative_paths.is_empty() {
return;
}
if !relative_paths.is_empty() {
let (scan_job_tx, scan_job_rx) = channel::unbounded();
if !relative_paths.is_empty() || !dot_git_paths.is_empty() {
log::debug!("received fs events {:?}", relative_paths);
let (scan_job_tx, scan_job_rx) = channel::unbounded();
self.reload_entries_for_paths(
root_path,
root_canonical_path,
@ -3784,23 +3612,17 @@ impl BackgroundScanner {
Some(scan_job_tx.clone()),
)
.await;
drop(scan_job_tx);
self.scan_dirs(false, scan_job_rx).await;
}
let (scan_job_tx, scan_job_rx) = channel::unbounded();
self.update_ignore_statuses(scan_job_tx).await;
self.scan_dirs(false, scan_job_rx).await;
self.update_ignore_statuses(scan_job_tx).await;
self.scan_dirs(false, scan_job_rx).await;
if !dot_git_paths.is_empty() {
self.update_git_repositories(dot_git_paths).await;
}
{
let mut state = self.state.lock();
if !dot_git_paths_to_reload.is_empty() {
if relative_paths.is_empty() {
state.snapshot.scan_id += 1;
}
log::debug!("reloading repositories: {dot_git_paths_to_reload:?}");
state.reload_repositories(&dot_git_paths_to_reload, self.fs.as_ref());
}
state.snapshot.completed_scan_id = state.snapshot.scan_id;
for (_, entry_id) in mem::take(&mut state.removed_entry_ids) {
state.scanned_dirs.remove(&entry_id);
@ -3932,43 +3754,74 @@ impl BackgroundScanner {
async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
let root_abs_path;
let mut ignore_stack;
let mut new_ignore;
let root_char_bag;
let next_entry_id;
{
let state = self.state.lock();
let snapshot = &state.snapshot;
root_abs_path = snapshot.abs_path().clone();
if snapshot.is_path_excluded(job.path.to_path_buf()) {
let snapshot = &self.state.lock().snapshot;
if snapshot.is_path_excluded(&job.path) {
log::error!("skipping excluded directory {:?}", job.path);
return Ok(());
}
log::debug!("scanning directory {:?}", job.path);
ignore_stack = job.ignore_stack.clone();
new_ignore = None;
root_abs_path = snapshot.abs_path().clone();
root_char_bag = snapshot.root_char_bag;
next_entry_id = self.next_entry_id.clone();
drop(state);
}
let mut dotgit_path = None;
let next_entry_id = self.next_entry_id.clone();
let mut ignore_stack = job.ignore_stack.clone();
let mut containing_repository = job.containing_repository.clone();
let mut new_ignore = None;
let mut root_canonical_path = None;
let mut new_entries: Vec<Entry> = Vec::new();
let mut new_jobs: Vec<Option<ScanJob>> = Vec::new();
let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
while let Some(child_abs_path) = child_paths.next().await {
let child_abs_path: Arc<Path> = match child_abs_path {
Ok(child_abs_path) => child_abs_path.into(),
Err(error) => {
log::error!("error processing entry {:?}", error);
continue;
let mut child_paths = self
.fs
.read_dir(&job.abs_path)
.await?
.filter_map(|entry| async {
match entry {
Ok(entry) => Some(entry),
Err(error) => {
log::error!("error processing entry {:?}", error);
None
}
}
};
})
.collect::<Vec<_>>()
.await;
// Ensure .git and gitignore files are processed first.
let mut ixs_to_move_to_front = Vec::new();
for (ix, child_abs_path) in child_paths.iter().enumerate() {
let filename = child_abs_path.file_name().unwrap();
if filename == *DOT_GIT {
ixs_to_move_to_front.insert(0, ix);
} else if filename == *GITIGNORE {
ixs_to_move_to_front.push(ix);
}
}
for (dest_ix, src_ix) in ixs_to_move_to_front.into_iter().enumerate() {
child_paths.swap(dest_ix, src_ix);
}
for child_abs_path in child_paths {
let child_abs_path: Arc<Path> = child_abs_path.into();
let child_name = child_abs_path.file_name().unwrap();
let child_path: Arc<Path> = job.path.join(child_name).into();
// If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
if child_name == *GITIGNORE {
if child_name == *DOT_GIT {
if let Some((work_directory, repository)) = self
.state
.lock()
.build_git_repository(child_path.clone(), self.fs.as_ref())
{
let staged_statuses = repository.lock().staged_statuses(Path::new(""));
containing_repository = Some(ScanJobContainingRepository {
work_directory,
repository,
staged_statuses,
});
}
} else if child_name == *GITIGNORE {
match build_gitignore(&child_abs_path, self.fs.as_ref()).await {
Ok(ignore) => {
let ignore = Arc::new(ignore);
@ -3983,42 +3836,15 @@ impl BackgroundScanner {
);
}
}
// Update ignore status of any child entries we've already processed to reflect the
// ignore file in the current directory. Because `.gitignore` starts with a `.`,
// there should rarely be too numerous. Update the ignore stack associated with any
// new jobs as well.
let mut new_jobs = new_jobs.iter_mut();
for entry in &mut new_entries {
let entry_abs_path = root_abs_path.join(&entry.path);
entry.is_ignored =
ignore_stack.is_abs_path_ignored(&entry_abs_path, entry.is_dir());
if entry.is_dir() {
if let Some(job) = new_jobs.next().expect("missing scan job for entry") {
job.ignore_stack = if entry.is_ignored {
IgnoreStack::all()
} else {
ignore_stack.clone()
};
}
}
}
}
// If we find a .git, we'll need to load the repository.
else if child_name == *DOT_GIT {
dotgit_path = Some(child_path.clone());
}
{
let relative_path = job.path.join(child_name);
let mut state = self.state.lock();
if state.snapshot.is_path_excluded(relative_path.clone()) {
log::debug!("skipping excluded child entry {relative_path:?}");
state.remove_path(&relative_path);
if state.snapshot.is_path_excluded(&child_path) {
log::debug!("skipping excluded child entry {child_path:?}");
state.remove_path(&child_path);
continue;
}
drop(state);
}
let child_metadata = match self.fs.metadata(&child_abs_path).await {
@ -4091,21 +3917,19 @@ impl BackgroundScanner {
},
ancestor_inodes,
scan_queue: job.scan_queue.clone(),
containing_repository: job.containing_repository.clone(),
containing_repository: containing_repository.clone(),
}));
}
} else {
child_entry.is_ignored = ignore_stack.is_abs_path_ignored(&child_abs_path, false);
if !child_entry.is_ignored {
if let Some((repository_dir, repository, staged_statuses)) =
&job.containing_repository
{
if let Ok(repo_path) = child_entry.path.strip_prefix(&repository_dir.0) {
if let Some(repo) = &containing_repository {
if let Ok(repo_path) = child_entry.path.strip_prefix(&repo.work_directory) {
if let Some(mtime) = child_entry.mtime {
let repo_path = RepoPath(repo_path.into());
child_entry.git_status = combine_git_statuses(
staged_statuses.get(&repo_path).copied(),
repository.lock().unstaged_status(&repo_path, mtime),
repo.staged_statuses.get(&repo_path).copied(),
repo.repository.lock().unstaged_status(&repo_path, mtime),
);
}
}
@ -4145,14 +3969,7 @@ impl BackgroundScanner {
state.populate_dir(&job.path, new_entries, new_ignore);
let repository =
dotgit_path.and_then(|path| state.build_git_repository(path, self.fs.as_ref()));
for mut new_job in new_jobs.into_iter().flatten() {
if let Some(containing_repository) = &repository {
new_job.containing_repository = Some(containing_repository.clone());
}
for new_job in new_jobs.into_iter().flatten() {
job.scan_queue
.try_send(new_job)
.expect("channel is unbounded");
@ -4458,6 +4275,237 @@ impl BackgroundScanner {
state.snapshot.entries_by_id.edit(entries_by_id_edits, &());
}
async fn update_git_repositories(&self, dot_git_paths: Vec<PathBuf>) {
log::debug!("reloading repositories: {dot_git_paths:?}");
let (update_job_tx, update_job_rx) = channel::unbounded();
{
let mut state = self.state.lock();
let scan_id = state.snapshot.scan_id;
for dot_git_dir in dot_git_paths {
let existing_repository_entry =
state
.snapshot
.git_repositories
.iter()
.find_map(|(entry_id, repo)| {
(repo.git_dir_path.as_ref() == dot_git_dir)
.then(|| (*entry_id, repo.clone()))
});
let (work_dir, repository) = match existing_repository_entry {
None => {
match state.build_git_repository(dot_git_dir.into(), self.fs.as_ref()) {
Some(output) => output,
None => continue,
}
}
Some((entry_id, repository)) => {
if repository.git_dir_scan_id == scan_id {
continue;
}
let Some(work_dir) = state
.snapshot
.entry_for_id(entry_id)
.map(|entry| RepositoryWorkDirectory(entry.path.clone()))
else {
continue;
};
log::info!("reload git repository {dot_git_dir:?}");
let repo = repository.repo_ptr.lock();
let branch = repo.branch_name();
repo.reload_index();
state
.snapshot
.git_repositories
.update(&entry_id, |entry| entry.git_dir_scan_id = scan_id);
state
.snapshot
.snapshot
.repository_entries
.update(&work_dir, |entry| entry.branch = branch.map(Into::into));
(work_dir, repository.repo_ptr.clone())
}
};
let staged_statuses = repository.lock().staged_statuses(Path::new(""));
let mut files =
state
.snapshot
.traverse_from_path(true, false, false, work_dir.0.as_ref());
let mut start_path = work_dir.0.clone();
while start_path.starts_with(&work_dir.0) {
files.advance_by(GIT_STATUS_UPDATE_BATCH_SIZE);
let end_path = files.entry().map(|e| e.path.clone());
smol::block_on(update_job_tx.send(UpdateGitStatusesJob {
start_path: start_path.clone(),
end_path: end_path.clone(),
containing_repository: ScanJobContainingRepository {
work_directory: work_dir.clone(),
repository: repository.clone(),
staged_statuses: staged_statuses.clone(),
},
}))
.unwrap();
if let Some(end_path) = end_path {
start_path = end_path;
} else {
break;
}
}
}
// Remove any git repositories whose .git entry no longer exists.
let snapshot = &mut state.snapshot;
let mut ids_to_preserve = HashSet::default();
for (&work_directory_id, entry) in snapshot.git_repositories.iter() {
let exists_in_snapshot = snapshot
.entry_for_id(work_directory_id)
.map_or(false, |entry| {
snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some()
});
if exists_in_snapshot {
ids_to_preserve.insert(work_directory_id);
} else {
let git_dir_abs_path = snapshot.abs_path().join(&entry.git_dir_path);
let git_dir_excluded = snapshot.is_path_excluded(&entry.git_dir_path);
if git_dir_excluded
&& !matches!(
smol::block_on(self.fs.metadata(&git_dir_abs_path)),
Ok(None)
)
{
ids_to_preserve.insert(work_directory_id);
}
}
}
snapshot
.git_repositories
.retain(|work_directory_id, _| ids_to_preserve.contains(work_directory_id));
snapshot
.repository_entries
.retain(|_, entry| ids_to_preserve.contains(&entry.work_directory.0));
}
drop(update_job_tx);
self.executor
.scoped(|scope| {
// Git status updates are currently not very parallelizable,
// because they need to lock the git repository. Limit the number
// of workers so that
for _ in 0..self.executor.num_cpus().min(3) {
scope.spawn(async {
let mut entries = Vec::with_capacity(GIT_STATUS_UPDATE_BATCH_SIZE);
loop {
select_biased! {
// Process any path refresh requests before moving on to process
// the queue of ignore statuses.
request = self.scan_requests_rx.recv().fuse() => {
let Ok(request) = request else { break };
if !self.process_scan_request(request, true).await {
return;
}
}
// Process git status updates in batches.
job = update_job_rx.recv().fuse() => {
let Ok(job) = job else { break };
self.update_git_statuses(job, &mut entries);
}
}
}
});
}
})
.await;
}
/// Update the git statuses for a given batch of entries.
fn update_git_statuses(&self, job: UpdateGitStatusesJob, entries: &mut Vec<Entry>) {
let t0 = Instant::now();
let repo_work_dir = &job.containing_repository.work_directory;
let state = self.state.lock();
let Some(repo_entry) = state
.snapshot
.repository_entries
.get(&repo_work_dir)
.cloned()
else {
return;
};
// Retrieve a batch of entries for this job, and then release the state lock.
entries.clear();
for entry in state
.snapshot
.traverse_from_path(true, false, false, &job.start_path)
{
if job
.end_path
.as_ref()
.map_or(false, |end| &entry.path >= end)
|| !entry.path.starts_with(&repo_work_dir)
{
break;
}
entries.push(entry.clone());
}
drop(state);
// Determine which entries in this batch have changed their git status.
let mut edits = vec![];
for entry in entries.iter() {
let Ok(repo_path) = entry.path.strip_prefix(&repo_work_dir) else {
continue;
};
let Some(mtime) = entry.mtime else {
continue;
};
let repo_path = RepoPath(if let Some(location) = &repo_entry.location_in_repo {
location.join(repo_path)
} else {
repo_path.to_path_buf()
});
let git_status = combine_git_statuses(
job.containing_repository
.staged_statuses
.get(&repo_path)
.copied(),
job.containing_repository
.repository
.lock()
.unstaged_status(&repo_path, mtime),
);
if entry.git_status != git_status {
let mut entry = entry.clone();
entry.git_status = git_status;
edits.push(Edit::Insert(entry));
}
}
// Apply the git status changes.
let mut state = self.state.lock();
let path_changes = edits.iter().map(|edit| {
if let Edit::Insert(entry) = edit {
entry.path.clone()
} else {
unreachable!()
}
});
util::extend_sorted(&mut state.changed_paths, path_changes, usize::MAX, Ord::cmp);
state.snapshot.entries_by_path.edit(edits, &());
log::trace!(
"refreshed git status of {} entries starting with {} in {:?}",
entries.len(),
job.start_path.display(),
t0.elapsed()
);
}
fn build_change_set(
&self,
old_snapshot: &Snapshot,
@ -4607,11 +4655,14 @@ struct ScanJob {
scan_queue: Sender<ScanJob>,
ancestor_inodes: TreeSet<u64>,
is_external: bool,
containing_repository: Option<(
RepositoryWorkDirectory,
Arc<Mutex<dyn GitRepository>>,
TreeMap<RepoPath, GitFileStatus>,
)>,
containing_repository: Option<ScanJobContainingRepository>,
}
#[derive(Clone)]
struct ScanJobContainingRepository {
work_directory: RepositoryWorkDirectory,
repository: Arc<Mutex<dyn GitRepository>>,
staged_statuses: TreeMap<RepoPath, GitFileStatus>,
}
struct UpdateIgnoreStatusJob {
@ -4621,6 +4672,12 @@ struct UpdateIgnoreStatusJob {
scan_queue: Sender<ScanJob>,
}
struct UpdateGitStatusesJob {
start_path: Arc<Path>,
end_path: Option<Arc<Path>>,
containing_repository: ScanJobContainingRepository,
}
pub trait WorktreeModelHandle {
#[cfg(any(test, feature = "test-support"))]
fn flush_fs_events<'a>(
@ -4829,9 +4886,13 @@ pub struct Traversal<'a> {
impl<'a> Traversal<'a> {
pub fn advance(&mut self) -> bool {
self.advance_by(1)
}
pub fn advance_by(&mut self, count: usize) -> bool {
self.cursor.seek_forward(
&TraversalTarget::Count {
count: self.end_offset() + 1,
count: self.end_offset() + count,
include_dirs: self.include_dirs,
include_files: self.include_files,
include_ignored: self.include_ignored,
@ -4953,25 +5014,6 @@ impl<'a> Iterator for ChildEntriesIter<'a> {
}
}
pub struct DescendentEntriesIter<'a> {
parent_path: &'a Path,
traversal: Traversal<'a>,
}
impl<'a> Iterator for DescendentEntriesIter<'a> {
type Item = &'a Entry;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.traversal.entry() {
if item.path.starts_with(&self.parent_path) {
self.traversal.advance();
return Some(item);
}
}
None
}
}
impl<'a> From<&'a Entry> for proto::Entry {
fn from(entry: &'a Entry) -> Self {
Self {

View File

@ -15,13 +15,7 @@ use pretty_assertions::assert_eq;
use rand::prelude::*;
use serde_json::json;
use settings::{Settings, SettingsStore};
use std::{
env,
fmt::Write,
mem,
path::{Path, PathBuf},
sync::Arc,
};
use std::{env, fmt::Write, mem, path::Path, sync::Arc};
use util::{test::temp_tree, ResultExt};
#[gpui::test]
@ -80,114 +74,6 @@ async fn test_traversal(cx: &mut TestAppContext) {
})
}
#[gpui::test]
async fn test_descendent_entries(cx: &mut TestAppContext) {
init_test(cx);
let fs = FakeFs::new(cx.background_executor.clone());
fs.insert_tree(
"/root",
json!({
"a": "",
"b": {
"c": {
"d": ""
},
"e": {}
},
"f": "",
"g": {
"h": {}
},
"i": {
"j": {
"k": ""
},
"l": {
}
},
".gitignore": "i/j\n",
}),
)
.await;
let tree = Worktree::local(
build_client(cx),
Path::new("/root"),
true,
fs,
Default::default(),
&mut cx.to_async(),
)
.await
.unwrap();
cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
.await;
tree.read_with(cx, |tree, _| {
assert_eq!(
tree.descendent_entries(false, false, Path::new("b"))
.map(|entry| entry.path.as_ref())
.collect::<Vec<_>>(),
vec![Path::new("b/c/d"),]
);
assert_eq!(
tree.descendent_entries(true, false, Path::new("b"))
.map(|entry| entry.path.as_ref())
.collect::<Vec<_>>(),
vec![
Path::new("b"),
Path::new("b/c"),
Path::new("b/c/d"),
Path::new("b/e"),
]
);
assert_eq!(
tree.descendent_entries(false, false, Path::new("g"))
.map(|entry| entry.path.as_ref())
.collect::<Vec<_>>(),
Vec::<PathBuf>::new()
);
assert_eq!(
tree.descendent_entries(true, false, Path::new("g"))
.map(|entry| entry.path.as_ref())
.collect::<Vec<_>>(),
vec![Path::new("g"), Path::new("g/h"),]
);
});
// Expand gitignored directory.
tree.read_with(cx, |tree, _| {
tree.as_local()
.unwrap()
.refresh_entries_for_paths(vec![Path::new("i/j").into()])
})
.recv()
.await;
tree.read_with(cx, |tree, _| {
assert_eq!(
tree.descendent_entries(false, false, Path::new("i"))
.map(|entry| entry.path.as_ref())
.collect::<Vec<_>>(),
Vec::<PathBuf>::new()
);
assert_eq!(
tree.descendent_entries(false, true, Path::new("i"))
.map(|entry| entry.path.as_ref())
.collect::<Vec<_>>(),
vec![Path::new("i/j/k")]
);
assert_eq!(
tree.descendent_entries(true, false, Path::new("i"))
.map(|entry| entry.path.as_ref())
.collect::<Vec<_>>(),
vec![Path::new("i"), Path::new("i/l"),]
);
})
}
#[gpui::test(iterations = 10)]
async fn test_circular_symlinks(cx: &mut TestAppContext) {
init_test(cx);
@ -2704,6 +2590,10 @@ fn check_worktree_entries(
}
fn init_test(cx: &mut gpui::TestAppContext) {
if std::env::var("RUST_LOG").is_ok() {
env_logger::try_init().ok();
}
cx.update(|cx| {
let settings_store = SettingsStore::test(cx);
cx.set_global(settings_store);