diff --git a/crates/gitbutler-tauri/src/watcher.rs b/crates/gitbutler-tauri/src/watcher.rs index f7b326873..83d583072 100644 --- a/crates/gitbutler-tauri/src/watcher.rs +++ b/crates/gitbutler-tauri/src/watcher.rs @@ -1,5 +1,5 @@ -mod dispatcher; mod events; +mod file_monitor; pub mod handlers; use std::{collections::HashMap, path, sync::Arc, time}; @@ -41,7 +41,7 @@ impl Watchers { let watchers = Arc::clone(&self.watchers); async move { watchers.lock().await.insert(project_id, watcher.clone()); - match watcher.run(&project_path, &project_id).await { + match watcher.run(&project_path, project_id).await { Ok(()) => { tracing::debug!(%project_id, "watcher stopped"); } @@ -126,7 +126,7 @@ impl Watcher { pub async fn run>( &self, path: P, - project_id: &ProjectId, + project_id: ProjectId, ) -> Result<(), anyhow::Error> { self.inner.run(path, project_id).await } @@ -134,7 +134,6 @@ impl Watcher { struct WatcherInner { handler: handlers::Handler, - dispatcher: dispatcher::Dispatcher, cancellation_token: CancellationToken, proxy_tx: Arc>>>, @@ -144,7 +143,6 @@ impl WatcherInner { pub fn from_app(app: &AppHandle) -> std::result::Result { Ok(Self { handler: handlers::Handler::from_app(app)?, - dispatcher: dispatcher::Dispatcher::new(), cancellation_token: CancellationToken::new(), proxy_tx: Arc::new(tokio::sync::Mutex::new(None)), }) @@ -172,16 +170,14 @@ impl WatcherInner { pub async fn run>( &self, path: P, - project_id: &ProjectId, + project_id: ProjectId, ) -> Result<(), anyhow::Error> { let (proxy_tx, mut proxy_rx) = unbounded_channel(); self.proxy_tx.lock().await.replace(proxy_tx.clone()); - let dispatcher = self.dispatcher.clone(); - let mut dispatcher_rx = dispatcher.run(project_id, path.as_ref())?; - + let mut dispatcher_rx = file_monitor::spawn(project_id, path.as_ref())?; proxy_tx - .send(Event::IndexAll(*project_id)) + .send(Event::IndexAll(project_id)) .context("failed to send event")?; let handle_event = |event: &Event| -> Result<()> { @@ -229,7 +225,6 @@ impl WatcherInner { Some(event) = dispatcher_rx.recv() => handle_event(&event)?, Some(event) = proxy_rx.recv() => handle_event(&event)?, () = self.cancellation_token.cancelled() => { - self.dispatcher.stop(); break; } } diff --git a/crates/gitbutler-tauri/src/watcher/dispatcher.rs b/crates/gitbutler-tauri/src/watcher/dispatcher.rs deleted file mode 100644 index 2ff16f76d..000000000 --- a/crates/gitbutler-tauri/src/watcher/dispatcher.rs +++ /dev/null @@ -1,376 +0,0 @@ -mod file_monitor { - use std::{ - path, - sync::{Arc, Mutex}, - time::Duration, - }; - - use anyhow::{anyhow, Context, Result}; - use futures::executor::block_on; - use gitbutler_core::{git, projects::ProjectId}; - use notify::{RecommendedWatcher, Watcher}; - use notify_debouncer_full::{new_debouncer, Debouncer, FileIdMap}; - use tokio::{ - sync::mpsc::{channel, Receiver}, - task, - }; - - use crate::watcher::events::Event; - - #[derive(Debug, Clone)] - pub struct Dispatcher { - watcher: Arc>>>, - } - - /// The timeout for debouncing file change events. - /// This is used to prevent multiple events from being sent for a single file change. - static DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100); - - /// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`] - /// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait. - #[derive(Debug, thiserror::Error)] - #[error(transparent)] - struct RunError { - #[from] - source: anyhow::Error, - } - - impl Dispatcher { - pub fn new() -> Self { - Self { - watcher: Arc::new(Mutex::new(None)), - } - } - - pub fn stop(&self) { - self.watcher.lock().unwrap().take(); - } - - pub fn run( - &self, - project_id: &ProjectId, - path: &path::Path, - ) -> Result, anyhow::Error> { - let (notify_tx, notify_rx) = std::sync::mpsc::channel(); - let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx) - .context("failed to create debouncer")?; - - let policy = backoff::ExponentialBackoffBuilder::new() - .with_max_elapsed_time(Some(std::time::Duration::from_secs(30))) - .build(); - - backoff::retry(policy, || { - debouncer - .watcher() - .watch(path, notify::RecursiveMode::Recursive) - .map_err(|err| match err.kind { - notify::ErrorKind::PathNotFound => backoff::Error::permanent( - RunError::from(anyhow!("{} not found", path.display())), - ), - notify::ErrorKind::Io(_) | notify::ErrorKind::InvalidConfig(_) => { - backoff::Error::permanent(RunError::from(anyhow::Error::from(err))) - } - _ => backoff::Error::transient(RunError::from(anyhow::Error::from(err))), - }) - }) - .context("failed to start watcher")?; - - let repo = git::Repository::open(path).context(format!( - "failed to open project repository: {}", - path.display() - ))?; - - self.watcher.lock().unwrap().replace(debouncer); - - tracing::debug!(%project_id, "file watcher started"); - - let (tx, rx) = channel(1); - task::spawn_blocking({ - let path = path.to_path_buf(); - let project_id = *project_id; - move || { - for result in notify_rx { - match result { - Err(errors) => { - tracing::error!(?errors, "file watcher error"); - } - Ok(events) => { - let file_paths = events - .into_iter() - .filter(|event| is_interesting_kind(event.kind)) - .flat_map(|event| event.paths.clone()) - .filter(|file| is_interesting_file(&repo, file)); - for file_path in file_paths { - match file_path.strip_prefix(&path) { - Ok(relative_file_path) - if relative_file_path - .display() - .to_string() - .is_empty() => - { /* noop */ } - Ok(relative_file_path) => { - let event = if relative_file_path.starts_with(".git") { - tracing::info!( - %project_id, - file_path = %relative_file_path.display(), - "git file change", - ); - Event::GitFileChange( - project_id, - relative_file_path - .strip_prefix(".git") - .unwrap() - .to_path_buf(), - ) - } else { - tracing::info!( - %project_id, - file_path = %relative_file_path.display(), - "project file change", - ); - Event::ProjectFileChange( - project_id, - relative_file_path.to_path_buf(), - ) - }; - if let Err(error) = block_on(tx.send(event)) { - tracing::error!( - %project_id, - ?error, - "failed to send file change event", - ); - } - } - Err(error) => { - tracing::error!(%project_id, ?error, "failed to strip prefix"); - } - } - } - } - } - } - tracing::debug!(%project_id, "file watcher stopped"); - } - }); - - Ok(rx) - } - } - - /// Listen to interesting filesystem events of files in `path` that are not `.gitignore`d and pass them to - /// `tx` as [`Event`] which classifies it, and associates it with `project_id`. - /// - /// Configure the channel behind `tx` according to your needs, typically `tokio::sync::mpsc::channel(1)`. - /// - /// ### Why is this not an iterator? - /// - /// The internal `notify_rx` could be an iterator, which performs all transformations and returns them as item. - /// However, due to closures being continuously created each time events come in, nested closures need to own - /// their resources which means they are `Clone` or `Copy`. This isn't the case for `git::Repository`. - /// Even though `gix::Repository` is `Clone`, an efficient implementation of `is_path_ignored()` requires more state - /// that ideally is kept between invocations. For that reason, the current channel-based 'worker' architecture - /// is chosen to allow all this state to live on the stack. - pub fn spawn( - project_id: ProjectId, - path: &path::Path, - tx: tokio::sync::mpsc::Sender, - ) -> Result<(), anyhow::Error> { - let (notify_tx, notify_rx) = std::sync::mpsc::channel(); - let mut debouncer = new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx) - .context("failed to create debouncer")?; - - let policy = backoff::ExponentialBackoffBuilder::new() - .with_max_elapsed_time(Some(std::time::Duration::from_secs(30))) - .build(); - - // Start the watcher, but retry if there are transient errors. - backoff::retry(policy, || { - debouncer - .watcher() - .watch(path, notify::RecursiveMode::Recursive) - .map_err(|err| match err.kind { - notify::ErrorKind::PathNotFound => backoff::Error::permanent(RunError::from( - anyhow!("{} not found", path.display()), - )), - notify::ErrorKind::Io(_) | notify::ErrorKind::InvalidConfig(_) => { - backoff::Error::permanent(RunError::from(anyhow::Error::from(err))) - } - _ => backoff::Error::transient(RunError::from(anyhow::Error::from(err))), - }) - }) - .context("failed to start watcher")?; - - let repo = git::Repository::open(path).context(format!( - "failed to open project repository: {}", - path.display() - ))?; - - tracing::debug!(%project_id, "file watcher started"); - - let path = path.to_owned(); - task::spawn_blocking(move || { - 'outer: for result in notify_rx { - match result { - Err(err) => { - tracing::error!(?err, "file watcher error"); - } - Ok(events) => { - let file_paths = events - .into_iter() - .filter(|event| is_interesting_kind(event.kind)) - .flat_map(|event| event.paths.clone()) - .filter(|file| is_interesting_file(&repo, file)); - for file_path in file_paths { - match file_path.strip_prefix(&path) { - Ok(relative_file_path) => { - if relative_file_path.as_os_str().is_empty() { - continue; - } - let event = if let Ok(stripped) = - relative_file_path.strip_prefix(".git") - { - tracing::info!( - %project_id, - file_path = %relative_file_path.display(), - "git file change", - ); - Event::GitFileChange(project_id, stripped.to_owned()) - } else { - tracing::info!( - %project_id, - file_path = %relative_file_path.display(), - "project file change", - ); - Event::ProjectFileChange( - project_id, - relative_file_path.to_path_buf(), - ) - }; - if block_on(tx.send(event)).is_err() { - // channel closed - break 'outer; - } - } - Err(err) => { - tracing::error!(%project_id, ?err, "failed to strip prefix"); - } - } - } - } - } - } - tracing::debug!(%project_id, "file watcher stopped"); - }); - Ok(()) - } - - #[cfg(target_family = "unix")] - fn is_interesting_kind(kind: notify::EventKind) -> bool { - matches!( - kind, - notify::EventKind::Create(notify::event::CreateKind::File) - | notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) - | notify::EventKind::Modify(notify::event::ModifyKind::Name(_)) - | notify::EventKind::Remove(notify::event::RemoveKind::File) - ) - } - - #[cfg(target_os = "windows")] - fn is_interesting_kind(kind: notify::EventKind) -> bool { - matches!( - kind, - notify::EventKind::Create(_) - | notify::EventKind::Modify(_) - | notify::EventKind::Remove(_) - ) - } - - fn is_interesting_file(git_repo: &git::Repository, file_path: &path::Path) -> bool { - if file_path.starts_with(git_repo.path()) { - let check_file_path = file_path.strip_prefix(git_repo.path()).unwrap(); - check_file_path.ends_with("FETCH_HEAD") - || check_file_path.eq(path::Path::new("logs/HEAD")) - || check_file_path.eq(path::Path::new("HEAD")) - || check_file_path.eq(path::Path::new("GB_FLUSH")) - || check_file_path.eq(path::Path::new("index")) - } else { - !git_repo.is_path_ignored(file_path).unwrap_or(false) - } - } -} - -use std::path; -use std::path::Path; - -use anyhow::Result; -use gitbutler_core::projects::ProjectId; -use tokio::{ - select, - sync::mpsc::{channel, Receiver}, - task, -}; -use tokio_util::sync::CancellationToken; - -use super::events; - -#[derive(Clone)] -pub struct Dispatcher { - file_change_dispatcher: file_monitor::Dispatcher, - cancellation_token: CancellationToken, -} - -impl Dispatcher { - pub fn new() -> Self { - Self { - file_change_dispatcher: file_monitor::Dispatcher::new(), - cancellation_token: CancellationToken::new(), - } - } - - pub fn stop(&self) { - self.file_change_dispatcher.stop(); - } - - pub fn run>( - &self, - project_id: &ProjectId, - path: P, - ) -> Result, anyhow::Error> { - let path = path.as_ref(); - - let mut file_change_rx = self.file_change_dispatcher.run(project_id, path)?; - - let (tx, rx) = channel(1); - let project_id = *project_id; - let cancellation_token = self.cancellation_token.clone(); - task::spawn(async move { - loop { - select! { - () = cancellation_token.cancelled() => { - break; - } - Some(event) = file_change_rx.recv() => { - if let Err(error) = tx.send(event).await { - tracing::error!(%project_id, ?error,"failed to send file change"); - } - } - } - } - tracing::debug!(%project_id, "dispatcher stopped"); - }); - - Ok(rx) - } -} - -/// Return a channel which provides change-events of for `project_id` at `path`. -pub fn new( - project_id: ProjectId, - path: impl AsRef, -) -> Result, anyhow::Error> { - // TODO(ST): is the size of 1 really required? It's unbounded internally, and could be just as unbounded here. - // If so, people can call `spawn` directly. - let (tx, rx) = tokio::sync::mpsc::channel(1); - file_monitor::spawn(project_id, path.as_ref(), tx)?; - Ok(rx) -} diff --git a/crates/gitbutler-tauri/src/watcher/file_monitor.rs b/crates/gitbutler-tauri/src/watcher/file_monitor.rs new file mode 100644 index 000000000..588ee9090 --- /dev/null +++ b/crates/gitbutler-tauri/src/watcher/file_monitor.rs @@ -0,0 +1,162 @@ +use std::{path, time::Duration}; + +use anyhow::{anyhow, Context, Result}; +use futures::executor::block_on; +use gitbutler_core::{git, projects::ProjectId}; +use notify::Watcher; +use notify_debouncer_full::new_debouncer; +use tokio::task; + +use crate::watcher::events::Event; + +/// The timeout for debouncing file change events. +/// This is used to prevent multiple events from being sent for a single file change. +static DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100); + +/// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`] +/// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +struct RunError { + #[from] + source: anyhow::Error, +} + +/// Listen to interesting filesystem events of files in `path` that are not `.gitignore`d, turn them into [`Events`](Event) +/// which classifies it and associates it with `project_id`. These are observable in the returned receiver. +/// +/// ### Why is this not an iterator? +/// +/// The internal `notify_rx` could be an iterator, which performs all transformations and returns them as item. +/// However, due to closures being continuously created each time events come in, nested closures need to own +/// their resources which means they are `Clone` or `Copy`. This isn't the case for `git::Repository`. +/// Even though `gix::Repository` is `Clone`, an efficient implementation of `is_path_ignored()` requires more state +/// that ideally is kept between invocations. For that reason, the current channel-based 'worker' architecture +/// is chosen to allow all this state to live on the stack. +pub fn spawn( + project_id: ProjectId, + path: &std::path::Path, +) -> Result, anyhow::Error> { + let (notify_tx, notify_rx) = std::sync::mpsc::channel(); + let mut debouncer = + new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx).context("failed to create debouncer")?; + + let policy = backoff::ExponentialBackoffBuilder::new() + .with_max_elapsed_time(Some(std::time::Duration::from_secs(30))) + .build(); + + // Start the watcher, but retry if there are transient errors. + backoff::retry(policy, || { + debouncer + .watcher() + .watch(path, notify::RecursiveMode::Recursive) + .map_err(|err| match err.kind { + notify::ErrorKind::PathNotFound => backoff::Error::permanent(RunError::from( + anyhow!("{} not found", path.display()), + )), + notify::ErrorKind::Io(_) | notify::ErrorKind::InvalidConfig(_) => { + backoff::Error::permanent(RunError::from(anyhow::Error::from(err))) + } + _ => backoff::Error::transient(RunError::from(anyhow::Error::from(err))), + }) + }) + .context("failed to start watcher")?; + + let repo = git::Repository::open(path).context(format!( + "failed to open project repository: {}", + path.display() + ))?; + + tracing::debug!(%project_id, "file watcher started"); + + // TODO(ST): is the size of 1 really required? It's unbounded internally, and could be just as unbounded here. + // If so, people can call `spawn` directly. + let (tx, rx) = tokio::sync::mpsc::channel(1); + let path = path.to_owned(); + task::spawn_blocking(move || { + let _debouncer = debouncer; + 'outer: for result in notify_rx { + match result { + Err(err) => { + tracing::error!(?err, "file watcher error"); + } + Ok(events) => { + let file_paths = events + .into_iter() + .filter(|event| is_interesting_kind(event.kind)) + .flat_map(|event| event.paths.clone()) + .filter(|file| is_interesting_file(&repo, file)); + for file_path in file_paths { + match file_path.strip_prefix(&path) { + Ok(relative_file_path) => { + if relative_file_path.as_os_str().is_empty() { + continue; + } + let event = + if let Ok(stripped) = relative_file_path.strip_prefix(".git") { + tracing::info!( + %project_id, + file_path = %relative_file_path.display(), + "git file change", + ); + Event::GitFileChange(project_id, stripped.to_owned()) + } else { + tracing::info!( + %project_id, + file_path = %relative_file_path.display(), + "project file change", + ); + Event::ProjectFileChange( + project_id, + relative_file_path.to_path_buf(), + ) + }; + if block_on(tx.send(event)).is_err() { + tracing::info!("channel closed - stopping file watcher"); + break 'outer; + } + } + Err(err) => { + tracing::error!(%project_id, ?err, "failed to strip prefix"); + } + } + } + } + } + } + tracing::debug!(%project_id, "file watcher stopped"); + }); + Ok(rx) +} + +#[cfg(target_family = "unix")] +fn is_interesting_kind(kind: notify::EventKind) -> bool { + matches!( + kind, + notify::EventKind::Create(notify::event::CreateKind::File) + | notify::EventKind::Modify(notify::event::ModifyKind::Data(_)) + | notify::EventKind::Modify(notify::event::ModifyKind::Name(_)) + | notify::EventKind::Remove(notify::event::RemoveKind::File) + ) +} + +#[cfg(target_os = "windows")] +fn is_interesting_kind(kind: notify::EventKind) -> bool { + matches!( + kind, + notify::EventKind::Create(_) | notify::EventKind::Modify(_) | notify::EventKind::Remove(_) + ) +} + +fn is_interesting_file(git_repo: &git::Repository, file_path: &path::Path) -> bool { + if file_path.starts_with(git_repo.path()) { + let check_file_path = file_path.strip_prefix(git_repo.path()).unwrap(); + check_file_path.ends_with("FETCH_HEAD") + || check_file_path.eq(path::Path::new("logs/HEAD")) + || check_file_path.eq(path::Path::new("HEAD")) + || check_file_path.eq(path::Path::new("GB_FLUSH")) + || check_file_path.eq(path::Path::new("index")) + } else { + !git_repo.is_path_ignored(file_path).unwrap_or(false) + } +}