revert debouncer to original version from 0.3.1

That way it becomes obvious which changes exactly are needed to make
the flush-functionality working.

This commit will be followed up by a revert to get the flush-changes
back.
This commit is contained in:
Sebastian Thiel 2024-06-02 14:13:26 +02:00
parent 659fc99af0
commit 3eaaa7f62d
No known key found for this signature in database
GPG Key ID: 9CB5EE7895E8268B
3 changed files with 16 additions and 62 deletions

View File

@ -83,7 +83,6 @@ impl FileIdMap {
/// If `recursive_mode` is `Recursive`, all children will be added to the cache as well
/// and all paths will be kept up-to-date in case of changes like new files being added,
/// files being removed or renamed.
#[allow(dead_code)]
pub fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
let path = path.into();
@ -95,7 +94,6 @@ impl FileIdMap {
/// Remove a path form the cache.
///
/// If the path was added with `Recursive` mode, all children will also be removed from the cache.
#[allow(dead_code)]
pub fn remove_root(&mut self, path: impl AsRef<Path>) {
self.roots.retain(|(root, _)| !root.starts_with(&path));

View File

@ -59,7 +59,7 @@ use notify::{
event::{ModifyKind, RemoveKind, RenameMode},
Error, ErrorKind, Event, EventKind, RecommendedWatcher, Watcher,
};
use parking_lot::Mutex;
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
#[cfg(test)]
use mock_instant::Instant;
@ -172,13 +172,14 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
/// Retrieve a vec of debounced events, removing them if not continuous
pub fn debounced_events(&mut self, flush_all: bool) -> Vec<DebouncedEvent> {
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
let now = Instant::now();
let mut events_expired = Vec::with_capacity(self.queues.len());
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
if let Some(event) = self.rescan_event.take() {
if now.saturating_duration_since(event.time) >= self.timeout {
log::trace!("debounced event: {event:?}");
events_expired.push(event);
} else {
self.rescan_event = Some(event);
@ -204,9 +205,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
kind_index.insert(event.kind, events_expired.len());
events_expired.push(event);
} else if flush_all {
tracing::trace!("Flushing event! {:?}", event.event);
events_expired.push(event);
} else {
queue.events.push_front(event);
@ -231,10 +229,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
});
for event in &events_expired {
tracing::trace!("Dispatching event: {:?}", event.event);
}
events_expired
}
@ -252,6 +246,8 @@ impl<T: FileIdCache> DebounceDataInner<T> {
/// Add new event to debouncer cache
pub fn add_event(&mut self, event: Event) {
log::trace!("raw event: {event:?}");
if event.need_rescan() {
self.cache.rescan();
self.rescan_event = Some(event.into());
@ -487,16 +483,13 @@ impl<T: FileIdCache> DebounceDataInner<T> {
pub struct Debouncer<T: Watcher, C: FileIdCache> {
watcher: T,
debouncer_thread: Option<std::thread::JoinHandle<()>>,
#[allow(dead_code)]
data: DebounceData<C>,
stop: Arc<AtomicBool>,
flush: Arc<AtomicBool>,
}
impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
/// Stop the debouncer, waits for the event thread to finish.
/// May block for the duration of one tick_rate.
#[allow(dead_code)]
pub fn stop(mut self) {
self.set_stop();
if let Some(t) = self.debouncer_thread.take() {
@ -505,7 +498,6 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
}
/// Stop the debouncer, does not wait for the event thread to finish.
#[allow(dead_code)]
pub fn stop_nonblocking(self) {
self.set_stop();
}
@ -514,15 +506,15 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
self.stop.store(true, Ordering::Relaxed);
}
/// Indicates that on the next tick of the debouncer thread, all events should be emitted.
pub fn flush_nonblocking(&self) {
self.flush.store(true, Ordering::Relaxed);
}
/// Access to the internally used notify Watcher backend
pub fn watcher(&mut self) -> &mut T {
&mut self.watcher
}
/// Access to the internally used notify Watcher backend
pub fn cache(&mut self) -> MappedMutexGuard<'_, C> {
MutexGuard::map(self.data.lock(), |data| &mut data.cache)
}
}
impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
@ -539,14 +531,12 @@ impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
mut event_handler: F,
file_id_cache: C,
config: notify::Config,
) -> Result<Debouncer<T, C>, Error> {
let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
let stop = Arc::new(AtomicBool::new(false));
let flush = Arc::new(AtomicBool::new(false));
let tick_div = 4;
let tick = match tick_rate {
@ -569,52 +559,21 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
let data_c = data.clone();
let stop_c = stop.clone();
let flush_c = flush.clone();
let mut idle_count = 0;
let mut prev_queue_count = 0;
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || loop {
if stop_c.load(Ordering::Acquire) {
break;
}
let mut should_flush = flush_c.load(Ordering::Acquire);
std::thread::sleep(tick);
let send_data;
let errors;
{
let mut lock = data_c.lock();
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
if prev_queue_count == queue_count {
idle_count += 1;
} else {
prev_queue_count = queue_count
}
if let Some(threshold) = flush_after {
if idle_count >= threshold {
idle_count = 0;
prev_queue_count = 0;
should_flush = true;
}
}
send_data = lock.debounced_events(should_flush);
if should_flush {
flush_c.store(false, Ordering::Release);
}
send_data = lock.debounced_events();
errors = lock.errors();
}
if !send_data.is_empty() {
if should_flush {
tracing::debug!("Flushed {} events", send_data.len());
}
event_handler.handle_event(Ok(send_data));
}
if !errors.is_empty() {
@ -641,7 +600,6 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
debouncer_thread: Some(thread),
data,
stop,
flush,
};
Ok(guard)
@ -655,13 +613,11 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
pub fn new_debouncer<F: DebounceEventHandler>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
event_handler: F,
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
new_debouncer_opt::<F, RecommendedWatcher, FileIdMap>(
timeout,
tick_rate,
flush_after,
event_handler,
FileIdMap::new(),
notify::Config::default(),

View File

@ -79,7 +79,7 @@ fn state(
file_name: &str,
) {
let file_content =
std::fs::read_to_string(Path::new(&format!("tests/fixtures/{file_name}.hjson"))).unwrap();
fs::read_to_string(Path::new(&format!("./test_cases/{file_name}.hjson"))).unwrap();
let mut test_case = deser_hjson::from_str::<TestCase>(&file_content).unwrap();
MockClock::set_time(Duration::default());
@ -95,8 +95,8 @@ fn state(
}
for error in test_case.errors {
let error = error.into_notify_error();
state.add_error(error);
let e = error.into_notify_error();
state.add_error(e);
}
let expected_errors = std::mem::take(&mut test_case.expected.errors);
@ -137,7 +137,7 @@ fn state(
for (delay, events) in expected_events {
MockClock::set_time(backup_time);
state.queues.clone_from(&backup_queues);
state.queues = backup_queues.clone();
match delay.as_str() {
"none" => {}
@ -157,7 +157,7 @@ fn state(
.collect::<Vec<_>>();
assert_eq!(
state.debounced_events(false),
state.debounced_events(),
events,
"debounced events after a `{delay}` delay"
);