mirror of
https://github.com/gitbutlerapp/gitbutler.git
synced 2024-12-29 12:33:49 +03:00
re-add original code to see what changed
This commit is contained in:
parent
6596f3956e
commit
547f2ffada
@ -49,7 +49,7 @@ pub trait FileIdCache {
|
||||
/// Add a new path to the cache or update its value.
|
||||
///
|
||||
/// This will be called if a new file or directory is created or if an existing file is overridden.
|
||||
fn add_path(&mut self, path: &Path);
|
||||
fn add_path(&mut self, path: &Path, recursive_mode: RecursiveMode);
|
||||
|
||||
/// Remove a path from the cache.
|
||||
///
|
||||
@ -59,7 +59,11 @@ pub trait FileIdCache {
|
||||
/// Re-scan all paths.
|
||||
///
|
||||
/// This will be called if the notification back-end has dropped events.
|
||||
fn rescan(&mut self);
|
||||
fn rescan(&mut self, roots: &[(PathBuf, RecursiveMode)]) {
|
||||
for (root, recursive_mode) in roots {
|
||||
self.add_path(root, *recursive_mode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A cache to hold the file system IDs of all watched files.
|
||||
@ -69,34 +73,9 @@ pub trait FileIdCache {
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct FileIdMap {
|
||||
paths: HashMap<PathBuf, FileId>,
|
||||
roots: Vec<(PathBuf, RecursiveMode)>,
|
||||
}
|
||||
|
||||
impl FileIdMap {
|
||||
/// Add a path to the cache.
|
||||
///
|
||||
/// If `recursive_mode` is `Recursive`, all children will be added to the cache as well
|
||||
/// and all paths will be kept up-to-date in case of changes like new files being added,
|
||||
/// files being removed or renamed.
|
||||
#[allow(dead_code)]
|
||||
pub fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
|
||||
let path = path.into();
|
||||
|
||||
self.roots.push((path.clone(), recursive_mode));
|
||||
|
||||
self.add_path(&path);
|
||||
}
|
||||
|
||||
/// Remove a path form the cache.
|
||||
///
|
||||
/// If the path was added with `Recursive` mode, all children will also be removed from the cache.
|
||||
#[allow(dead_code)]
|
||||
pub fn remove_root(&mut self, path: impl AsRef<Path>) {
|
||||
self.roots.retain(|(root, _)| !root.starts_with(&path));
|
||||
|
||||
self.remove_path(path.as_ref());
|
||||
}
|
||||
|
||||
fn dir_scan_depth(is_recursive: bool) -> usize {
|
||||
if is_recursive {
|
||||
usize::max_value()
|
||||
@ -111,18 +90,8 @@ impl FileIdCache for FileIdMap {
|
||||
self.paths.get(path)
|
||||
}
|
||||
|
||||
fn add_path(&mut self, path: &Path) {
|
||||
let is_recursive = self
|
||||
.roots
|
||||
.iter()
|
||||
.find_map(|(root, recursive_mode)| {
|
||||
if path.starts_with(root) {
|
||||
Some(*recursive_mode == RecursiveMode::Recursive)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or_default();
|
||||
fn add_path(&mut self, path: &Path, recursive_mode: RecursiveMode) {
|
||||
let is_recursive = recursive_mode == RecursiveMode::Recursive;
|
||||
|
||||
for (path, file_id) in WalkDir::new(path)
|
||||
.follow_links(true)
|
||||
@ -141,10 +110,4 @@ impl FileIdCache for FileIdMap {
|
||||
fn remove_path(&mut self, path: &Path) {
|
||||
self.paths.retain(|p, _| !p.starts_with(path));
|
||||
}
|
||||
|
||||
fn rescan(&mut self) {
|
||||
for (root, _) in self.roots.clone() {
|
||||
self.add_path(&root);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,7 @@
|
||||
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
|
||||
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
use std::path::Path;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
path::PathBuf,
|
||||
@ -46,7 +47,7 @@ use std::time::Instant;
|
||||
use file_id::FileId;
|
||||
use notify::{
|
||||
event::{ModifyKind, RemoveKind, RenameMode},
|
||||
Error, ErrorKind, Event, EventKind, RecommendedWatcher, Watcher,
|
||||
Error, ErrorKind, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, WatcherKind,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
@ -123,6 +124,7 @@ impl Queue {
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DebounceDataInner<T> {
|
||||
queues: HashMap<PathBuf, Queue>,
|
||||
roots: Vec<(PathBuf, RecursiveMode)>,
|
||||
cache: T,
|
||||
rename_event: Option<(DebouncedEvent, Option<FileId>)>,
|
||||
rescan_event: Option<DebouncedEvent>,
|
||||
@ -134,6 +136,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
pub(crate) fn new(cache: T, timeout: Duration) -> Self {
|
||||
Self {
|
||||
queues: HashMap::new(),
|
||||
roots: Vec::new(),
|
||||
cache,
|
||||
rename_event: None,
|
||||
rescan_event: None,
|
||||
@ -143,7 +146,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
}
|
||||
|
||||
/// Retrieve a vec of debounced events, removing them if not continuous
|
||||
pub fn debounced_events(&mut self, flush_all: bool) -> Vec<DebouncedEvent> {
|
||||
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
|
||||
let now = Instant::now();
|
||||
let mut events_expired = Vec::with_capacity(self.queues.len());
|
||||
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
|
||||
@ -160,7 +163,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
for (path, mut queue) in self.queues.drain() {
|
||||
let mut kind_index = HashMap::new();
|
||||
|
||||
tracing::trace!("Checking path: {:?}", path);
|
||||
while let Some(event) = queue.events.pop_front() {
|
||||
if now.saturating_duration_since(event.time) >= self.timeout {
|
||||
// remove previous event of the same kind
|
||||
@ -176,9 +178,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
|
||||
kind_index.insert(event.kind, events_expired.len());
|
||||
|
||||
events_expired.push(event);
|
||||
} else if flush_all {
|
||||
tracing::trace!("Flushing event! {:?}", event.event);
|
||||
events_expired.push(event);
|
||||
} else {
|
||||
queue.events.push_front(event);
|
||||
@ -203,10 +202,6 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
}
|
||||
});
|
||||
|
||||
for event in &events_expired {
|
||||
tracing::trace!("Dispatching event: {:?}", event.event);
|
||||
}
|
||||
|
||||
events_expired
|
||||
}
|
||||
|
||||
@ -227,7 +222,7 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
tracing::trace!("Received event: {:?}", event);
|
||||
|
||||
if event.need_rescan() {
|
||||
self.cache.rescan();
|
||||
self.cache.rescan(&self.roots);
|
||||
self.rescan_event = Some(event.into());
|
||||
return;
|
||||
}
|
||||
@ -236,7 +231,9 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
|
||||
match &event.kind {
|
||||
EventKind::Create(_) => {
|
||||
self.cache.add_path(path);
|
||||
let recursive_mode = self.recursive_mode(path);
|
||||
|
||||
self.cache.add_path(path, recursive_mode);
|
||||
|
||||
self.push_event(event, Instant::now());
|
||||
}
|
||||
@ -271,7 +268,9 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
}
|
||||
_ => {
|
||||
if self.cache.cached_file_id(path).is_none() {
|
||||
self.cache.add_path(path);
|
||||
let recursive_mode = self.recursive_mode(path);
|
||||
|
||||
self.cache.add_path(path, recursive_mode);
|
||||
}
|
||||
|
||||
self.push_event(event, Instant::now());
|
||||
@ -279,6 +278,19 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn recursive_mode(&mut self, path: &Path) -> RecursiveMode {
|
||||
self.roots
|
||||
.iter()
|
||||
.find_map(|(root, recursive_mode)| {
|
||||
if path.starts_with(root) {
|
||||
Some(*recursive_mode)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or(RecursiveMode::NonRecursive)
|
||||
}
|
||||
|
||||
fn handle_rename_from(&mut self, event: Event) {
|
||||
let time = Instant::now();
|
||||
let path = &event.paths[0];
|
||||
@ -293,7 +305,9 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
}
|
||||
|
||||
fn handle_rename_to(&mut self, event: Event) {
|
||||
self.cache.add_path(&event.paths[0]);
|
||||
let recursive_mode = self.recursive_mode(&event.paths[0]);
|
||||
|
||||
self.cache.add_path(&event.paths[0], recursive_mode);
|
||||
|
||||
let trackers_match = self
|
||||
.rename_event
|
||||
@ -461,16 +475,13 @@ impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
pub struct Debouncer<T: Watcher, C: FileIdCache> {
|
||||
watcher: T,
|
||||
debouncer_thread: Option<std::thread::JoinHandle<()>>,
|
||||
#[allow(dead_code)]
|
||||
data: DebounceData<C>,
|
||||
stop: Arc<AtomicBool>,
|
||||
flush: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
|
||||
/// Stop the debouncer, waits for the event thread to finish.
|
||||
/// May block for the duration of one tick_rate.
|
||||
#[allow(dead_code)]
|
||||
pub fn stop(mut self) {
|
||||
self.set_stop();
|
||||
if let Some(t) = self.debouncer_thread.take() {
|
||||
@ -479,7 +490,6 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
|
||||
}
|
||||
|
||||
/// Stop the debouncer, does not wait for the event thread to finish.
|
||||
#[allow(dead_code)]
|
||||
pub fn stop_nonblocking(self) {
|
||||
self.set_stop();
|
||||
}
|
||||
@ -488,14 +498,60 @@ impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
|
||||
self.stop.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Indicates that on the next tick of the debouncer thread, all events should be emitted.
|
||||
pub fn flush_nonblocking(&self) {
|
||||
self.flush.store(true, Ordering::Relaxed);
|
||||
#[deprecated = "`Debouncer` provides all methods from `Watcher` itself now. Remove `.watcher()` and use those methods directly."]
|
||||
pub fn watcher(&mut self) {}
|
||||
|
||||
#[deprecated = "`Debouncer` now manages root paths automatically. Remove all calls to `add_root` and `remove_root`."]
|
||||
pub fn cache(&mut self) {}
|
||||
|
||||
fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
|
||||
let path = path.into();
|
||||
|
||||
let mut data = self.data.lock();
|
||||
|
||||
// skip, if the root has already been added
|
||||
if data.roots.iter().any(|(p, _)| p == &path) {
|
||||
return;
|
||||
}
|
||||
|
||||
data.roots.push((path.clone(), recursive_mode));
|
||||
|
||||
data.cache.add_path(&path, recursive_mode);
|
||||
}
|
||||
|
||||
/// Access to the internally used notify Watcher backend
|
||||
pub fn watcher(&mut self) -> &mut T {
|
||||
&mut self.watcher
|
||||
fn remove_root(&mut self, path: impl AsRef<Path>) {
|
||||
let mut data = self.data.lock();
|
||||
|
||||
data.roots.retain(|(root, _)| !root.starts_with(&path));
|
||||
|
||||
data.cache.remove_path(path.as_ref());
|
||||
}
|
||||
|
||||
pub fn watch(
|
||||
&mut self,
|
||||
path: impl AsRef<Path>,
|
||||
recursive_mode: RecursiveMode,
|
||||
) -> notify::Result<()> {
|
||||
self.watcher.watch(path.as_ref(), recursive_mode)?;
|
||||
self.add_root(path.as_ref(), recursive_mode);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn unwatch(&mut self, path: impl AsRef<Path>) -> notify::Result<()> {
|
||||
self.watcher.unwatch(path.as_ref())?;
|
||||
self.remove_root(path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure(&mut self, option: notify::Config) -> notify::Result<bool> {
|
||||
self.watcher.configure(option)
|
||||
}
|
||||
|
||||
pub fn kind() -> WatcherKind
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
T::kind()
|
||||
}
|
||||
}
|
||||
|
||||
@ -513,14 +569,12 @@ impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
|
||||
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
|
||||
timeout: Duration,
|
||||
tick_rate: Option<Duration>,
|
||||
flush_after: Option<u32>,
|
||||
mut event_handler: F,
|
||||
file_id_cache: C,
|
||||
config: notify::Config,
|
||||
) -> Result<Debouncer<T, C>, Error> {
|
||||
let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
let flush = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let tick_div = 4;
|
||||
let tick = match tick_rate {
|
||||
@ -543,52 +597,21 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
|
||||
|
||||
let data_c = data.clone();
|
||||
let stop_c = stop.clone();
|
||||
let flush_c = flush.clone();
|
||||
let mut idle_count = 0;
|
||||
let mut prev_queue_count = 0;
|
||||
let thread = std::thread::Builder::new()
|
||||
.name("notify-rs debouncer loop".to_string())
|
||||
.spawn(move || loop {
|
||||
if stop_c.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut should_flush = flush_c.load(Ordering::Acquire);
|
||||
|
||||
std::thread::sleep(tick);
|
||||
|
||||
let send_data;
|
||||
let errors;
|
||||
{
|
||||
let mut lock = data_c.lock();
|
||||
|
||||
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
|
||||
if prev_queue_count == queue_count {
|
||||
idle_count += 1;
|
||||
} else {
|
||||
prev_queue_count = queue_count
|
||||
}
|
||||
|
||||
if let Some(threshold) = flush_after {
|
||||
if idle_count >= threshold {
|
||||
idle_count = 0;
|
||||
prev_queue_count = 0;
|
||||
should_flush = true;
|
||||
}
|
||||
}
|
||||
|
||||
send_data = lock.debounced_events(should_flush);
|
||||
if should_flush {
|
||||
flush_c.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
send_data = lock.debounced_events();
|
||||
errors = lock.errors();
|
||||
}
|
||||
if !send_data.is_empty() {
|
||||
if should_flush {
|
||||
tracing::trace!("Flushed {} events", send_data.len());
|
||||
}
|
||||
|
||||
event_handler.handle_event(Ok(send_data));
|
||||
}
|
||||
if !errors.is_empty() {
|
||||
@ -615,7 +638,6 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
|
||||
debouncer_thread: Some(thread),
|
||||
data,
|
||||
stop,
|
||||
flush,
|
||||
};
|
||||
|
||||
Ok(guard)
|
||||
@ -629,13 +651,11 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + S
|
||||
pub fn new_debouncer<F: DebounceEventHandler>(
|
||||
timeout: Duration,
|
||||
tick_rate: Option<Duration>,
|
||||
flush_after: Option<u32>,
|
||||
event_handler: F,
|
||||
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
|
||||
new_debouncer_opt::<F, RecommendedWatcher, FileIdMap>(
|
||||
timeout,
|
||||
tick_rate,
|
||||
flush_after,
|
||||
event_handler,
|
||||
FileIdMap::default(),
|
||||
notify::Config::default(),
|
||||
|
@ -159,7 +159,7 @@ fn state(
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(
|
||||
state.debounced_events(false),
|
||||
state.debounced_events(),
|
||||
events,
|
||||
"debounced events after a `{delay}` delay"
|
||||
);
|
||||
@ -169,6 +169,7 @@ fn state(
|
||||
mod schema;
|
||||
mod utils {
|
||||
use crate::debouncer::FileIdCache;
|
||||
use notify::RecursiveMode;
|
||||
|
||||
use file_id::FileId;
|
||||
use std::collections::HashMap;
|
||||
@ -191,9 +192,11 @@ mod utils {
|
||||
self.paths.get(path)
|
||||
}
|
||||
|
||||
fn add_path(&mut self, path: &Path) {
|
||||
fn add_path(&mut self, path: &Path, recursive_mode: RecursiveMode) {
|
||||
for (file_path, file_id) in &self.file_system {
|
||||
if file_path == path || file_path.starts_with(path) {
|
||||
if file_path == path
|
||||
|| (file_path.starts_with(path) && recursive_mode == RecursiveMode::Recursive)
|
||||
{
|
||||
self.paths.insert(file_path.clone(), *file_id);
|
||||
}
|
||||
}
|
||||
@ -202,7 +205,5 @@ mod utils {
|
||||
fn remove_path(&mut self, path: &Path) {
|
||||
self.paths.remove(path);
|
||||
}
|
||||
|
||||
fn rescan(&mut self) {}
|
||||
}
|
||||
}
|
||||
|
@ -286,6 +286,7 @@ impl State {
|
||||
|
||||
DebounceDataInner {
|
||||
queues,
|
||||
roots: Vec::new(),
|
||||
cache,
|
||||
rename_event,
|
||||
rescan_event,
|
||||
|
@ -60,7 +60,8 @@ pub fn spawn(
|
||||
let mut debouncer = new_debouncer(
|
||||
DEBOUNCE_TIMEOUT,
|
||||
Some(TICK_RATE),
|
||||
Some(FLUSH_AFTER_EMPTY),
|
||||
// TODO: re-enable this
|
||||
// Some(FLUSH_AFTER_EMPTY),
|
||||
notify_tx,
|
||||
)
|
||||
.context("failed to create debouncer")?;
|
||||
@ -88,12 +89,11 @@ pub fn spawn(
|
||||
|
||||
// Start the watcher, but retry if there are transient errors.
|
||||
backoff::retry(policy, || {
|
||||
let watcher = debouncer.watcher();
|
||||
watcher
|
||||
debouncer
|
||||
.watch(worktree_path, notify::RecursiveMode::Recursive)
|
||||
.and_then(|()| {
|
||||
if let Some(git_dir) = extra_git_dir_to_watch {
|
||||
watcher.watch(git_dir, notify::RecursiveMode::Recursive)
|
||||
debouncer.watch(git_dir, notify::RecursiveMode::Recursive)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
|
@ -110,7 +110,8 @@ pub fn watch_in_background(
|
||||
tokio::select! {
|
||||
Some(event) = events_in.recv() => handle_event(event)?,
|
||||
Some(_signal_flush) = flush_rx.recv() => {
|
||||
debounce.flush_nonblocking();
|
||||
// TODO: re-add this
|
||||
// debounce.flush_nonblocking();
|
||||
}
|
||||
() = cancellation_token.cancelled() => {
|
||||
break;
|
||||
|
@ -18,7 +18,6 @@
|
||||
}
|
||||
cache: {
|
||||
/watch/parent: 1
|
||||
/watch/parent/child: 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user