From 954d100d95773b749d0318c9053e60ba405512ca Mon Sep 17 00:00:00 2001 From: Sebastian Thiel Date: Wed, 10 Apr 2024 08:20:25 +0200 Subject: [PATCH] simplify dispatcher around the idea of a single channel That way, all objects go away and it will be nothing more than a task around a channel. --- crates/gitbutler-tauri/src/watcher.rs | 20 +- .../gitbutler-tauri/src/watcher/dispatcher.rs | 376 ++++++++++++++++++ .../src/watcher/dispatchers.rs | 76 ---- .../src/watcher/dispatchers/file_change.rs | 185 --------- 4 files changed, 382 insertions(+), 275 deletions(-) create mode 100644 crates/gitbutler-tauri/src/watcher/dispatcher.rs delete mode 100644 crates/gitbutler-tauri/src/watcher/dispatchers.rs delete mode 100644 crates/gitbutler-tauri/src/watcher/dispatchers/file_change.rs diff --git a/crates/gitbutler-tauri/src/watcher.rs b/crates/gitbutler-tauri/src/watcher.rs index 27ff10471..f7b326873 100644 --- a/crates/gitbutler-tauri/src/watcher.rs +++ b/crates/gitbutler-tauri/src/watcher.rs @@ -1,4 +1,4 @@ -mod dispatchers; +mod dispatcher; mod events; pub mod handlers; @@ -45,10 +45,6 @@ impl Watchers { Ok(()) => { tracing::debug!(%project_id, "watcher stopped"); } - Err(RunError::PathNotFound(path)) => { - tracing::warn!(%project_id, path = %path.display(), "watcher stopped: project path not found"); - watchers.lock().await.remove(&project_id); - } Err(error) => { tracing::error!(?error, %project_id, "watcher error"); watchers.lock().await.remove(&project_id); @@ -131,14 +127,14 @@ impl Watcher { &self, path: P, project_id: &ProjectId, - ) -> Result<(), RunError> { + ) -> Result<(), anyhow::Error> { self.inner.run(path, project_id).await } } struct WatcherInner { handler: handlers::Handler, - dispatcher: dispatchers::Dispatcher, + dispatcher: dispatcher::Dispatcher, cancellation_token: CancellationToken, proxy_tx: Arc>>>, @@ -148,7 +144,7 @@ impl WatcherInner { pub fn from_app(app: &AppHandle) -> std::result::Result { Ok(Self { handler: handlers::Handler::from_app(app)?, - dispatcher: dispatchers::Dispatcher::new(), + dispatcher: dispatcher::Dispatcher::new(), cancellation_token: CancellationToken::new(), proxy_tx: Arc::new(tokio::sync::Mutex::new(None)), }) @@ -177,16 +173,12 @@ impl WatcherInner { &self, path: P, project_id: &ProjectId, - ) -> Result<(), RunError> { + ) -> Result<(), anyhow::Error> { let (proxy_tx, mut proxy_rx) = unbounded_channel(); self.proxy_tx.lock().await.replace(proxy_tx.clone()); let dispatcher = self.dispatcher.clone(); - let mut dispatcher_rx = match dispatcher.run(project_id, path.as_ref()) { - Ok(dispatcher_rx) => Ok(dispatcher_rx), - Err(dispatchers::RunError::PathNotFound(path)) => Err(RunError::PathNotFound(path)), - Err(error) => Err(error).context("failed to run dispatcher")?, - }?; + let mut dispatcher_rx = dispatcher.run(project_id, path.as_ref())?; proxy_tx .send(Event::IndexAll(*project_id)) diff --git a/crates/gitbutler-tauri/src/watcher/dispatcher.rs b/crates/gitbutler-tauri/src/watcher/dispatcher.rs new file mode 100644 index 000000000..2ff16f76d --- /dev/null +++ b/crates/gitbutler-tauri/src/watcher/dispatcher.rs @@ -0,0 +1,376 @@ +mod file_monitor { + use std::{ + path, + sync::{Arc, Mutex}, + time::Duration, + }; + + use anyhow::{anyhow, Context, Result}; + use futures::executor::block_on; + use gitbutler_core::{git, projects::ProjectId}; + use notify::{RecommendedWatcher, Watcher}; + use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap}; + use tokio::{ + sync::mpsc::{channel, Receiver}, + task, + }; + + use crate::watcher::events::Event; + + #[derive(Debug, Clone)] + pub struct Dispatcher { + watcher: Arc>>>, + } + + /// The timeout for debouncing file change events. + /// This is used to prevent multiple events from being sent for a single file change. + static DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100); + + /// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`] + /// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait. + #[derive(Debug, thiserror::Error)] + #[error(transparent)] + struct RunError { + #[from] + source: anyhow::Error, + } + + impl Dispatcher { + pub fn new() -> Self { + Self { + watcher: Arc::new(Mutex::new(None)), + } + } + + pub fn stop(&self) { + self.watcher.lock().unwrap().take(); + } + + pub fn run( + &self, + project_id: &ProjectId, + path: &path::Path, + ) -> Result, anyhow::Error> { + let (notify_tx, notify_rx) = std::sync::mpsc::channel(); + let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx) + .context("failed to create debouncer")?; + + let policy = backoff::ExponentialBackoffBuilder::new() + .with_max_elapsed_time(Some(std::time::Duration::from_secs(30))) + .build(); + + backoff::retry(policy, || { + debouncer + .watcher() + .watch(path, notify::RecursiveMode::Recursive) + .map_err(|err| match err.kind { + notify::ErrorKind::PathNotFound => backoff::Error::permanent( + RunError::from(anyhow!("{} not found", path.display())), + ), + notify::ErrorKind::Io(_) | notify::ErrorKind::InvalidConfig(_) => { + backoff::Error::permanent(RunError::from(anyhow::Error::from(err))) + } + _ => backoff::Error::transient(RunError::from(anyhow::Error::from(err))), + }) + }) + .context("failed to start watcher")?; + + let repo = git::Repository::open(path).context(format!( + "failed to open project repository: {}", + path.display() + ))?; + + self.watcher.lock().unwrap().replace(debouncer); + + tracing::debug!(%project_id, "file watcher started"); + + let (tx, rx) = channel(1); + task::spawn_blocking({ + let path = path.to_path_buf(); + let project_id = *project_id; + move || { + for result in notify_rx { + match result { + Err(errors) => { + tracing::error!(?errors, "file watcher error"); + } + Ok(events) => { + let file_paths = events + .into_iter() + .filter(|event| is_interesting_kind(event.kind)) + .flat_map(|event| event.paths.clone()) + .filter(|file| is_interesting_file(&repo, file)); + for file_path in file_paths { + match file_path.strip_prefix(&path) { + Ok(relative_file_path) + if relative_file_path + .display() + .to_string() + .is_empty() => + { /* noop */ } + Ok(relative_file_path) => { + let event = if relative_file_path.starts_with(".git") { + tracing::info!( + %project_id, + file_path = %relative_file_path.display(), + "git file change", + ); + Event::GitFileChange( + project_id, + relative_file_path + .strip_prefix(".git") + .unwrap() + .to_path_buf(), + ) + } else { + tracing::info!( + %project_id, + file_path = %relative_file_path.display(), + "project file change", + ); + Event::ProjectFileChange( + project_id, + relative_file_path.to_path_buf(), + ) + }; + if let Err(error) = block_on(tx.send(event)) { + tracing::error!( + %project_id, + ?error, + "failed to send file change event", + ); + } + } + Err(error) => { + tracing::error!(%project_id, ?error, "failed to strip prefix"); + } + } + } + } + } + } + tracing::debug!(%project_id, "file watcher stopped"); + } + }); + + Ok(rx) + } + } + + /// Listen to interesting filesystem events of files in `path` that are not `.gitignore`d and pass them to + /// `tx` as [`Event`] which classifies it, and associates it with `project_id`. + /// + /// Configure the channel behind `tx` according to your needs, typically `tokio::sync::mpsc::channel(1)`. + /// + /// ### Why is this not an iterator? + /// + /// The internal `notify_rx` could be an iterator, which performs all transformations and returns them as item. + /// However, due to closures being continuously created each time events come in, nested closures need to own + /// their resources which means they are `Clone` or `Copy`. This isn't the case for `git::Repository`. + /// Even though `gix::Repository` is `Clone`, an efficient implementation of `is_path_ignored()` requires more state + /// that ideally is kept between invocations. For that reason, the current channel-based 'worker' architecture + /// is chosen to allow all this state to live on the stack. + pub fn spawn( + project_id: ProjectId, + path: &path::Path, + tx: tokio::sync::mpsc::Sender, + ) -> Result<(), anyhow::Error> { + let (notify_tx, notify_rx) = std::sync::mpsc::channel(); + let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx) + .context("failed to create debouncer")?; + + let policy = backoff::ExponentialBackoffBuilder::new() + .with_max_elapsed_time(Some(std::time::Duration::from_secs(30))) + .build(); + + // Start the watcher, but retry if there are transient errors. + backoff::retry(policy, || { + debouncer + .watcher() + .watch(path, notify::RecursiveMode::Recursive) + .map_err(|err| match err.kind { + notify::ErrorKind::PathNotFound => backoff::Error::permanent(RunError::from( + anyhow!("{} not found", path.display()), + )), + notify::ErrorKind::Io(_) | notify::ErrorKind::InvalidConfig(_) => { + backoff::Error::permanent(RunError::from(anyhow::Error::from(err))) + } + _ => backoff::Error::transient(RunError::from(anyhow::Error::from(err))), + }) + }) + .context("failed to start watcher")?; + + let repo = git::Repository::open(path).context(format!( + "failed to open project repository: {}", + path.display() + ))?; + + tracing::debug!(%project_id, "file watcher started"); + + let path = path.to_owned(); + task::spawn_blocking(move || { + 'outer: for result in notify_rx { + match result { + Err(err) => { + tracing::error!(?err, "file watcher error"); + } + Ok(events) => { + let file_paths = events + .into_iter() + .filter(|event| is_interesting_kind(event.kind)) + .flat_map(|event| event.paths.clone()) + .filter(|file| is_interesting_file(&repo, file)); + for file_path in file_paths { + match file_path.strip_prefix(&path) { + Ok(relative_file_path) => { + if relative_file_path.as_os_str().is_empty() { + continue; + } + let event = if let Ok(stripped) = + relative_file_path.strip_prefix(".git") + { + tracing::info!( + %project_id, + file_path = %relative_file_path.display(), + "git file change", + ); + Event::GitFileChange(project_id, stripped.to_owned()) + } else { + tracing::info!( + %project_id, + file_path = %relative_file_path.display(), + "project file change", + ); + Event::ProjectFileChange( + project_id, + relative_file_path.to_path_buf(), + ) + }; + if block_on(tx.send(event)).is_err() { + // channel closed + break 'outer; + } + } + Err(err) => { + tracing::error!(%project_id, ?err, "failed to strip prefix"); + } + } + } + } + } + } + tracing::debug!(%project_id, "file watcher stopped"); + }); + Ok(()) + } + + #[cfg(target_family = "unix")] + fn is_interesting_kind(kind: notify::EventKind) -> bool { + matches!( + kind, + notify::EventKind::Create(notify::event::CreateKind::File) + | notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) + | notify::EventKind::Modify(notify::event::ModifyKind::Name(_)) + | notify::EventKind::Remove(notify::event::RemoveKind::File) + ) + } + + #[cfg(target_os = "windows")] + fn is_interesting_kind(kind: notify::EventKind) -> bool { + matches!( + kind, + notify::EventKind::Create(_) + | notify::EventKind::Modify(_) + | notify::EventKind::Remove(_) + ) + } + + fn is_interesting_file(git_repo: &git::Repository, file_path: &path::Path) -> bool { + if file_path.starts_with(git_repo.path()) { + let check_file_path = file_path.strip_prefix(git_repo.path()).unwrap(); + check_file_path.ends_with("FETCH_HEAD") + || check_file_path.eq(path::Path::new("logs/HEAD")) + || check_file_path.eq(path::Path::new("HEAD")) + || check_file_path.eq(path::Path::new("GB_FLUSH")) + || check_file_path.eq(path::Path::new("index")) + } else { + !git_repo.is_path_ignored(file_path).unwrap_or(false) + } + } +} + +use std::path; +use std::path::Path; + +use anyhow::Result; +use gitbutler_core::projects::ProjectId; +use tokio::{ + select, + sync::mpsc::{channel, Receiver}, + task, +}; +use tokio_util::sync::CancellationToken; + +use super::events; + +#[derive(Clone)] +pub struct Dispatcher { + file_change_dispatcher: file_monitor::Dispatcher, + cancellation_token: CancellationToken, +} + +impl Dispatcher { + pub fn new() -> Self { + Self { + file_change_dispatcher: file_monitor::Dispatcher::new(), + cancellation_token: CancellationToken::new(), + } + } + + pub fn stop(&self) { + self.file_change_dispatcher.stop(); + } + + pub fn run>( + &self, + project_id: &ProjectId, + path: P, + ) -> Result, anyhow::Error> { + let path = path.as_ref(); + + let mut file_change_rx = self.file_change_dispatcher.run(project_id, path)?; + + let (tx, rx) = channel(1); + let project_id = *project_id; + let cancellation_token = self.cancellation_token.clone(); + task::spawn(async move { + loop { + select! { + () = cancellation_token.cancelled() => { + break; + } + Some(event) = file_change_rx.recv() => { + if let Err(error) = tx.send(event).await { + tracing::error!(%project_id, ?error,"failed to send file change"); + } + } + } + } + tracing::debug!(%project_id, "dispatcher stopped"); + }); + + Ok(rx) + } +} + +/// Return a channel which provides change-events of for `project_id` at `path`. +pub fn new( + project_id: ProjectId, + path: impl AsRef, +) -> Result, anyhow::Error> { + // TODO(ST): is the size of 1 really required? It's unbounded internally, and could be just as unbounded here. + // If so, people can call `spawn` directly. + let (tx, rx) = tokio::sync::mpsc::channel(1); + file_monitor::spawn(project_id, path.as_ref(), tx)?; + Ok(rx) +} diff --git a/crates/gitbutler-tauri/src/watcher/dispatchers.rs b/crates/gitbutler-tauri/src/watcher/dispatchers.rs deleted file mode 100644 index 30a829990..000000000 --- a/crates/gitbutler-tauri/src/watcher/dispatchers.rs +++ /dev/null @@ -1,76 +0,0 @@ -mod file_change; - -use std::path; - -use anyhow::{Context, Result}; -use gitbutler_core::projects::ProjectId; -use tokio::{ - select, - sync::mpsc::{channel, Receiver}, - task, -}; -use tokio_util::sync::CancellationToken; - -use super::events; - -#[derive(Clone)] -pub struct Dispatcher { - file_change_dispatcher: file_change::Dispatcher, - cancellation_token: CancellationToken, -} - -#[derive(Debug, thiserror::Error)] -pub enum RunError { - #[error("{0} not found")] - PathNotFound(path::PathBuf), - #[error(transparent)] - Other(#[from] anyhow::Error), -} - -impl Dispatcher { - pub fn new() -> Self { - Self { - file_change_dispatcher: file_change::Dispatcher::new(), - cancellation_token: CancellationToken::new(), - } - } - - pub fn stop(&self) { - self.file_change_dispatcher.stop(); - } - - pub fn run>( - &self, - project_id: &ProjectId, - path: P, - ) -> Result, RunError> { - let path = path.as_ref(); - - let mut file_change_rx = match self.file_change_dispatcher.run(project_id, path) { - Ok(file_change_rx) => Ok(file_change_rx), - Err(file_change::RunError::PathNotFound(path)) => Err(RunError::PathNotFound(path)), - Err(error) => Err(error).context("failed to run file change dispatcher")?, - }?; - - let (tx, rx) = channel(1); - let project_id = *project_id; - let cancellation_token = self.cancellation_token.clone(); - task::spawn(async move { - loop { - select! { - () = cancellation_token.cancelled() => { - break; - } - Some(event) = file_change_rx.recv() => { - if let Err(error) = tx.send(event).await { - tracing::error!(%project_id, ?error,"failed to send file change"); - } - } - } - } - tracing::debug!(%project_id, "dispatcher stopped"); - }); - - Ok(rx) - } -} diff --git a/crates/gitbutler-tauri/src/watcher/dispatchers/file_change.rs b/crates/gitbutler-tauri/src/watcher/dispatchers/file_change.rs deleted file mode 100644 index 174bd5df6..000000000 --- a/crates/gitbutler-tauri/src/watcher/dispatchers/file_change.rs +++ /dev/null @@ -1,185 +0,0 @@ -use std::{ - path, - sync::{Arc, Mutex}, - time::Duration, -}; - -use anyhow::{Context, Result}; -use futures::executor::block_on; -use gitbutler_core::{git, projects::ProjectId}; -use notify::{RecommendedWatcher, Watcher}; -use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap}; -use tokio::{ - sync::mpsc::{channel, Receiver}, - task, -}; - -use crate::watcher::events; - -#[derive(Debug, Clone)] -pub struct Dispatcher { - watcher: Arc>>>, -} - -/// The timeout for debouncing file change events. -/// This is used to prevent multiple events from being sent for a single file change. -static DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100); - -#[derive(Debug, thiserror::Error)] -pub enum RunError { - #[error("{0} not found")] - PathNotFound(path::PathBuf), - #[error(transparent)] - Other(#[from] anyhow::Error), -} - -impl Dispatcher { - pub fn new() -> Self { - Self { - watcher: Arc::new(Mutex::new(None)), - } - } - - pub fn stop(&self) { - self.watcher.lock().unwrap().take(); - } - - pub fn run( - &self, - project_id: &ProjectId, - path: &path::Path, - ) -> Result, RunError> { - let (notify_tx, notify_rx) = std::sync::mpsc::channel(); - let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx) - .context("failed to create debouncer")?; - - let policy = backoff::ExponentialBackoffBuilder::new() - .with_max_elapsed_time(Some(std::time::Duration::from_secs(30))) - .build(); - - backoff::retry(policy, || { - debouncer - .watcher() - .watch(path, notify::RecursiveMode::Recursive) - .map_err(|error| match error.kind { - notify::ErrorKind::PathNotFound => { - backoff::Error::permanent(RunError::PathNotFound(path.to_path_buf())) - } - notify::ErrorKind::Io(_) | notify::ErrorKind::InvalidConfig(_) => { - backoff::Error::permanent(RunError::Other(error.into())) - } - _ => backoff::Error::transient(RunError::Other(error.into())), - }) - }) - .context("failed to start watcher")?; - - let repo = git::Repository::open(path).context(format!( - "failed to open project repository: {}", - path.display() - ))?; - - self.watcher.lock().unwrap().replace(debouncer); - - tracing::debug!(%project_id, "file watcher started"); - - let (tx, rx) = channel(1); - task::spawn_blocking({ - let path = path.to_path_buf(); - let project_id = *project_id; - move || { - for result in notify_rx { - match result { - Err(errors) => { - tracing::error!(?errors, "file watcher error"); - } - Ok(events) => { - let file_paths = events - .into_iter() - .filter(|event| is_interesting_kind(event.kind)) - .flat_map(|event| event.paths.clone()) - .filter(|file| is_interesting_file(&repo, file)); - for file_path in file_paths { - match file_path.strip_prefix(&path) { - Ok(relative_file_path) - if relative_file_path.display().to_string().is_empty() => - { /* noop */ } - Ok(relative_file_path) => { - let event = if relative_file_path.starts_with(".git") { - tracing::info!( - %project_id, - file_path = %relative_file_path.display(), - "git file change", - ); - events::Event::GitFileChange( - project_id, - relative_file_path - .strip_prefix(".git") - .unwrap() - .to_path_buf(), - ) - } else { - tracing::info!( - %project_id, - file_path = %relative_file_path.display(), - "project file change", - ); - events::Event::ProjectFileChange( - project_id, - relative_file_path.to_path_buf(), - ) - }; - if let Err(error) = block_on(tx.send(event)) { - tracing::error!( - %project_id, - ?error, - "failed to send file change event", - ); - } - } - Err(error) => { - tracing::error!(%project_id, ?error, "failed to strip prefix"); - } - } - } - } - } - } - tracing::debug!(%project_id, "file watcher stopped"); - } - }); - - Ok(rx) - } -} - -#[cfg(target_family = "unix")] -fn is_interesting_kind(kind: notify::EventKind) -> bool { - matches!( - kind, - notify::EventKind::Create(notify::event::CreateKind::File) - | notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) - | notify::EventKind::Modify(notify::event::ModifyKind::Name(_)) - | notify::EventKind::Remove(notify::event::RemoveKind::File) - ) -} - -#[cfg(target_os = "windows")] -fn is_interesting_kind(kind: notify::EventKind) -> bool { - matches!( - kind, - notify::EventKind::Create(_) | notify::EventKind::Modify(_) | notify::EventKind::Remove(_) - ) -} - -fn is_interesting_file(git_repo: &git::Repository, file_path: &path::Path) -> bool { - if file_path.starts_with(git_repo.path()) { - let check_file_path = file_path.strip_prefix(git_repo.path()).unwrap(); - check_file_path.ends_with("FETCH_HEAD") - || check_file_path.eq(path::Path::new("logs/HEAD")) - || check_file_path.eq(path::Path::new("HEAD")) - || check_file_path.eq(path::Path::new("GB_FLUSH")) - || check_file_path.eq(path::Path::new("index")) - } else { - !git_repo.is_path_ignored(file_path).unwrap_or(false) - } -}