Merge pull request #1072 from gitbutlerapp/blocking-handlers

run handlers on blocking threads
This commit is contained in:
Nikita Galaiko 2023-08-22 13:34:05 +02:00 committed by GitHub
commit 4d19ee1969
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 41 deletions

View File

@ -106,20 +106,12 @@ impl App {
let project_id = project.id.clone();
let project_path = project.path.clone();
// let handle = thread::spawn(move || {
// let rt = tokio::runtime::Builder::new_multi_thread()
// .thread_name(format!("watcher-{}", project_id))
// .enable_time()
// .build()
// .unwrap();
spawn(async move {
// rt.block_on(async move {
if let Err(e) = c_watcher.run(&project_path, &project_id).await {
tracing::error!("watcher error: {:#}", e);
}
tracing::info!("watcher stopped");
});
// });
self.watchers
.lock()

View File

@ -8,7 +8,6 @@ mod project_file_change;
use anyhow::{Context, Result};
use tauri::AppHandle;
use tokio::time::{timeout, Duration};
use tracing::instrument;
use crate::events as app_events;
@ -44,28 +43,9 @@ impl TryFrom<&AppHandle> for Handler {
}
}
impl<'handler> Handler {
impl Handler {
#[instrument(name = "handle", skip(self), fields(event = %event))]
pub async fn handle(&self, event: &events::Event) -> Result<Vec<events::Event>> {
self.handle_with_timeout(Duration::from_secs(30), event)
.await
}
async fn handle_with_timeout(
&self,
duration: Duration,
event: &events::Event,
) -> Result<Vec<events::Event>> {
match timeout(duration, self.handle_event(event)).await {
Ok(events) => events,
Err(_) => Err(anyhow::anyhow!(
"handler timed out after {} sec",
duration.as_secs()
))?,
}
}
async fn handle_event(&self, event: &events::Event) -> Result<Vec<events::Event>> {
pub fn handle(&self, event: &events::Event) -> Result<Vec<events::Event>> {
match event {
events::Event::ProjectFileChange(project_id, path) => self
.project_file_handler

View File

@ -11,9 +11,10 @@ use tauri::AppHandle;
use tokio::{
spawn,
sync::{
mpsc::{channel, Sender},
mpsc::{unbounded_channel, UnboundedSender},
Mutex,
},
task,
};
use tokio_util::sync::CancellationToken;
@ -51,7 +52,7 @@ struct WatcherInner {
dispatcher: dispatchers::Dispatcher,
cancellation_token: CancellationToken,
proxy_tx: Arc<Mutex<Option<Sender<Event>>>>,
proxy_tx: Arc<Mutex<Option<UnboundedSender<Event>>>>,
}
impl TryFrom<&AppHandle> for WatcherInner {
@ -79,7 +80,6 @@ impl WatcherInner {
tx.as_ref()
.unwrap()
.send(event)
.await
.context("failed to send event")?;
Ok(())
} else {
@ -88,7 +88,7 @@ impl WatcherInner {
}
pub async fn run<P: AsRef<path::Path>>(&self, path: P, project_id: &str) -> Result<()> {
let (tx, mut rx) = channel(1);
let (tx, mut rx) = unbounded_channel();
self.proxy_tx.lock().await.replace(tx.clone());
spawn({
@ -104,7 +104,7 @@ impl WatcherInner {
let span =
tracing::info_span!("proxying event from dispatcher", source = %event);
let _guard = span.enter();
if let Err(e) = tx.send(event.clone()).await {
if let Err(e) = tx.send(event.clone()) {
tracing::error!("{}: failed to post event: {:#}", project_id, e);
}
drop(_guard);
@ -113,23 +113,22 @@ impl WatcherInner {
});
tx.send(Event::IndexAll(project_id.to_string()))
.await
.context("failed to send event")?;
loop {
tokio::select! {
Some(event) = rx.recv() => {
spawn({
task::Builder::new().name(&event.to_string()).spawn_blocking({
let project_id = project_id.to_string();
let handler = self.handler.clone();
let tx = tx.clone();
let event = event.clone();
async move {
match handler.handle(&event).await {
move || {
match handler.handle(&event) {
Err(error) => tracing::error!("{}: failed to handle event {}: {:#}", project_id, event, error),
Ok(events) => {
for e in events {
if let Err(e) = tx.send(e.clone()).await {
if let Err(e) = tx.send(e.clone()) {
tracing::error!("{}: failed to post event {}: {:#}", project_id, event, e);
} else {
tracing::info!("{}: sent response event: {}", project_id, event);
@ -138,7 +137,7 @@ impl WatcherInner {
}
}
}
});
})?;
},
_ = self.cancellation_token.cancelled() => {
if let Err(error) = self.dispatcher.stop() {