Show what changed to get the version we currently have

This reverts commit 547f2ffada.

That way it's more obvious when looking at the history what was added
and changed.
This commit is contained in:
Sebastian Thiel 2024-05-24 15:44:13 +02:00
parent 547f2ffada
commit cdf508e04c
No known key found for this signature in database
GPG Key ID: 9CB5EE7895E8268B
7 changed files with 117 additions and 102 deletions

View File

@ -49,7 +49,7 @@ pub trait FileIdCache {
/// Add a new path to the cache or update its value.
///
/// This will be called if a new file or directory is created or if an existing file is overridden.
fn add_path(&mut self, path: &Path, recursive_mode: RecursiveMode);
fn add_path(&mut self, path: &Path);
/// Remove a path from the cache.
///
@ -59,11 +59,7 @@ pub trait FileIdCache {
/// Re-scan all paths.
///
/// This will be called if the notification back-end has dropped events.
fn rescan(&mut self, roots: &[(PathBuf, RecursiveMode)]) {
for (root, recursive_mode) in roots {
self.add_path(root, *recursive_mode);
}
}
fn rescan(&mut self);
}
/// A cache to hold the file system IDs of all watched files.
@ -73,9 +69,34 @@ pub trait FileIdCache {
#[derive(Debug, Clone, Default)]
pub struct FileIdMap {
paths: HashMap<PathBuf, FileId>,
roots: Vec<(PathBuf, RecursiveMode)>,
}
impl FileIdMap {
/// Add a path to the cache.
///
/// If `recursive_mode` is `Recursive`, all children will be added to the cache as well
/// and all paths will be kept up-to-date in case of changes like new files being added,
/// files being removed or renamed.
#[allow(dead_code)]
pub fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
let path = path.into();
self.roots.push((path.clone(), recursive_mode));
self.add_path(&path);
}
/// Remove a path form the cache.
///
/// If the path was added with `Recursive` mode, all children will also be removed from the cache.
#[allow(dead_code)]
pub fn remove_root(&mut self, path: impl AsRef<Path>) {
self.roots.retain(|(root, _)| !root.starts_with(&path));
self.remove_path(path.as_ref());
}
fn dir_scan_depth(is_recursive: bool) -> usize {
if is_recursive {
usize::max_value()
@ -90,8 +111,18 @@ impl FileIdCache for FileIdMap {
self.paths.get(path)
}
fn add_path(&mut self, path: &Path, recursive_mode: RecursiveMode) {
let is_recursive = recursive_mode == RecursiveMode::Recursive;
fn add_path(&mut self, path: &Path) {
let is_recursive = self
.roots
.iter()
.find_map(|(root, recursive_mode)| {
if path.starts_with(root) {
Some(*recursive_mode == RecursiveMode::Recursive)
} else {
None
}
})
.unwrap_or_default();
for (path, file_id) in WalkDir::new(path)
.follow_links(true)
@ -110,4 +141,10 @@ impl FileIdCache for FileIdMap {
fn remove_path(&mut self, path: &Path) {
self.paths.retain(|p, _| !p.starts_with(path));
}
fn rescan(&mut self) {
for (root, _) in self.roots.clone() {
self.add_path(&root);
}
}
}

View File

@ -27,7 +27,6 @@
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::path::Path;
use std::{
collections::{HashMap, VecDeque},
path::PathBuf,
@ -47,7 +46,7 @@ use std::time::Instant;
use file_id::FileId;
use notify::{
event::{ModifyKind, RemoveKind, RenameMode},
Error, ErrorKind, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, WatcherKind,
Error, ErrorKind, Event, EventKind, RecommendedWatcher, Watcher,
};
use parking_lot::Mutex;
@ -124,7 +123,6 @@ impl Queue {
#[derive(Debug)]
pub(crate) struct DebounceDataInner<T> {
queues: HashMap<PathBuf, Queue>,
roots: Vec<(PathBuf, RecursiveMode)>,
cache: T,
rename_event: Option<(DebouncedEvent, Option<FileId>)>,
rescan_event: Option<DebouncedEvent>,
@ -136,7 +134,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
pub(crate) fn new(cache: T, timeout: Duration) -> Self {
Self {
queues: HashMap::new(),
roots: Vec::new(),
cache,
rename_event: None,
rescan_event: None,
@ -146,7 +143,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
/// Retrieve a vec of debounced events, removing them if not continuous
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
pub fn debounced_events(&mut self, flush_all: bool) -> Vec<DebouncedEvent> {
let now = Instant::now();
let mut events_expired = Vec::with_capacity(self.queues.len());
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
@ -163,6 +160,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
for (path, mut queue) in self.queues.drain() {
let mut kind_index = HashMap::new();
tracing::trace!("Checking path: {:?}", path);
while let Some(event) = queue.events.pop_front() {
if now.saturating_duration_since(event.time) >= self.timeout {
// remove previous event of the same kind
@ -178,6 +176,9 @@ impl<T: FileIdCache> DebounceDataInner<T> {
kind_index.insert(event.kind, events_expired.len());
events_expired.push(event);
} else if flush_all {
tracing::trace!("Flushing event! {:?}", event.event);
events_expired.push(event);
} else {
queue.events.push_front(event);
@ -202,6 +203,10 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
});
for event in &events_expired {
tracing::trace!("Dispatching event: {:?}", event.event);
}
events_expired
}
@ -222,7 +227,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
tracing::trace!("Received event: {:?}", event);
if event.need_rescan() {
self.cache.rescan(&self.roots);
self.cache.rescan();
self.rescan_event = Some(event.into());
return;
}
@ -231,9 +236,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
match &event.kind {
EventKind::Create(_) => {
let recursive_mode = self.recursive_mode(path);
self.cache.add_path(path, recursive_mode);
self.cache.add_path(path);
self.push_event(event, Instant::now());
}
@ -268,9 +271,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
_ => {
if self.cache.cached_file_id(path).is_none() {
let recursive_mode = self.recursive_mode(path);
self.cache.add_path(path, recursive_mode);
self.cache.add_path(path);
}
self.push_event(event, Instant::now());
@ -278,19 +279,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
}
fn recursive_mode(&mut self, path: &Path) -> RecursiveMode {
self.roots
.iter()
.find_map(|(root, recursive_mode)| {
if path.starts_with(root) {
Some(*recursive_mode)
} else {
None
}
})
.unwrap_or(RecursiveMode::NonRecursive)
}
fn handle_rename_from(&mut self, event: Event) {
let time = Instant::now();
let path = &event.paths[0];
@ -305,9 +293,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
}
fn handle_rename_to(&mut self, event: Event) {
let recursive_mode = self.recursive_mode(&event.paths[0]);
self.cache.add_path(&event.paths[0], recursive_mode);
self.cache.add_path(&event.paths[0]);
let trackers_match = self
.rename_event
@ -475,13 +461,16 @@ impl<T: FileIdCache> DebounceDataInner<T> {
pub struct Debouncer<T: Watcher, C: FileIdCache> {
watcher: T,
debouncer_thread: Option<std::thread::JoinHandle<()>>,
#[allow(dead_code)]
data: DebounceData<C>,
stop: Arc<AtomicBool>,
flush: Arc<AtomicBool>,
}
impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
/// Stop the debouncer, waits for the event thread to finish.
/// May block for the duration of one tick_rate.
#[allow(dead_code)]
pub fn stop(mut self) {
self.set_stop();
if let Some(t) = self.debouncer_thread.take() {
@ -490,6 +479,7 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
}
/// Stop the debouncer, does not wait for the event thread to finish.
#[allow(dead_code)]
pub fn stop_nonblocking(self) {
self.set_stop();
}
@ -498,60 +488,14 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
self.stop.store(true, Ordering::Relaxed);
}
#[deprecated = "`Debouncer` provides all methods from `Watcher` itself now. Remove `.watcher()` and use those methods directly."]
pub fn watcher(&mut self) {}
#[deprecated = "`Debouncer` now manages root paths automatically. Remove all calls to `add_root` and `remove_root`."]
pub fn cache(&mut self) {}
fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
let path = path.into();
let mut data = self.data.lock();
// skip, if the root has already been added
if data.roots.iter().any(|(p, _)| p == &path) {
return;
}
data.roots.push((path.clone(), recursive_mode));
data.cache.add_path(&path, recursive_mode);
/// Indicates that on the next tick of the debouncer thread, all events should be emitted.
pub fn flush_nonblocking(&self) {
self.flush.store(true, Ordering::Relaxed);
}
fn remove_root(&mut self, path: impl AsRef<Path>) {
let mut data = self.data.lock();
data.roots.retain(|(root, _)| !root.starts_with(&path));
data.cache.remove_path(path.as_ref());
}
pub fn watch(
&mut self,
path: impl AsRef<Path>,
recursive_mode: RecursiveMode,
) -> notify::Result<()> {
self.watcher.watch(path.as_ref(), recursive_mode)?;
self.add_root(path.as_ref(), recursive_mode);
Ok(())
}
pub fn unwatch(&mut self, path: impl AsRef<Path>) -> notify::Result<()> {
self.watcher.unwatch(path.as_ref())?;
self.remove_root(path);
Ok(())
}
pub fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
self.watcher.configure(option)
}
pub fn kind() -> WatcherKind
where
Self: Sized,
{
T::kind()
/// Access to the internally used notify Watcher backend
pub fn watcher(&mut self) -> &mut T {
&mut self.watcher
}
}
@ -569,12 +513,14 @@ impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
mut event_handler: F,
file_id_cache: C,
config: notify::Config,
) -> Result<Debouncer<T, C>, Error> {
let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
let stop = Arc::new(AtomicBool::new(false));
let flush = Arc::new(AtomicBool::new(false));
let tick_div = 4;
let tick = match tick_rate {
@ -597,21 +543,52 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
let data_c = data.clone();
let stop_c = stop.clone();
let flush_c = flush.clone();
let mut idle_count = 0;
let mut prev_queue_count = 0;
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || loop {
if stop_c.load(Ordering::Acquire) {
break;
}
let mut should_flush = flush_c.load(Ordering::Acquire);
std::thread::sleep(tick);
let send_data;
let errors;
{
let mut lock = data_c.lock();
send_data = lock.debounced_events();
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
if prev_queue_count == queue_count {
idle_count += 1;
} else {
prev_queue_count = queue_count
}
if let Some(threshold) = flush_after {
if idle_count >= threshold {
idle_count = 0;
prev_queue_count = 0;
should_flush = true;
}
}
send_data = lock.debounced_events(should_flush);
if should_flush {
flush_c.store(false, Ordering::Release);
}
errors = lock.errors();
}
if !send_data.is_empty() {
if should_flush {
tracing::trace!("Flushed {} events", send_data.len());
}
event_handler.handle_event(Ok(send_data));
}
if !errors.is_empty() {
@ -638,6 +615,7 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
debouncer_thread: Some(thread),
data,
stop,
flush,
};
Ok(guard)
@ -651,11 +629,13 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
pub fn new_debouncer<F: DebounceEventHandler>(
timeout: Duration,
tick_rate: Option<Duration>,
flush_after: Option<u32>,
event_handler: F,
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
new_debouncer_opt::<F, RecommendedWatcher, FileIdMap>(
timeout,
tick_rate,
flush_after,
event_handler,
FileIdMap::default(),
notify::Config::default(),

View File

@ -159,7 +159,7 @@ fn state(
.collect::<Vec<_>>();
assert_eq!(
state.debounced_events(),
state.debounced_events(false),
events,
"debounced events after a `{delay}` delay"
);
@ -169,7 +169,6 @@ fn state(
mod schema;
mod utils {
use crate::debouncer::FileIdCache;
use notify::RecursiveMode;
use file_id::FileId;
use std::collections::HashMap;
@ -192,11 +191,9 @@ mod utils {
self.paths.get(path)
}
fn add_path(&mut self, path: &Path, recursive_mode: RecursiveMode) {
fn add_path(&mut self, path: &Path) {
for (file_path, file_id) in &self.file_system {
if file_path == path
|| (file_path.starts_with(path) && recursive_mode == RecursiveMode::Recursive)
{
if file_path == path || file_path.starts_with(path) {
self.paths.insert(file_path.clone(), *file_id);
}
}
@ -205,5 +202,7 @@ mod utils {
fn remove_path(&mut self, path: &Path) {
self.paths.remove(path);
}
fn rescan(&mut self) {}
}
}

View File

@ -286,7 +286,6 @@ impl State {
DebounceDataInner {
queues,
roots: Vec::new(),
cache,
rename_event,
rescan_event,

View File

@ -60,8 +60,7 @@ pub fn spawn(
let mut debouncer = new_debouncer(
DEBOUNCE_TIMEOUT,
Some(TICK_RATE),
// TODO: re-enable this
// Some(FLUSH_AFTER_EMPTY),
Some(FLUSH_AFTER_EMPTY),
notify_tx,
)
.context("failed to create debouncer")?;
@ -89,11 +88,12 @@ pub fn spawn(
// Start the watcher, but retry if there are transient errors.
backoff::retry(policy, || {
debouncer
let watcher = debouncer.watcher();
watcher
.watch(worktree_path, notify::RecursiveMode::Recursive)
.and_then(|()| {
if let Some(git_dir) = extra_git_dir_to_watch {
debouncer.watch(git_dir, notify::RecursiveMode::Recursive)
watcher.watch(git_dir, notify::RecursiveMode::Recursive)
} else {
Ok(())
}

View File

@ -110,8 +110,7 @@ pub fn watch_in_background(
tokio::select! {
Some(event) = events_in.recv() => handle_event(event)?,
Some(_signal_flush) = flush_rx.recv() => {
// TODO: re-add this
// debounce.flush_nonblocking();
debounce.flush_nonblocking();
}
() = cancellation_token.cancelled() => {
break;

View File

@ -18,6 +18,7 @@
}
cache: {
/watch/parent: 1
/watch/parent/child: 2
}
}
}