diff --git a/src-tauri/src/watcher/dispatchers/file_change.rs b/src-tauri/src/watcher/dispatchers/file_change.rs index 309b02a66..be0070c28 100644 --- a/src-tauri/src/watcher/dispatchers/file_change.rs +++ b/src-tauri/src/watcher/dispatchers/file_change.rs @@ -10,6 +10,8 @@ use futures::{ }; use notify::{Config, Event, RecommendedWatcher, Watcher}; +use crate::watcher::events; + #[derive(Debug, Clone)] pub struct Dispatcher { watcher: Arc>>, @@ -38,7 +40,7 @@ impl Dispatcher { Ok(()) } - pub async fn start(&self, rtx: crossbeam_channel::Sender) -> Result<()> { + pub async fn start(&self, rtx: crossbeam_channel::Sender) -> Result<()> { let (mut watcher, mut rx) = async_watcher()?; watcher .watch( @@ -62,34 +64,23 @@ impl Dispatcher { continue; } for file_path in event.paths { - if let Err(e) = file_path - .strip_prefix(&self.project_path) - .with_context(|| { - format!( - "failed to striprefix from file path: {}", - file_path.display() - ) - }) - .map(|relative_file_path| { - log::info!( - "{}: file changed: {}", - self.project_id, - relative_file_path.display() - ); - if let Err(e) = rtx.send(relative_file_path.to_path_buf()) { + match file_path.strip_prefix(&self.project_path) { + Ok(relative_file_path) => { + if let Err(e) = rtx.send(events::Event::FileChange( + relative_file_path.to_path_buf(), + )) { log::error!( "{}: failed to send file change event: {:#}", self.project_id, e ); } - }) - { - log::error!( - "{}: failed to send file change event: {:#}", + } + Err(err) => log::error!( + "{}: failed to strip prefix: {:#}", self.project_id, - e - ); + err + ), } } } @@ -116,12 +107,13 @@ fn is_interesting_event(kind: ¬ify::EventKind) -> bool { fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver>)> { let (mut tx, rx) = channel(1); - // Automatically select the best implementation for your platform. - // You can also access each implementation directly e.g. INotifyWatcher. let watcher = RecommendedWatcher::new( move |res| { futures::executor::block_on(async { - tx.send(res).await.unwrap(); + if let Err(err) = tx.send(res).await { + log::error!("failed to send file change event: {:#}", err); + } + println!("sent"); }) }, Config::default(), diff --git a/src-tauri/src/watcher/dispatchers/mod.rs b/src-tauri/src/watcher/dispatchers/mod.rs index 63dccf702..661e0c512 100644 --- a/src-tauri/src/watcher/dispatchers/mod.rs +++ b/src-tauri/src/watcher/dispatchers/mod.rs @@ -4,7 +4,7 @@ mod tick; use std::{path, time}; use anyhow::Result; -use crossbeam_channel::{bounded, select, unbounded, Sender}; +use tokio_util::sync::CancellationToken; use super::events; @@ -14,10 +14,7 @@ pub struct Dispatcher { tick_dispatcher: tick::Dispatcher, file_change_dispatcher: file_change::Dispatcher, proxy: crossbeam_channel::Receiver, - stop: ( - crossbeam_channel::Sender<()>, - crossbeam_channel::Receiver<()>, - ), + cancellation_token: CancellationToken, } impl Dispatcher { @@ -30,82 +27,65 @@ impl Dispatcher { project_id: project_id.clone(), tick_dispatcher: tick::Dispatcher::new(project_id.clone()), file_change_dispatcher: file_change::Dispatcher::new(project_id, path), - stop: bounded(1), proxy: proxy_chan, + cancellation_token: CancellationToken::new(), } } pub fn stop(&self) -> Result<()> { - self.stop.0.send(())?; + self.cancellation_token.cancel(); Ok(()) } - pub fn start(&self, sender: Sender) -> Result<()> { - let (t_tx, t_rx) = unbounded(); + pub async fn start(&self, sender: crossbeam_channel::Sender) -> Result<()> { let tick_dispatcher = self.tick_dispatcher.clone(); + let s1 = sender.clone(); let project_id = self.project_id.clone(); tauri::async_runtime::spawn(async move { if let Err(e) = tick_dispatcher - .start(time::Duration::from_secs(10), t_tx) + .start(time::Duration::from_secs(10), s1) .await { log::error!("{}: failed to start ticker: {:#}", project_id, e); } }); - let (fw_tx, fw_rx) = unbounded(); let file_change_dispatcher = self.file_change_dispatcher.clone(); let project_id = self.project_id.clone(); + let s2 = sender.clone(); tauri::async_runtime::spawn(async move { - if let Err(e) = file_change_dispatcher.start(fw_tx).await { + if let Err(e) = file_change_dispatcher.start(s2).await { log::error!("{}: failed to start file watcher: {:#}", project_id, e); } }); - loop { - select! { - recv(t_rx) -> ts => match ts{ - Ok(ts) => { - if let Err(e) = sender.send(events::Event::Tick(ts)) { - log::error!("{}: failed to proxy tick event: {:#}", self.project_id, e); - } - } - Err(e) => { - log::error!("{}: failed to receive tick event: {:#}", self.project_id, e); - } - }, - recv(fw_rx) -> path => match path { - Ok(path) => { - if let Err(e) = sender.send(events::Event::FileChange(path)) { - log::error!("{}: failed to proxy path event: {:#}", self.project_id, e); - } - }, - Err(e) => { - log::error!("{}: failed to receive file change event: {:#}", self.project_id, e); - } - }, - recv(self.proxy) -> event => match event { - Ok(event) => { - if let Err(e) = sender.send(event) { - log::error!("{}: failed to proxy event: {:#}", self.project_id, e); - } - }, - Err(e) => { - log::error!("{}: failed to receive event: {:#}", self.project_id, e); - } - }, - recv(self.stop.1) -> _ => { - if let Err(e) = self.tick_dispatcher.stop() { - log::error!("{}: failed to stop ticker: {:#}", self.project_id, e); - } - if let Err(e) = self.file_change_dispatcher.stop() { - log::error!("{}: failed to stop file watcher: {:#}", self.project_id, e); - } - break; + let project_id = self.project_id.clone(); + let s3 = sender; + let proxy = self.proxy.clone(); + tauri::async_runtime::spawn(async move { + for event in proxy { + if let Err(e) = s3.send(event) { + log::error!("{}: failed to proxy event: {:#}", project_id, e); } } + }); + + self.cancellation_token.cancelled().await; + + if let Err(err) = self.tick_dispatcher.stop() { + log::error!("{}: failed to stop ticker: {:#}", self.project_id, err); } + if let Err(err) = self.file_change_dispatcher.stop() { + log::error!( + "{}: failed to stop file change dispatcher: {:#}", + self.project_id, + err + ); + } + + log::info!("{}: dispatcher stopped", self.project_id); + Ok(()) } } diff --git a/src-tauri/src/watcher/dispatchers/tick.rs b/src-tauri/src/watcher/dispatchers/tick.rs index 0d266e388..0b17b48f7 100644 --- a/src-tauri/src/watcher/dispatchers/tick.rs +++ b/src-tauri/src/watcher/dispatchers/tick.rs @@ -3,6 +3,8 @@ use std::time; use anyhow::Result; use tokio_util::sync::CancellationToken; +use crate::watcher::events; + #[derive(Debug, Clone)] pub struct Dispatcher { project_id: String, @@ -25,16 +27,18 @@ impl Dispatcher { pub async fn start( &self, interval: time::Duration, - rtx: crossbeam_channel::Sender, + rtx: crossbeam_channel::Sender, ) -> Result<()> { let mut ticker = tokio::time::interval(interval); + log::info!("{}: ticker started", self.project_id); + loop { ticker.tick().await; if self.cancellation_token.is_cancelled() { break; } - if let Err(e) = rtx.send(time::SystemTime::now()) { + if let Err(e) = rtx.send(events::Event::Tick(time::SystemTime::now())) { log::error!("{}: failed to send tick: {}", self.project_id, e); } } diff --git a/src-tauri/src/watcher/events.rs b/src-tauri/src/watcher/events.rs index b1a5b5a49..2b2f83417 100644 --- a/src-tauri/src/watcher/events.rs +++ b/src-tauri/src/watcher/events.rs @@ -1,7 +1,8 @@ -use std::{path, time}; +use std::{fmt::Display, path, time}; use crate::{bookmarks, deltas, sessions}; +#[derive(Debug)] pub enum Event { Tick(time::SystemTime), Flush(sessions::Session), @@ -21,3 +22,24 @@ pub enum Event { File((String, path::PathBuf, String)), Deltas((String, path::PathBuf, Vec)), } + +impl Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Event::Tick(_) => write!(f, "Tick"), + Event::Flush(_) => write!(f, "Flush"), + Event::SessionFlushed(_) => write!(f, "SessionFlushed"), + Event::Fetch => write!(f, "Fetch"), + Event::FileChange(_) => write!(f, "FileChange"), + Event::GitFileChange(_) => write!(f, "GitFileChange"), + Event::GitIndexChange => write!(f, "GitIndexChange"), + Event::GitActivity => write!(f, "GitActivity"), + Event::GitHeadChange(_) => write!(f, "GitHeadChange"), + Event::ProjectFileChange(_) => write!(f, "ProjectFileChange"), + Event::Session(_) => write!(f, "Session"), + Event::Bookmark(_) => write!(f, "Bookmark"), + Event::File(_) => write!(f, "File"), + Event::Deltas(_) => write!(f, "Deltas"), + } + } +} diff --git a/src-tauri/src/watcher/mod.rs b/src-tauri/src/watcher/mod.rs index 1fa4556aa..335dde936 100644 --- a/src-tauri/src/watcher/mod.rs +++ b/src-tauri/src/watcher/mod.rs @@ -58,8 +58,8 @@ impl<'watcher> Watcher<'watcher> { let dispatcher = self.dispatcher.clone(); let project_id = self.project_id.clone(); let etx = events_tx.clone(); - tauri::async_runtime::spawn_blocking(move || { - if let Err(e) = dispatcher.start(etx.clone()) { + tauri::async_runtime::spawn(async move { + if let Err(e) = dispatcher.start(etx.clone()).await { log::error!("{}: failed to start dispatcher: {:#}", project_id, e); } }); @@ -67,8 +67,8 @@ impl<'watcher> Watcher<'watcher> { loop { select! { recv(events_rx) -> event => match event { - Ok(events) => { - match self.handler.handle(events) { + Ok(event) => { + match self.handler.handle(event) { Ok(events) => { for event in events { if let Err(e) = events_tx.send(event) {