From b10b0dbd757dd2537c65867786808d96d6597d64 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 21 Mar 2023 11:21:58 -0700 Subject: [PATCH] Only mutate background snapshot in the background scanner --- crates/project/src/worktree.rs | 284 +++++++++++++++------------------ 1 file changed, 131 insertions(+), 153 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index afce1b5abe..656268a02c 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -9,7 +9,7 @@ use fs::LineEnding; use fs::{repository::GitRepository, Fs}; use futures::{ channel::{ - mpsc::{self, UnboundedSender}, + mpsc::{self, UnboundedReceiver, UnboundedSender}, oneshot, }, Stream, StreamExt, @@ -29,6 +29,7 @@ use language::{ Buffer, DiagnosticEntry, PointUtf16, Rope, RopeFingerprint, Unclipped, }; use parking_lot::Mutex; +use postage::barrier; use postage::{ prelude::{Sink as _, Stream as _}, watch, @@ -55,7 +56,6 @@ use util::{ResultExt, TryFutureExt}; #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)] pub struct WorktreeId(usize); -#[allow(clippy::large_enum_variant)] pub enum Worktree { Local(LocalWorktree), Remote(RemoteWorktree), @@ -63,7 +63,7 @@ pub enum Worktree { pub struct LocalWorktree { snapshot: LocalSnapshot, - background_snapshot: Arc>, + path_changes_tx: mpsc::UnboundedSender<(Vec, barrier::Sender)>, is_scanning: (watch::Sender, watch::Receiver), _background_scanner_task: Task<()>, share: Option, @@ -156,14 +156,17 @@ impl DerefMut for LocalSnapshot { } } -#[derive(Clone, Debug)] enum ScanState { /// The worktree is performing its initial scan of the filesystem. Initializing(LocalSnapshot), Initialized(LocalSnapshot), /// The worktree is updating in response to filesystem events. Updating, - Updated(LocalSnapshot, HashMap, PathChange>), + Updated { + snapshot: LocalSnapshot, + changes: HashMap, PathChange>, + barrier: Option, + }, Err(Arc), } @@ -234,8 +237,8 @@ impl Worktree { ); } + let (path_changes_tx, path_changes_rx) = mpsc::unbounded(); let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); - let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); cx.spawn_weak(|this, mut cx| async move { while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) { @@ -250,21 +253,21 @@ impl Worktree { let background_scanner_task = cx.background().spawn({ let fs = fs.clone(); - let background_snapshot = background_snapshot.clone(); + let snapshot = snapshot.clone(); let background = cx.background().clone(); async move { let events = fs.watch(&abs_path, Duration::from_millis(100)).await; - BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background) - .run(events) + BackgroundScanner::new(snapshot, scan_states_tx, fs, background) + .run(events, path_changes_rx) .await; } }); Worktree::Local(LocalWorktree { snapshot, - background_snapshot, is_scanning: watch::channel_with(true), share: None, + path_changes_tx, _background_scanner_task: background_scanner_task, diagnostics: Default::default(), diagnostic_summaries: Default::default(), @@ -546,10 +549,15 @@ impl LocalWorktree { ScanState::Updating => { *self.is_scanning.0.borrow_mut() = true; } - ScanState::Updated(new_snapshot, changes) => { + ScanState::Updated { + snapshot: new_snapshot, + changes, + barrier, + } => { *self.is_scanning.0.borrow_mut() = false; cx.emit(Event::UpdatedEntries(changes)); self.set_snapshot(new_snapshot, cx); + drop(barrier); } ScanState::Err(error) => { *self.is_scanning.0.borrow_mut() = false; @@ -660,9 +668,7 @@ impl LocalWorktree { // Eagerly populate the snapshot with an updated entry for the loaded file let entry = this .update(&mut cx, |this, cx| { - this.as_local() - .unwrap() - .refresh_entry(path, abs_path, None, cx) + this.as_local().unwrap().refresh_entry(path, None, cx) }) .await?; @@ -780,10 +786,13 @@ impl LocalWorktree { cx: &mut ModelContext, ) -> Option>> { let entry = self.entry_for_id(entry_id)?.clone(); - let abs_path = self.absolutize(&entry.path); + let path = entry.path.clone(); + let abs_path = self.absolutize(&path); + let (tx, mut rx) = barrier::channel(); + let delete = cx.background().spawn({ + let abs_path = abs_path.clone(); let fs = self.fs.clone(); - let abs_path = abs_path; async move { if entry.is_file() { fs.remove_file(&abs_path, Default::default()).await @@ -802,17 +811,14 @@ impl LocalWorktree { Some(cx.spawn(|this, mut cx| async move { delete.await?; - this.update(&mut cx, |this, cx| { - let this = this.as_local_mut().unwrap(); - - this.background_snapshot.lock().delete_entry(entry_id); - - if let Some(path) = this.snapshot.delete_entry(entry_id) { - cx.emit(Event::UpdatedEntries( - [(path, PathChange::Removed)].into_iter().collect(), - )); - } + this.update(&mut cx, |this, _| { + this.as_local_mut() + .unwrap() + .path_changes_tx + .unbounded_send((vec![abs_path], tx)) + .unwrap(); }); + rx.recv().await; Ok(()) })) } @@ -826,29 +832,21 @@ impl LocalWorktree { let old_path = self.entry_for_id(entry_id)?.path.clone(); let new_path = new_path.into(); let abs_old_path = self.absolutize(&old_path); - let abs_new_path = self.absolutize(new_path.as_ref()); - let rename = cx.background().spawn({ - let fs = self.fs.clone(); - let abs_new_path = abs_new_path.clone(); - async move { - fs.rename(&abs_old_path, &abs_new_path, Default::default()) - .await - } + let abs_new_path = self.absolutize(&new_path); + let fs = self.fs.clone(); + let rename = cx.background().spawn(async move { + fs.rename(&abs_old_path, &abs_new_path, Default::default()) + .await }); Some(cx.spawn(|this, mut cx| async move { rename.await?; - let entry = this - .update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().refresh_entry( - new_path.clone(), - abs_new_path, - Some(old_path), - cx, - ) - }) - .await?; - Ok(entry) + this.update(&mut cx, |this, cx| { + this.as_local_mut() + .unwrap() + .refresh_entry(new_path.clone(), Some(old_path), cx) + }) + .await })) } @@ -862,33 +860,25 @@ impl LocalWorktree { let new_path = new_path.into(); let abs_old_path = self.absolutize(&old_path); let abs_new_path = self.absolutize(&new_path); - let copy = cx.background().spawn({ - let fs = self.fs.clone(); - let abs_new_path = abs_new_path.clone(); - async move { - copy_recursive( - fs.as_ref(), - &abs_old_path, - &abs_new_path, - Default::default(), - ) - .await - } + let fs = self.fs.clone(); + let copy = cx.background().spawn(async move { + copy_recursive( + fs.as_ref(), + &abs_old_path, + &abs_new_path, + Default::default(), + ) + .await }); Some(cx.spawn(|this, mut cx| async move { copy.await?; - let entry = this - .update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().refresh_entry( - new_path.clone(), - abs_new_path, - None, - cx, - ) - }) - .await?; - Ok(entry) + this.update(&mut cx, |this, cx| { + this.as_local_mut() + .unwrap() + .refresh_entry(new_path.clone(), None, cx) + }) + .await })) } @@ -900,90 +890,51 @@ impl LocalWorktree { ) -> Task> { let path = path.into(); let abs_path = self.absolutize(&path); - let write = cx.background().spawn({ - let fs = self.fs.clone(); - let abs_path = abs_path.clone(); - async move { - if let Some((text, line_ending)) = text_if_file { - fs.save(&abs_path, &text, line_ending).await - } else { - fs.create_dir(&abs_path).await - } + let fs = self.fs.clone(); + let write = cx.background().spawn(async move { + if let Some((text, line_ending)) = text_if_file { + fs.save(&abs_path, &text, line_ending).await + } else { + fs.create_dir(&abs_path).await } }); cx.spawn(|this, mut cx| async move { write.await?; - let entry = this - .update(&mut cx, |this, cx| { - this.as_local_mut() - .unwrap() - .refresh_entry(path, abs_path, None, cx) - }) - .await?; - Ok(entry) + this.update(&mut cx, |this, cx| { + this.as_local_mut().unwrap().refresh_entry(path, None, cx) + }) + .await }) } fn refresh_entry( &self, path: Arc, - abs_path: PathBuf, old_path: Option>, cx: &mut ModelContext, ) -> Task> { let fs = self.fs.clone(); - let root_char_bag = self.snapshot.root_char_bag; - let next_entry_id = self.snapshot.next_entry_id.clone(); - cx.spawn_weak(|this, mut cx| async move { - let metadata = fs - .metadata(&abs_path) - .await? - .ok_or_else(|| anyhow!("could not read saved file metadata"))?; - let this = this - .upgrade(&cx) - .ok_or_else(|| anyhow!("worktree was dropped"))?; - this.update(&mut cx, |this, cx| { - let this = this.as_local_mut().unwrap(); - let mut entry = Entry::new(path, &metadata, &next_entry_id, root_char_bag); - entry.is_ignored = this - .snapshot - .ignore_stack_for_abs_path(&abs_path, entry.is_dir()) - .is_abs_path_ignored(&abs_path, entry.is_dir()); + let abs_path = self.abs_path.clone(); + let path_changes_tx = self.path_changes_tx.clone(); + cx.spawn_weak(move |this, mut cx| async move { + let abs_path = fs.canonicalize(&abs_path).await?; + let paths = if let Some(old_path) = old_path { + vec![abs_path.join(&path), abs_path.join(&old_path)] + } else { + vec![abs_path.join(&path)] + }; - { - let mut snapshot = this.background_snapshot.lock(); - snapshot.scan_started(); - if let Some(old_path) = &old_path { - snapshot.remove_path(old_path); - } - snapshot.insert_entry(entry.clone(), fs.as_ref()); - snapshot.scan_completed(); - } - - let mut changes = HashMap::default(); - - this.snapshot.scan_started(); - if let Some(old_path) = &old_path { - this.snapshot.remove_path(old_path); - changes.insert(old_path.clone(), PathChange::Removed); - } - let exists = this.snapshot.entry_for_path(&entry.path).is_some(); - let inserted_entry = this.snapshot.insert_entry(entry, fs.as_ref()); - changes.insert( - inserted_entry.path.clone(), - if exists { - PathChange::Updated - } else { - PathChange::Added - }, - ); - this.snapshot.scan_completed(); - - eprintln!("refreshed {:?}", changes); - cx.emit(Event::UpdatedEntries(changes)); - Ok(inserted_entry) - }) + let (tx, mut rx) = barrier::channel(); + path_changes_tx.unbounded_send((paths, tx)).unwrap(); + rx.recv().await; + this.upgrade(&cx) + .ok_or_else(|| anyhow!("worktree was dropped"))? + .update(&mut cx, |this, _| { + this.entry_for_path(path) + .cloned() + .ok_or_else(|| anyhow!("failed to read path after update")) + }) }) } @@ -2188,14 +2139,14 @@ struct BackgroundScanner { impl BackgroundScanner { fn new( - snapshot: Arc>, + snapshot: LocalSnapshot, notify: UnboundedSender, fs: Arc, executor: Arc, ) -> Self { Self { fs, - snapshot, + snapshot: Arc::new(Mutex::new(snapshot)), notify, executor, changes: Default::default(), @@ -2206,7 +2157,13 @@ impl BackgroundScanner { self.snapshot.lock().abs_path.clone() } - async fn run(mut self, events_rx: impl Stream>) { + async fn run( + mut self, + events_rx: impl Stream>, + mut changed_paths: UnboundedReceiver<(Vec, barrier::Sender)>, + ) { + use futures::{select_biased, FutureExt as _}; + // While performing the initial scan, send a new snapshot to the main // thread on a recurring interval. let initializing_task = self.executor.spawn({ @@ -2260,7 +2217,7 @@ impl BackgroundScanner { // Process any events that occurred while performing the initial scan. These // events can't be reported as precisely, because there is no snapshot of the // worktree before they occurred. - if let Some(mut events) = events_rx.next().await { + if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) { while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) { events.extend(additional_events); } @@ -2272,10 +2229,11 @@ impl BackgroundScanner { } if self .notify - .unbounded_send(ScanState::Updated( - self.snapshot.lock().clone(), - mem::take(&mut self.changes), - )) + .unbounded_send(ScanState::Updated { + snapshot: self.snapshot.lock().clone(), + changes: mem::take(&mut self.changes), + barrier: None, + }) .is_err() { return; @@ -2283,10 +2241,29 @@ impl BackgroundScanner { } // Continue processing events until the worktree is dropped. - while let Some(mut events) = events_rx.next().await { - while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) { - events.extend(additional_events); + loop { + let events; + let barrier; + select_biased! { + request = changed_paths.next().fuse() => { + let Some((paths, b)) = request else { break; }; + events = paths + .into_iter() + .map(|path| fsevent::Event { + path, + event_id: 0, + flags: fsevent::StreamFlags::NONE + }) + .collect::>(); + barrier = Some(b); + } + e = events_rx.next().fuse() => { + let Some(e) = e else { break; }; + events = e; + barrier = None; + } } + if self.notify.unbounded_send(ScanState::Updating).is_err() { return; } @@ -2295,10 +2272,11 @@ impl BackgroundScanner { } if self .notify - .unbounded_send(ScanState::Updated( - self.snapshot.lock().clone(), - mem::take(&mut self.changes), - )) + .unbounded_send(ScanState::Updated { + snapshot: self.snapshot.lock().clone(), + changes: mem::take(&mut self.changes), + barrier, + }) .is_err() { return; @@ -3132,7 +3110,7 @@ mod tests { let tree = Worktree::local( client, - Arc::from(Path::new("/root")), + Path::new("/root"), true, fs, Default::default(), @@ -3193,7 +3171,7 @@ mod tests { let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); let tree = Worktree::local( client, - Arc::from(Path::new("/root")), + Path::new("/root"), true, fs.clone(), Default::default(),