From 17c203fef96491c75c60121fcba64048ed2cd2ba Mon Sep 17 00:00:00 2001 From: gmorenz Date: Thu, 8 Feb 2024 11:35:37 -0500 Subject: [PATCH] Translate notify::Event to fsevent::Event on linux (#7545) This isn't exactly a great solution, but it's a step in the right direction, and it's simple allowing us to quickly unblock linux. Without this (or an equivalent) PR linux builds are broken. I spent a bunch of time investigating using notify on macos, and have a branch with that working and FakeFs updated to use notify events. unfortunately I think this would come with some drawbacks. Primarily that files that don't yet exist yet aren't handled as well as with using events directly leading to some less than ideal tradeoffs. This PR is very much a placeholder for a better cross platform solution. Most problematically, it only fills in the portion of fsevent::Event that is currently used, despite there being a lot more information in the ones collected from macos. At the very least a followup PR should hide those implementation details behind a cross platform Event type so that if people try and access data that hasn't been translated, they find out about it. Release Notes: - N/A --- crates/fs/Cargo.toml | 4 +- crates/fs/src/fs.rs | 49 +++-- crates/fsevent/src/fsevent.rs | 387 +-------------------------------- crates/fsevent/src/mac_impl.rs | 382 ++++++++++++++++++++++++++++++++ 4 files changed, 416 insertions(+), 406 deletions(-) create mode 100644 crates/fsevent/src/mac_impl.rs diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml index ea18ec1fb3..d3f6d87d30 100644 --- a/crates/fs/Cargo.toml +++ b/crates/fs/Cargo.toml @@ -10,6 +10,7 @@ path = "src/fs.rs" [dependencies] collections.workspace = true +fsevent.workspace = true rope.workspace = true text.workspace = true util.workspace = true @@ -33,9 +34,6 @@ time.workspace = true gpui = { workspace = true, optional = true} -[target.'cfg(target_os = "macos")'.dependencies] -fsevent.workspace = true - [target.'cfg(not(target_os = "macos"))'.dependencies] notify = "6.1.1" diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index bdb32d7d76..571de91a17 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -1,15 +1,15 @@ pub mod repository; use anyhow::{anyhow, Result}; -#[cfg(target_os = "macos")] pub use fsevent::Event; #[cfg(target_os = "macos")] use fsevent::EventStream; #[cfg(not(target_os = "macos"))] -pub use notify::Event; +use fsevent::StreamFlags; + #[cfg(not(target_os = "macos"))] -use notify::{Config, Watcher}; +use notify::{Config, EventKind, Watcher}; use futures::{future::BoxFuture, Stream, StreamExt}; use git2::Repository as LibGitRepository; @@ -292,15 +292,30 @@ impl Fs for RealFs { return Box::pin(rx); } - let mut watcher = notify::recommended_watcher(move |res| match res { - Ok(event) => { - let _ = tx.try_send(vec![event]); - } - Err(err) => { - log::error!("watch error: {}", err); - } - }) - .unwrap(); + let mut watcher = + notify::recommended_watcher(move |res: Result| match res { + Ok(event) => { + let flags = match event.kind { + // ITEM_REMOVED is currently the only flag we care about + EventKind::Remove(_) => StreamFlags::ITEM_REMOVED, + _ => StreamFlags::NONE, + }; + let events = event + .paths + .into_iter() + .map(|path| Event { + event_id: 0, + flags, + path, + }) + .collect::>(); + let _ = tx.try_send(events); + } + Err(err) => { + log::error!("watch error: {}", err); + } + }) + .unwrap(); watcher .configure(Config::default().with_poll_interval(latency)) @@ -330,20 +345,10 @@ impl Fs for RealFs { } } -#[cfg(target_os = "macos")] pub fn fs_events_paths(events: Vec) -> Vec { events.into_iter().map(|event| event.path).collect() } -#[cfg(not(target_os = "macos"))] -pub fn fs_events_paths(events: Vec) -> Vec { - events - .into_iter() - .map(|event| event.paths.into_iter()) - .flatten() - .collect() -} - #[cfg(any(test, feature = "test-support"))] pub struct FakeFs { // Use an unfair lock to ensure tests are deterministic. diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs index 7a65c2a021..108b582bd0 100644 --- a/crates/fsevent/src/fsevent.rs +++ b/crates/fsevent/src/fsevent.rs @@ -1,17 +1,11 @@ -#![cfg(target_os = "macos")] +#[cfg(target_os = "macos")] +pub use mac_impl::*; use bitflags::bitflags; -use fsevent_sys::{self as fs, core_foundation as cf}; -use parking_lot::Mutex; -use std::{ - convert::AsRef, - ffi::{c_void, CStr, OsStr}, - os::unix::ffi::OsStrExt, - path::{Path, PathBuf}, - ptr, slice, - sync::Arc, - time::Duration, -}; +use std::path::PathBuf; + +#[cfg(target_os = "macos")] +mod mac_impl; #[derive(Clone, Debug)] pub struct Event { @@ -20,240 +14,6 @@ pub struct Event { pub path: PathBuf, } -pub struct EventStream { - lifecycle: Arc>, - state: Box, -} - -struct State { - latency: Duration, - paths: cf::CFMutableArrayRef, - callback: Option) -> bool>>, - last_valid_event_id: Option, - stream: fs::FSEventStreamRef, -} - -impl Drop for State { - fn drop(&mut self) { - unsafe { - cf::CFRelease(self.paths); - fs::FSEventStreamStop(self.stream); - fs::FSEventStreamInvalidate(self.stream); - fs::FSEventStreamRelease(self.stream); - } - } -} - -enum Lifecycle { - New, - Running(cf::CFRunLoopRef), - Stopped, -} - -pub struct Handle(Arc>); - -unsafe impl Send for EventStream {} -unsafe impl Send for Lifecycle {} - -impl EventStream { - pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { - unsafe { - let cf_paths = - cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); - assert!(!cf_paths.is_null()); - - for path in paths { - let path_bytes = path.as_os_str().as_bytes(); - let cf_url = cf::CFURLCreateFromFileSystemRepresentation( - cf::kCFAllocatorDefault, - path_bytes.as_ptr() as *const i8, - path_bytes.len() as cf::CFIndex, - false, - ); - let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); - cf::CFArrayAppendValue(cf_paths, cf_path); - cf::CFRelease(cf_path); - cf::CFRelease(cf_url); - } - - let mut state = Box::new(State { - latency, - paths: cf_paths, - callback: None, - last_valid_event_id: None, - stream: ptr::null_mut(), - }); - let stream_context = fs::FSEventStreamContext { - version: 0, - info: state.as_ref() as *const _ as *mut c_void, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - cf_paths, - FSEventsGetCurrentEventId(), - latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - state.stream = stream; - - let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); - ( - EventStream { - lifecycle: lifecycle.clone(), - state, - }, - Handle(lifecycle), - ) - } - } - - pub fn run(mut self, f: F) - where - F: FnMut(Vec) -> bool + 'static, - { - self.state.callback = Some(Box::new(f)); - unsafe { - let run_loop = cf::CFRunLoopGetCurrent(); - { - let mut state = self.lifecycle.lock(); - match *state { - Lifecycle::New => *state = Lifecycle::Running(run_loop), - Lifecycle::Running(_) => unreachable!(), - Lifecycle::Stopped => return, - } - } - fs::FSEventStreamScheduleWithRunLoop( - self.state.stream, - run_loop, - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(self.state.stream); - cf::CFRunLoopRun(); - } - } - - extern "C" fn trampoline( - stream_ref: fs::FSEventStreamRef, - info: *mut ::std::os::raw::c_void, - num: usize, // size_t numEvents - event_paths: *mut ::std::os::raw::c_void, // void *eventPaths - event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] - event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] - ) { - unsafe { - let event_paths = event_paths as *const *const ::std::os::raw::c_char; - let e_ptr = event_flags as *mut u32; - let i_ptr = event_ids as *mut u64; - let state = (info as *mut State).as_mut().unwrap(); - let callback = if let Some(callback) = state.callback.as_mut() { - callback - } else { - return; - }; - - let paths = slice::from_raw_parts(event_paths, num); - let flags = slice::from_raw_parts_mut(e_ptr, num); - let ids = slice::from_raw_parts_mut(i_ptr, num); - let mut stream_restarted = false; - - // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel - // or our code couldn't keep up with the sheer volume of file-system events that were - // generated. If we observed a valid event before this happens, we'll try to read the - // file-system journal by stopping the current stream and creating a new one starting at - // such event. Otherwise, we'll let invoke the callback with the dropped event, which - // will likely perform a re-scan of one of the root directories. - if flags - .iter() - .copied() - .filter_map(StreamFlags::from_bits) - .any(|flags| { - flags.contains(StreamFlags::USER_DROPPED) - || flags.contains(StreamFlags::KERNEL_DROPPED) - }) - { - if let Some(last_valid_event_id) = state.last_valid_event_id.take() { - fs::FSEventStreamStop(state.stream); - fs::FSEventStreamInvalidate(state.stream); - fs::FSEventStreamRelease(state.stream); - - let stream_context = fs::FSEventStreamContext { - version: 0, - info, - retain: None, - release: None, - copy_description: None, - }; - let stream = fs::FSEventStreamCreate( - cf::kCFAllocatorDefault, - Self::trampoline, - &stream_context, - state.paths, - last_valid_event_id, - state.latency.as_secs_f64(), - fs::kFSEventStreamCreateFlagFileEvents - | fs::kFSEventStreamCreateFlagNoDefer - | fs::kFSEventStreamCreateFlagWatchRoot, - ); - - state.stream = stream; - fs::FSEventStreamScheduleWithRunLoop( - state.stream, - cf::CFRunLoopGetCurrent(), - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(state.stream); - stream_restarted = true; - } - } - - if !stream_restarted { - let mut events = Vec::with_capacity(num); - for p in 0..num { - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - if !flag.contains(StreamFlags::HISTORY_DONE) { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - let event = Event { - event_id: ids[p], - flags: flag, - path, - }; - state.last_valid_event_id = Some(event.event_id); - events.push(event); - } - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); - } - } - - if !events.is_empty() && !callback(events) { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); - } - } - } - } -} - -impl Drop for Handle { - fn drop(&mut self) { - let mut state = self.0.lock(); - if let Lifecycle::Running(run_loop) = *state { - unsafe { - cf::CFRunLoopStop(run_loop); - } - } - *state = Lifecycle::Stopped; - } -} - // Synchronize with // /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h bitflags! { @@ -360,138 +120,3 @@ impl std::fmt::Display for StreamFlags { write!(f, "") } } - -#[link(name = "CoreServices", kind = "framework")] -extern "C" { - pub fn FSEventsGetCurrentEventId() -> u64; -} - -#[cfg(test)] -mod tests { - use super::*; - use std::{fs, sync::mpsc, thread, time::Duration}; - - #[test] - fn test_event_stream_simple() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_delayed_start() { - for _ in 0..3 { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - for i in 0..10 { - fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); - } - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - - // Delay the call to `run` in order to make sure we don't miss any events that occur - // between creating the `EventStream` and calling `run`. - thread::spawn(move || { - thread::sleep(Duration::from_millis(100)); - stream.run(move |events| tx.send(events.to_vec()).is_ok()) - }); - - fs::write(path.join("new-file"), "").unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("new-file")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("existing-file-5")).unwrap(); - let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("existing-file-5")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); - } - } - - #[test] - fn test_event_stream_shutdown_by_dropping_handle() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - flush_historical_events(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - thread::spawn(move || { - stream.run({ - let tx = tx.clone(); - move |_| { - tx.send("running").unwrap(); - true - } - }); - tx.send("stopped").unwrap(); - }); - - fs::write(path.join("new-file"), "").unwrap(); - assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running"); - - // Dropping the handle causes `EventStream::run` to return. - drop(handle); - assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped"); - } - - #[test] - fn test_event_stream_shutdown_before_run() { - let dir = tempfile::Builder::new() - .prefix("test-event-stream") - .tempdir() - .unwrap(); - let path = dir.path().canonicalize().unwrap(); - - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - drop(handle); - - // This returns immediately because the handle was already dropped. - stream.run(|_| true); - } - - fn flush_historical_events() { - let duration = if std::env::var("CI").is_ok() { - Duration::from_secs(2) - } else { - Duration::from_millis(500) - }; - thread::sleep(duration); - } -} diff --git a/crates/fsevent/src/mac_impl.rs b/crates/fsevent/src/mac_impl.rs new file mode 100644 index 0000000000..9aec621580 --- /dev/null +++ b/crates/fsevent/src/mac_impl.rs @@ -0,0 +1,382 @@ +use fsevent_sys::{self as fs, core_foundation as cf}; +use parking_lot::Mutex; +use std::{ + convert::AsRef, + ffi::{c_void, CStr, OsStr}, + os::unix::ffi::OsStrExt, + path::{Path, PathBuf}, + ptr, slice, + sync::Arc, + time::Duration, +}; + +use crate::{Event, StreamFlags}; + +pub struct EventStream { + lifecycle: Arc>, + state: Box, +} + +struct State { + latency: Duration, + paths: cf::CFMutableArrayRef, + callback: Option) -> bool>>, + last_valid_event_id: Option, + stream: fs::FSEventStreamRef, +} + +impl Drop for State { + fn drop(&mut self) { + unsafe { + cf::CFRelease(self.paths); + fs::FSEventStreamStop(self.stream); + fs::FSEventStreamInvalidate(self.stream); + fs::FSEventStreamRelease(self.stream); + } + } +} + +enum Lifecycle { + New, + Running(cf::CFRunLoopRef), + Stopped, +} + +pub struct Handle(Arc>); + +unsafe impl Send for EventStream {} +unsafe impl Send for Lifecycle {} + +impl EventStream { + pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { + unsafe { + let cf_paths = + cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); + assert!(!cf_paths.is_null()); + + for path in paths { + let path_bytes = path.as_os_str().as_bytes(); + let cf_url = cf::CFURLCreateFromFileSystemRepresentation( + cf::kCFAllocatorDefault, + path_bytes.as_ptr() as *const i8, + path_bytes.len() as cf::CFIndex, + false, + ); + let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); + cf::CFArrayAppendValue(cf_paths, cf_path); + cf::CFRelease(cf_path); + cf::CFRelease(cf_url); + } + + let mut state = Box::new(State { + latency, + paths: cf_paths, + callback: None, + last_valid_event_id: None, + stream: ptr::null_mut(), + }); + let stream_context = fs::FSEventStreamContext { + version: 0, + info: state.as_ref() as *const _ as *mut c_void, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + cf_paths, + FSEventsGetCurrentEventId(), + latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + state.stream = stream; + + let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); + ( + EventStream { + lifecycle: lifecycle.clone(), + state, + }, + Handle(lifecycle), + ) + } + } + + pub fn run(mut self, f: F) + where + F: FnMut(Vec) -> bool + 'static, + { + self.state.callback = Some(Box::new(f)); + unsafe { + let run_loop = cf::CFRunLoopGetCurrent(); + { + let mut state = self.lifecycle.lock(); + match *state { + Lifecycle::New => *state = Lifecycle::Running(run_loop), + Lifecycle::Running(_) => unreachable!(), + Lifecycle::Stopped => return, + } + } + fs::FSEventStreamScheduleWithRunLoop( + self.state.stream, + run_loop, + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(self.state.stream); + cf::CFRunLoopRun(); + } + } + + extern "C" fn trampoline( + stream_ref: fs::FSEventStreamRef, + info: *mut ::std::os::raw::c_void, + num: usize, // size_t numEvents + event_paths: *mut ::std::os::raw::c_void, // void *eventPaths + event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] + event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] + ) { + unsafe { + let event_paths = event_paths as *const *const ::std::os::raw::c_char; + let e_ptr = event_flags as *mut u32; + let i_ptr = event_ids as *mut u64; + let state = (info as *mut State).as_mut().unwrap(); + let callback = if let Some(callback) = state.callback.as_mut() { + callback + } else { + return; + }; + + let paths = slice::from_raw_parts(event_paths, num); + let flags = slice::from_raw_parts_mut(e_ptr, num); + let ids = slice::from_raw_parts_mut(i_ptr, num); + let mut stream_restarted = false; + + // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel + // or our code couldn't keep up with the sheer volume of file-system events that were + // generated. If we observed a valid event before this happens, we'll try to read the + // file-system journal by stopping the current stream and creating a new one starting at + // such event. Otherwise, we'll let invoke the callback with the dropped event, which + // will likely perform a re-scan of one of the root directories. + if flags + .iter() + .copied() + .filter_map(StreamFlags::from_bits) + .any(|flags| { + flags.contains(StreamFlags::USER_DROPPED) + || flags.contains(StreamFlags::KERNEL_DROPPED) + }) + { + if let Some(last_valid_event_id) = state.last_valid_event_id.take() { + fs::FSEventStreamStop(state.stream); + fs::FSEventStreamInvalidate(state.stream); + fs::FSEventStreamRelease(state.stream); + + let stream_context = fs::FSEventStreamContext { + version: 0, + info, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + state.paths, + last_valid_event_id, + state.latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + + state.stream = stream; + fs::FSEventStreamScheduleWithRunLoop( + state.stream, + cf::CFRunLoopGetCurrent(), + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(state.stream); + stream_restarted = true; + } + } + + if !stream_restarted { + let mut events = Vec::with_capacity(num); + for p in 0..num { + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + if !flag.contains(StreamFlags::HISTORY_DONE) { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + let event = Event { + event_id: ids[p], + flags: flag, + path, + }; + state.last_valid_event_id = Some(event.event_id); + events.push(event); + } + } else { + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + } + } + + if !events.is_empty() && !callback(events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } + } + } + } +} + +impl Drop for Handle { + fn drop(&mut self) { + let mut state = self.0.lock(); + if let Lifecycle::Running(run_loop) = *state { + unsafe { + cf::CFRunLoopStop(run_loop); + } + } + *state = Lifecycle::Stopped; + } +} + +#[link(name = "CoreServices", kind = "framework")] +extern "C" { + pub fn FSEventsGetCurrentEventId() -> u64; +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{fs, sync::mpsc, thread, time::Duration}; + + #[test] + fn test_event_stream_simple() { + for _ in 0..3 { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); + } + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_delayed_start() { + for _ in 0..3 { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); + } + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + + // Delay the call to `run` in order to make sure we don't miss any events that occur + // between creating the `EventStream` and calling `run`. + thread::spawn(move || { + thread::sleep(Duration::from_millis(100)); + stream.run(move |events| tx.send(events.to_vec()).is_ok()) + }); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_shutdown_by_dropping_handle() { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + flush_historical_events(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || { + stream.run({ + let tx = tx.clone(); + move |_| { + tx.send("running").unwrap(); + true + } + }); + tx.send("stopped").unwrap(); + }); + + fs::write(path.join("new-file"), "").unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running"); + + // Dropping the handle causes `EventStream::run` to return. + drop(handle); + assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped"); + } + + #[test] + fn test_event_stream_shutdown_before_run() { + let dir = tempfile::Builder::new() + .prefix("test-event-stream") + .tempdir() + .unwrap(); + let path = dir.path().canonicalize().unwrap(); + + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + drop(handle); + + // This returns immediately because the handle was already dropped. + stream.run(|_| true); + } + + fn flush_historical_events() { + let duration = if std::env::var("CI").is_ok() { + Duration::from_secs(2) + } else { + Duration::from_millis(500) + }; + thread::sleep(duration); + } +}