Merge pull request #2371 from zed-industries/refresh-entry-delay

Restructure background scanner to handle refresh requests even while scanning directories
This commit is contained in:
Max Brunsfeld 2023-04-13 12:44:04 -07:00 committed by GitHub
commit c13914bda1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 332 additions and 383 deletions

View File

@ -2183,7 +2183,7 @@ async fn test_apply_code_actions_with_commands(cx: &mut gpui::TestAppContext) {
});
}
#[gpui::test]
#[gpui::test(iterations = 10)]
async fn test_save_file(cx: &mut gpui::TestAppContext) {
let fs = FakeFs::new(cx.background());
fs.insert_tree(

View File

@ -12,7 +12,9 @@ use futures::{
mpsc::{self, UnboundedSender},
oneshot,
},
select_biased, Stream, StreamExt,
select_biased,
task::Poll,
Stream, StreamExt,
};
use fuzzy::CharBag;
use git::{DOT_GIT, GITIGNORE};
@ -41,11 +43,11 @@ use std::{
mem,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
},
task::Poll,
time::{Duration, SystemTime},
};
use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet};
@ -154,20 +156,12 @@ impl DerefMut for LocalSnapshot {
}
enum ScanState {
/// The worktree is performing its initial scan of the filesystem.
Initializing {
snapshot: LocalSnapshot,
barrier: Option<barrier::Sender>,
},
Initialized {
snapshot: LocalSnapshot,
},
/// The worktree is updating in response to filesystem events.
Updating,
Started,
Updated {
snapshot: LocalSnapshot,
changes: HashMap<Arc<Path>, PathChange>,
barrier: Option<barrier::Sender>,
scanning: bool,
},
}
@ -244,9 +238,24 @@ impl Worktree {
cx.spawn_weak(|this, mut cx| async move {
while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) {
this.update(&mut cx, |this, cx| {
this.as_local_mut()
.unwrap()
.background_scanner_updated(state, cx);
let this = this.as_local_mut().unwrap();
match state {
ScanState::Started => {
*this.is_scanning.0.borrow_mut() = true;
}
ScanState::Updated {
snapshot,
changes,
barrier,
scanning,
} => {
*this.is_scanning.0.borrow_mut() = scanning;
this.set_snapshot(snapshot, cx);
cx.emit(Event::UpdatedEntries(changes));
drop(barrier);
}
}
cx.notify();
});
}
})
@ -258,9 +267,15 @@ impl Worktree {
let background = cx.background().clone();
async move {
let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
BackgroundScanner::new(snapshot, scan_states_tx, fs, background)
.run(events, path_changes_rx)
.await;
BackgroundScanner::new(
snapshot,
fs,
scan_states_tx,
background,
path_changes_rx,
)
.run(events)
.await;
}
});
@ -533,38 +548,6 @@ impl LocalWorktree {
Ok(updated)
}
fn background_scanner_updated(
&mut self,
scan_state: ScanState,
cx: &mut ModelContext<Worktree>,
) {
match scan_state {
ScanState::Initializing { snapshot, barrier } => {
*self.is_scanning.0.borrow_mut() = true;
self.set_snapshot(snapshot, cx);
drop(barrier);
}
ScanState::Initialized { snapshot } => {
*self.is_scanning.0.borrow_mut() = false;
self.set_snapshot(snapshot, cx);
}
ScanState::Updating => {
*self.is_scanning.0.borrow_mut() = true;
}
ScanState::Updated {
snapshot,
changes,
barrier,
} => {
*self.is_scanning.0.borrow_mut() = false;
cx.emit(Event::UpdatedEntries(changes));
self.set_snapshot(snapshot, cx);
drop(barrier);
}
}
cx.notify();
}
fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
let updated_repos = Self::changed_repos(
&self.snapshot.git_repositories,
@ -1337,14 +1320,6 @@ impl Snapshot {
&self.root_name
}
pub fn scan_started(&mut self) {
self.scan_id += 1;
}
pub fn scan_completed(&mut self) {
self.completed_scan_id = self.scan_id;
}
pub fn scan_id(&self) -> usize {
self.scan_id
}
@ -1539,17 +1514,20 @@ impl LocalSnapshot {
return;
};
match parent_entry.kind {
EntryKind::PendingDir => {
parent_entry.kind = EntryKind::Dir;
}
EntryKind::Dir => {}
_ => return,
}
if let Some(ignore) = ignore {
self.ignores_by_parent_abs_path.insert(
self.abs_path.join(&parent_path).into(),
(ignore, self.scan_id),
);
}
if matches!(parent_entry.kind, EntryKind::PendingDir) {
parent_entry.kind = EntryKind::Dir;
} else {
unreachable!();
}
if parent_path.file_name() == Some(&DOT_GIT) {
let abs_path = self.abs_path.join(&parent_path);
@ -2135,53 +2113,47 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
}
struct BackgroundScanner {
fs: Arc<dyn Fs>,
snapshot: Mutex<LocalSnapshot>,
notify: UnboundedSender<ScanState>,
fs: Arc<dyn Fs>,
status_updates_tx: UnboundedSender<ScanState>,
executor: Arc<executor::Background>,
refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
prev_state: Mutex<(Snapshot, Vec<Arc<Path>>)>,
finished_initial_scan: bool,
}
impl BackgroundScanner {
fn new(
snapshot: LocalSnapshot,
notify: UnboundedSender<ScanState>,
fs: Arc<dyn Fs>,
status_updates_tx: UnboundedSender<ScanState>,
executor: Arc<executor::Background>,
refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
) -> Self {
Self {
fs,
snapshot: Mutex::new(snapshot),
notify,
status_updates_tx,
executor,
refresh_requests_rx,
prev_state: Mutex::new((snapshot.snapshot.clone(), Vec::new())),
snapshot: Mutex::new(snapshot),
finished_initial_scan: false,
}
}
fn abs_path(&self) -> Arc<Path> {
self.snapshot.lock().abs_path.clone()
}
async fn run(
self,
events_rx: impl Stream<Item = Vec<fsevent::Event>>,
mut changed_paths: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
&mut self,
mut events_rx: Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>,
) {
use futures::FutureExt as _;
// Retrieve the basic properties of the root node.
let root_char_bag;
let root_abs_path;
let root_inode;
let root_is_dir;
let next_entry_id;
{
let mut snapshot = self.snapshot.lock();
snapshot.scan_started();
root_char_bag = snapshot.root_char_bag;
root_abs_path = snapshot.abs_path.clone();
root_inode = snapshot.root_entry().map(|e| e.inode);
root_is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir());
next_entry_id = snapshot.next_entry_id.clone();
}
let (root_abs_path, root_inode) = {
let snapshot = self.snapshot.lock();
(
snapshot.abs_path.clone(),
snapshot.root_entry().map(|e| e.inode),
)
};
// Populate ignores above the root.
let ignore_stack;
@ -2205,198 +2177,220 @@ impl BackgroundScanner {
}
};
if root_is_dir {
let mut ancestor_inodes = TreeSet::default();
if let Some(root_inode) = root_inode {
ancestor_inodes.insert(root_inode);
// Perform an initial scan of the directory.
let (scan_job_tx, scan_job_rx) = channel::unbounded();
smol::block_on(scan_job_tx.send(ScanJob {
abs_path: root_abs_path,
path: Arc::from(Path::new("")),
ignore_stack,
ancestor_inodes: TreeSet::from_ordered_entries(root_inode),
scan_queue: scan_job_tx.clone(),
}))
.unwrap();
drop(scan_job_tx);
self.scan_dirs(true, scan_job_rx).await;
self.send_status_update(false, None);
// Process any any FS events that occurred while performing the initial scan.
// For these events, update events cannot be as precise, because we didn't
// have the previous state loaded yet.
if let Poll::Ready(Some(events)) = futures::poll!(events_rx.next()) {
let mut paths = events.into_iter().map(|e| e.path).collect::<Vec<_>>();
while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) {
paths.extend(more_events.into_iter().map(|e| e.path));
}
let (tx, rx) = channel::unbounded();
self.executor
.block(tx.send(ScanJob {
abs_path: root_abs_path.to_path_buf(),
path: Arc::from(Path::new("")),
ignore_stack,
ancestor_inodes,
scan_queue: tx.clone(),
}))
.unwrap();
drop(tx);
let progress_update_count = AtomicUsize::new(0);
self.executor
.scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
let mut last_progress_update_count = 0;
let progress_update_timer = self.pause_between_progress_updates().fuse();
futures::pin_mut!(progress_update_timer);
loop {
select_biased! {
// Send periodic progress updates to the worktree. Use an atomic counter
// to ensure that only one of the workers sends a progress update after
// the update interval elapses.
_ = progress_update_timer => {
match progress_update_count.compare_exchange(
last_progress_update_count,
last_progress_update_count + 1,
SeqCst,
SeqCst
) {
Ok(_) => {
last_progress_update_count += 1;
if self
.notify
.unbounded_send(ScanState::Initializing {
snapshot: self.snapshot.lock().clone(),
barrier: None,
})
.is_err()
{
break;
}
}
Err(current_count) => last_progress_update_count = current_count,
}
progress_update_timer.set(self.pause_between_progress_updates().fuse());
}
// Refresh any paths requested by the main thread.
job = changed_paths.recv().fuse() => {
let Ok((abs_paths, barrier)) = job else { break };
self.update_entries_for_paths(abs_paths, None).await;
if self
.notify
.unbounded_send(ScanState::Initializing {
snapshot: self.snapshot.lock().clone(),
barrier: Some(barrier),
})
.is_err()
{
break;
}
}
// Recursively load directories from the file system.
job = rx.recv().fuse() => {
let Ok(job) = job else { break };
if let Err(err) = self
.scan_dir(root_char_bag, next_entry_id.clone(), &job)
.await
{
log::error!("error scanning {:?}: {}", job.abs_path, err);
}
}
}
}
});
}
})
.await;
self.process_events(paths).await;
self.send_status_update(false, None);
}
self.snapshot.lock().scan_completed();
self.finished_initial_scan = true;
// Continue processing events until the worktree is dropped.
loop {
select_biased! {
// Process any path refresh requests from the worktree. Prioritize
// these before handling changes reported by the filesystem.
request = self.refresh_requests_rx.recv().fuse() => {
let Ok((paths, barrier)) = request else { break };
self.reload_entries_for_paths(paths, None).await;
if !self.send_status_update(false, Some(barrier)) {
break;
}
}
events = events_rx.next().fuse() => {
let Some(events) = events else { break };
let mut paths = events.into_iter().map(|e| e.path).collect::<Vec<_>>();
while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) {
paths.extend(more_events.into_iter().map(|e| e.path));
}
self.process_events(paths).await;
self.send_status_update(false, None);
}
}
}
}
async fn process_events(&mut self, paths: Vec<PathBuf>) {
use futures::FutureExt as _;
let (scan_job_tx, scan_job_rx) = channel::unbounded();
if let Some(mut paths) = self
.reload_entries_for_paths(paths, Some(scan_job_tx.clone()))
.await
{
paths.sort_unstable();
util::extend_sorted(&mut self.prev_state.lock().1, paths, usize::MAX, Ord::cmp);
}
drop(scan_job_tx);
self.scan_dirs(false, scan_job_rx).await;
let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
let snapshot = self.update_ignore_statuses(ignore_queue_tx);
self.executor
.scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
loop {
select_biased! {
// Process any path refresh requests before moving on to process
// the queue of ignore statuses.
request = self.refresh_requests_rx.recv().fuse() => {
let Ok((paths, barrier)) = request else { break };
self.reload_entries_for_paths(paths, None).await;
if !self.send_status_update(false, Some(barrier)) {
return;
}
}
// Recursively process directories whose ignores have changed.
job = ignore_queue_rx.recv().fuse() => {
let Ok(job) = job else { break };
self.update_ignore_status(job, &snapshot).await;
}
}
}
});
}
})
.await;
let mut snapshot = self.snapshot.lock();
let mut git_repositories = mem::take(&mut snapshot.git_repositories);
git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some());
snapshot.git_repositories = git_repositories;
snapshot.removed_entry_ids.clear();
snapshot.completed_scan_id = snapshot.scan_id;
}
async fn scan_dirs(
&self,
enable_progress_updates: bool,
scan_jobs_rx: channel::Receiver<ScanJob>,
) {
use futures::FutureExt as _;
if self
.notify
.unbounded_send(ScanState::Initialized {
snapshot: self.snapshot.lock().clone(),
})
.status_updates_tx
.unbounded_send(ScanState::Started)
.is_err()
{
return;
}
// Process any events that occurred while performing the initial scan. These
// events can't be reported as precisely, because there is no snapshot of the
// worktree before they occurred.
futures::pin_mut!(events_rx);
if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) {
while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
events.extend(additional_events);
}
let abs_paths = events.into_iter().map(|e| e.path).collect();
if self.notify.unbounded_send(ScanState::Updating).is_err() {
return;
}
if let Some(changes) = self.process_events(abs_paths, true).await {
if self
.notify
.unbounded_send(ScanState::Updated {
snapshot: self.snapshot.lock().clone(),
changes,
barrier: None,
})
.is_err()
{
return;
}
} else {
return;
}
}
let progress_update_count = AtomicUsize::new(0);
self.executor
.scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
let mut last_progress_update_count = 0;
let progress_update_timer = self.progress_timer(enable_progress_updates).fuse();
futures::pin_mut!(progress_update_timer);
// Continue processing events until the worktree is dropped.
loop {
let barrier;
let abs_paths;
select_biased! {
request = changed_paths.next().fuse() => {
let Some((paths, b)) = request else { break };
abs_paths = paths;
barrier = Some(b);
}
events = events_rx.next().fuse() => {
let Some(events) = events else { break };
abs_paths = events.into_iter().map(|e| e.path).collect();
barrier = None;
}
}
loop {
select_biased! {
// Process any path refresh requests before moving on to process
// the scan queue, so that user operations are prioritized.
request = self.refresh_requests_rx.recv().fuse() => {
let Ok((paths, barrier)) = request else { break };
self.reload_entries_for_paths(paths, None).await;
if !self.send_status_update(false, Some(barrier)) {
return;
}
}
if self.notify.unbounded_send(ScanState::Updating).is_err() {
return;
}
if let Some(changes) = self.process_events(abs_paths, false).await {
if self
.notify
.unbounded_send(ScanState::Updated {
snapshot: self.snapshot.lock().clone(),
changes,
barrier,
// Send periodic progress updates to the worktree. Use an atomic counter
// to ensure that only one of the workers sends a progress update after
// the update interval elapses.
_ = progress_update_timer => {
match progress_update_count.compare_exchange(
last_progress_update_count,
last_progress_update_count + 1,
SeqCst,
SeqCst
) {
Ok(_) => {
last_progress_update_count += 1;
self.send_status_update(true, None);
}
Err(count) => {
last_progress_update_count = count;
}
}
progress_update_timer.set(self.progress_timer(enable_progress_updates).fuse());
}
// Recursively load directories from the file system.
job = scan_jobs_rx.recv().fuse() => {
let Ok(job) = job else { break };
if let Err(err) = self.scan_dir(&job).await {
if job.path.as_ref() != Path::new("") {
log::error!("error scanning directory {:?}: {}", job.abs_path, err);
}
}
}
}
}
})
.is_err()
{
return;
}
} else {
return;
}
}
})
.await;
}
async fn pause_between_progress_updates(&self) {
#[cfg(any(test, feature = "test-support"))]
if self.fs.is_fake() {
return self.executor.simulate_random_delay().await;
}
smol::Timer::after(Duration::from_millis(100)).await;
fn send_status_update(&self, scanning: bool, barrier: Option<barrier::Sender>) -> bool {
let mut prev_state = self.prev_state.lock();
let snapshot = self.snapshot.lock().clone();
let mut old_snapshot = snapshot.snapshot.clone();
mem::swap(&mut old_snapshot, &mut prev_state.0);
let changed_paths = mem::take(&mut prev_state.1);
let changes = self.build_change_set(&old_snapshot, &snapshot.snapshot, changed_paths);
self.status_updates_tx
.unbounded_send(ScanState::Updated {
snapshot,
changes,
scanning,
barrier,
})
.is_ok()
}
async fn scan_dir(
&self,
root_char_bag: CharBag,
next_entry_id: Arc<AtomicUsize>,
job: &ScanJob,
) -> Result<()> {
async fn scan_dir(&self, job: &ScanJob) -> Result<()> {
let mut new_entries: Vec<Entry> = Vec::new();
let mut new_jobs: Vec<Option<ScanJob>> = Vec::new();
let mut ignore_stack = job.ignore_stack.clone();
let mut new_ignore = None;
let (root_abs_path, root_char_bag, next_entry_id) = {
let snapshot = self.snapshot.lock();
(
snapshot.abs_path().clone(),
snapshot.root_char_bag,
snapshot.next_entry_id.clone(),
)
};
let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
while let Some(child_abs_path) = child_paths.next().await {
let child_abs_path = match child_abs_path {
Ok(child_abs_path) => child_abs_path,
let child_abs_path: Arc<Path> = match child_abs_path {
Ok(child_abs_path) => child_abs_path.into(),
Err(error) => {
log::error!("error processing entry {:?}", error);
continue;
@ -2419,8 +2413,7 @@ impl BackgroundScanner {
match build_gitignore(&child_abs_path, self.fs.as_ref()).await {
Ok(ignore) => {
let ignore = Arc::new(ignore);
ignore_stack =
ignore_stack.append(job.abs_path.as_path().into(), ignore.clone());
ignore_stack = ignore_stack.append(job.abs_path.clone(), ignore.clone());
new_ignore = Some(ignore);
}
Err(error) => {
@ -2438,7 +2431,7 @@ impl BackgroundScanner {
// new jobs as well.
let mut new_jobs = new_jobs.iter_mut();
for entry in &mut new_entries {
let entry_abs_path = self.abs_path().join(&entry.path);
let entry_abs_path = root_abs_path.join(&entry.path);
entry.is_ignored =
ignore_stack.is_abs_path_ignored(&entry_abs_path, entry.is_dir());
@ -2507,69 +2500,18 @@ impl BackgroundScanner {
Ok(())
}
async fn process_events(
&self,
abs_paths: Vec<PathBuf>,
received_before_initialized: bool,
) -> Option<HashMap<Arc<Path>, PathChange>> {
let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
let prev_snapshot = {
let mut snapshot = self.snapshot.lock();
snapshot.scan_started();
snapshot.clone()
};
let event_paths = self
.update_entries_for_paths(abs_paths, Some(scan_queue_tx))
.await?;
// Scan any directories that were created as part of this event batch.
self.executor
.scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
while let Ok(job) = scan_queue_rx.recv().await {
if let Err(err) = self
.scan_dir(
prev_snapshot.root_char_bag,
prev_snapshot.next_entry_id.clone(),
&job,
)
.await
{
log::error!("error scanning {:?}: {}", job.abs_path, err);
}
}
});
}
})
.await;
// Attempt to detect renames only over a single batch of file-system events.
self.snapshot.lock().removed_entry_ids.clear();
self.update_ignore_statuses().await;
self.update_git_repositories();
let changes = self.build_change_set(
prev_snapshot.snapshot,
event_paths,
received_before_initialized,
);
self.snapshot.lock().scan_completed();
Some(changes)
}
async fn update_entries_for_paths(
async fn reload_entries_for_paths(
&self,
mut abs_paths: Vec<PathBuf>,
scan_queue_tx: Option<Sender<ScanJob>>,
) -> Option<Vec<Arc<Path>>> {
let doing_recursive_update = scan_queue_tx.is_some();
abs_paths.sort_unstable();
abs_paths.dedup_by(|a, b| a.starts_with(&b));
let root_abs_path = self.snapshot.lock().abs_path.clone();
let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.ok()?;
let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.log_err()?;
let metadata = futures::future::join_all(
abs_paths
.iter()
@ -2579,29 +2521,35 @@ impl BackgroundScanner {
.await;
let mut snapshot = self.snapshot.lock();
if scan_queue_tx.is_some() {
for abs_path in &abs_paths {
if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) {
snapshot.remove_path(path);
}
if snapshot.completed_scan_id == snapshot.scan_id {
snapshot.scan_id += 1;
if !doing_recursive_update {
snapshot.completed_scan_id = snapshot.scan_id;
}
}
let mut event_paths = Vec::with_capacity(abs_paths.len());
for (abs_path, metadata) in abs_paths.into_iter().zip(metadata.into_iter()) {
let path: Arc<Path> = match abs_path.strip_prefix(&root_canonical_path) {
Ok(path) => Arc::from(path.to_path_buf()),
Err(_) => {
log::error!(
"unexpected event {:?} for root path {:?}",
abs_path,
root_canonical_path
);
continue;
// Remove any entries for paths that no longer exist or are being recursively
// refreshed. Do this before adding any new entries, so that renames can be
// detected regardless of the order of the paths.
let mut event_paths = Vec::<Arc<Path>>::with_capacity(abs_paths.len());
for (abs_path, metadata) in abs_paths.iter().zip(metadata.iter()) {
if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) {
if matches!(metadata, Ok(None)) || doing_recursive_update {
snapshot.remove_path(path);
}
};
event_paths.push(path.clone());
let abs_path = root_abs_path.join(&path);
event_paths.push(path.into());
} else {
log::error!(
"unexpected event {:?} for root path {:?}",
abs_path,
root_canonical_path
);
}
}
for (path, metadata) in event_paths.iter().cloned().zip(metadata.into_iter()) {
let abs_path: Arc<Path> = root_abs_path.join(&path).into();
match metadata {
Ok(Some(metadata)) => {
@ -2626,15 +2574,14 @@ impl BackgroundScanner {
let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path);
if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) {
ancestor_inodes.insert(metadata.inode);
self.executor
.block(scan_queue_tx.send(ScanJob {
abs_path,
path,
ignore_stack,
ancestor_inodes,
scan_queue: scan_queue_tx.clone(),
}))
.unwrap();
smol::block_on(scan_queue_tx.send(ScanJob {
abs_path,
path,
ignore_stack,
ancestor_inodes,
scan_queue: scan_queue_tx.clone(),
}))
.unwrap();
}
}
}
@ -2649,7 +2596,10 @@ impl BackgroundScanner {
Some(event_paths)
}
async fn update_ignore_statuses(&self) {
fn update_ignore_statuses(
&self,
ignore_queue_tx: Sender<UpdateIgnoreStatusJob>,
) -> LocalSnapshot {
let mut snapshot = self.snapshot.lock().clone();
let mut ignores_to_update = Vec::new();
let mut ignores_to_delete = Vec::new();
@ -2674,7 +2624,6 @@ impl BackgroundScanner {
.remove(&parent_abs_path);
}
let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
ignores_to_update.sort_unstable();
let mut ignores_to_update = ignores_to_update.into_iter().peekable();
while let Some(parent_abs_path) = ignores_to_update.next() {
@ -2686,35 +2635,15 @@ impl BackgroundScanner {
}
let ignore_stack = snapshot.ignore_stack_for_abs_path(&parent_abs_path, true);
ignore_queue_tx
.send(UpdateIgnoreStatusJob {
abs_path: parent_abs_path,
ignore_stack,
ignore_queue: ignore_queue_tx.clone(),
})
.await
.unwrap();
smol::block_on(ignore_queue_tx.send(UpdateIgnoreStatusJob {
abs_path: parent_abs_path,
ignore_stack,
ignore_queue: ignore_queue_tx.clone(),
}))
.unwrap();
}
drop(ignore_queue_tx);
self.executor
.scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
while let Ok(job) = ignore_queue_rx.recv().await {
self.update_ignore_status(job, &snapshot).await;
}
});
}
})
.await;
}
fn update_git_repositories(&self) {
let mut snapshot = self.snapshot.lock();
let mut git_repositories = mem::take(&mut snapshot.git_repositories);
git_repositories.retain(|repo| snapshot.entry_for_path(&repo.git_dir_path).is_some());
snapshot.git_repositories = git_repositories;
snapshot
}
async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) {
@ -2728,7 +2657,7 @@ impl BackgroundScanner {
let path = job.abs_path.strip_prefix(&snapshot.abs_path).unwrap();
for mut entry in snapshot.child_entries(path).cloned() {
let was_ignored = entry.is_ignored;
let abs_path = self.abs_path().join(&entry.path);
let abs_path = snapshot.abs_path().join(&entry.path);
entry.is_ignored = ignore_stack.is_abs_path_ignored(&abs_path, entry.is_dir());
if entry.is_dir() {
let child_ignore_stack = if entry.is_ignored {
@ -2762,16 +2691,16 @@ impl BackgroundScanner {
fn build_change_set(
&self,
old_snapshot: Snapshot,
old_snapshot: &Snapshot,
new_snapshot: &Snapshot,
event_paths: Vec<Arc<Path>>,
received_before_initialized: bool,
) -> HashMap<Arc<Path>, PathChange> {
use PathChange::{Added, AddedOrUpdated, Removed, Updated};
let new_snapshot = self.snapshot.lock();
let mut changes = HashMap::default();
let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>();
let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>();
let received_before_initialized = !self.finished_initial_scan;
for path in event_paths {
let path = PathKey(path);
@ -2799,9 +2728,9 @@ impl BackgroundScanner {
// If the worktree was not fully initialized when this event was generated,
// we can't know whether this entry was added during the scan or whether
// it was merely updated.
changes.insert(old_entry.path.clone(), AddedOrUpdated);
changes.insert(new_entry.path.clone(), AddedOrUpdated);
} else if old_entry.mtime != new_entry.mtime {
changes.insert(old_entry.path.clone(), Updated);
changes.insert(new_entry.path.clone(), Updated);
}
old_paths.next(&());
new_paths.next(&());
@ -2826,6 +2755,19 @@ impl BackgroundScanner {
}
changes
}
async fn progress_timer(&self, running: bool) {
if !running {
return futures::future::pending().await;
}
#[cfg(any(test, feature = "test-support"))]
if self.fs.is_fake() {
return self.executor.simulate_random_delay().await;
}
smol::Timer::after(Duration::from_millis(100)).await;
}
}
fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
@ -2839,7 +2781,7 @@ fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
}
struct ScanJob {
abs_path: PathBuf,
abs_path: Arc<Path>,
path: Arc<Path>,
ignore_stack: Arc<IgnoreStack>,
scan_queue: Sender<ScanJob>,
@ -3524,7 +3466,7 @@ mod tests {
let fs = FakeFs::new(cx.background());
fs.insert_tree(
"/a",
"/root",
json!({
"b": {},
"c": {},
@ -3535,7 +3477,7 @@ mod tests {
let tree = Worktree::local(
client,
"/a".as_ref(),
"/root".as_ref(),
true,
fs,
Default::default(),
@ -3555,6 +3497,7 @@ mod tests {
assert!(entry.is_dir());
cx.foreground().run_until_parked();
tree.read_with(cx, |tree, _| {
assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir);
});

View File

@ -154,6 +154,12 @@ impl<K> TreeSet<K>
where
K: Clone + Debug + Default + Ord,
{
pub fn from_ordered_entries(entries: impl IntoIterator<Item = K>) -> Self {
Self(TreeMap::from_ordered_entries(
entries.into_iter().map(|key| (key, ())),
))
}
pub fn insert(&mut self, key: K) {
self.0.insert(key, ());
}