Translate notify::Event to fsevent::Event on linux (#7545)

This isn't exactly a great solution, but it's a step in the right
direction, and it's simple allowing us to quickly unblock linux. Without
this (or an equivalent) PR linux builds are broken.

I spent a bunch of time investigating using notify on macos, and have a
branch with that working and FakeFs updated to use notify events.
unfortunately I think this would come with some drawbacks. Primarily
that files that don't yet exist yet aren't handled as well as with using
events directly leading to some less than ideal tradeoffs.

This PR is very much a placeholder for a better cross platform solution.
Most problematically, it only fills in the portion of fsevent::Event
that is currently used, despite there being a lot more information in
the ones collected from macos. At the very least a followup PR should
hide those implementation details behind a cross platform Event type so
that if people try and access data that hasn't been translated, they
find out about it.

Release Notes:

- N/A
This commit is contained in:
gmorenz 2024-02-08 11:35:37 -05:00 committed by GitHub
parent 006e7a77d5
commit 17c203fef9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 416 additions and 406 deletions

View File

@ -10,6 +10,7 @@ path = "src/fs.rs"
[dependencies]
collections.workspace = true
fsevent.workspace = true
rope.workspace = true
text.workspace = true
util.workspace = true
@ -33,9 +34,6 @@ time.workspace = true
gpui = { workspace = true, optional = true}
[target.'cfg(target_os = "macos")'.dependencies]
fsevent.workspace = true
[target.'cfg(not(target_os = "macos"))'.dependencies]
notify = "6.1.1"

View File

@ -1,15 +1,15 @@
pub mod repository;
use anyhow::{anyhow, Result};
#[cfg(target_os = "macos")]
pub use fsevent::Event;
#[cfg(target_os = "macos")]
use fsevent::EventStream;
#[cfg(not(target_os = "macos"))]
pub use notify::Event;
use fsevent::StreamFlags;
#[cfg(not(target_os = "macos"))]
use notify::{Config, Watcher};
use notify::{Config, EventKind, Watcher};
use futures::{future::BoxFuture, Stream, StreamExt};
use git2::Repository as LibGitRepository;
@ -292,15 +292,30 @@ impl Fs for RealFs {
return Box::pin(rx);
}
let mut watcher = notify::recommended_watcher(move |res| match res {
Ok(event) => {
let _ = tx.try_send(vec![event]);
}
Err(err) => {
log::error!("watch error: {}", err);
}
})
.unwrap();
let mut watcher =
notify::recommended_watcher(move |res: Result<notify::Event, _>| match res {
Ok(event) => {
let flags = match event.kind {
// ITEM_REMOVED is currently the only flag we care about
EventKind::Remove(_) => StreamFlags::ITEM_REMOVED,
_ => StreamFlags::NONE,
};
let events = event
.paths
.into_iter()
.map(|path| Event {
event_id: 0,
flags,
path,
})
.collect::<Vec<_>>();
let _ = tx.try_send(events);
}
Err(err) => {
log::error!("watch error: {}", err);
}
})
.unwrap();
watcher
.configure(Config::default().with_poll_interval(latency))
@ -330,20 +345,10 @@ impl Fs for RealFs {
}
}
#[cfg(target_os = "macos")]
pub fn fs_events_paths(events: Vec<Event>) -> Vec<PathBuf> {
events.into_iter().map(|event| event.path).collect()
}
#[cfg(not(target_os = "macos"))]
pub fn fs_events_paths(events: Vec<Event>) -> Vec<PathBuf> {
events
.into_iter()
.map(|event| event.paths.into_iter())
.flatten()
.collect()
}
#[cfg(any(test, feature = "test-support"))]
pub struct FakeFs {
// Use an unfair lock to ensure tests are deterministic.

View File

@ -1,17 +1,11 @@
#![cfg(target_os = "macos")]
#[cfg(target_os = "macos")]
pub use mac_impl::*;
use bitflags::bitflags;
use fsevent_sys::{self as fs, core_foundation as cf};
use parking_lot::Mutex;
use std::{
convert::AsRef,
ffi::{c_void, CStr, OsStr},
os::unix::ffi::OsStrExt,
path::{Path, PathBuf},
ptr, slice,
sync::Arc,
time::Duration,
};
use std::path::PathBuf;
#[cfg(target_os = "macos")]
mod mac_impl;
#[derive(Clone, Debug)]
pub struct Event {
@ -20,240 +14,6 @@ pub struct Event {
pub path: PathBuf,
}
pub struct EventStream {
lifecycle: Arc<Mutex<Lifecycle>>,
state: Box<State>,
}
struct State {
latency: Duration,
paths: cf::CFMutableArrayRef,
callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
last_valid_event_id: Option<fs::FSEventStreamEventId>,
stream: fs::FSEventStreamRef,
}
impl Drop for State {
fn drop(&mut self) {
unsafe {
cf::CFRelease(self.paths);
fs::FSEventStreamStop(self.stream);
fs::FSEventStreamInvalidate(self.stream);
fs::FSEventStreamRelease(self.stream);
}
}
}
enum Lifecycle {
New,
Running(cf::CFRunLoopRef),
Stopped,
}
pub struct Handle(Arc<Mutex<Lifecycle>>);
unsafe impl Send for EventStream {}
unsafe impl Send for Lifecycle {}
impl EventStream {
pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
unsafe {
let cf_paths =
cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
assert!(!cf_paths.is_null());
for path in paths {
let path_bytes = path.as_os_str().as_bytes();
let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
cf::kCFAllocatorDefault,
path_bytes.as_ptr() as *const i8,
path_bytes.len() as cf::CFIndex,
false,
);
let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
cf::CFArrayAppendValue(cf_paths, cf_path);
cf::CFRelease(cf_path);
cf::CFRelease(cf_url);
}
let mut state = Box::new(State {
latency,
paths: cf_paths,
callback: None,
last_valid_event_id: None,
stream: ptr::null_mut(),
});
let stream_context = fs::FSEventStreamContext {
version: 0,
info: state.as_ref() as *const _ as *mut c_void,
retain: None,
release: None,
copy_description: None,
};
let stream = fs::FSEventStreamCreate(
cf::kCFAllocatorDefault,
Self::trampoline,
&stream_context,
cf_paths,
FSEventsGetCurrentEventId(),
latency.as_secs_f64(),
fs::kFSEventStreamCreateFlagFileEvents
| fs::kFSEventStreamCreateFlagNoDefer
| fs::kFSEventStreamCreateFlagWatchRoot,
);
state.stream = stream;
let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
(
EventStream {
lifecycle: lifecycle.clone(),
state,
},
Handle(lifecycle),
)
}
}
pub fn run<F>(mut self, f: F)
where
F: FnMut(Vec<Event>) -> bool + 'static,
{
self.state.callback = Some(Box::new(f));
unsafe {
let run_loop = cf::CFRunLoopGetCurrent();
{
let mut state = self.lifecycle.lock();
match *state {
Lifecycle::New => *state = Lifecycle::Running(run_loop),
Lifecycle::Running(_) => unreachable!(),
Lifecycle::Stopped => return,
}
}
fs::FSEventStreamScheduleWithRunLoop(
self.state.stream,
run_loop,
cf::kCFRunLoopDefaultMode,
);
fs::FSEventStreamStart(self.state.stream);
cf::CFRunLoopRun();
}
}
extern "C" fn trampoline(
stream_ref: fs::FSEventStreamRef,
info: *mut ::std::os::raw::c_void,
num: usize, // size_t numEvents
event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
) {
unsafe {
let event_paths = event_paths as *const *const ::std::os::raw::c_char;
let e_ptr = event_flags as *mut u32;
let i_ptr = event_ids as *mut u64;
let state = (info as *mut State).as_mut().unwrap();
let callback = if let Some(callback) = state.callback.as_mut() {
callback
} else {
return;
};
let paths = slice::from_raw_parts(event_paths, num);
let flags = slice::from_raw_parts_mut(e_ptr, num);
let ids = slice::from_raw_parts_mut(i_ptr, num);
let mut stream_restarted = false;
// Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
// or our code couldn't keep up with the sheer volume of file-system events that were
// generated. If we observed a valid event before this happens, we'll try to read the
// file-system journal by stopping the current stream and creating a new one starting at
// such event. Otherwise, we'll let invoke the callback with the dropped event, which
// will likely perform a re-scan of one of the root directories.
if flags
.iter()
.copied()
.filter_map(StreamFlags::from_bits)
.any(|flags| {
flags.contains(StreamFlags::USER_DROPPED)
|| flags.contains(StreamFlags::KERNEL_DROPPED)
})
{
if let Some(last_valid_event_id) = state.last_valid_event_id.take() {
fs::FSEventStreamStop(state.stream);
fs::FSEventStreamInvalidate(state.stream);
fs::FSEventStreamRelease(state.stream);
let stream_context = fs::FSEventStreamContext {
version: 0,
info,
retain: None,
release: None,
copy_description: None,
};
let stream = fs::FSEventStreamCreate(
cf::kCFAllocatorDefault,
Self::trampoline,
&stream_context,
state.paths,
last_valid_event_id,
state.latency.as_secs_f64(),
fs::kFSEventStreamCreateFlagFileEvents
| fs::kFSEventStreamCreateFlagNoDefer
| fs::kFSEventStreamCreateFlagWatchRoot,
);
state.stream = stream;
fs::FSEventStreamScheduleWithRunLoop(
state.stream,
cf::CFRunLoopGetCurrent(),
cf::kCFRunLoopDefaultMode,
);
fs::FSEventStreamStart(state.stream);
stream_restarted = true;
}
}
if !stream_restarted {
let mut events = Vec::with_capacity(num);
for p in 0..num {
if let Some(flag) = StreamFlags::from_bits(flags[p]) {
if !flag.contains(StreamFlags::HISTORY_DONE) {
let path_c_str = CStr::from_ptr(paths[p]);
let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
let event = Event {
event_id: ids[p],
flags: flag,
path,
};
state.last_valid_event_id = Some(event.event_id);
events.push(event);
}
} else {
debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
}
}
if !events.is_empty() && !callback(events) {
fs::FSEventStreamStop(stream_ref);
cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
}
}
}
}
}
impl Drop for Handle {
fn drop(&mut self) {
let mut state = self.0.lock();
if let Lifecycle::Running(run_loop) = *state {
unsafe {
cf::CFRunLoopStop(run_loop);
}
}
*state = Lifecycle::Stopped;
}
}
// Synchronize with
// /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h
bitflags! {
@ -360,138 +120,3 @@ impl std::fmt::Display for StreamFlags {
write!(f, "")
}
}
#[link(name = "CoreServices", kind = "framework")]
extern "C" {
pub fn FSEventsGetCurrentEventId() -> u64;
}
#[cfg(test)]
mod tests {
use super::*;
use std::{fs, sync::mpsc, thread, time::Duration};
#[test]
fn test_event_stream_simple() {
for _ in 0..3 {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
for i in 0..10 {
fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
}
flush_historical_events();
let (tx, rx) = mpsc::channel();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
fs::write(path.join("new-file"), "").unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("new-file"));
assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
fs::remove_file(path.join("existing-file-5")).unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("existing-file-5"));
assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
drop(handle);
}
}
#[test]
fn test_event_stream_delayed_start() {
for _ in 0..3 {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
for i in 0..10 {
fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
}
flush_historical_events();
let (tx, rx) = mpsc::channel();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
// Delay the call to `run` in order to make sure we don't miss any events that occur
// between creating the `EventStream` and calling `run`.
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
stream.run(move |events| tx.send(events.to_vec()).is_ok())
});
fs::write(path.join("new-file"), "").unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("new-file"));
assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
fs::remove_file(path.join("existing-file-5")).unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("existing-file-5"));
assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
drop(handle);
}
}
#[test]
fn test_event_stream_shutdown_by_dropping_handle() {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
flush_historical_events();
let (tx, rx) = mpsc::channel();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
thread::spawn(move || {
stream.run({
let tx = tx.clone();
move |_| {
tx.send("running").unwrap();
true
}
});
tx.send("stopped").unwrap();
});
fs::write(path.join("new-file"), "").unwrap();
assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
// Dropping the handle causes `EventStream::run` to return.
drop(handle);
assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
}
#[test]
fn test_event_stream_shutdown_before_run() {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
drop(handle);
// This returns immediately because the handle was already dropped.
stream.run(|_| true);
}
fn flush_historical_events() {
let duration = if std::env::var("CI").is_ok() {
Duration::from_secs(2)
} else {
Duration::from_millis(500)
};
thread::sleep(duration);
}
}

