simplify dispatcher

This commit is contained in:
Nikita Galaiko 2023-06-01 14:41:52 +02:00
parent 705e0b4ffb
commit 6efd814c6e
5 changed files with 82 additions and 84 deletions

View File

@ -10,6 +10,8 @@ use futures::{
};
use notify::{Config, Event, RecommendedWatcher, Watcher};
use crate::watcher::events;
#[derive(Debug, Clone)]
pub struct Dispatcher {
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
@ -38,7 +40,7 @@ impl Dispatcher {
Ok(())
}
pub async fn start(&self, rtx: crossbeam_channel::Sender<PathBuf>) -> Result<()> {
pub async fn start(&self, rtx: crossbeam_channel::Sender<events::Event>) -> Result<()> {
let (mut watcher, mut rx) = async_watcher()?;
watcher
.watch(
@ -62,34 +64,23 @@ impl Dispatcher {
continue;
}
for file_path in event.paths {
if let Err(e) = file_path
.strip_prefix(&self.project_path)
.with_context(|| {
format!(
"failed to striprefix from file path: {}",
file_path.display()
)
})
.map(|relative_file_path| {
log::info!(
"{}: file changed: {}",
self.project_id,
relative_file_path.display()
);
if let Err(e) = rtx.send(relative_file_path.to_path_buf()) {
match file_path.strip_prefix(&self.project_path) {
Ok(relative_file_path) => {
if let Err(e) = rtx.send(events::Event::FileChange(
relative_file_path.to_path_buf(),
)) {
log::error!(
"{}: failed to send file change event: {:#}",
self.project_id,
e
);
}
})
{
log::error!(
"{}: failed to send file change event: {:#}",
}
Err(err) => log::error!(
"{}: failed to strip prefix: {:#}",
self.project_id,
e
);
err
),
}
}
}
@ -116,12 +107,13 @@ fn is_interesting_event(kind: &notify::EventKind) -> bool {
fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);
// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
tx.send(res).await.unwrap();
if let Err(err) = tx.send(res).await {
log::error!("failed to send file change event: {:#}", err);
}
println!("sent");
})
},
Config::default(),

View File

@ -4,7 +4,7 @@ mod tick;
use std::{path, time};
use anyhow::Result;
use crossbeam_channel::{bounded, select, unbounded, Sender};
use tokio_util::sync::CancellationToken;
use super::events;
@ -14,10 +14,7 @@ pub struct Dispatcher {
tick_dispatcher: tick::Dispatcher,
file_change_dispatcher: file_change::Dispatcher,
proxy: crossbeam_channel::Receiver<events::Event>,
stop: (
crossbeam_channel::Sender<()>,
crossbeam_channel::Receiver<()>,
),
cancellation_token: CancellationToken,
}
impl Dispatcher {
@ -30,82 +27,65 @@ impl Dispatcher {
project_id: project_id.clone(),
tick_dispatcher: tick::Dispatcher::new(project_id.clone()),
file_change_dispatcher: file_change::Dispatcher::new(project_id, path),
stop: bounded(1),
proxy: proxy_chan,
cancellation_token: CancellationToken::new(),
}
}
pub fn stop(&self) -> Result<()> {
self.stop.0.send(())?;
self.cancellation_token.cancel();
Ok(())
}
pub fn start(&self, sender: Sender<events::Event>) -> Result<()> {
let (t_tx, t_rx) = unbounded();
pub async fn start(&self, sender: crossbeam_channel::Sender<events::Event>) -> Result<()> {
let tick_dispatcher = self.tick_dispatcher.clone();
let s1 = sender.clone();
let project_id = self.project_id.clone();
tauri::async_runtime::spawn(async move {
if let Err(e) = tick_dispatcher
.start(time::Duration::from_secs(10), t_tx)
.start(time::Duration::from_secs(10), s1)
.await
{
log::error!("{}: failed to start ticker: {:#}", project_id, e);
}
});
let (fw_tx, fw_rx) = unbounded();
let file_change_dispatcher = self.file_change_dispatcher.clone();
let project_id = self.project_id.clone();
let s2 = sender.clone();
tauri::async_runtime::spawn(async move {
if let Err(e) = file_change_dispatcher.start(fw_tx).await {
if let Err(e) = file_change_dispatcher.start(s2).await {
log::error!("{}: failed to start file watcher: {:#}", project_id, e);
}
});
loop {
select! {
recv(t_rx) -> ts => match ts{
Ok(ts) => {
if let Err(e) = sender.send(events::Event::Tick(ts)) {
log::error!("{}: failed to proxy tick event: {:#}", self.project_id, e);
let project_id = self.project_id.clone();
let s3 = sender;
let proxy = self.proxy.clone();
tauri::async_runtime::spawn(async move {
for event in proxy {
if let Err(e) = s3.send(event) {
log::error!("{}: failed to proxy event: {:#}", project_id, e);
}
}
Err(e) => {
log::error!("{}: failed to receive tick event: {:#}", self.project_id, e);
}
},
recv(fw_rx) -> path => match path {
Ok(path) => {
if let Err(e) = sender.send(events::Event::FileChange(path)) {
log::error!("{}: failed to proxy path event: {:#}", self.project_id, e);
}
},
Err(e) => {
log::error!("{}: failed to receive file change event: {:#}", self.project_id, e);
}
},
recv(self.proxy) -> event => match event {
Ok(event) => {
if let Err(e) = sender.send(event) {
log::error!("{}: failed to proxy event: {:#}", self.project_id, e);
}
},
Err(e) => {
log::error!("{}: failed to receive event: {:#}", self.project_id, e);
}
},
recv(self.stop.1) -> _ => {
if let Err(e) = self.tick_dispatcher.stop() {
log::error!("{}: failed to stop ticker: {:#}", self.project_id, e);
}
if let Err(e) = self.file_change_dispatcher.stop() {
log::error!("{}: failed to stop file watcher: {:#}", self.project_id, e);
}
break;
}
});
self.cancellation_token.cancelled().await;
if let Err(err) = self.tick_dispatcher.stop() {
log::error!("{}: failed to stop ticker: {:#}", self.project_id, err);
}
if let Err(err) = self.file_change_dispatcher.stop() {
log::error!(
"{}: failed to stop file change dispatcher: {:#}",
self.project_id,
err
);
}
log::info!("{}: dispatcher stopped", self.project_id);
Ok(())
}
}

View File

@ -3,6 +3,8 @@ use std::time;
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use crate::watcher::events;
#[derive(Debug, Clone)]
pub struct Dispatcher {
project_id: String,
@ -25,16 +27,18 @@ impl Dispatcher {
pub async fn start(
&self,
interval: time::Duration,
rtx: crossbeam_channel::Sender<time::SystemTime>,
rtx: crossbeam_channel::Sender<events::Event>,
) -> Result<()> {
let mut ticker = tokio::time::interval(interval);
log::info!("{}: ticker started", self.project_id);
loop {
ticker.tick().await;
if self.cancellation_token.is_cancelled() {
break;
}
if let Err(e) = rtx.send(time::SystemTime::now()) {
if let Err(e) = rtx.send(events::Event::Tick(time::SystemTime::now())) {
log::error!("{}: failed to send tick: {}", self.project_id, e);
}
}

View File

@ -1,7 +1,8 @@
use std::{path, time};
use std::{fmt::Display, path, time};
use crate::{bookmarks, deltas, sessions};
#[derive(Debug)]
pub enum Event {
Tick(time::SystemTime),
Flush(sessions::Session),
@ -21,3 +22,24 @@ pub enum Event {
File((String, path::PathBuf, String)),
Deltas((String, path::PathBuf, Vec<deltas::Delta>)),
}
impl Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Event::Tick(_) => write!(f, "Tick"),
Event::Flush(_) => write!(f, "Flush"),
Event::SessionFlushed(_) => write!(f, "SessionFlushed"),
Event::Fetch => write!(f, "Fetch"),
Event::FileChange(_) => write!(f, "FileChange"),
Event::GitFileChange(_) => write!(f, "GitFileChange"),
Event::GitIndexChange => write!(f, "GitIndexChange"),
Event::GitActivity => write!(f, "GitActivity"),
Event::GitHeadChange(_) => write!(f, "GitHeadChange"),
Event::ProjectFileChange(_) => write!(f, "ProjectFileChange"),
Event::Session(_) => write!(f, "Session"),
Event::Bookmark(_) => write!(f, "Bookmark"),
Event::File(_) => write!(f, "File"),
Event::Deltas(_) => write!(f, "Deltas"),
}
}
}

View File

@ -58,8 +58,8 @@ impl<'watcher> Watcher<'watcher> {
let dispatcher = self.dispatcher.clone();
let project_id = self.project_id.clone();
let etx = events_tx.clone();
tauri::async_runtime::spawn_blocking(move || {
if let Err(e) = dispatcher.start(etx.clone()) {
tauri::async_runtime::spawn(async move {
if let Err(e) = dispatcher.start(etx.clone()).await {
log::error!("{}: failed to start dispatcher: {:#}", project_id, e);
}
});
@ -67,8 +67,8 @@ impl<'watcher> Watcher<'watcher> {
loop {
select! {
recv(events_rx) -> event => match event {
Ok(events) => {
match self.handler.handle(events) {
Ok(event) => {
match self.handler.handle(event) {
Ok(events) => {
for event in events {
if let Err(e) = events_tx.send(event) {