diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 80e591c13..95111c05c 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -1505,6 +1505,7 @@ dependencies = [ "timed", "tokio", "tokio-tungstenite 0.18.0", + "tokio-util", "urlencoding", "uuid", "walkdir", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 5c2127967..a8675ca23 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -54,6 +54,7 @@ zip = "0.6.5" rusqlite = { version = "0.28.0", features = [ "bundled", "blob" ] } refinery = { version = "0.8", features = [ "rusqlite" ] } sha1 = "0.10.5" +tokio-util = "0.7.8" [features] # by default Tauri runs in production mode diff --git a/src-tauri/src/watcher/dispatchers/file_change.rs b/src-tauri/src/watcher/dispatchers/file_change.rs index 444ab23a9..6d4629088 100644 --- a/src-tauri/src/watcher/dispatchers/file_change.rs +++ b/src-tauri/src/watcher/dispatchers/file_change.rs @@ -4,7 +4,11 @@ use std::{ }; use anyhow::{Context, Result}; -use notify::{Config, RecommendedWatcher, Watcher}; +use futures::{ + channel::mpsc::{channel, Receiver}, + SinkExt, StreamExt, +}; +use notify::{Config, Event, RecommendedWatcher, Watcher}; #[derive(Debug, Clone)] pub struct Dispatcher { @@ -34,9 +38,8 @@ impl Dispatcher { Ok(()) } - pub fn start(&self, rtx: crossbeam_channel::Sender) -> Result<()> { - let (tx, rx) = std::sync::mpsc::channel(); - let mut watcher = RecommendedWatcher::new(tx, Config::default())?; + pub async fn start(&self, rtx: crossbeam_channel::Sender) -> Result<()> { + let (mut watcher, mut rx) = async_watcher()?; watcher .watch( std::path::Path::new(&self.project_path), @@ -52,7 +55,7 @@ impl Dispatcher { log::info!("{}: file watcher started", self.project_id); - for res in rx { + while let Some(res) = rx.next().await { match res { Ok(event) => { if !is_interesting_event(&event.kind) { @@ -96,3 +99,20 @@ fn is_interesting_event(kind: ¬ify::EventKind) -> bool { | notify::EventKind::Remove(notify::event::RemoveKind::File) ) } + +fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver>)> { + let (mut tx, rx) = channel(1); + + // Automatically select the best implementation for your platform. + // You can also access each implementation directly e.g. INotifyWatcher. + let watcher = RecommendedWatcher::new( + move |res| { + futures::executor::block_on(async { + tx.send(res).await.unwrap(); + }) + }, + Config::default(), + )?; + + Ok((watcher, rx)) +} diff --git a/src-tauri/src/watcher/dispatchers/mod.rs b/src-tauri/src/watcher/dispatchers/mod.rs index b03769aba..63dccf702 100644 --- a/src-tauri/src/watcher/dispatchers/mod.rs +++ b/src-tauri/src/watcher/dispatchers/mod.rs @@ -44,8 +44,11 @@ impl Dispatcher { let (t_tx, t_rx) = unbounded(); let tick_dispatcher = self.tick_dispatcher.clone(); let project_id = self.project_id.clone(); - tauri::async_runtime::spawn_blocking(move || { - if let Err(e) = tick_dispatcher.start(time::Duration::from_secs(10), t_tx) { + tauri::async_runtime::spawn(async move { + if let Err(e) = tick_dispatcher + .start(time::Duration::from_secs(10), t_tx) + .await + { log::error!("{}: failed to start ticker: {:#}", project_id, e); } }); @@ -53,8 +56,8 @@ impl Dispatcher { let (fw_tx, fw_rx) = unbounded(); let file_change_dispatcher = self.file_change_dispatcher.clone(); let project_id = self.project_id.clone(); - tauri::async_runtime::spawn_blocking(move || { - if let Err(e) = file_change_dispatcher.start(fw_tx) { + tauri::async_runtime::spawn(async move { + if let Err(e) = file_change_dispatcher.start(fw_tx).await { log::error!("{}: failed to start file watcher: {:#}", project_id, e); } }); diff --git a/src-tauri/src/watcher/dispatchers/tick.rs b/src-tauri/src/watcher/dispatchers/tick.rs index e162cd769..b03e12f74 100644 --- a/src-tauri/src/watcher/dispatchers/tick.rs +++ b/src-tauri/src/watcher/dispatchers/tick.rs @@ -1,46 +1,41 @@ use std::time; use anyhow::Result; -use crossbeam_channel::{bounded, select, tick, Receiver, Sender}; +use tokio_util::sync::CancellationToken; #[derive(Debug, Clone)] pub struct Dispatcher { project_id: String, - stop: (Sender<()>, Receiver<()>), + cancellation_token: CancellationToken, } impl Dispatcher { pub fn new(project_id: String) -> Self { Self { project_id, - stop: bounded(1), + cancellation_token: CancellationToken::new(), } } pub fn stop(&self) -> Result<()> { - self.stop.0.send(())?; + self.cancellation_token.cancel(); Ok(()) } - pub fn start(&self, interval: time::Duration, rtx: Sender) -> Result<()> { - let update = tick(interval); - - log::info!("{}: ticker started", self.project_id); + pub async fn start( + &self, + interval: time::Duration, + rtx: crossbeam_channel::Sender, + ) -> Result<()> { + let mut ticker = tokio::time::interval(interval); loop { - select! { - recv(update) -> ts => match ts { - Ok(_) => { - if let Err(e) = rtx.send(time::SystemTime::now()) { - log::error!("{}: failed to send tick event: {:#}", self.project_id, e); - } - }, - Err(e) => log::error!("{}: failed to receive tick event: {:#}", self.project_id, e) - }, - recv(self.stop.1) -> _ => { - break; - } + ticker.tick().await; + if self.cancellation_token.is_cancelled() { + break; } + println!("{}: tick", self.project_id); + rtx.send(time::SystemTime::now())?; } log::info!("{}: ticker stopped", self.project_id);