simplify watcher api

This commit is contained in:
Nikita Galaiko 2023-07-20 09:22:04 +02:00
parent 37f16207d3
commit 9787c596c6
10 changed files with 228 additions and 212 deletions

View File

@ -1,8 +1,7 @@
use std::{collections::HashMap, ops, path, sync, time};
use anyhow::{Context, Result};
use tokio::sync::{mpsc, Semaphore};
use tokio_util::sync::CancellationToken;
use tokio::sync::Semaphore;
use crate::{
bookmarks, database, deltas, events, files, gb_repository,
@ -20,8 +19,7 @@ pub struct App {
searcher: search::Searcher,
events_sender: events::Sender,
stop_watchers: sync::Arc<sync::Mutex<HashMap<String, CancellationToken>>>,
proxy_watchers: sync::Arc<sync::Mutex<HashMap<String, mpsc::UnboundedSender<watcher::Event>>>>,
watchers: sync::Arc<sync::Mutex<HashMap<String, watcher::Watcher>>>,
sessions_database: sessions::Database,
files_database: files::Database,
@ -55,8 +53,7 @@ impl App {
projects_storage: projects::Storage::new(storage.clone()),
users_storage: users::Storage::new(storage),
searcher: deltas_searcher,
stop_watchers: sync::Arc::new(sync::Mutex::new(HashMap::new())),
proxy_watchers: sync::Arc::new(sync::Mutex::new(HashMap::new())),
watchers: sync::Arc::new(sync::Mutex::new(HashMap::new())),
sessions_database: sessions::Database::new(database.clone()),
deltas_database: deltas::Database::new(database.clone()),
files_database: files::Database::new(database.clone()),
@ -93,84 +90,51 @@ impl App {
if let Err(e) = self.init_project(&project) {
log::error!("failed to init project {}: {:#}", project.id, e);
}
if let Err(e) = self
.proxy_watchers
.lock()
.unwrap()
.get(&project.id)
.unwrap()
.send(watcher::Event::IndexAll)
{
log::error!("failed to send session event: {:#}", e);
}
}
Ok(())
}
fn start_watcher(&self, project: &projects::Project) -> Result<()> {
let project = project.clone();
let users_storage = self.users_storage.clone();
let projects_storage = self.projects_storage.clone();
let local_data_dir = self.local_data_dir.clone();
let deltas_searcher = self.searcher.clone();
let events_sender = self.events_sender.clone();
let sessions_database = self.sessions_database.clone();
let files_database = self.files_database.clone();
let deltas_database = self.deltas_database.clone();
let bookmarks_database = self.bookmarks_database.clone();
let cancellation_token = CancellationToken::new();
self.stop_watchers
.lock()
.unwrap()
.insert(project.id.clone(), cancellation_token.clone());
let (proxy_tx, proxy_rx) = mpsc::unbounded_channel();
self.proxy_watchers
.lock()
.unwrap()
.insert(project.id.clone(), proxy_tx);
let watcher = watcher::Watcher::new(
&self.local_data_dir,
project,
&self.projects_storage,
&self.users_storage,
&self.searcher,
&self.events_sender,
&self.sessions_database,
&self.deltas_database,
&self.files_database,
&self.bookmarks_database,
);
let c_watcher = watcher.clone();
tauri::async_runtime::spawn(async move {
let project = project;
let watcher = watcher::Watcher::new(
local_data_dir,
&project,
projects_storage,
users_storage,
deltas_searcher,
cancellation_token,
events_sender,
sessions_database,
deltas_database,
files_database,
bookmarks_database,
)
.expect("failed to create watcher");
watcher
.start(proxy_rx)
.await
.expect("failed to init watcher");
if let Err(e) = c_watcher.start().await {
log::error!("watcher error: {:#}", e);
}
});
self.watchers
.lock()
.unwrap()
.insert(project.id.clone(), watcher.clone());
Ok(())
}
fn send_event(&self, project_id: &str, event: watcher::Event) -> Result<()> {
self.proxy_watchers
.lock()
.unwrap()
.get(project_id)
.unwrap()
.send(event)
.context("failed to send event to proxy")
let watchers = self.watchers.lock().unwrap();
if let Some(watcher) = watchers.get(project_id) {
watcher.post(event).context("failed to post event")
} else {
Err(anyhow::anyhow!("watcher for project {} not found", project_id))
}
}
fn stop_watcher(&self, project_id: &str) -> Result<()> {
if let Some((_, token)) = self.stop_watchers.lock().unwrap().remove_entry(project_id) {
token.cancel();
if let Some((_, watcher)) = self.watchers.lock().unwrap().remove_entry(project_id) {
watcher.stop()?;
};
Ok(())
}
@ -481,14 +445,7 @@ impl App {
let writer = bookmarks::Writer::new(&gb_repository).context("failed to open writer")?;
writer.write(bookmark).context("failed to write bookmark")?;
if let Err(e) = self
.proxy_watchers
.lock()
.unwrap()
.get(&bookmark.project_id)
.unwrap()
.send(watcher::Event::Bookmark(bookmark.clone()))
{
if let Err(e) = self.send_event(&bookmark.project_id, watcher::Event::Bookmark(bookmark.clone())) {
log::error!("failed to send session event: {:#}", e);
}
Ok(())
@ -548,7 +505,6 @@ impl App {
&project_repository.get_head()?.peel_to_commit()?.id(),
&diff::Options {
context_lines,
..Default::default()
},
)
.context("failed to diff")?;

View File

@ -16,16 +16,16 @@ pub struct Handler {
impl Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
local_data_dir: &path::Path,
project_id: &str,
project_store: &projects::Storage,
user_store: &users::Storage,
) -> Self {
Self {
project_id,
project_store,
local_data_dir,
user_store,
project_id: project_id.to_string(),
project_store: project_store.clone(),
local_data_dir: local_data_dir.to_path_buf(),
user_store: user_store.clone(),
}
}

View File

@ -16,16 +16,16 @@ pub struct Handler {
impl Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_storage: projects::Storage,
user_storage: users::Storage,
local_data_dir: &path::Path,
project_id: &str,
project_storage: &projects::Storage,
user_storage: &users::Storage,
) -> Self {
Self {
project_id,
project_storage,
user_storage,
local_data_dir,
project_id: project_id.to_string(),
project_storage: project_storage.clone(),
user_storage: user_storage.clone(),
local_data_dir: local_data_dir.to_path_buf(),
}
}

View File

@ -13,10 +13,10 @@ pub struct Handler {
}
impl Handler {
pub fn new(project_id: String, project_storage: projects::Storage) -> Self {
pub fn new(project_id: &str, project_storage: &projects::Storage) -> Self {
Self {
project_id,
project_storage,
project_id: project_id.to_string(),
project_storage: project_storage.clone(),
}
}

View File

@ -16,16 +16,16 @@ pub struct Handler {
impl Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
local_data_dir: &path::Path,
project_id: &str,
project_store: &projects::Storage,
user_store: &users::Storage,
) -> Self {
Self {
project_id,
project_store,
local_data_dir,
user_store,
project_id: project_id.to_string(),
project_store: project_store.clone(),
local_data_dir: local_data_dir.to_path_buf(),
user_store: user_store.clone(),
}
}

View File

@ -11,10 +11,10 @@ pub struct Handler {
}
impl Handler {
pub fn new(project_id: String, project_store: projects::Storage) -> Self {
pub fn new(project_id: &str, project_store: &projects::Storage) -> Self {
Self {
project_id,
project_store,
project_id: project_id.to_string(),
project_store: project_store.clone(),
}
}

View File

@ -25,26 +25,26 @@ pub struct Handler {
impl Handler {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
deltas_searcher: search::Searcher,
files_database: files::Database,
sessions_database: sessions::Database,
deltas_database: deltas::Database,
bookmarks_database: bookmarks::Database,
local_data_dir: &path::Path,
project_id: &str,
project_store: &projects::Storage,
user_store: &users::Storage,
deltas_searcher: &search::Searcher,
files_database: &files::Database,
sessions_database: &sessions::Database,
deltas_database: &deltas::Database,
bookmarks_database: &bookmarks::Database,
) -> Self {
Self {
local_data_dir,
project_id,
project_store,
user_store,
deltas_searcher,
files_database,
sessions_database,
deltas_database,
bookmarks_database,
local_data_dir: local_data_dir.to_path_buf(),
project_id: project_id.to_string(),
project_store: project_store.clone(),
user_store: user_store.clone(),
deltas_searcher: deltas_searcher.clone(),
files_database: files_database.clone(),
sessions_database: sessions_database.clone(),
deltas_database: deltas_database.clone(),
bookmarks_database: bookmarks_database.clone(),
}
}

View File

@ -12,7 +12,7 @@ mod check_current_session_tests;
#[cfg(test)]
mod project_file_change_tests;
use std::path::PathBuf;
use std::path;
use anyhow::{Context, Result};
@ -39,53 +39,53 @@ pub struct Handler {
impl<'handler> Handler {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
searcher: search::Searcher,
events_sender: app_events::Sender,
sessions_database: sessions::Database,
deltas_database: deltas::Database,
files_database: files::Database,
bookmarks_database: bookmarks::Database,
local_data_dir: &path::Path,
project_id: &str,
project_store: &projects::Storage,
user_store: &users::Storage,
searcher: &search::Searcher,
events_sender: &app_events::Sender,
sessions_database: &sessions::Database,
deltas_database: &deltas::Database,
files_database: &files::Database,
bookmarks_database: &bookmarks::Database,
) -> Self {
Self {
project_id: project_id.clone(),
events_sender,
project_id: project_id.to_string(),
events_sender: events_sender.clone(),
file_change_handler: file_change::Handler::new(),
project_file_handler: project_file_change::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
user_store.clone(),
local_data_dir,
project_id,
project_store,
user_store,
),
check_current_session_handler: check_current_session::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
user_store.clone(),
local_data_dir,
project_id,
project_store,
user_store,
),
git_file_change_handler: git_file_change::Handler::new(
project_id.clone(),
project_store.clone(),
project_id,
project_store,
),
flush_session_handler: flush_session::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
user_store.clone(),
local_data_dir,
project_id,
project_store,
user_store,
),
fetch_project_handler: fetch_project_data::Handler::new(
project_id.clone(),
project_store.clone(),
project_id,
project_store,
),
fetch_gitbutler_handler: fetch_gitbutler_data::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
user_store.clone(),
local_data_dir,
project_id,
project_store,
user_store,
),
index_handler: index_handler::Handler::new(
local_data_dir,

View File

@ -20,16 +20,16 @@ pub struct Handler {
impl Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
local_data_dir: &path::Path,
project_id: &str,
project_store: &projects::Storage,
user_store: &users::Storage,
) -> Self {
Self {
project_id,
project_store,
local_data_dir,
user_store,
project_id: project_id.to_string(),
project_store: project_store.clone(),
local_data_dir: local_data_dir.to_path_buf(),
user_store: user_store.clone(),
}
}

View File

@ -2,44 +2,92 @@ mod dispatchers;
mod events;
mod handlers;
use std::path;
use std::{path, sync::{Arc, Mutex}};
pub use events::Event;
use anyhow::Result;
use anyhow::{Result, Context};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::{bookmarks, deltas, files, projects, search, sessions, users};
#[derive(Clone)]
pub struct Watcher {
inner: Arc<InnerWatcher>
}
impl Watcher {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: &path::Path,
project: &projects::Project,
project_store: &projects::Storage,
user_store: &users::Storage,
deltas_searcher: &search::Searcher,
events_sender: &crate::events::Sender,
sessions_database: &sessions::Database,
deltas_database: &deltas::Database,
files_database: &files::Database,
bookmarks_database: &bookmarks::Database,
) -> Self {
Self {inner: Arc::new(InnerWatcher::new(
local_data_dir,
project,
project_store,
user_store,
deltas_searcher,
events_sender,
sessions_database,
deltas_database,
files_database,
bookmarks_database,
))}
}
pub fn stop(&self) -> Result<()> {
self.inner.stop()
}
pub fn post(&self, event: Event) -> Result<()> {
self.inner.post(event)
}
pub async fn start(&self) -> Result<()> {
self.inner.start().await
}
}
struct InnerWatcher {
project_id: String,
dispatcher: dispatchers::Dispatcher,
handler: handlers::Handler,
cancellation_token: CancellationToken,
proxy_tx: Arc<Mutex<Option<mpsc::UnboundedSender<Event>>>>,
}
impl<'watcher> Watcher {
impl<'watcher> InnerWatcher {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: path::PathBuf,
local_data_dir: &path::Path,
project: &projects::Project,
project_store: projects::Storage,
user_store: users::Storage,
deltas_searcher: search::Searcher,
cancellation_token: CancellationToken,
events_sender: crate::events::Sender,
sessions_database: sessions::Database,
deltas_database: deltas::Database,
files_database: files::Database,
bookmarks_database: bookmarks::Database,
) -> Result<Self> {
Ok(Self {
project_store: &projects::Storage,
user_store: &users::Storage,
deltas_searcher: &search::Searcher,
events_sender: &crate::events::Sender,
sessions_database: &sessions::Database,
deltas_database: &deltas::Database,
files_database: &files::Database,
bookmarks_database: &bookmarks::Database,
) -> Self {
Self {
project_id: project.id.clone(),
dispatcher: dispatchers::Dispatcher::new(project.id.clone(), project.path.clone()),
handler: handlers::Handler::new(
local_data_dir,
project.id.clone(),
&project.id,
project_store,
user_store,
deltas_searcher,
@ -49,53 +97,65 @@ impl<'watcher> Watcher {
files_database,
bookmarks_database,
),
cancellation_token,
})
cancellation_token: CancellationToken::new(),
proxy_tx: Arc::new(Mutex::new(None)),
}
}
pub async fn start(&self, mut proxy: mpsc::UnboundedReceiver<events::Event>) -> Result<()> {
let (events_tx, mut events_rx) = mpsc::unbounded_channel();
pub fn stop(&self) -> Result<()> {
self.cancellation_token.cancel();
Ok(())
}
pub fn post(&self, event: Event) -> Result<()> {
let tx = self.proxy_tx.lock().unwrap();
if tx.is_some() {
tx.as_ref().unwrap().send(event).context("failed to send event")?;
Ok(())
} else {
Err(anyhow::anyhow!("watcher is not started"))
}
}
pub async fn start(&self) -> Result<()> {
let dispatcher = self.dispatcher.clone();
let project_id = self.project_id.clone();
let etx = events_tx.clone();
tauri::async_runtime::spawn(async move {
if let Err(e) = dispatcher.start(etx.clone()).await {
let (tx, mut rx) = mpsc::unbounded_channel();
self.proxy_tx.lock().unwrap().replace(tx.clone());
let c_tx = tx.clone();
let dispatcher_handle = tauri::async_runtime::spawn(async move {
if let Err(e) = dispatcher.start(c_tx).await {
log::error!("{}: failed to start dispatcher: {:#}", project_id, e);
}
});
tx.send(Event::IndexAll).context("failed to send event")?;
loop {
tokio::select! {
Some(event) = proxy.recv() => {
if let Err(e) = events_tx.send(event) {
log::error!("{}: failed to post event: {:#}", self.project_id, e);
}
},
Some(event) = events_rx.recv() => {
Some(event) = rx.recv() => {
let project_id = self.project_id.clone();
let handler = self.handler.clone();
let events_tx = events_tx.clone();
tauri::async_runtime::spawn(async move {
match handler.handle(event).await {
Ok(events) => {
for event in events {
if let Err(e) = events_tx.send(event) {
log::error!("{}: failed to post event: {:#}", project_id, e);
}
match handler.handle(event).await {
Ok(events) => {
for event in events {
if let Err(e) = tx.send(event) {
log::error!("{}: failed to post event: {:#}", project_id, e);
}
},
Err(err) => log::error!("{}: failed to handle event: {:#}", project_id, err),
}
});
}
},
Err(err) => log::error!("{}: failed to handle event: {:#}", project_id, err),
}
},
_ = self.cancellation_token.cancelled() => {
if let Err(e) = self.dispatcher.stop() {
log::error!("{}: failed to stop dispatcher: {:#}", self.project_id, e);
}
break;
self.dispatcher.stop()?;
dispatcher_handle.await?;
return Ok(())
}
}
}
Ok(())
}
}