Revert "fix: performance issue "

This commit is contained in:
Kiril Videlov 2024-05-30 20:10:23 +02:00 committed by GitHub
parent e908828b3a
commit 2e788de86f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 59 additions and 12 deletions

View File

@ -108,7 +108,6 @@ impl FileIdCache for FileIdMap {
}
fn remove_path(&mut self, path: &Path) {
println!("n_paths {:?}", self.paths.len());
self.paths.retain(|p, _| !p.starts_with(path));
}
}

View File

@ -146,7 +146,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
/// Retrieve a vec of debounced events, removing them if not continuous
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
pub fn debounced_events(&mut self, flush_all: bool) -> Vec<DebouncedEvent> {
let now = Instant::now();
let mut events_expired = Vec::with_capacity(self.queues.len());
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
@ -163,6 +163,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
for (path, mut queue) in self.queues.drain() {
let mut kind_index = HashMap::new();
tracing::trace!("Checking path: {:?}", path);
while let Some(event) = queue.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.timeout {
// remove previous event of the same kind
@ -178,6 +179,9 @@ impl<T: FileIdCache> DebounceDataInner<T> {
kind_index.insert(event.kind, events_expired.len());
events_expired.push(event);
} else if flush_all {
tracing::trace!("Flushing event! {:?}", event.event);
events_expired.push(event);
} else {
queue.events.push_front(event);
@ -202,6 +206,10 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
});
for event in &events_expired {
tracing::trace!("Dispatching event: {:?}", event.event);
}
events_expired
}
@ -468,10 +476,11 @@ impl<T: FileIdCache> DebounceDataInner<T> {
#[derive(Debug)]
pub struct Debouncer<T: Watcher, C: FileIdCache> {
watcher: T,
#[allow(dead_code)]
debouncer_thread: Option<std::thread::JoinHandle<()>>,
#[allow(dead_code)]
data: DebounceData<C>,
stop: Arc<AtomicBool>,
flush: Arc<AtomicBool>,
}
impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
@ -520,6 +529,11 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
self.add_root(path.as_ref(), recursive_mode);
Ok(())
}
/// Indicates that on the next tick of the debouncer thread, all events should be emitted.
pub fn flush_nonblocking(&self) {
self.flush.store(true, Ordering::Relaxed);
}
}
impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
@ -536,12 +550,14 @@ impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
mut event_handler: F,
file_id_cache: C,
config: notify::Config,
) -> Result<Debouncer<T, C>, Error> {
let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
let stop = Arc::new(AtomicBool::new(false));
let flush = Arc::new(AtomicBool::new(false));
let tick_div = 4;
let tick = match tick_rate {
@ -564,21 +580,52 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
let data_c = data.clone();
let stop_c = stop.clone();
let flush_c = flush.clone();
let mut idle_count = 0;
let mut prev_queue_count = 0;
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || loop {
if stop_c.load(Ordering::Acquire) {
break;
}
let mut should_flush = flush_c.load(Ordering::Acquire);
std::thread::sleep(tick);
let send_data;
let errors;
{
let mut lock = data_c.lock();
send_data = lock.debounced_events();
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
if prev_queue_count == queue_count {
idle_count += 1;
} else {
prev_queue_count = queue_count
}
if let Some(threshold) = flush_after {
if idle_count >= threshold {
idle_count = 0;
prev_queue_count = 0;
should_flush = true;
}
}
send_data = lock.debounced_events(should_flush);
if should_flush {
flush_c.store(false, Ordering::Release);
}
errors = lock.errors();
}
if !send_data.is_empty() {
if should_flush {
tracing::trace!("Flushed {} events", send_data.len());
}
event_handler.handle_event(Ok(send_data));
}
if !errors.is_empty() {
@ -605,6 +652,7 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
debouncer_thread: Some(thread),
data,
stop,
flush,
};
Ok(guard)
@ -618,11 +666,13 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
pub fn new_debouncer<F: DebounceEventHandler>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
event_handler: F,
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
new_debouncer_opt::<F, RecommendedWatcher, FileIdMap>(
timeout,
tick_rate,
flush_after,
event_handler,
FileIdMap::default(),
notify::Config::default(),

View File

@ -160,7 +160,7 @@ fn state(
.collect::<Vec<_>>();
assert_eq!(
state.debounced_events(),
state.debounced_events(false),
events,
"debounced events after a `{delay}` delay"
);
@ -170,9 +170,9 @@ fn state(
mod schema;
mod utils {
use crate::debouncer::FileIdCache;
use notify::RecursiveMode;
use file_id::FileId;
use notify::RecursiveMode;
use std::collections::HashMap;
use std::path::{Path, PathBuf};

View File

@ -25,7 +25,7 @@ const TICK_RATE: Duration = Duration::from_millis(250);
// arriving) before we will automatically flush pending events. This means that
// after the disk is quiet for TICK_RATE * FLUSH_AFTER_EMPTY, we will process
// the pending events, even if DEBOUNCE_TIMEOUT hasn't expired yet
// const FLUSH_AFTER_EMPTY: u32 = 3;
const FLUSH_AFTER_EMPTY: u32 = 3;
/// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`]
/// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait.
@ -60,8 +60,7 @@ pub fn spawn(
let mut debouncer = new_debouncer(
DEBOUNCE_TIMEOUT,
Some(TICK_RATE),
// TODO: re-enable this
// Some(FLUSH_AFTER_EMPTY),
Some(FLUSH_AFTER_EMPTY),
notify_tx,
)
.context("failed to create debouncer")?;

View File

@ -82,7 +82,6 @@ pub fn watch_in_background(
let (events_out, mut events_in) = unbounded_channel();
let (flush_tx, mut flush_rx) = unbounded_channel();
#[allow(unused_variables)]
let debounce = file_monitor::spawn(project_id, worktree_path.as_ref(), events_out.clone())?;
let cancellation_token = CancellationToken::new();
@ -111,8 +110,7 @@ pub fn watch_in_background(
tokio::select! {
Some(event) = events_in.recv() => handle_event(event)?,
Some(_signal_flush) = flush_rx.recv() => {
// TODO: re-add this
// debounce.flush_nonblocking();
debounce.flush_nonblocking();
}
() = cancellation_token.cancelled() => {
break;

View File

@ -18,6 +18,7 @@
}
cache: {
/watch/parent: 1
/watch/parent/child: 2
}
}
}