linux watcher (#12615)

fixes https://github.com/zed-industries/zed/issues/12297
fixes https://github.com/zed-industries/zed/issues/11345

Release Notes:

- N/A

---------

Co-authored-by: Max <max@zed.dev>
This commit is contained in:
Conrad Irwin 2024-06-03 22:17:10 -06:00 committed by GitHub
parent 3cd6719b30
commit edd613062a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 186 additions and 125 deletions

View File

@ -155,7 +155,7 @@ impl AssistantPanel {
cx.new_view::<Self>(|cx| {
const CONVERSATION_WATCH_DURATION: Duration = Duration::from_millis(100);
let _watch_saved_conversations = cx.spawn(move |this, mut cx| async move {
let mut events = fs
let (mut events, _) = fs
.watch(&CONVERSATIONS_DIR, CONVERSATION_WATCH_DURATION)
.await;
while events.next().await.is_some() {

View File

@ -351,7 +351,7 @@ impl ExtensionStore {
let reload_tx = this.reload_tx.clone();
let installed_dir = this.installed_dir.clone();
async move {
let mut paths = fs.watch(&installed_dir, FS_WATCH_LATENCY).await;
let (mut paths, _) = fs.watch(&installed_dir, FS_WATCH_LATENCY).await;
while let Some(paths) = paths.next().await {
for path in paths {
let Ok(event_path) = path.strip_prefix(&installed_dir) else {

View File

@ -36,6 +36,11 @@ use smol::io::AsyncReadExt;
#[cfg(any(test, feature = "test-support"))]
use std::ffi::OsStr;
pub trait Watcher: Send + Sync {
fn add(&self, path: &Path) -> Result<()>;
fn remove(&self, path: &Path) -> Result<()>;
}
#[async_trait::async_trait]
pub trait Fs: Send + Sync {
async fn create_dir(&self, path: &Path) -> Result<()>;
@ -79,7 +84,10 @@ pub trait Fs: Send + Sync {
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>;
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Arc<dyn Watcher>,
);
fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<dyn GitRepository>>;
fn is_fake(&self) -> bool;
@ -126,6 +134,13 @@ pub struct RealFs {
git_binary_path: Option<PathBuf>,
}
pub struct RealWatcher {
#[cfg(target_os = "linux")]
root_path: PathBuf,
#[cfg(target_os = "linux")]
fs_watcher: parking_lot::Mutex<notify::INotifyWatcher>,
}
impl RealFs {
pub fn new(
git_hosting_provider_registry: Arc<GitHostingProviderRegistry>,
@ -409,7 +424,10 @@ impl Fs for RealFs {
&self,
path: &Path,
latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Arc<dyn Watcher>,
) {
use fsevent::EventStream;
let (tx, rx) = smol::channel::unbounded();
@ -421,22 +439,76 @@ impl Fs for RealFs {
});
});
Box::pin(rx.chain(futures::stream::once(async move {
drop(handle);
vec![]
})))
(
Box::pin(rx.chain(futures::stream::once(async move {
drop(handle);
vec![]
}))),
Arc::new(RealWatcher {}),
)
}
#[cfg(not(target_os = "macos"))]
#[cfg(target_os = "linux")]
async fn watch(
&self,
path: &Path,
_latency: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
use notify::{event::EventKind, event::ModifyKind, Watcher};
// todo(linux): This spawns two threads, while the macOS impl
// only spawns one. Can we use a OnceLock or some such to make
// this better
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Arc<dyn Watcher>,
) {
let (tx, rx) = smol::channel::unbounded();
let file_watcher = notify::recommended_watcher({
let tx = tx.clone();
move |event: Result<notify::Event, _>| {
if let Some(event) = event.log_err() {
tx.try_send(event.paths).ok();
}
}
})
.expect("Could not start file watcher");
let watcher = Arc::new(RealWatcher {
root_path: path.to_path_buf(),
fs_watcher: parking_lot::Mutex::new(file_watcher),
});
watcher.add(path).ok(); // Ignore "file doesn't exist error" and rely on parent watcher.
// watch the parent dir so we can tell when settings.json is created
if let Some(parent) = path.parent() {
watcher.add(parent).log_err();
}
(
Box::pin(rx.filter_map({
let watcher = watcher.clone();
move |mut paths| {
paths.retain(|path| path.starts_with(&watcher.root_path));
async move {
if paths.is_empty() {
None
} else {
Some(paths)
}
}
}
})),
watcher,
)
}
#[cfg(target_os = "windows")]
async fn watch(
&self,
path: &Path,
_latency: Duration,
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Arc<dyn Watcher>,
) {
use notify::Watcher;
let (tx, rx) = smol::channel::unbounded();
@ -452,56 +524,15 @@ impl Fs for RealFs {
file_watcher
.watch(path, notify::RecursiveMode::Recursive)
.ok(); // It's ok if this fails, the parent watcher will add it.
.log_err();
let mut parent_watcher = notify::recommended_watcher({
let watched_path = path.to_path_buf();
let tx = tx.clone();
move |event: Result<notify::Event, _>| {
if let Some(event) = event.ok() {
if event.paths.into_iter().any(|path| *path == watched_path) {
match event.kind {
EventKind::Modify(ev) => {
if matches!(ev, ModifyKind::Name(_)) {
file_watcher
.watch(
watched_path.as_path(),
notify::RecursiveMode::Recursive,
)
.log_err();
let _ = tx.try_send(vec![watched_path.clone()]).ok();
}
}
EventKind::Create(_) => {
file_watcher
.watch(watched_path.as_path(), notify::RecursiveMode::Recursive)
.log_err();
let _ = tx.try_send(vec![watched_path.clone()]).ok();
}
EventKind::Remove(_) => {
file_watcher.unwatch(&watched_path).log_err();
let _ = tx.try_send(vec![watched_path.clone()]).ok();
}
_ => {}
}
}
}
}
})
.expect("Could not start file watcher");
parent_watcher
.watch(
path.parent()
.expect("Watching root is probably not what you want"),
notify::RecursiveMode::NonRecursive,
)
.expect("Could not start watcher on parent directory");
Box::pin(rx.chain(futures::stream::once(async move {
drop(parent_watcher);
vec![]
})))
(
Box::pin(rx.chain(futures::stream::once(async move {
drop(file_watcher);
vec![]
}))),
Arc::new(RealWatcher {}),
)
}
fn open_repo(&self, dotgit_path: &Path) -> Option<Arc<dyn GitRepository>> {
@ -560,6 +591,36 @@ impl Fs for RealFs {
}
}
#[cfg(not(target_os = "linux"))]
impl Watcher for RealWatcher {
fn add(&self, _: &Path) -> Result<()> {
Ok(())
}
fn remove(&self, _: &Path) -> Result<()> {
Ok(())
}
}
#[cfg(target_os = "linux")]
impl Watcher for RealWatcher {
fn add(&self, path: &Path) -> Result<()> {
use notify::Watcher;
self.fs_watcher
.lock()
.watch(path, notify::RecursiveMode::NonRecursive)?;
Ok(())
}
fn remove(&self, path: &Path) -> Result<()> {
use notify::Watcher;
self.fs_watcher.lock().unwatch(path)?;
Ok(())
}
}
#[cfg(any(test, feature = "test-support"))]
pub struct FakeFs {
// Use an unfair lock to ensure tests are deterministic.
@ -1073,6 +1134,20 @@ impl FakeFsEntry {
}
}
#[cfg(any(test, feature = "test-support"))]
struct FakeWatcher {}
#[cfg(any(test, feature = "test-support"))]
impl Watcher for FakeWatcher {
fn add(&self, _: &Path) -> Result<()> {
Ok(())
}
fn remove(&self, _: &Path) -> Result<()> {
Ok(())
}
}
#[cfg(any(test, feature = "test-support"))]
#[async_trait::async_trait]
impl Fs for FakeFs {
@ -1468,20 +1543,26 @@ impl Fs for FakeFs {
&self,
path: &Path,
_: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>> {
) -> (
Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>,
Arc<dyn Watcher>,
) {
self.simulate_random_delay().await;
let (tx, rx) = smol::channel::unbounded();
self.state.lock().event_txs.push(tx);
let path = path.to_path_buf();
let executor = self.executor.clone();
Box::pin(futures::StreamExt::filter(rx, move |events| {
let result = events.iter().any(|evt_path| evt_path.starts_with(&path));
let executor = executor.clone();
async move {
executor.simulate_random_delay().await;
result
}
}))
(
Box::pin(futures::StreamExt::filter(rx, move |events| {
let result = events.iter().any(|evt_path| evt_path.starts_with(&path));
let executor = executor.clone();
async move {
executor.simulate_random_delay().await;
result
}
})),
Arc::new(FakeWatcher {}),
)
}
fn open_repo(&self, abs_dot_git: &Path) -> Option<Arc<dyn GitRepository>> {

View File

@ -38,7 +38,7 @@ pub fn watch_config_file(
let (tx, rx) = mpsc::unbounded();
executor
.spawn(async move {
let events = fs.watch(&path, Duration::from_millis(100)).await;
let (events, _) = fs.watch(&path, Duration::from_millis(100)).await;
futures::pin_mut!(events);
let contents = fs.load(&path).await.unwrap_or_default();

View File

@ -8,15 +8,15 @@ use anyhow::{anyhow, Context as _, Result};
use client::{proto, Client};
use clock::ReplicaId;
use collections::{HashMap, HashSet, VecDeque};
use fs::Fs;
use fs::{copy_recursive, RemoveOptions};
use futures::stream::select;
use fs::{Fs, Watcher};
use futures::{
channel::{
mpsc::{self, UnboundedSender},
oneshot,
},
select_biased,
stream::select,
task::Poll,
FutureExt as _, Stream, StreamExt,
};
@ -700,32 +700,42 @@ fn start_background_scan_tasks(
let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
let background_scanner = cx.background_executor().spawn({
let abs_path = if cfg!(target_os = "windows") {
abs_path.canonicalize().unwrap_or_else(|_| abs_path.to_path_buf())
abs_path
.canonicalize()
.unwrap_or_else(|_| abs_path.to_path_buf())
} else {
abs_path.to_path_buf()
};
let background = cx.background_executor().clone();
async move {
let events = fs.watch(&abs_path, FS_WATCH_LATENCY).await;
let (events, watcher) = fs.watch(&abs_path, FS_WATCH_LATENCY).await;
let case_sensitive = fs.is_case_sensitive().await.unwrap_or_else(|e| {
log::error!(
"Failed to determine whether filesystem is case sensitive (falling back to true) due to error: {e:#}"
);
log::error!("Failed to determine whether filesystem is case sensitive: {e:#}");
true
});
BackgroundScanner::new(
snapshot,
next_entry_id,
let mut scanner = BackgroundScanner {
fs,
case_sensitive,
scan_states_tx,
background,
fs_case_sensitive: case_sensitive,
status_updates_tx: scan_states_tx,
executor: background,
scan_requests_rx,
path_prefixes_to_scan_rx,
)
.run(events)
.await;
next_entry_id,
state: Mutex::new(BackgroundScannerState {
prev_snapshot: snapshot.snapshot.clone(),
snapshot,
scanned_dirs: Default::default(),
path_prefixes_to_scan: Default::default(),
paths_to_scan: Default::default(),
removed_entry_ids: Default::default(),
changed_paths: Default::default(),
}),
phase: BackgroundScannerPhase::InitialScan,
watcher,
};
scanner.run(events).await;
}
});
let scan_state_updater = cx.spawn(|this, mut cx| async move {
@ -3327,6 +3337,7 @@ struct BackgroundScanner {
path_prefixes_to_scan_rx: channel::Receiver<Arc<Path>>,
next_entry_id: Arc<AtomicUsize>,
phase: BackgroundScannerPhase,
watcher: Arc<dyn Watcher>,
}
#[derive(PartialEq)]
@ -3337,38 +3348,6 @@ enum BackgroundScannerPhase {
}
impl BackgroundScanner {
#[allow(clippy::too_many_arguments)]
fn new(
snapshot: LocalSnapshot,
next_entry_id: Arc<AtomicUsize>,
fs: Arc<dyn Fs>,
fs_case_sensitive: bool,
status_updates_tx: UnboundedSender<ScanState>,
executor: BackgroundExecutor,
scan_requests_rx: channel::Receiver<ScanRequest>,
path_prefixes_to_scan_rx: channel::Receiver<Arc<Path>>,
) -> Self {
Self {
fs,
fs_case_sensitive,
status_updates_tx,
executor,
scan_requests_rx,
path_prefixes_to_scan_rx,
next_entry_id,
state: Mutex::new(BackgroundScannerState {
prev_snapshot: snapshot.snapshot.clone(),
snapshot,
scanned_dirs: Default::default(),
path_prefixes_to_scan: Default::default(),
paths_to_scan: Default::default(),
removed_entry_ids: Default::default(),
changed_paths: Default::default(),
}),
phase: BackgroundScannerPhase::InitialScan,
}
}
async fn run(&mut self, mut fs_events_rx: Pin<Box<dyn Send + Stream<Item = Vec<PathBuf>>>>) {
use futures::FutureExt as _;
@ -3396,7 +3375,7 @@ impl BackgroundScanner {
if let Some(ancestor_dot_git) =
self.fs.canonicalize(&ancestor_dot_git).await.log_err()
{
let ancestor_git_events =
let (ancestor_git_events, _) =
self.fs.watch(&ancestor_dot_git, FS_WATCH_LATENCY).await;
fs_events_rx = select(fs_events_rx, ancestor_git_events).boxed();
@ -3987,6 +3966,7 @@ impl BackgroundScanner {
}
state.populate_dir(&job.path, new_entries, new_ignore);
self.watcher.add(job.abs_path.as_ref()).log_err();
for new_job in new_jobs.into_iter().flatten() {
job.scan_queue

View File

@ -881,7 +881,7 @@ fn load_user_themes_in_background(fs: Arc<dyn fs::Fs>, cx: &mut AppContext) {
fn watch_themes(fs: Arc<dyn fs::Fs>, cx: &mut AppContext) {
use std::time::Duration;
cx.spawn(|cx| async move {
let mut events = fs
let (mut events, _) = fs
.watch(&paths::THEMES_DIR.clone(), Duration::from_millis(100))
.await;
@ -920,7 +920,7 @@ fn watch_languages(fs: Arc<dyn fs::Fs>, languages: Arc<LanguageRegistry>, cx: &m
};
cx.spawn(|_| async move {
let mut events = fs.watch(path.as_path(), Duration::from_millis(100)).await;
let (mut events, _) = fs.watch(path.as_path(), Duration::from_millis(100)).await;
while let Some(event) = events.next().await {
let has_language_file = event.iter().any(|path| {
path.extension()
@ -954,7 +954,7 @@ fn watch_file_types(fs: Arc<dyn fs::Fs>, cx: &mut AppContext) {
};
cx.spawn(|cx| async move {
let mut events = fs.watch(path.as_path(), Duration::from_millis(100)).await;
let (mut events, _) = fs.watch(path.as_path(), Duration::from_millis(100)).await;
while (events.next().await).is_some() {
cx.update(|cx| {
FileIcons::update_global(cx, |file_types, _cx| {