From edd613062a8b784135dda809febbb30223cf01bc Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Mon, 3 Jun 2024 22:17:10 -0600 Subject: [PATCH] linux watcher (#12615) fixes https://github.com/zed-industries/zed/issues/12297 fixes https://github.com/zed-industries/zed/issues/11345 Release Notes: - N/A --------- Co-authored-by: Max --- crates/assistant/src/assistant_panel.rs | 2 +- crates/extension/src/extension_store.rs | 2 +- crates/fs/src/fs.rs | 221 ++++++++++++++++-------- crates/settings/src/settings_file.rs | 2 +- crates/worktree/src/worktree.rs | 78 ++++----- crates/zed/src/main.rs | 6 +- 6 files changed, 186 insertions(+), 125 deletions(-) diff --git a/crates/assistant/src/assistant_panel.rs b/crates/assistant/src/assistant_panel.rs index 70165a7d63..b73d2a89c6 100644 --- a/crates/assistant/src/assistant_panel.rs +++ b/crates/assistant/src/assistant_panel.rs @@ -155,7 +155,7 @@ impl AssistantPanel { cx.new_view::(|cx| { const CONVERSATION_WATCH_DURATION: Duration = Duration::from_millis(100); let _watch_saved_conversations = cx.spawn(move |this, mut cx| async move { - let mut events = fs + let (mut events, _) = fs .watch(&CONVERSATIONS_DIR, CONVERSATION_WATCH_DURATION) .await; while events.next().await.is_some() { diff --git a/crates/extension/src/extension_store.rs b/crates/extension/src/extension_store.rs index b33e0addaa..a4aa6bfafd 100644 --- a/crates/extension/src/extension_store.rs +++ b/crates/extension/src/extension_store.rs @@ -351,7 +351,7 @@ impl ExtensionStore { let reload_tx = this.reload_tx.clone(); let installed_dir = this.installed_dir.clone(); async move { - let mut paths = fs.watch(&installed_dir, FS_WATCH_LATENCY).await; + let (mut paths, _) = fs.watch(&installed_dir, FS_WATCH_LATENCY).await; while let Some(paths) = paths.next().await { for path in paths { let Ok(event_path) = path.strip_prefix(&installed_dir) else { diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index 7bfefd695d..6e4cfcaa74 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -36,6 +36,11 @@ use smol::io::AsyncReadExt; #[cfg(any(test, feature = "test-support"))] use std::ffi::OsStr; +pub trait Watcher: Send + Sync { + fn add(&self, path: &Path) -> Result<()>; + fn remove(&self, path: &Path) -> Result<()>; +} + #[async_trait::async_trait] pub trait Fs: Send + Sync { async fn create_dir(&self, path: &Path) -> Result<()>; @@ -79,7 +84,10 @@ pub trait Fs: Send + Sync { &self, path: &Path, latency: Duration, - ) -> Pin>>>; + ) -> ( + Pin>>>, + Arc, + ); fn open_repo(&self, abs_dot_git: &Path) -> Option>; fn is_fake(&self) -> bool; @@ -126,6 +134,13 @@ pub struct RealFs { git_binary_path: Option, } +pub struct RealWatcher { + #[cfg(target_os = "linux")] + root_path: PathBuf, + #[cfg(target_os = "linux")] + fs_watcher: parking_lot::Mutex, +} + impl RealFs { pub fn new( git_hosting_provider_registry: Arc, @@ -409,7 +424,10 @@ impl Fs for RealFs { &self, path: &Path, latency: Duration, - ) -> Pin>>> { + ) -> ( + Pin>>>, + Arc, + ) { use fsevent::EventStream; let (tx, rx) = smol::channel::unbounded(); @@ -421,22 +439,76 @@ impl Fs for RealFs { }); }); - Box::pin(rx.chain(futures::stream::once(async move { - drop(handle); - vec![] - }))) + ( + Box::pin(rx.chain(futures::stream::once(async move { + drop(handle); + vec![] + }))), + Arc::new(RealWatcher {}), + ) } - #[cfg(not(target_os = "macos"))] + #[cfg(target_os = "linux")] async fn watch( &self, path: &Path, _latency: Duration, - ) -> Pin>>> { - use notify::{event::EventKind, event::ModifyKind, Watcher}; - // todo(linux): This spawns two threads, while the macOS impl - // only spawns one. Can we use a OnceLock or some such to make - // this better + ) -> ( + Pin>>>, + Arc, + ) { + let (tx, rx) = smol::channel::unbounded(); + + let file_watcher = notify::recommended_watcher({ + let tx = tx.clone(); + move |event: Result| { + if let Some(event) = event.log_err() { + tx.try_send(event.paths).ok(); + } + } + }) + .expect("Could not start file watcher"); + + let watcher = Arc::new(RealWatcher { + root_path: path.to_path_buf(), + fs_watcher: parking_lot::Mutex::new(file_watcher), + }); + + watcher.add(path).ok(); // Ignore "file doesn't exist error" and rely on parent watcher. + + // watch the parent dir so we can tell when settings.json is created + if let Some(parent) = path.parent() { + watcher.add(parent).log_err(); + } + + ( + Box::pin(rx.filter_map({ + let watcher = watcher.clone(); + move |mut paths| { + paths.retain(|path| path.starts_with(&watcher.root_path)); + async move { + if paths.is_empty() { + None + } else { + Some(paths) + } + } + } + })), + watcher, + ) + } + + #[cfg(target_os = "windows")] + async fn watch( + &self, + path: &Path, + _latency: Duration, + ) -> ( + Pin>>>, + Arc, + ) { + use notify::Watcher; let (tx, rx) = smol::channel::unbounded(); @@ -452,56 +524,15 @@ impl Fs for RealFs { file_watcher .watch(path, notify::RecursiveMode::Recursive) - .ok(); // It's ok if this fails, the parent watcher will add it. + .log_err(); - let mut parent_watcher = notify::recommended_watcher({ - let watched_path = path.to_path_buf(); - let tx = tx.clone(); - move |event: Result| { - if let Some(event) = event.ok() { - if event.paths.into_iter().any(|path| *path == watched_path) { - match event.kind { - EventKind::Modify(ev) => { - if matches!(ev, ModifyKind::Name(_)) { - file_watcher - .watch( - watched_path.as_path(), - notify::RecursiveMode::Recursive, - ) - .log_err(); - let _ = tx.try_send(vec![watched_path.clone()]).ok(); - } - } - EventKind::Create(_) => { - file_watcher - .watch(watched_path.as_path(), notify::RecursiveMode::Recursive) - .log_err(); - let _ = tx.try_send(vec![watched_path.clone()]).ok(); - } - EventKind::Remove(_) => { - file_watcher.unwatch(&watched_path).log_err(); - let _ = tx.try_send(vec![watched_path.clone()]).ok(); - } - _ => {} - } - } - } - } - }) - .expect("Could not start file watcher"); - - parent_watcher - .watch( - path.parent() - .expect("Watching root is probably not what you want"), - notify::RecursiveMode::NonRecursive, - ) - .expect("Could not start watcher on parent directory"); - - Box::pin(rx.chain(futures::stream::once(async move { - drop(parent_watcher); - vec![] - }))) + ( + Box::pin(rx.chain(futures::stream::once(async move { + drop(file_watcher); + vec![] + }))), + Arc::new(RealWatcher {}), + ) } fn open_repo(&self, dotgit_path: &Path) -> Option> { @@ -560,6 +591,36 @@ impl Fs for RealFs { } } +#[cfg(not(target_os = "linux"))] +impl Watcher for RealWatcher { + fn add(&self, _: &Path) -> Result<()> { + Ok(()) + } + + fn remove(&self, _: &Path) -> Result<()> { + Ok(()) + } +} + +#[cfg(target_os = "linux")] +impl Watcher for RealWatcher { + fn add(&self, path: &Path) -> Result<()> { + use notify::Watcher; + + self.fs_watcher + .lock() + .watch(path, notify::RecursiveMode::NonRecursive)?; + Ok(()) + } + + fn remove(&self, path: &Path) -> Result<()> { + use notify::Watcher; + + self.fs_watcher.lock().unwatch(path)?; + Ok(()) + } +} + #[cfg(any(test, feature = "test-support"))] pub struct FakeFs { // Use an unfair lock to ensure tests are deterministic. @@ -1073,6 +1134,20 @@ impl FakeFsEntry { } } +#[cfg(any(test, feature = "test-support"))] +struct FakeWatcher {} + +#[cfg(any(test, feature = "test-support"))] +impl Watcher for FakeWatcher { + fn add(&self, _: &Path) -> Result<()> { + Ok(()) + } + + fn remove(&self, _: &Path) -> Result<()> { + Ok(()) + } +} + #[cfg(any(test, feature = "test-support"))] #[async_trait::async_trait] impl Fs for FakeFs { @@ -1468,20 +1543,26 @@ impl Fs for FakeFs { &self, path: &Path, _: Duration, - ) -> Pin>>> { + ) -> ( + Pin>>>, + Arc, + ) { self.simulate_random_delay().await; let (tx, rx) = smol::channel::unbounded(); self.state.lock().event_txs.push(tx); let path = path.to_path_buf(); let executor = self.executor.clone(); - Box::pin(futures::StreamExt::filter(rx, move |events| { - let result = events.iter().any(|evt_path| evt_path.starts_with(&path)); - let executor = executor.clone(); - async move { - executor.simulate_random_delay().await; - result - } - })) + ( + Box::pin(futures::StreamExt::filter(rx, move |events| { + let result = events.iter().any(|evt_path| evt_path.starts_with(&path)); + let executor = executor.clone(); + async move { + executor.simulate_random_delay().await; + result + } + })), + Arc::new(FakeWatcher {}), + ) } fn open_repo(&self, abs_dot_git: &Path) -> Option> { diff --git a/crates/settings/src/settings_file.rs b/crates/settings/src/settings_file.rs index 6af02bbec6..d00a87f22a 100644 --- a/crates/settings/src/settings_file.rs +++ b/crates/settings/src/settings_file.rs @@ -38,7 +38,7 @@ pub fn watch_config_file( let (tx, rx) = mpsc::unbounded(); executor .spawn(async move { - let events = fs.watch(&path, Duration::from_millis(100)).await; + let (events, _) = fs.watch(&path, Duration::from_millis(100)).await; futures::pin_mut!(events); let contents = fs.load(&path).await.unwrap_or_default(); diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index ef93a7c7b4..6dca9aa898 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -8,15 +8,15 @@ use anyhow::{anyhow, Context as _, Result}; use client::{proto, Client}; use clock::ReplicaId; use collections::{HashMap, HashSet, VecDeque}; -use fs::Fs; use fs::{copy_recursive, RemoveOptions}; -use futures::stream::select; +use fs::{Fs, Watcher}; use futures::{ channel::{ mpsc::{self, UnboundedSender}, oneshot, }, select_biased, + stream::select, task::Poll, FutureExt as _, Stream, StreamExt, }; @@ -700,32 +700,42 @@ fn start_background_scan_tasks( let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); let background_scanner = cx.background_executor().spawn({ let abs_path = if cfg!(target_os = "windows") { - abs_path.canonicalize().unwrap_or_else(|_| abs_path.to_path_buf()) + abs_path + .canonicalize() + .unwrap_or_else(|_| abs_path.to_path_buf()) } else { abs_path.to_path_buf() }; let background = cx.background_executor().clone(); async move { - let events = fs.watch(&abs_path, FS_WATCH_LATENCY).await; + let (events, watcher) = fs.watch(&abs_path, FS_WATCH_LATENCY).await; let case_sensitive = fs.is_case_sensitive().await.unwrap_or_else(|e| { - log::error!( - "Failed to determine whether filesystem is case sensitive (falling back to true) due to error: {e:#}" - ); + log::error!("Failed to determine whether filesystem is case sensitive: {e:#}"); true }); - BackgroundScanner::new( - snapshot, - next_entry_id, + let mut scanner = BackgroundScanner { fs, - case_sensitive, - scan_states_tx, - background, + fs_case_sensitive: case_sensitive, + status_updates_tx: scan_states_tx, + executor: background, scan_requests_rx, path_prefixes_to_scan_rx, - ) - .run(events) - .await; + next_entry_id, + state: Mutex::new(BackgroundScannerState { + prev_snapshot: snapshot.snapshot.clone(), + snapshot, + scanned_dirs: Default::default(), + path_prefixes_to_scan: Default::default(), + paths_to_scan: Default::default(), + removed_entry_ids: Default::default(), + changed_paths: Default::default(), + }), + phase: BackgroundScannerPhase::InitialScan, + watcher, + }; + + scanner.run(events).await; } }); let scan_state_updater = cx.spawn(|this, mut cx| async move { @@ -3327,6 +3337,7 @@ struct BackgroundScanner { path_prefixes_to_scan_rx: channel::Receiver>, next_entry_id: Arc, phase: BackgroundScannerPhase, + watcher: Arc, } #[derive(PartialEq)] @@ -3337,38 +3348,6 @@ enum BackgroundScannerPhase { } impl BackgroundScanner { - #[allow(clippy::too_many_arguments)] - fn new( - snapshot: LocalSnapshot, - next_entry_id: Arc, - fs: Arc, - fs_case_sensitive: bool, - status_updates_tx: UnboundedSender, - executor: BackgroundExecutor, - scan_requests_rx: channel::Receiver, - path_prefixes_to_scan_rx: channel::Receiver>, - ) -> Self { - Self { - fs, - fs_case_sensitive, - status_updates_tx, - executor, - scan_requests_rx, - path_prefixes_to_scan_rx, - next_entry_id, - state: Mutex::new(BackgroundScannerState { - prev_snapshot: snapshot.snapshot.clone(), - snapshot, - scanned_dirs: Default::default(), - path_prefixes_to_scan: Default::default(), - paths_to_scan: Default::default(), - removed_entry_ids: Default::default(), - changed_paths: Default::default(), - }), - phase: BackgroundScannerPhase::InitialScan, - } - } - async fn run(&mut self, mut fs_events_rx: Pin>>>) { use futures::FutureExt as _; @@ -3396,7 +3375,7 @@ impl BackgroundScanner { if let Some(ancestor_dot_git) = self.fs.canonicalize(&ancestor_dot_git).await.log_err() { - let ancestor_git_events = + let (ancestor_git_events, _) = self.fs.watch(&ancestor_dot_git, FS_WATCH_LATENCY).await; fs_events_rx = select(fs_events_rx, ancestor_git_events).boxed(); @@ -3987,6 +3966,7 @@ impl BackgroundScanner { } state.populate_dir(&job.path, new_entries, new_ignore); + self.watcher.add(job.abs_path.as_ref()).log_err(); for new_job in new_jobs.into_iter().flatten() { job.scan_queue diff --git a/crates/zed/src/main.rs b/crates/zed/src/main.rs index fe93df3380..df2aa9b5cf 100644 --- a/crates/zed/src/main.rs +++ b/crates/zed/src/main.rs @@ -881,7 +881,7 @@ fn load_user_themes_in_background(fs: Arc, cx: &mut AppContext) { fn watch_themes(fs: Arc, cx: &mut AppContext) { use std::time::Duration; cx.spawn(|cx| async move { - let mut events = fs + let (mut events, _) = fs .watch(&paths::THEMES_DIR.clone(), Duration::from_millis(100)) .await; @@ -920,7 +920,7 @@ fn watch_languages(fs: Arc, languages: Arc, cx: &m }; cx.spawn(|_| async move { - let mut events = fs.watch(path.as_path(), Duration::from_millis(100)).await; + let (mut events, _) = fs.watch(path.as_path(), Duration::from_millis(100)).await; while let Some(event) = events.next().await { let has_language_file = event.iter().any(|path| { path.extension() @@ -954,7 +954,7 @@ fn watch_file_types(fs: Arc, cx: &mut AppContext) { }; cx.spawn(|cx| async move { - let mut events = fs.watch(path.as_path(), Duration::from_millis(100)).await; + let (mut events, _) = fs.watch(path.as_path(), Duration::from_millis(100)).await; while (events.next().await).is_some() { cx.update(|cx| { FileIcons::update_global(cx, |file_types, _cx| {