From ca696fd5f62efc622c635e0c11e6799cae5a1c0f Mon Sep 17 00:00:00 2001 From: Mikayla Maki Date: Fri, 8 Mar 2024 22:18:44 -0800 Subject: [PATCH] Add rs-notify implementation of `fs::watch` (#9040) This PR simplifies the Zed file system abstraction and implements `Fs::watch` for linux and windows. TODO: - [x] Figure out why this fails to initialize the file watchers when we have to initialize the config directory paths, but succeeds on subsequent runs. - [x] Fix macOS dependencies on old fsevents::Event crate Release Notes: - N/A --- Cargo.lock | 1 - crates/extension/src/extension_store.rs | 8 +- crates/fs/Cargo.toml | 4 +- crates/fs/src/fs.rs | 130 +++--- crates/fsevent/src/fsevent.rs | 389 +++++++++++++++++- crates/fsevent/src/mac_impl.rs | 382 ----------------- crates/gpui/src/platform/linux/platform.rs | 7 +- .../gpui/src/platform/linux/wayland/client.rs | 2 +- crates/gpui/src/platform/linux/x11/client.rs | 2 +- crates/project_core/src/worktree.rs | 20 +- crates/zed/Cargo.toml | 1 - crates/zed/src/main.rs | 19 +- tooling/xtask/src/main.rs | 6 +- 13 files changed, 478 insertions(+), 493 deletions(-) delete mode 100644 crates/fsevent/src/mac_impl.rs diff --git a/Cargo.lock b/Cargo.lock index 81cae2ae82..9e15e37fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12790,7 +12790,6 @@ dependencies = [ "feedback", "file_finder", "fs", - "fsevent", "futures 0.3.28", "go_to_line", "gpui", diff --git a/crates/extension/src/extension_store.rs b/crates/extension/src/extension_store.rs index 33cee64fae..5b53b509a7 100644 --- a/crates/extension/src/extension_store.rs +++ b/crates/extension/src/extension_store.rs @@ -296,10 +296,10 @@ impl ExtensionStore { let reload_tx = this.reload_tx.clone(); let installed_dir = this.installed_dir.clone(); async move { - let mut events = fs.watch(&installed_dir, FS_WATCH_LATENCY).await; - while let Some(events) = events.next().await { - for event in events { - let Ok(event_path) = event.path.strip_prefix(&installed_dir) else { + let mut paths = fs.watch(&installed_dir, FS_WATCH_LATENCY).await; + while let Some(paths) = paths.next().await { + for path in paths { + let Ok(event_path) = path.strip_prefix(&installed_dir) else { continue; }; diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml index 4a26a13e5f..1e6fb62569 100644 --- a/crates/fs/Cargo.toml +++ b/crates/fs/Cargo.toml @@ -13,7 +13,6 @@ path = "src/fs.rs" [dependencies] collections.workspace = true -fsevent.workspace = true rope.workspace = true text.workspace = true util.workspace = true @@ -37,6 +36,9 @@ time.workspace = true gpui = { workspace = true, optional = true } +[target.'cfg(target_os = "macos")'.dependencies] +fsevent.workspace = true + [target.'cfg(not(target_os = "macos"))'.dependencies] notify = "6.1.1" diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index 5b07619746..88f77da608 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -1,15 +1,6 @@ pub mod repository; use anyhow::{anyhow, Result}; -pub use fsevent::Event; -#[cfg(target_os = "macos")] -use fsevent::EventStream; - -#[cfg(not(target_os = "macos"))] -use fsevent::StreamFlags; - -#[cfg(not(target_os = "macos"))] -use notify::{Config, EventKind, Watcher}; #[cfg(unix)] use std::os::unix::fs::MetadataExt; @@ -76,7 +67,7 @@ pub trait Fs: Send + Sync { &self, path: &Path, latency: Duration, - ) -> Pin>>>; + ) -> Pin>>>; fn open_repo(&self, abs_dot_git: &Path) -> Option>>; fn is_fake(&self) -> bool; @@ -327,12 +318,18 @@ impl Fs for RealFs { &self, path: &Path, latency: Duration, - ) -> Pin>>> { + ) -> Pin>>> { + use fsevent::EventStream; + let (tx, rx) = smol::channel::unbounded(); let (stream, handle) = EventStream::new(&[path], latency); std::thread::spawn(move || { - stream.run(move |events| smol::block_on(tx.send(events)).is_ok()); + stream.run(move |events| { + smol::block_on(tx.send(events.into_iter().map(|event| event.path).collect())) + .is_ok() + }); }); + Box::pin(rx.chain(futures::stream::once(async move { drop(handle); vec![] @@ -343,49 +340,66 @@ impl Fs for RealFs { async fn watch( &self, path: &Path, - latency: Duration, - ) -> Pin>>> { + _latency: Duration, + ) -> Pin>>> { + use notify::{event::EventKind, Watcher}; + // todo(linux): This spawns two threads, while the macOS impl + // only spawns one. Can we use a OnceLock or some such to make + // this better + let (tx, rx) = smol::channel::unbounded(); - if !path.exists() { - log::error!("watch path does not exist: {}", path.display()); - return Box::pin(rx); - } - - let mut watcher = - notify::recommended_watcher(move |res: Result| match res { - Ok(event) => { - let flags = match event.kind { - // ITEM_REMOVED is currently the only flag we care about - EventKind::Remove(_) => StreamFlags::ITEM_REMOVED, - _ => StreamFlags::NONE, - }; - let events = event - .paths - .into_iter() - .map(|path| Event { - event_id: 0, - flags, - path, - }) - .collect::>(); - let _ = tx.try_send(events); + let mut file_watcher = notify::recommended_watcher({ + let tx = tx.clone(); + move |event: Result| { + if let Some(event) = event.log_err() { + tx.try_send(event.paths).ok(); } - Err(err) => { - log::error!("watch error: {}", err); - } - }) - .unwrap(); + } + }) + .expect("Could not start file watcher"); - watcher - .configure(Config::default().with_poll_interval(latency)) - .unwrap(); - - watcher + file_watcher .watch(path, notify::RecursiveMode::Recursive) - .unwrap(); + .ok(); // It's ok if this fails, the parent watcher will add it. - Box::pin(rx) + let mut parent_watcher = notify::recommended_watcher({ + let watched_path = path.to_path_buf(); + let tx = tx.clone(); + move |event: Result| { + if let Some(event) = event.ok() { + if event.paths.into_iter().any(|path| *path == watched_path) { + match event.kind { + EventKind::Create(_) => { + file_watcher + .watch(watched_path.as_path(), notify::RecursiveMode::Recursive) + .log_err(); + let _ = tx.try_send(vec![watched_path.clone()]).ok(); + } + EventKind::Remove(_) => { + file_watcher.unwatch(&watched_path).log_err(); + let _ = tx.try_send(vec![watched_path.clone()]).ok(); + } + _ => {} + } + } + } + } + }) + .expect("Could not start file watcher"); + + parent_watcher + .watch( + path.parent() + .expect("Watching root is probably not what you want"), + notify::RecursiveMode::NonRecursive, + ) + .expect("Could not start watcher on parent directory"); + + Box::pin(rx.chain(futures::stream::once(async move { + drop(parent_watcher); + vec![] + }))) } fn open_repo(&self, dotgit_path: &Path) -> Option>> { @@ -443,10 +457,6 @@ impl Fs for RealFs { } } -pub fn fs_events_paths(events: Vec) -> Vec { - events.into_iter().map(|event| event.path).collect() -} - #[cfg(any(test, feature = "test-support"))] pub struct FakeFs { // Use an unfair lock to ensure tests are deterministic. @@ -459,9 +469,9 @@ struct FakeFsState { root: Arc>, next_inode: u64, next_mtime: SystemTime, - event_txs: Vec>>, + event_txs: Vec>>, events_paused: bool, - buffered_events: Vec, + buffered_events: Vec, metadata_call_count: usize, read_dir_call_count: usize, } @@ -569,11 +579,7 @@ impl FakeFsState { T: Into, { self.buffered_events - .extend(paths.into_iter().map(|path| fsevent::Event { - event_id: 0, - flags: fsevent::StreamFlags::empty(), - path: path.into(), - })); + .extend(paths.into_iter().map(Into::into)); if !self.events_paused { self.flush_events(self.buffered_events.len()); @@ -1328,14 +1334,14 @@ impl Fs for FakeFs { &self, path: &Path, _: Duration, - ) -> Pin>>> { + ) -> Pin>>> { self.simulate_random_delay().await; let (tx, rx) = smol::channel::unbounded(); self.state.lock().event_txs.push(tx); let path = path.to_path_buf(); let executor = self.executor.clone(); Box::pin(futures::StreamExt::filter(rx, move |events| { - let result = events.iter().any(|event| event.path.starts_with(&path)); + let result = events.iter().any(|evt_path| evt_path.starts_with(&path)); let executor = executor.clone(); async move { executor.simulate_random_delay().await; diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs index e730e701f3..ef113b6de2 100644 --- a/crates/fsevent/src/fsevent.rs +++ b/crates/fsevent/src/fsevent.rs @@ -1,11 +1,17 @@ -#[cfg(target_os = "macos")] -pub use mac_impl::*; +#![cfg(target_os = "macos")] use bitflags::bitflags; -use std::path::PathBuf; - -#[cfg(target_os = "macos")] -mod mac_impl; +use fsevent_sys::{self as fs, core_foundation as cf}; +use parking_lot::Mutex; +use std::{ + convert::AsRef, + ffi::{c_void, CStr, OsStr}, + os::unix::ffi::OsStrExt, + path::{Path, PathBuf}, + ptr, slice, + sync::Arc, + time::Duration, +}; #[derive(Clone, Debug)] pub struct Event { @@ -14,10 +20,244 @@ pub struct Event { pub path: PathBuf, } +pub struct EventStream { + lifecycle: Arc>, + state: Box, +} + +struct State { + latency: Duration, + paths: cf::CFMutableArrayRef, + callback: Option) -> bool>>, + last_valid_event_id: Option, + stream: fs::FSEventStreamRef, +} + +impl Drop for State { + fn drop(&mut self) { + unsafe { + cf::CFRelease(self.paths); + fs::FSEventStreamStop(self.stream); + fs::FSEventStreamInvalidate(self.stream); + fs::FSEventStreamRelease(self.stream); + } + } +} + +enum Lifecycle { + New, + Running(cf::CFRunLoopRef), + Stopped, +} + +pub struct Handle(Arc>); + +unsafe impl Send for EventStream {} +unsafe impl Send for Lifecycle {} + +impl EventStream { + pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { + unsafe { + let cf_paths = + cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); + assert!(!cf_paths.is_null()); + + for path in paths { + let path_bytes = path.as_os_str().as_bytes(); + let cf_url = cf::CFURLCreateFromFileSystemRepresentation( + cf::kCFAllocatorDefault, + path_bytes.as_ptr() as *const i8, + path_bytes.len() as cf::CFIndex, + false, + ); + let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); + cf::CFArrayAppendValue(cf_paths, cf_path); + cf::CFRelease(cf_path); + cf::CFRelease(cf_url); + } + + let mut state = Box::new(State { + latency, + paths: cf_paths, + callback: None, + last_valid_event_id: None, + stream: ptr::null_mut(), + }); + let stream_context = fs::FSEventStreamContext { + version: 0, + info: state.as_ref() as *const _ as *mut c_void, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + cf_paths, + FSEventsGetCurrentEventId(), + latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + state.stream = stream; + + let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); + ( + EventStream { + lifecycle: lifecycle.clone(), + state, + }, + Handle(lifecycle), + ) + } + } + + pub fn run(mut self, f: F) + where + F: FnMut(Vec) -> bool + 'static, + { + self.state.callback = Some(Box::new(f)); + unsafe { + let run_loop = cf::CFRunLoopGetCurrent(); + { + let mut state = self.lifecycle.lock(); + match *state { + Lifecycle::New => *state = Lifecycle::Running(run_loop), + Lifecycle::Running(_) => unreachable!(), + Lifecycle::Stopped => return, + } + } + fs::FSEventStreamScheduleWithRunLoop( + self.state.stream, + run_loop, + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(self.state.stream); + cf::CFRunLoopRun(); + } + } + + extern "C" fn trampoline( + stream_ref: fs::FSEventStreamRef, + info: *mut ::std::os::raw::c_void, + num: usize, // size_t numEvents + event_paths: *mut ::std::os::raw::c_void, // void *eventPaths + event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] + event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] + ) { + unsafe { + let event_paths = event_paths as *const *const ::std::os::raw::c_char; + let e_ptr = event_flags as *mut u32; + let i_ptr = event_ids as *mut u64; + let state = (info as *mut State).as_mut().unwrap(); + let callback = if let Some(callback) = state.callback.as_mut() { + callback + } else { + return; + }; + + let paths = slice::from_raw_parts(event_paths, num); + let flags = slice::from_raw_parts_mut(e_ptr, num); + let ids = slice::from_raw_parts_mut(i_ptr, num); + let mut stream_restarted = false; + + // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel + // or our code couldn't keep up with the sheer volume of file-system events that were + // generated. If we observed a valid event before this happens, we'll try to read the + // file-system journal by stopping the current stream and creating a new one starting at + // such event. Otherwise, we'll let invoke the callback with the dropped event, which + // will likely perform a re-scan of one of the root directories. + if flags + .iter() + .copied() + .filter_map(StreamFlags::from_bits) + .any(|flags| { + flags.contains(StreamFlags::USER_DROPPED) + || flags.contains(StreamFlags::KERNEL_DROPPED) + }) + { + if let Some(last_valid_event_id) = state.last_valid_event_id.take() { + fs::FSEventStreamStop(state.stream); + fs::FSEventStreamInvalidate(state.stream); + fs::FSEventStreamRelease(state.stream); + + let stream_context = fs::FSEventStreamContext { + version: 0, + info, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + state.paths, + last_valid_event_id, + state.latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + + state.stream = stream; + fs::FSEventStreamScheduleWithRunLoop( + state.stream, + cf::CFRunLoopGetCurrent(), + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(state.stream); + stream_restarted = true; + } + } + + if !stream_restarted { + let mut events = Vec::with_capacity(num); + for p in 0..num { + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + if !flag.contains(StreamFlags::HISTORY_DONE) { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + let event = Event { + event_id: ids[p], + flags: flag, + path, + }; + state.last_valid_event_id = Some(event.event_id); + events.push(event); + } + } else { + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + } + } + + if !events.is_empty() && !callback(events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } + } + } + } +} + +impl Drop for Handle { + fn drop(&mut self) { + let mut state = self.0.lock(); + if let Lifecycle::Running(run_loop) = *state { + unsafe { + cf::CFRunLoopStop(run_loop); + } + } + *state = Lifecycle::Stopped; + } +} + // Synchronize with // /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h bitflags! { - #[derive(Debug, PartialEq, Eq, Clone, Copy)] + #[derive(Debug, PartialEq, Eq, Clone, Copy)] #[repr(C)] pub struct StreamFlags: u32 { const NONE = 0x00000000; @@ -121,3 +361,138 @@ impl std::fmt::Display for StreamFlags { write!(f, "") } } + +#[link(name = "CoreServices", kind = "framework")] +extern "C" { + pub fn FSEventsGetCurrentEventId() -> u64; +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{fs, sync::mpsc, thread, time::Duration}; + + #[test] + fn test_event_stream_simple() { + for _ in 0..3 { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); + } + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_delayed_start() { + for _ in 0..3 { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); + } + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + + // Delay the call to `run` in order to make sure we don't miss any events that occur + // between creating the `EventStream` and calling `run`. + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + stream.run(move |events| tx.send(events.to_vec()).is_ok()) + }); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_shutdown_by_dropping_handle() { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || { + stream.run({ + let tx = tx.clone(); + move |_| { + tx.send("running").unwrap(); + true + } + }); + tx.send("stopped").unwrap(); + }); + + fs::write(path.join("new-file"), "").unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running"); + + // Dropping the handle causes `EventStream::run` to return. + drop(handle); + assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped"); + } + + #[test] + fn test_event_stream_shutdown_before_run() { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + drop(handle); + + // This returns immediately because the handle was already dropped. + stream.run(|_| true); + } + + fn flush_historical_events() { + let duration = if std::env::var("CI").is_ok() { + Duration::from_secs(2) + } else { + Duration::from_millis(500) + }; + thread::sleep(duration); + } +} diff --git a/crates/fsevent/src/mac_impl.rs b/crates/fsevent/src/mac_impl.rs deleted file mode 100644 index 9aec621580..0000000000 --- a/crates/fsevent/src/mac_impl.rs +++ /dev/null @@ -1,382 +0,0 @@ -use fsevent_sys::{self as fs, core_foundation as cf}; -use parking_lot::Mutex; -use std::{ - convert::AsRef, - ffi::{c_void, CStr, OsStr}, - os::unix::ffi::OsStrExt, - path::{Path, PathBuf}, - ptr, slice, - sync::Arc, - time::Duration, -}; - -use crate::{Event, StreamFlags}; - -pub struct EventStream { - lifecycle: Arc>, - state: Box, -} - -struct State { - latency: Duration, - paths: cf::CFMutableArrayRef, - callback: Option) -> bool>>, - last_valid_event_id: Option, - stream: fs::FSEventStreamRef, -} - -impl Drop for State { - fn drop(&mut self) { - unsafe { - cf::CFRelease(self.paths); - fs::FSEventStreamStop(self.stream); - fs::FSEventStreamInvalidate(self.stream); - fs::FSEventStreamRelease(self.stream); - } - } -} - -enum Lifecycle { - New, - Running(cf::CFRunLoopRef), - Stopped, -} - -pub struct Handle(Arc>); - -unsafe impl Send for EventStream {} -unsafe impl Send for Lifecycle {} - -impl EventStream { - pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { - unsafe { - let cf_paths = - cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); - assert!(!cf_paths.is_null()); - - for path in paths { - let path_bytes = path.as_os_str().as_bytes(); - let cf_url = cf::CFURLCreateFromFileSystemRepresentation( - cf::kCFAllocatorDefault, - path_bytes.as_ptr() as *const i8, - path_bytes.len() as cf::CFIndex, - false, - ); - let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); - cf::CFArrayAppendValue(cf_paths, cf_path); - cf::CFRelease(cf_path); - cf::CFRelease(cf_url); - } - - let mut state = Box::new(State { - latency, - paths: cf_paths, - callback: None, - last_valid_event_id: None, - stream: ptr::null_mut(), - }); - let stream_context = fs::FSEventStreamContext { - version: 0, - info: state.as_ref() as *const _ as *mut c_void, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - cf_paths, - FSEventsGetCurrentEventId(), - latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - state.stream = stream; - - let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); - ( - EventStream { - lifecycle: lifecycle.clone(), - state, - }, - Handle(lifecycle), - ) - } - } - - pub fn run(mut self, f: F) - where - F: FnMut(Vec) -> bool + 'static, - { - self.state.callback = Some(Box::new(f)); - unsafe { - let run_loop = cf::CFRunLoopGetCurrent(); - { - let mut state = self.lifecycle.lock(); - match *state { - Lifecycle::New => *state = Lifecycle::Running(run_loop), - Lifecycle::Running(_) => unreachable!(), - Lifecycle::Stopped => return, - } - } - fs::FSEventStreamScheduleWithRunLoop( - self.state.stream, - run_loop, - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(self.state.stream); - cf::CFRunLoopRun(); - } - } - - extern "C" fn trampoline( - stream_ref: fs::FSEventStreamRef, - info: *mut ::std::os::raw::c_void, - num: usize, // size_t numEvents - event_paths: *mut ::std::os::raw::c_void, // void *eventPaths - event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] - event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] - ) { - unsafe { - let event_paths = event_paths as *const *const ::std::os::raw::c_char; - let e_ptr = event_flags as *mut u32; - let i_ptr = event_ids as *mut u64; - let state = (info as *mut State).as_mut().unwrap(); - let callback = if let Some(callback) = state.callback.as_mut() { - callback - } else { - return; - }; - - let paths = slice::from_raw_parts(event_paths, num); - let flags = slice::from_raw_parts_mut(e_ptr, num); - let ids = slice::from_raw_parts_mut(i_ptr, num); - let mut stream_restarted = false; - - // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel - // or our code couldn't keep up with the sheer volume of file-system events that were - // generated. If we observed a valid event before this happens, we'll try to read the - // file-system journal by stopping the current stream and creating a new one starting at - // such event. Otherwise, we'll let invoke the callback with the dropped event, which - // will likely perform a re-scan of one of the root directories. - if flags - .iter() - .copied() - .filter_map(StreamFlags::from_bits) - .any(|flags| { - flags.contains(StreamFlags::USER_DROPPED) - || flags.contains(StreamFlags::KERNEL_DROPPED) - }) - { - if let Some(last_valid_event_id) = state.last_valid_event_id.take() { - fs::FSEventStreamStop(state.stream); - fs::FSEventStreamInvalidate(state.stream); - fs::FSEventStreamRelease(state.stream); - - let stream_context = fs::FSEventStreamContext { - version: 0, - info, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - state.paths, - last_valid_event_id, - state.latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - - state.stream = stream; - fs::FSEventStreamScheduleWithRunLoop( - state.stream, - cf::CFRunLoopGetCurrent(), - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(state.stream); - stream_restarted = true; - } - } - - if !stream_restarted { - let mut events = Vec::with_capacity(num); - for p in 0..num { - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - if !flag.contains(StreamFlags::HISTORY_DONE) { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - let event = Event { - event_id: ids[p], - flags: flag, - path, - }; - state.last_valid_event_id = Some(event.event_id); - events.push(event); - } - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); - } - } - - if !events.is_empty() && !callback(events) { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); - } - } - } - } -} - -impl Drop for Handle { - fn drop(&mut self) { - let mut state = self.0.lock(); - if let Lifecycle::Running(run_loop) = *state { - unsafe { - cf::CFRunLoopStop(run_loop); - } - } - *state = Lifecycle::Stopped; - } -} - -#[link(name = "CoreServices", kind = "framework")] -extern "C" { - pub fn FSEventsGetCurrentEventId() -> u64; -} - -#[cfg(test)] -mod tests { - use super::*; - use std::{fs, sync::mpsc, thread, time::Duration}; - - #[test] - fn test_event_stream_simple() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_delayed_start() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - - // Delay the call to `run` in order to make sure we don't miss any events that occur - // between creating the `EventStream` and calling `run`. - thread::spawn(move || { - thread::sleep(Duration::from_millis(100)); - stream.run(move |events| tx.send(events.to_vec()).is_ok()) - }); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_shutdown_by_dropping_handle() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || { - stream.run({ - let tx = tx.clone(); - move |_| { - tx.send("running").unwrap(); - true - } - }); - tx.send("stopped").unwrap(); - }); - - fs::write(path.join("new-file"), "").unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running"); - - // Dropping the handle causes `EventStream::run` to return. - drop(handle); - assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped"); - } - - #[test] - fn test_event_stream_shutdown_before_run() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - drop(handle); - - // This returns immediately because the handle was already dropped. - stream.run(|_| true); - } - - fn flush_historical_events() { - let duration = if std::env::var("CI").is_ok() { - Duration::from_secs(2) - } else { - Duration::from_millis(500) - }; - thread::sleep(duration); - } -} diff --git a/crates/gpui/src/platform/linux/platform.rs b/crates/gpui/src/platform/linux/platform.rs index c19cc72451..aa82f5870c 100644 --- a/crates/gpui/src/platform/linux/platform.rs +++ b/crates/gpui/src/platform/linux/platform.rs @@ -324,7 +324,7 @@ impl Platform for LinuxPlatform { }) } - //TODO linux + //todo(linux) fn app_path(&self) -> Result { Err(anyhow::Error::msg( "Platform::app_path is not implemented yet", @@ -338,7 +338,7 @@ impl Platform for LinuxPlatform { UtcOffset::UTC } - //TODO linux + //todo(linux) fn path_for_auxiliary_executable(&self, name: &str) -> Result { Err(anyhow::Error::msg( "Platform::path_for_auxiliary_executable is not implemented yet", @@ -390,8 +390,7 @@ impl Platform for LinuxPlatform { }) } - //TODO linux: add trait methods for accessing the primary selection - + //todo(linux): add trait methods for accessing the primary selection fn read_credentials(&self, url: &str) -> Task)>>> { let url = url.to_string(); self.background_executor().spawn(async move { diff --git a/crates/gpui/src/platform/linux/wayland/client.rs b/crates/gpui/src/platform/linux/wayland/client.rs index f191b9d3a8..f97766af79 100644 --- a/crates/gpui/src/platform/linux/wayland/client.rs +++ b/crates/gpui/src/platform/linux/wayland/client.rs @@ -281,7 +281,7 @@ impl Client for WaylandClient { CursorStyle::ResizeUp => "n-resize".to_string(), CursorStyle::ResizeDown => "s-resize".to_string(), CursorStyle::ResizeUpDown => "ns-resize".to_string(), - CursorStyle::DisappearingItem => "grabbing".to_string(), // TODO linux - couldn't find equivalent icon in linux + CursorStyle::DisappearingItem => "grabbing".to_string(), // todo(linux) - couldn't find equivalent icon in linux CursorStyle::IBeamCursorForVerticalLayout => "vertical-text".to_string(), CursorStyle::OperationNotAllowed => "not-allowed".to_string(), CursorStyle::DragLink => "dnd-link".to_string(), diff --git a/crates/gpui/src/platform/linux/x11/client.rs b/crates/gpui/src/platform/linux/x11/client.rs index f50d3fd6ea..e3dd1628fe 100644 --- a/crates/gpui/src/platform/linux/x11/client.rs +++ b/crates/gpui/src/platform/linux/x11/client.rs @@ -361,7 +361,7 @@ impl Client for X11Client { Box::new(X11Window(window_ptr)) } - // TODO linux + //todo(linux) fn set_cursor_style(&self, _style: CursorStyle) {} fn get_clipboard(&self) -> Rc> { diff --git a/crates/project_core/src/worktree.rs b/crates/project_core/src/worktree.rs index 1f9e9a8e15..8c64ab2579 100644 --- a/crates/project_core/src/worktree.rs +++ b/crates/project_core/src/worktree.rs @@ -3336,7 +3336,7 @@ impl BackgroundScanner { } } - async fn run(&mut self, mut fs_events_rx: Pin>>>) { + async fn run(&mut self, mut fs_events_rx: Pin>>>) { use futures::FutureExt as _; // Populate ignores above the root. @@ -3389,11 +3389,9 @@ impl BackgroundScanner { // For these events, update events cannot be as precise, because we didn't // have the previous state loaded yet. self.phase = BackgroundScannerPhase::EventsReceivedDuringInitialScan; - if let Poll::Ready(Some(events)) = futures::poll!(fs_events_rx.next()) { - let mut paths = fs::fs_events_paths(events); - - while let Poll::Ready(Some(more_events)) = futures::poll!(fs_events_rx.next()) { - paths.extend(fs::fs_events_paths(more_events)); + if let Poll::Ready(Some(mut paths)) = futures::poll!(fs_events_rx.next()) { + while let Poll::Ready(Some(more_paths)) = futures::poll!(fs_events_rx.next()) { + paths.extend(more_paths); } self.process_events(paths).await; } @@ -3430,12 +3428,10 @@ impl BackgroundScanner { } } - events = fs_events_rx.next().fuse() => { - let Some(events) = events else { break }; - let mut paths = fs::fs_events_paths(events); - - while let Poll::Ready(Some(more_events)) = futures::poll!(fs_events_rx.next()) { - paths.extend(fs::fs_events_paths(more_events)); + paths = fs_events_rx.next().fuse() => { + let Some(mut paths) = paths else { break }; + while let Poll::Ready(Some(more_paths)) = futures::poll!(fs_events_rx.next()) { + paths.extend(more_paths); } self.process_events(paths.clone()).await; } diff --git a/crates/zed/Cargo.toml b/crates/zed/Cargo.toml index 90c969c0cb..d3e77ab676 100644 --- a/crates/zed/Cargo.toml +++ b/crates/zed/Cargo.toml @@ -48,7 +48,6 @@ extensions_ui.workspace = true feedback.workspace = true file_finder.workspace = true fs.workspace = true -fsevent.workspace = true futures.workspace = true go_to_line.workspace = true gpui.workspace = true diff --git a/crates/zed/src/main.rs b/crates/zed/src/main.rs index 8769af21a3..23f2aa4c53 100644 --- a/crates/zed/src/main.rs +++ b/crates/zed/src/main.rs @@ -11,8 +11,6 @@ use db::kvp::KEY_VALUE_STORE; use editor::Editor; use env_logger::Builder; use fs::RealFs; -#[cfg(target_os = "macos")] -use fsevent::StreamFlags; use futures::{future, StreamExt}; use gpui::{App, AppContext, AsyncAppContext, Context, SemanticVersion, Task}; use isahc::{prelude::Configurable, Request}; @@ -184,7 +182,6 @@ fn main() { ); load_user_themes_in_background(fs.clone(), cx); - #[cfg(target_os = "macos")] watch_themes(fs.clone(), cx); cx.spawn(|_| watch_languages(fs.clone(), languages.clone())) @@ -465,10 +462,11 @@ async fn restore_or_create_workspace(app_state: Arc, cx: AsyncAppConte fn init_paths() { std::fs::create_dir_all(&*util::paths::CONFIG_DIR).expect("could not create config path"); + std::fs::create_dir_all(&*util::paths::EXTENSIONS_DIR) + .expect("could not create extensions path"); std::fs::create_dir_all(&*util::paths::LANGUAGES_DIR).expect("could not create languages path"); std::fs::create_dir_all(&*util::paths::DB_DIR).expect("could not create database path"); std::fs::create_dir_all(&*util::paths::LOGS_DIR).expect("could not create logs path"); - #[cfg(target_os = "linux")] std::fs::create_dir_all(&*util::paths::TEMP_DIR).expect("could not create tmp path"); } @@ -974,9 +972,7 @@ fn load_user_themes_in_background(fs: Arc, cx: &mut AppContext) { .detach_and_log_err(cx); } -// todo(linux): Port fsevents to linux /// Spawns a background task to watch the themes directory for changes. -#[cfg(target_os = "macos")] fn watch_themes(fs: Arc, cx: &mut AppContext) { use std::time::Duration; cx.spawn(|cx| async move { @@ -984,17 +980,14 @@ fn watch_themes(fs: Arc, cx: &mut AppContext) { .watch(&paths::THEMES_DIR.clone(), Duration::from_millis(100)) .await; - while let Some(events) = events.next().await { - for event in events { - if event.flags.contains(StreamFlags::ITEM_REMOVED) { - // Theme was removed, don't need to reload. - // We may want to remove the theme from the registry, in this case. - } else { + while let Some(paths) = events.next().await { + for path in paths { + if fs.metadata(&path).await.ok().flatten().is_some() { if let Some(theme_registry) = cx.update(|cx| ThemeRegistry::global(cx).clone()).log_err() { if let Some(()) = theme_registry - .load_user_theme(&event.path, fs.clone()) + .load_user_theme(&path, fs.clone()) .await .log_err() { diff --git a/tooling/xtask/src/main.rs b/tooling/xtask/src/main.rs index e4feb2bdb1..b721c46f7e 100644 --- a/tooling/xtask/src/main.rs +++ b/tooling/xtask/src/main.rs @@ -47,10 +47,7 @@ fn run_clippy(args: ClippyArgs) -> Result<()> { clippy_command.arg("--workspace"); } - clippy_command - .arg("--release") - .arg("--all-targets") - .arg("--all-features"); + clippy_command.arg("--release").arg("--all-features"); if args.fix { clippy_command.arg("--fix"); @@ -60,6 +57,7 @@ fn run_clippy(args: ClippyArgs) -> Result<()> { // Deny all warnings. // We don't do this yet on Windows, as it still has some warnings present. + // todo(windows) #[cfg(not(target_os = "windows"))] clippy_command.args(["--deny", "warnings"]);