minor refactor

This commit is contained in:
Sebastian Thiel 2024-06-02 15:52:53 +02:00
parent 184f557897
commit 843841981c
No known key found for this signature in database
GPG Key ID: 9CB5EE7895E8268B
2 changed files with 43 additions and 46 deletions

View File

@ -569,58 +569,58 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
})?,
};
let data_c = data.clone();
let stop_c = stop.clone();
let flush_c = flush.clone();
let mut idle_count = 0;
let mut prev_queue_count = 0;
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || loop {
if stop_c.load(Ordering::Acquire) {
break;
}
let mut should_flush = flush_c.load(Ordering::Acquire);
std::thread::sleep(tick);
let send_data;
let errors;
{
let mut lock = data_c.lock();
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
if prev_queue_count == queue_count {
idle_count += 1;
} else {
prev_queue_count = queue_count
.spawn({
let data = data.clone();
let stop = stop.clone();
let flush = flush.clone();
let mut idle_count = 0;
let mut prev_queue_count = 0;
move || loop {
if stop.load(Ordering::Acquire) {
break;
}
if let Some(threshold) = flush_after {
if idle_count >= threshold {
let mut should_flush = flush.load(Ordering::Acquire);
std::thread::sleep(tick);
let send_data;
let errors;
{
let mut lock = data.lock();
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
if prev_queue_count == queue_count {
idle_count += 1;
} else {
prev_queue_count = queue_count
}
if flush_after.map_or(false, |threshold| idle_count >= threshold) {
idle_count = 0;
prev_queue_count = 0;
should_flush = true;
}
}
send_data = lock.debounced_events(should_flush);
if should_flush {
flush_c.store(false, Ordering::Release);
}
send_data = lock.debounced_events(should_flush);
if should_flush {
flush.store(false, Ordering::Release);
}
errors = lock.errors();
}
if !send_data.is_empty() {
if should_flush {
tracing::debug!("Flushed {} events", send_data.len());
errors = lock.errors();
}
if !send_data.is_empty() {
if should_flush {
tracing::debug!("Flushed {} events", send_data.len());
}
event_handler.handle_event(Ok(send_data));
}
if !errors.is_empty() {
event_handler.handle_event(Err(errors));
event_handler.handle_event(Ok(send_data));
}
if !errors.is_empty() {
event_handler.handle_event(Err(errors));
}
}
})?;
@ -638,15 +638,13 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
config,
)?;
let guard = Debouncer {
Ok(Debouncer {
watcher,
debouncer_thread: Some(thread),
data,
stop,
flush,
};
Ok(guard)
})
}
/// Short function to create a new debounced watcher with the recommended debouncer and the built-in file ID cache.

View File

@ -95,8 +95,7 @@ fn state(
}
for error in test_case.errors {
let error = error.into_notify_error();
state.add_error(error);
state.add_error(error.into_notify_error());
}
let expected_errors = std::mem::take(&mut test_case.expected.errors);