From 944d2a272ef41ea88e30743651663b5492bd0b64 Mon Sep 17 00:00:00 2001 From: Nikita Galaiko Date: Tue, 22 Aug 2023 12:41:29 +0200 Subject: [PATCH 1/2] run handlers on blocking threads --- src-tauri/src/watcher/handlers/mod.rs | 22 +--------------------- src-tauri/src/watcher/mod.rs | 18 ++++++++---------- 2 files changed, 9 insertions(+), 31 deletions(-) diff --git a/src-tauri/src/watcher/handlers/mod.rs b/src-tauri/src/watcher/handlers/mod.rs index 84aee447c..e7e9e8c55 100644 --- a/src-tauri/src/watcher/handlers/mod.rs +++ b/src-tauri/src/watcher/handlers/mod.rs @@ -8,7 +8,6 @@ mod project_file_change; use anyhow::{Context, Result}; use tauri::AppHandle; -use tokio::time::{timeout, Duration}; use tracing::instrument; use crate::events as app_events; @@ -46,26 +45,7 @@ impl TryFrom<&AppHandle> for Handler { impl<'handler> Handler { #[instrument(name = "handle", skip(self), fields(event = %event))] - pub async fn handle(&self, event: &events::Event) -> Result> { - self.handle_with_timeout(Duration::from_secs(30), event) - .await - } - - async fn handle_with_timeout( - &self, - duration: Duration, - event: &events::Event, - ) -> Result> { - match timeout(duration, self.handle_event(event)).await { - Ok(events) => events, - Err(_) => Err(anyhow::anyhow!( - "handler timed out after {} sec", - duration.as_secs() - ))?, - } - } - - async fn handle_event(&self, event: &events::Event) -> Result> { + pub fn handle(&self, event: &events::Event) -> Result> { match event { events::Event::ProjectFileChange(project_id, path) => self .project_file_handler diff --git a/src-tauri/src/watcher/mod.rs b/src-tauri/src/watcher/mod.rs index ac79ff821..e41c4f1d2 100644 --- a/src-tauri/src/watcher/mod.rs +++ b/src-tauri/src/watcher/mod.rs @@ -51,7 +51,7 @@ struct WatcherInner { dispatcher: dispatchers::Dispatcher, cancellation_token: CancellationToken, - proxy_tx: Arc>>>, + proxy_tx: Arc>>>, } impl TryFrom<&AppHandle> for WatcherInner { @@ -79,7 +79,6 @@ impl WatcherInner { tx.as_ref() .unwrap() .send(event) - .await .context("failed to send event")?; Ok(()) } else { @@ -88,7 +87,7 @@ impl WatcherInner { } pub async fn run>(&self, path: P, project_id: &str) -> Result<()> { - let (tx, mut rx) = channel(1); + let (tx, mut rx) = unbounded_channel(); self.proxy_tx.lock().await.replace(tx.clone()); spawn({ @@ -104,7 +103,7 @@ impl WatcherInner { let span = tracing::info_span!("proxying event from dispatcher", source = %event); let _guard = span.enter(); - if let Err(e) = tx.send(event.clone()).await { + if let Err(e) = tx.send(event.clone()) { tracing::error!("{}: failed to post event: {:#}", project_id, e); } drop(_guard); @@ -113,23 +112,22 @@ impl WatcherInner { }); tx.send(Event::IndexAll(project_id.to_string())) - .await .context("failed to send event")?; loop { tokio::select! { Some(event) = rx.recv() => { - spawn({ + task::Builder::new().name(&event.to_string()).spawn_blocking({ let project_id = project_id.to_string(); let handler = self.handler.clone(); let tx = tx.clone(); let event = event.clone(); - async move { - match handler.handle(&event).await { + move || { + match handler.handle(&event) { Err(error) => tracing::error!("{}: failed to handle event {}: {:#}", project_id, event, error), Ok(events) => { for e in events { - if let Err(e) = tx.send(e.clone()).await { + if let Err(e) = tx.send(e.clone()) { tracing::error!("{}: failed to post event {}: {:#}", project_id, event, e); } else { tracing::info!("{}: sent response event: {}", project_id, event); @@ -138,7 +136,7 @@ impl WatcherInner { } } } - }); + })?; }, _ = self.cancellation_token.cancelled() => { if let Err(error) = self.dispatcher.stop() { From 22f231334c10e2477da201d39535999468acf543 Mon Sep 17 00:00:00 2001 From: Nikita Galaiko Date: Tue, 22 Aug 2023 12:55:07 +0200 Subject: [PATCH 2/2] fix build --- src-tauri/src/app.rs | 8 -------- src-tauri/src/watcher/handlers/mod.rs | 2 +- src-tauri/src/watcher/mod.rs | 3 ++- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src-tauri/src/app.rs b/src-tauri/src/app.rs index 9c0284039..c90852a31 100644 --- a/src-tauri/src/app.rs +++ b/src-tauri/src/app.rs @@ -106,20 +106,12 @@ impl App { let project_id = project.id.clone(); let project_path = project.path.clone(); - // let handle = thread::spawn(move || { - // let rt = tokio::runtime::Builder::new_multi_thread() - // .thread_name(format!("watcher-{}", project_id)) - // .enable_time() - // .build() - // .unwrap(); spawn(async move { - // rt.block_on(async move { if let Err(e) = c_watcher.run(&project_path, &project_id).await { tracing::error!("watcher error: {:#}", e); } tracing::info!("watcher stopped"); }); - // }); self.watchers .lock() diff --git a/src-tauri/src/watcher/handlers/mod.rs b/src-tauri/src/watcher/handlers/mod.rs index e7e9e8c55..4cb0aa76f 100644 --- a/src-tauri/src/watcher/handlers/mod.rs +++ b/src-tauri/src/watcher/handlers/mod.rs @@ -43,7 +43,7 @@ impl TryFrom<&AppHandle> for Handler { } } -impl<'handler> Handler { +impl Handler { #[instrument(name = "handle", skip(self), fields(event = %event))] pub fn handle(&self, event: &events::Event) -> Result> { match event { diff --git a/src-tauri/src/watcher/mod.rs b/src-tauri/src/watcher/mod.rs index e41c4f1d2..ed175126a 100644 --- a/src-tauri/src/watcher/mod.rs +++ b/src-tauri/src/watcher/mod.rs @@ -11,9 +11,10 @@ use tauri::AppHandle; use tokio::{ spawn, sync::{ - mpsc::{channel, Sender}, + mpsc::{unbounded_channel, UnboundedSender}, Mutex, }, + task, }; use tokio_util::sync::CancellationToken;