make file_change and ticket async

This commit is contained in:
Nikita Galaiko 2023-05-31 11:28:11 +02:00
parent 61bdfef94d
commit 63a576d5a6
5 changed files with 49 additions and 29 deletions

1
src-tauri/Cargo.lock generated
View File

@ -1505,6 +1505,7 @@ dependencies = [
"timed",
"tokio",
"tokio-tungstenite 0.18.0",
"tokio-util",
"urlencoding",
"uuid",
"walkdir",

View File

@ -54,6 +54,7 @@ zip = "0.6.5"
rusqlite = { version = "0.28.0", features = [ "bundled", "blob" ] }
refinery = { version = "0.8", features = [ "rusqlite" ] }
sha1 = "0.10.5"
tokio-util = "0.7.8"
[features]
# by default Tauri runs in production mode

View File

@ -4,7 +4,11 @@ use std::{
};
use anyhow::{Context, Result};
use notify::{Config, RecommendedWatcher, Watcher};
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use notify::{Config, Event, RecommendedWatcher, Watcher};
#[derive(Debug, Clone)]
pub struct Dispatcher {
@ -34,9 +38,8 @@ impl Dispatcher {
Ok(())
}
pub fn start(&self, rtx: crossbeam_channel::Sender<PathBuf>) -> Result<()> {
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
pub async fn start(&self, rtx: crossbeam_channel::Sender<PathBuf>) -> Result<()> {
let (mut watcher, mut rx) = async_watcher()?;
watcher
.watch(
std::path::Path::new(&self.project_path),
@ -52,7 +55,7 @@ impl Dispatcher {
log::info!("{}: file watcher started", self.project_id);
for res in rx {
while let Some(res) = rx.next().await {
match res {
Ok(event) => {
if !is_interesting_event(&event.kind) {
@ -96,3 +99,20 @@ fn is_interesting_event(kind: &notify::EventKind) -> bool {
| notify::EventKind::Remove(notify::event::RemoveKind::File)
)
}
fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);
// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
tx.send(res).await.unwrap();
})
},
Config::default(),
)?;
Ok((watcher, rx))
}

View File

@ -44,8 +44,11 @@ impl Dispatcher {
let (t_tx, t_rx) = unbounded();
let tick_dispatcher = self.tick_dispatcher.clone();
let project_id = self.project_id.clone();
tauri::async_runtime::spawn_blocking(move || {
if let Err(e) = tick_dispatcher.start(time::Duration::from_secs(10), t_tx) {
tauri::async_runtime::spawn(async move {
if let Err(e) = tick_dispatcher
.start(time::Duration::from_secs(10), t_tx)
.await
{
log::error!("{}: failed to start ticker: {:#}", project_id, e);
}
});
@ -53,8 +56,8 @@ impl Dispatcher {
let (fw_tx, fw_rx) = unbounded();
let file_change_dispatcher = self.file_change_dispatcher.clone();
let project_id = self.project_id.clone();
tauri::async_runtime::spawn_blocking(move || {
if let Err(e) = file_change_dispatcher.start(fw_tx) {
tauri::async_runtime::spawn(async move {
if let Err(e) = file_change_dispatcher.start(fw_tx).await {
log::error!("{}: failed to start file watcher: {:#}", project_id, e);
}
});

View File

@ -1,46 +1,41 @@
use std::time;
use anyhow::Result;
use crossbeam_channel::{bounded, select, tick, Receiver, Sender};
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct Dispatcher {
project_id: String,
stop: (Sender<()>, Receiver<()>),
cancellation_token: CancellationToken,
}
impl Dispatcher {
pub fn new(project_id: String) -> Self {
Self {
project_id,
stop: bounded(1),
cancellation_token: CancellationToken::new(),
}
}
pub fn stop(&self) -> Result<()> {
self.stop.0.send(())?;
self.cancellation_token.cancel();
Ok(())
}
pub fn start(&self, interval: time::Duration, rtx: Sender<time::SystemTime>) -> Result<()> {
let update = tick(interval);
log::info!("{}: ticker started", self.project_id);
pub async fn start(
&self,
interval: time::Duration,
rtx: crossbeam_channel::Sender<time::SystemTime>,
) -> Result<()> {
let mut ticker = tokio::time::interval(interval);
loop {
select! {
recv(update) -> ts => match ts {
Ok(_) => {
if let Err(e) = rtx.send(time::SystemTime::now()) {
log::error!("{}: failed to send tick event: {:#}", self.project_id, e);
}
},
Err(e) => log::error!("{}: failed to receive tick event: {:#}", self.project_id, e)
},
recv(self.stop.1) -> _ => {
break;
}
ticker.tick().await;
if self.cancellation_token.is_cancelled() {
break;
}
println!("{}: tick", self.project_id);
rtx.send(time::SystemTime::now())?;
}
log::info!("{}: ticker stopped", self.project_id);