mirror of
https://github.com/gitbutlerapp/gitbutler.git
synced 2024-11-28 22:03:30 +03:00
Merge pull request #3916 from gitbutlerapp/revert-3915-remove-idle-check-and-flush
Revert "fix: performance issue "
This commit is contained in:
commit
0da06a1551
@ -108,7 +108,6 @@ impl FileIdCache for FileIdMap {
|
||||
}
|
||||
|
||||
fn remove_path(&mut self, path: &Path) {
|
||||
println!("n_paths {:?}", self.paths.len());
|
||||
self.paths.retain(|p, _| !p.starts_with(path));
|
||||
}
|
||||
}
|
||||
|
@ -146,7 +146,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
}
|
||||
|
||||
/// Retrieve a vec of debounced events, removing them if not continuous
|
||||
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
|
||||
pub fn debounced_events(&mut self, flush_all: bool) -> Vec<DebouncedEvent> {
|
||||
let now = Instant::now();
|
||||
let mut events_expired = Vec::with_capacity(self.queues.len());
|
||||
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
|
||||
@ -163,6 +163,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
for (path, mut queue) in self.queues.drain() {
|
||||
let mut kind_index = HashMap::new();
|
||||
|
||||
tracing::trace!("Checking path: {:?}", path);
|
||||
while let Some(event) = queue.events.pop_front() {
|
||||
if now.saturating_duration_since(event.time) >= self.timeout {
|
||||
// remove previous event of the same kind
|
||||
@ -178,6 +179,9 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
|
||||
kind_index.insert(event.kind, events_expired.len());
|
||||
|
||||
events_expired.push(event);
|
||||
} else if flush_all {
|
||||
tracing::trace!("Flushing event! {:?}", event.event);
|
||||
events_expired.push(event);
|
||||
} else {
|
||||
queue.events.push_front(event);
|
||||
@ -202,6 +206,10 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
}
|
||||
});
|
||||
|
||||
for event in &events_expired {
|
||||
tracing::trace!("Dispatching event: {:?}", event.event);
|
||||
}
|
||||
|
||||
events_expired
|
||||
}
|
||||
|
||||
@ -468,10 +476,11 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
#[derive(Debug)]
|
||||
pub struct Debouncer<T: Watcher, C: FileIdCache> {
|
||||
watcher: T,
|
||||
#[allow(dead_code)]
|
||||
debouncer_thread: Option<std::thread::JoinHandle<()>>,
|
||||
#[allow(dead_code)]
|
||||
data: DebounceData<C>,
|
||||
stop: Arc<AtomicBool>,
|
||||
flush: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
|
||||
@ -520,6 +529,11 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
|
||||
self.add_root(path.as_ref(), recursive_mode);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Indicates that on the next tick of the debouncer thread, all events should be emitted.
|
||||
pub fn flush_nonblocking(&self) {
|
||||
self.flush.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
|
||||
@ -536,12 +550,14 @@ impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
|
||||
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
|
||||
timeout: Duration,
|
||||
tick_rate: Option<Duration>,
|
||||
flush_after: Option<u32>,
|
||||
mut event_handler: F,
|
||||
file_id_cache: C,
|
||||
config: notify::Config,
|
||||
) -> Result<Debouncer<T, C>, Error> {
|
||||
let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
let flush = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let tick_div = 4;
|
||||
let tick = match tick_rate {
|
||||
@ -564,21 +580,52 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
|
||||
|
||||
let data_c = data.clone();
|
||||
let stop_c = stop.clone();
|
||||
let flush_c = flush.clone();
|
||||
let mut idle_count = 0;
|
||||
let mut prev_queue_count = 0;
|
||||
let thread = std::thread::Builder::new()
|
||||
.name("notify-rs debouncer loop".to_string())
|
||||
.spawn(move || loop {
|
||||
if stop_c.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut should_flush = flush_c.load(Ordering::Acquire);
|
||||
|
||||
std::thread::sleep(tick);
|
||||
|
||||
let send_data;
|
||||
let errors;
|
||||
{
|
||||
let mut lock = data_c.lock();
|
||||
send_data = lock.debounced_events();
|
||||
|
||||
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
|
||||
if prev_queue_count == queue_count {
|
||||
idle_count += 1;
|
||||
} else {
|
||||
prev_queue_count = queue_count
|
||||
}
|
||||
|
||||
if let Some(threshold) = flush_after {
|
||||
if idle_count >= threshold {
|
||||
idle_count = 0;
|
||||
prev_queue_count = 0;
|
||||
should_flush = true;
|
||||
}
|
||||
}
|
||||
|
||||
send_data = lock.debounced_events(should_flush);
|
||||
if should_flush {
|
||||
flush_c.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
errors = lock.errors();
|
||||
}
|
||||
if !send_data.is_empty() {
|
||||
if should_flush {
|
||||
tracing::trace!("Flushed {} events", send_data.len());
|
||||
}
|
||||
|
||||
event_handler.handle_event(Ok(send_data));
|
||||
}
|
||||
if !errors.is_empty() {
|
||||
@ -605,6 +652,7 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
|
||||
debouncer_thread: Some(thread),
|
||||
data,
|
||||
stop,
|
||||
flush,
|
||||
};
|
||||
|
||||
Ok(guard)
|
||||
@ -618,11 +666,13 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
|
||||
pub fn new_debouncer<F: DebounceEventHandler>(
|
||||
timeout: Duration,
|
||||
tick_rate: Option<Duration>,
|
||||
flush_after: Option<u32>,
|
||||
event_handler: F,
|
||||
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
|
||||
new_debouncer_opt::<F, RecommendedWatcher, FileIdMap>(
|
||||
timeout,
|
||||
tick_rate,
|
||||
flush_after,
|
||||
event_handler,
|
||||
FileIdMap::default(),
|
||||
notify::Config::default(),
|
||||
|
@ -160,7 +160,7 @@ fn state(
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
state.debounced_events(),
|
||||
state.debounced_events(false),
|
||||
events,
|
||||
"debounced events after a `{delay}` delay"
|
||||
);
|
||||
@ -170,9 +170,9 @@ fn state(
|
||||
mod schema;
|
||||
mod utils {
|
||||
use crate::debouncer::FileIdCache;
|
||||
use notify::RecursiveMode;
|
||||
|
||||
use file_id::FileId;
|
||||
use notify::RecursiveMode;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
|
@ -25,7 +25,7 @@ const TICK_RATE: Duration = Duration::from_millis(250);
|
||||
// arriving) before we will automatically flush pending events. This means that
|
||||
// after the disk is quiet for TICK_RATE * FLUSH_AFTER_EMPTY, we will process
|
||||
// the pending events, even if DEBOUNCE_TIMEOUT hasn't expired yet
|
||||
// const FLUSH_AFTER_EMPTY: u32 = 3;
|
||||
const FLUSH_AFTER_EMPTY: u32 = 3;
|
||||
|
||||
/// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`]
|
||||
/// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait.
|
||||
@ -60,8 +60,7 @@ pub fn spawn(
|
||||
let mut debouncer = new_debouncer(
|
||||
DEBOUNCE_TIMEOUT,
|
||||
Some(TICK_RATE),
|
||||
// TODO: re-enable this
|
||||
// Some(FLUSH_AFTER_EMPTY),
|
||||
Some(FLUSH_AFTER_EMPTY),
|
||||
notify_tx,
|
||||
)
|
||||
.context("failed to create debouncer")?;
|
||||
|
@ -82,7 +82,6 @@ pub fn watch_in_background(
|
||||
let (events_out, mut events_in) = unbounded_channel();
|
||||
let (flush_tx, mut flush_rx) = unbounded_channel();
|
||||
|
||||
#[allow(unused_variables)]
|
||||
let debounce = file_monitor::spawn(project_id, worktree_path.as_ref(), events_out.clone())?;
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
@ -111,8 +110,7 @@ pub fn watch_in_background(
|
||||
tokio::select! {
|
||||
Some(event) = events_in.recv() => handle_event(event)?,
|
||||
Some(_signal_flush) = flush_rx.recv() => {
|
||||
// TODO: re-add this
|
||||
// debounce.flush_nonblocking();
|
||||
debounce.flush_nonblocking();
|
||||
}
|
||||
() = cancellation_token.cancelled() => {
|
||||
break;
|
||||
|
@ -18,6 +18,7 @@
|
||||
}
|
||||
cache: {
|
||||
/watch/parent: 1
|
||||
/watch/parent/child: 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user