Add a new mechanism to flush on "idle"

Optionally, if we detect that no change notifications have appeared for
a certain number of ticks, we will automatically flush pending
notifications
This commit is contained in:
Ani Betts 2024-05-15 12:53:56 +02:00
parent 1c3d8d3997
commit 28de8135d2
No known key found for this signature in database
2 changed files with 44 additions and 4 deletions

View File

@ -206,6 +206,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
for (path, mut queue) in self.queues.drain() {
let mut kind_index = HashMap::new();
tracing::debug!("Checking path: {:?}", path);
while let Some(event) = queue.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.timeout {
// remove previous event of the same kind
@ -224,6 +225,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
events_expired.push(event);
} else {
if flush_all {
tracing::debug!("Flushing event! {:?}", event.event);
events_expired.push(event);
} else {
queue.events.push_front(event);
@ -249,6 +251,10 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
});
for event in &events_expired {
tracing::debug!("Dispatching event: {:?}", event.event);
}
events_expired
}
@ -266,6 +272,8 @@ impl<T: FileIdCache> DebounceDataInner<T> {
/// Add new event to debouncer cache
pub fn add_event(&mut self, event: Event) {
tracing::debug!("Received event: {:?}", event);
if event.need_rescan() {
self.cache.rescan();
self.rescan_event = Some(event.into());
@ -550,6 +558,7 @@ impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
mut event_handler: F,
file_id_cache: C,
config: notify::Config,
@ -580,6 +589,8 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
let data_c = data.clone();
let stop_c = stop.clone();
let flush_c = flush.clone();
let mut idle_count = 0;
let mut prev_queue_count = 0;
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || loop {
@ -587,7 +598,7 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
break;
}
let should_flush = flush_c.load(Ordering::Acquire);
let mut should_flush = flush_c.load(Ordering::Acquire);
std::thread::sleep(tick);
@ -595,6 +606,22 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
let errors;
{
let mut lock = data_c.lock();
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
if prev_queue_count == queue_count {
idle_count += 1;
} else {
prev_queue_count = queue_count
}
if let Some(threshold) = flush_after {
if idle_count >= threshold {
idle_count = 0;
prev_queue_count = 0;
should_flush = true;
}
}
send_data = lock.debounced_events(should_flush);
if should_flush {
flush_c.store(false, Ordering::Release);
@ -603,6 +630,10 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
errors = lock.errors();
}
if !send_data.is_empty() {
if should_flush {
tracing::debug!("Flushed {} events", send_data.len());
}
event_handler.handle_event(Ok(send_data));
}
if !errors.is_empty() {
@ -643,11 +674,13 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
pub fn new_debouncer<F: DebounceEventHandler>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
event_handler: F,
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
new_debouncer_opt::<F, RecommendedWatcher, FileIdMap>(
timeout,
tick_rate,
flush_after,
event_handler,
FileIdMap::new(),
notify::Config::default(),

View File

@ -11,7 +11,9 @@ use tracing::Level;
/// The timeout for debouncing file change events.
/// This is used to prevent multiple events from being sent for a single file change.
const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100);
const DEBOUNCE_TIMEOUT: Duration = Duration::from_secs(60);
const TICK_RATE: Duration = Duration::from_millis(250);
const FLUSH_AFTER_EMPTY: u32 = 3;
/// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`]
/// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait.
@ -43,8 +45,13 @@ pub fn spawn(
out: tokio::sync::mpsc::UnboundedSender<InternalEvent>,
) -> Result<()> {
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let mut debouncer =
new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx).context("failed to create debouncer")?;
let mut debouncer = new_debouncer(
DEBOUNCE_TIMEOUT,
Some(TICK_RATE),
Some(FLUSH_AFTER_EMPTY),
notify_tx,
)
.context("failed to create debouncer")?;
let policy = backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(std::time::Duration::from_secs(30)))