From d12b8c3945e8af4a5540417943e8a92c0362ccc4 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 31 May 2024 09:10:09 -0700 Subject: [PATCH] Simplify and improve concurrency of git status updates (#12513) The quest for responsiveness in large git repos continues. This is a follow-up to https://github.com/zed-industries/zed/pull/12444 Release Notes: - N/A --- crates/worktree/src/worktree.rs | 177 ++++++++++++++------------------ 1 file changed, 76 insertions(+), 101 deletions(-) diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index 2a0336cbba..ef93a7c7b4 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -78,8 +78,6 @@ pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100); #[cfg(not(feature = "test-support"))] pub const FS_WATCH_LATENCY: Duration = Duration::from_millis(100); -const GIT_STATUS_UPDATE_BATCH_SIZE: usize = 1024; - #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)] pub struct WorktreeId(usize); @@ -4293,7 +4291,7 @@ impl BackgroundScanner { async fn update_git_repositories(&self, dot_git_paths: Vec) { log::debug!("reloading repositories: {dot_git_paths:?}"); - let (update_job_tx, update_job_rx) = channel::unbounded(); + let mut repo_updates = Vec::new(); { let mut state = self.state.lock(); let scan_id = state.snapshot.scan_id; @@ -4308,7 +4306,7 @@ impl BackgroundScanner { .then(|| (*entry_id, repo.clone())) }); - let (work_dir, repository) = match existing_repository_entry { + let (work_directory, repository) = match existing_repository_entry { None => { match state.build_git_repository(dot_git_dir.into(), self.fs.as_ref()) { Some(output) => output, @@ -4327,7 +4325,6 @@ impl BackgroundScanner { continue; }; - log::info!("reload git repository {dot_git_dir:?}"); let repo = &repository.repo_ptr; let branch = repo.branch_name(); repo.reload_index(); @@ -4345,41 +4342,16 @@ impl BackgroundScanner { } }; - let statuses = repository - .statuses(Path::new("")) - .log_err() - .unwrap_or_default(); - let entries = state.snapshot.entries_by_path.clone(); - let location_in_repo = state - .snapshot - .repository_entries - .get(&work_dir) - .and_then(|repo| repo.location_in_repo.clone()); - let mut files = - state + repo_updates.push(UpdateGitStatusesJob { + location_in_repo: state .snapshot - .traverse_from_path(true, false, false, work_dir.0.as_ref()); - let mut start_path = work_dir.0.clone(); - while start_path.starts_with(&work_dir.0) { - files.advance_by(GIT_STATUS_UPDATE_BATCH_SIZE); - let end_path = files.entry().map(|e| e.path.clone()); - smol::block_on(update_job_tx.send(UpdateGitStatusesJob { - start_path: start_path.clone(), - end_path: end_path.clone(), - entries: entries.clone(), - location_in_repo: location_in_repo.clone(), - containing_repository: ScanJobContainingRepository { - work_directory: work_dir.clone(), - statuses: statuses.clone(), - }, - })) - .unwrap(); - if let Some(end_path) = end_path { - start_path = end_path; - } else { - break; - } - } + .repository_entries + .get(&work_directory) + .and_then(|repo| repo.location_in_repo.clone()) + .clone(), + work_directory, + repository, + }); } // Remove any git repositories whose .git entry no longer exists. @@ -4414,87 +4386,92 @@ impl BackgroundScanner { .repository_entries .retain(|_, entry| ids_to_preserve.contains(&entry.work_directory.0)); } - drop(update_job_tx); + let (mut updates_done_tx, mut updates_done_rx) = barrier::channel(); self.executor .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - loop { - select_biased! { - // Process any path refresh requests before moving on to process - // the queue of git statuses. - request = self.scan_requests_rx.recv().fuse() => { - let Ok(request) = request else { break }; - if !self.process_scan_request(request, true).await { - return; - } - } + scope.spawn(async { + for repo_update in repo_updates { + self.update_git_statuses(repo_update); + } + updates_done_tx.blocking_send(()).ok(); + }); - // Process git status updates in batches. - job = update_job_rx.recv().fuse() => { - let Ok(job) = job else { break }; - self.update_git_statuses(job); + scope.spawn(async { + loop { + select_biased! { + // Process any path refresh requests before moving on to process + // the queue of git statuses. + request = self.scan_requests_rx.recv().fuse() => { + let Ok(request) = request else { break }; + if !self.process_scan_request(request, true).await { + return; } } + _ = updates_done_rx.recv().fuse() => break, } - }); - } + } + }); }) .await; } /// Update the git statuses for a given batch of entries. fn update_git_statuses(&self, job: UpdateGitStatusesJob) { - // Determine which entries in this batch have changed their git status. + log::trace!("updating git statuses for repo {:?}", job.work_directory.0); let t0 = Instant::now(); - let mut edits = Vec::new(); - for entry in Traversal::new(&job.entries, true, false, false, &job.start_path) { - if job - .end_path - .as_ref() - .map_or(false, |end| &entry.path >= end) - { + let Some(statuses) = job.repository.statuses(Path::new("")).log_err() else { + return; + }; + log::trace!( + "computed git statuses for repo {:?} in {:?}", + job.work_directory.0, + t0.elapsed() + ); + + let t0 = Instant::now(); + let mut changes = Vec::new(); + let snapshot = self.state.lock().snapshot.snapshot.clone(); + for file in snapshot.traverse_from_path(true, false, false, job.work_directory.0.as_ref()) { + let Ok(repo_path) = file.path.strip_prefix(&job.work_directory.0) else { break; - } - let Ok(repo_path) = entry - .path - .strip_prefix(&job.containing_repository.work_directory) - else { - continue; }; - let repo_path = RepoPath(if let Some(location) = &job.location_in_repo { - location.join(repo_path) + let git_status = if let Some(location) = &job.location_in_repo { + statuses.get(&location.join(repo_path)) } else { - repo_path.to_path_buf() - }); - let git_status = job.containing_repository.statuses.get(&repo_path); - if entry.git_status != git_status { - let mut entry = entry.clone(); + statuses.get(&repo_path) + }; + if file.git_status != git_status { + let mut entry = file.clone(); entry.git_status = git_status; - edits.push(Edit::Insert(entry)); + changes.push((entry.path, git_status)); } } + let mut state = self.state.lock(); + let edits = changes + .iter() + .filter_map(|(path, git_status)| { + let entry = state.snapshot.entry_for_path(path)?.clone(); + Some(Edit::Insert(Entry { + git_status: *git_status, + ..entry.clone() + })) + }) + .collect(); + // Apply the git status changes. - if edits.len() > 0 { - let mut state = self.state.lock(); - let path_changes = edits.iter().map(|edit| { - if let Edit::Insert(entry) = edit { - entry.path.clone() - } else { - unreachable!() - } - }); - util::extend_sorted(&mut state.changed_paths, path_changes, usize::MAX, Ord::cmp); - state.snapshot.entries_by_path.edit(edits, &()); - } - + util::extend_sorted( + &mut state.changed_paths, + changes.iter().map(|p| p.0.clone()), + usize::MAX, + Ord::cmp, + ); + state.snapshot.entries_by_path.edit(edits, &()); log::trace!( - "refreshed git status of entries starting with {} in {:?}", - // entries.len(), - job.start_path.display(), - t0.elapsed() + "applied git status updates for repo {:?} in {:?}", + job.work_directory.0, + t0.elapsed(), ); } @@ -4664,11 +4641,9 @@ struct UpdateIgnoreStatusJob { } struct UpdateGitStatusesJob { - entries: SumTree, - start_path: Arc, - end_path: Option>, - containing_repository: ScanJobContainingRepository, + work_directory: RepositoryWorkDirectory, location_in_repo: Option>, + repository: Arc, } pub trait WorktreeModelHandle {