View File

@ -0,0 +1,382 @@
use fsevent_sys::{self as fs, core_foundation as cf};
use parking_lot::Mutex;
use std::{
convert::AsRef,
ffi::{c_void, CStr, OsStr},
os::unix::ffi::OsStrExt,
path::{Path, PathBuf},
ptr, slice,
sync::Arc,
time::Duration,
};
use crate::{Event, StreamFlags};
pub struct EventStream {
lifecycle: Arc<Mutex<Lifecycle>>,
state: Box<State>,
}
struct State {
latency: Duration,
paths: cf::CFMutableArrayRef,
callback: Option<Box<dyn FnMut(Vec<Event>) -> bool>>,
last_valid_event_id: Option<fs::FSEventStreamEventId>,
stream: fs::FSEventStreamRef,
}
impl Drop for State {
fn drop(&mut self) {
unsafe {
cf::CFRelease(self.paths);
fs::FSEventStreamStop(self.stream);
fs::FSEventStreamInvalidate(self.stream);
fs::FSEventStreamRelease(self.stream);
}
}
}
enum Lifecycle {
New,
Running(cf::CFRunLoopRef),
Stopped,
}
pub struct Handle(Arc<Mutex<Lifecycle>>);
unsafe impl Send for EventStream {}
unsafe impl Send for Lifecycle {}
impl EventStream {
pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) {
unsafe {
let cf_paths =
cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks);
assert!(!cf_paths.is_null());
for path in paths {
let path_bytes = path.as_os_str().as_bytes();
let cf_url = cf::CFURLCreateFromFileSystemRepresentation(
cf::kCFAllocatorDefault,
path_bytes.as_ptr() as *const i8,
path_bytes.len() as cf::CFIndex,
false,
);
let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle);
cf::CFArrayAppendValue(cf_paths, cf_path);
cf::CFRelease(cf_path);
cf::CFRelease(cf_url);
}
let mut state = Box::new(State {
latency,
paths: cf_paths,
callback: None,
last_valid_event_id: None,
stream: ptr::null_mut(),
});
let stream_context = fs::FSEventStreamContext {
version: 0,
info: state.as_ref() as *const _ as *mut c_void,
retain: None,
release: None,
copy_description: None,
};
let stream = fs::FSEventStreamCreate(
cf::kCFAllocatorDefault,
Self::trampoline,
&stream_context,
cf_paths,
FSEventsGetCurrentEventId(),
latency.as_secs_f64(),
fs::kFSEventStreamCreateFlagFileEvents
| fs::kFSEventStreamCreateFlagNoDefer
| fs::kFSEventStreamCreateFlagWatchRoot,
);
state.stream = stream;
let lifecycle = Arc::new(Mutex::new(Lifecycle::New));
(
EventStream {
lifecycle: lifecycle.clone(),
state,
},
Handle(lifecycle),
)
}
}
pub fn run<F>(mut self, f: F)
where
F: FnMut(Vec<Event>) -> bool + 'static,
{
self.state.callback = Some(Box::new(f));
unsafe {
let run_loop = cf::CFRunLoopGetCurrent();
{
let mut state = self.lifecycle.lock();
match *state {
Lifecycle::New => *state = Lifecycle::Running(run_loop),
Lifecycle::Running(_) => unreachable!(),
Lifecycle::Stopped => return,
}
}
fs::FSEventStreamScheduleWithRunLoop(
self.state.stream,
run_loop,
cf::kCFRunLoopDefaultMode,
);
fs::FSEventStreamStart(self.state.stream);
cf::CFRunLoopRun();
}
}
extern "C" fn trampoline(
stream_ref: fs::FSEventStreamRef,
info: *mut ::std::os::raw::c_void,
num: usize, // size_t numEvents
event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
) {
unsafe {
let event_paths = event_paths as *const *const ::std::os::raw::c_char;
let e_ptr = event_flags as *mut u32;
let i_ptr = event_ids as *mut u64;
let state = (info as *mut State).as_mut().unwrap();
let callback = if let Some(callback) = state.callback.as_mut() {
callback
} else {
return;
};
let paths = slice::from_raw_parts(event_paths, num);
let flags = slice::from_raw_parts_mut(e_ptr, num);
let ids = slice::from_raw_parts_mut(i_ptr, num);
let mut stream_restarted = false;
// Sometimes FSEvents reports a "dropped" event, an indication that either the kernel
// or our code couldn't keep up with the sheer volume of file-system events that were
// generated. If we observed a valid event before this happens, we'll try to read the
// file-system journal by stopping the current stream and creating a new one starting at
// such event. Otherwise, we'll let invoke the callback with the dropped event, which
// will likely perform a re-scan of one of the root directories.
if flags
.iter()
.copied()
.filter_map(StreamFlags::from_bits)
.any(|flags| {
flags.contains(StreamFlags::USER_DROPPED)
|| flags.contains(StreamFlags::KERNEL_DROPPED)
})
{
if let Some(last_valid_event_id) = state.last_valid_event_id.take() {
fs::FSEventStreamStop(state.stream);
fs::FSEventStreamInvalidate(state.stream);
fs::FSEventStreamRelease(state.stream);
let stream_context = fs::FSEventStreamContext {
version: 0,
info,
retain: None,
release: None,
copy_description: None,
};
let stream = fs::FSEventStreamCreate(
cf::kCFAllocatorDefault,
Self::trampoline,
&stream_context,
state.paths,
last_valid_event_id,
state.latency.as_secs_f64(),
fs::kFSEventStreamCreateFlagFileEvents
| fs::kFSEventStreamCreateFlagNoDefer
| fs::kFSEventStreamCreateFlagWatchRoot,
);
state.stream = stream;
fs::FSEventStreamScheduleWithRunLoop(
state.stream,
cf::CFRunLoopGetCurrent(),
cf::kCFRunLoopDefaultMode,
);
fs::FSEventStreamStart(state.stream);
stream_restarted = true;
}
}
if !stream_restarted {
let mut events = Vec::with_capacity(num);
for p in 0..num {
if let Some(flag) = StreamFlags::from_bits(flags[p]) {
if !flag.contains(StreamFlags::HISTORY_DONE) {
let path_c_str = CStr::from_ptr(paths[p]);
let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes()));
let event = Event {
event_id: ids[p],
flags: flag,
path,
};
state.last_valid_event_id = Some(event.event_id);
events.push(event);
}
} else {
debug_assert!(false, "unknown flag set for fs event: {}", flags[p]);
}
}
if !events.is_empty() && !callback(events) {
fs::FSEventStreamStop(stream_ref);
cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
}
}
}
}
}
impl Drop for Handle {
fn drop(&mut self) {
let mut state = self.0.lock();
if let Lifecycle::Running(run_loop) = *state {
unsafe {
cf::CFRunLoopStop(run_loop);
}
}
*state = Lifecycle::Stopped;
}
}
#[link(name = "CoreServices", kind = "framework")]
extern "C" {
pub fn FSEventsGetCurrentEventId() -> u64;
}
#[cfg(test)]
mod tests {
use super::*;
use std::{fs, sync::mpsc, thread, time::Duration};
#[test]
fn test_event_stream_simple() {
for _ in 0..3 {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
for i in 0..10 {
fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
}
flush_historical_events();
let (tx, rx) = mpsc::channel();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok()));
fs::write(path.join("new-file"), "").unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("new-file"));
assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
fs::remove_file(path.join("existing-file-5")).unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("existing-file-5"));
assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
drop(handle);
}
}
#[test]
fn test_event_stream_delayed_start() {
for _ in 0..3 {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
for i in 0..10 {
fs::write(path.join(format!("existing-file-{}", i)), "").unwrap();
}
flush_historical_events();
let (tx, rx) = mpsc::channel();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
// Delay the call to `run` in order to make sure we don't miss any events that occur
// between creating the `EventStream` and calling `run`.
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
stream.run(move |events| tx.send(events.to_vec()).is_ok())
});
fs::write(path.join("new-file"), "").unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("new-file"));
assert!(event.flags.contains(StreamFlags::ITEM_CREATED));
fs::remove_file(path.join("existing-file-5")).unwrap();
let events = rx.recv_timeout(Duration::from_secs(2)).unwrap();
let event = events.last().unwrap();
assert_eq!(event.path, path.join("existing-file-5"));
assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
drop(handle);
}
}
#[test]
fn test_event_stream_shutdown_by_dropping_handle() {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
flush_historical_events();
let (tx, rx) = mpsc::channel();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
thread::spawn(move || {
stream.run({
let tx = tx.clone();
move |_| {
tx.send("running").unwrap();
true
}
});
tx.send("stopped").unwrap();
});
fs::write(path.join("new-file"), "").unwrap();
assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running");
// Dropping the handle causes `EventStream::run` to return.
drop(handle);
assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped");
}
#[test]
fn test_event_stream_shutdown_before_run() {
let dir = tempfile::Builder::new()
.prefix("test-event-stream")
.tempdir()
.unwrap();
let path = dir.path().canonicalize().unwrap();
let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50));
drop(handle);
// This returns immediately because the handle was already dropped.
stream.run(|_| true);
}
fn flush_historical_events() {
let duration = if std::env::var("CI").is_ok() {
Duration::from_secs(2)
} else {
Duration::from_millis(500)
};
thread::sleep(duration);
}
}