watcher handler run async

This commit is contained in:
Nikita Galaiko 2023-06-07 13:43:09 +02:00
parent a53c0ebd33
commit c186dea4af
12 changed files with 278 additions and 127 deletions

View File

@ -131,21 +131,13 @@ impl App {
.unwrap()
.insert(project.id.clone(), proxy_tx);
tauri::async_runtime::spawn_blocking(|| {
tauri::async_runtime::spawn(async move {
let project = project;
let gb_repository = gb_repository::Repository::open(
local_data_dir,
project.id.clone(),
projects_storage.clone(),
users_storage,
)
.expect("failed to open git repository");
let watcher = watcher::Watcher::new(
local_data_dir,
&project,
projects_storage,
&gb_repository,
users_storage,
deltas_searcher,
cancellation_token,
events_sender,
@ -156,7 +148,10 @@ impl App {
)
.expect("failed to create watcher");
futures::executor::block_on(watcher.start(proxy_rx)).expect("failed to init watcher");
watcher
.start(proxy_rx)
.await
.expect("failed to init watcher");
});
Ok(())

View File

@ -1,27 +1,43 @@
use std::{sync, time};
use std::{path, time};
use anyhow::{Context, Result};
use crate::{gb_repository, sessions};
use crate::{gb_repository, projects, sessions, users};
use super::events;
pub struct Handler<'handler> {
gb_repository: sync::Arc<sync::Mutex<&'handler gb_repository::Repository>>,
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_store: projects::Storage,
local_data_dir: path::PathBuf,
user_store: users::Storage,
}
impl<'handler> Handler<'handler> {
pub fn new(gb_repository: &'handler gb_repository::Repository) -> Self {
impl<'handler> Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
) -> Self {
Self {
gb_repository: sync::Arc::new(sync::Mutex::new(gb_repository)),
project_id,
project_store,
local_data_dir,
user_store,
}
}
pub fn handle(&self, now: time::SystemTime) -> Result<Vec<events::Event>> {
match self
.gb_repository
.lock()
.unwrap()
let gb_repo = gb_repository::Repository::open(
&self.local_data_dir,
self.project_id.clone(),
self.project_store.clone(),
self.user_store.clone(),
)
.context("failed to open repository")?;
match gb_repo
.get_current_session()
.context("failed to get current session")?
{

View File

@ -6,6 +6,7 @@ use crate::projects;
use super::events;
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_storage: projects::Storage,

View File

@ -1,37 +1,48 @@
use std::time;
use std::{path, time};
use anyhow::{Context, Result};
use crate::{gb_repository, projects};
use crate::{gb_repository, projects, users};
use super::events;
pub struct Handler<'handler> {
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_storage: projects::Storage,
gb_repository: &'handler gb_repository::Repository,
local_data_dir: path::PathBuf,
user_storage: users::Storage,
}
impl<'handler> Handler<'handler> {
impl Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_storage: projects::Storage,
gb_repository: &'handler gb_repository::Repository,
user_storage: users::Storage,
) -> Self {
Self {
project_id,
project_storage,
gb_repository,
user_storage,
local_data_dir,
}
}
pub fn handle(&self) -> Result<Vec<events::Event>> {
let sessions_before_fetch = self
.gb_repository
let gb_rep = gb_repository::Repository::open(
self.local_data_dir.clone(),
self.project_id.clone(),
self.project_storage.clone(),
self.user_storage.clone(),
)
.context("failed to open repository")?;
let sessions_before_fetch = gb_rep
.get_sessions_iterator()?
.filter_map(|s| s.ok())
.collect::<Vec<_>>();
if !self.gb_repository.fetch().context("failed to fetch")? {
if !gb_rep.fetch().context("failed to fetch")? {
return Ok(vec![]);
}
@ -48,8 +59,7 @@ impl<'handler> Handler<'handler> {
})
.context("failed to update project")?;
let sessions_after_fetch = self
.gb_repository
let sessions_after_fetch = gb_rep
.get_sessions_iterator()?
.filter_map(|s| s.ok())
.collect::<Vec<_>>();

View File

@ -2,6 +2,7 @@ use anyhow::Result;
use crate::watcher::events;
#[derive(Clone)]
pub struct Handler {}
impl Handler {

View File

@ -1,25 +1,31 @@
use std::path;
use anyhow::{anyhow, Context, Result};
use crate::{gb_repository, project_repository, projects, sessions};
use crate::{gb_repository, project_repository, projects, sessions, users};
use super::events;
pub struct Handler<'handler> {
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_store: projects::Storage,
gb_repository: &'handler gb_repository::Repository,
local_data_dir: path::PathBuf,
user_store: users::Storage,
}
impl<'listener> Handler<'listener> {
impl<'listener> Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
gb_repository: &'listener gb_repository::Repository,
user_store: users::Storage,
) -> Self {
Self {
project_id,
gb_repository,
project_store,
local_data_dir,
user_store,
}
}
@ -30,8 +36,15 @@ impl<'listener> Handler<'listener> {
.context("failed to get project")?
.ok_or_else(|| anyhow!("project not found"))?;
let session = self
.gb_repository
let gb_repo = gb_repository::Repository::open(
&self.local_data_dir,
self.project_id.clone(),
self.project_store.clone(),
self.user_store.clone(),
)
.context("failed to open repository")?;
let session = gb_repo
.flush_session(&project_repository::Repository::open(&project)?, session)
.context("failed to flush session")?;

View File

@ -4,6 +4,7 @@ use crate::{project_repository, projects};
use super::events;
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_store: projects::Storage,

View File

@ -1,13 +1,21 @@
use std::path;
use anyhow::{Context, Result};
use crate::{bookmarks, deltas, events as app_events, files, gb_repository, search, sessions};
use crate::{
bookmarks, deltas, events as app_events, files, gb_repository, projects, search, sessions,
users,
};
use super::events;
pub struct Handler<'handler> {
#[derive(Clone)]
pub struct Handler {
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
deltas_searcher: search::Searcher,
gb_repository: &'handler gb_repository::Repository,
files_database: files::Database,
sessions_database: sessions::Database,
deltas_database: deltas::Database,
@ -15,12 +23,14 @@ pub struct Handler<'handler> {
events_sender: app_events::Sender,
}
impl<'handler> Handler<'handler> {
impl Handler {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
user_store: users::Storage,
deltas_searcher: search::Searcher,
gb_repository: &'handler gb_repository::Repository,
files_database: files::Database,
sessions_database: sessions::Database,
deltas_database: deltas::Database,
@ -28,9 +38,11 @@ impl<'handler> Handler<'handler> {
events_sender: app_events::Sender,
) -> Self {
Self {
local_data_dir,
project_id,
project_store,
user_store,
deltas_searcher,
gb_repository,
files_database,
sessions_database,
deltas_database,
@ -74,7 +86,15 @@ impl<'handler> Handler<'handler> {
}
pub fn reindex(&self) -> Result<Vec<events::Event>> {
let sessions_iter = self.gb_repository.get_sessions_iterator()?;
let gb_repository = gb_repository::Repository::open(
self.local_data_dir.clone(),
self.project_id.clone(),
self.project_store.clone(),
self.user_store.clone(),
)
.context("failed to open repository")?;
let sessions_iter = gb_repository.get_sessions_iterator()?;
let mut events = vec![];
for session in sessions_iter {
events.extend(self.index_session(&session?)?);
@ -83,16 +103,24 @@ impl<'handler> Handler<'handler> {
}
pub fn index_session(&self, session: &sessions::Session) -> Result<Vec<events::Event>> {
let gb_repository = gb_repository::Repository::open(
self.local_data_dir.clone(),
self.project_id.clone(),
self.project_store.clone(),
self.user_store.clone(),
)
.context("failed to open repository")?;
// first of all, index session for searching. searhcer keeps it's own state to
// decide if the actual indexing needed
self.deltas_searcher
.index_session(self.gb_repository, session)
.index_session(&gb_repository, session)
.context("failed to index session")?;
// index bookmarks right away. bookmarks are stored in the session during which it was
// created, not in the session that is actually bookmarked. so we want to make sure all of
// them are indexed at all times
let session_reader = sessions::Reader::open(self.gb_repository, session)?;
let session_reader = sessions::Reader::open(&gb_repository, session)?;
let bookmarks_reader = bookmarks::Reader::new(&session_reader);
for bookmark in bookmarks_reader.read()? {
self.index_bookmark(&bookmark)?;

View File

@ -12,35 +12,37 @@ mod check_current_session_tests;
#[cfg(test)]
mod project_file_change_tests;
use std::path::PathBuf;
use anyhow::{Context, Result};
use crate::{
bookmarks, deltas, events as app_events, files, gb_repository, projects, search, sessions,
};
use crate::{bookmarks, deltas, events as app_events, files, projects, search, sessions, users};
use super::events;
pub struct Handler<'handler> {
#[derive(Clone)]
pub struct Handler {
project_id: String,
file_change_handler: file_change::Handler,
project_file_handler: project_file_change::Handler<'handler>,
project_file_handler: project_file_change::Handler,
git_file_change_handler: git_file_change::Handler,
check_current_session_handler: check_current_session::Handler<'handler>,
flush_session_handler: flush_session::Handler<'handler>,
fetch_project_handler: fetch_project::Handler<'handler>,
check_current_session_handler: check_current_session::Handler,
flush_session_handler: flush_session::Handler,
fetch_project_handler: fetch_project::Handler,
chech_fetch_project_handler: check_fetch_project::Handler,
index_handler: index_handler::Handler<'handler>,
index_handler: index_handler::Handler,
events_sender: app_events::Sender,
}
impl<'handler> Handler<'handler> {
impl<'handler> Handler {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: PathBuf,
project_id: String,
project_store: projects::Storage,
gb_repository: &'handler gb_repository::Repository,
user_store: users::Storage,
searcher: search::Searcher,
events_sender: app_events::Sender,
sessions_database: sessions::Database,
@ -54,33 +56,43 @@ impl<'handler> Handler<'handler> {
file_change_handler: file_change::Handler::new(),
project_file_handler: project_file_change::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
gb_repository,
user_store.clone(),
),
check_current_session_handler: check_current_session::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
user_store.clone(),
),
check_current_session_handler: check_current_session::Handler::new(gb_repository),
git_file_change_handler: git_file_change::Handler::new(
project_id.clone(),
project_store.clone(),
),
flush_session_handler: flush_session::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
gb_repository,
user_store.clone(),
),
fetch_project_handler: fetch_project::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
gb_repository,
user_store.clone(),
),
chech_fetch_project_handler: check_fetch_project::Handler::new(
project_id.clone(),
project_store,
project_store.clone(),
),
index_handler: index_handler::Handler::new(
local_data_dir,
project_id,
project_store,
user_store,
searcher,
gb_repository,
files_database,
sessions_database,
deltas_database,
@ -90,7 +102,7 @@ impl<'handler> Handler<'handler> {
}
}
pub fn handle(&self, event: events::Event) -> Result<Vec<events::Event>> {
pub async fn handle(&self, event: events::Event) -> Result<Vec<events::Event>> {
log::info!("{}: handling event: {}", self.project_id, event);
match event {
events::Event::FileChange(path) => self

View File

@ -1,31 +1,35 @@
use std::vec;
use std::{path, vec};
use anyhow::{Context, Result};
use crate::{
deltas, gb_repository, project_repository, projects,
reader::{self, Reader},
sessions,
sessions, users,
};
use super::events;
pub struct Handler<'listener> {
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_store: projects::Storage,
gb_repository: &'listener gb_repository::Repository,
local_data_dir: path::PathBuf,
user_store: users::Storage,
}
impl<'listener> Handler<'listener> {
impl Handler {
pub fn new(
local_data_dir: path::PathBuf,
project_id: String,
project_store: projects::Storage,
gb_repository: &'listener gb_repository::Repository,
user_store: users::Storage,
) -> Self {
Self {
project_id,
project_store,
gb_repository,
local_data_dir,
user_store,
}
}
@ -62,12 +66,20 @@ impl<'listener> Handler<'listener> {
// returns deltas for the file that are already part of the current session (if any)
fn get_current_deltas(&self, path: &std::path::Path) -> Result<Option<Vec<deltas::Delta>>> {
let current_session = self.gb_repository.get_current_session()?;
let gb_repo = gb_repository::Repository::open(
self.local_data_dir.clone(),
self.project_id.clone(),
self.project_store.clone(),
self.user_store.clone(),
)
.context("failed to open gb repository")?;
let current_session = gb_repo.get_current_session()?;
if current_session.is_none() {
return Ok(None);
}
let current_session = current_session.unwrap();
let session_reader = sessions::Reader::open(self.gb_repository, &current_session)
let session_reader = sessions::Reader::open(&gb_repo, &current_session)
.context("failed to get session reader")?;
let deltas_reader = deltas::Reader::new(&session_reader);
let deltas = deltas_reader
@ -86,9 +98,16 @@ impl<'listener> Handler<'listener> {
let project_repository = project_repository::Repository::open(&project)
.with_context(|| "failed to open project repository for project")?;
let gb_repository = gb_repository::Repository::open(
&self.local_data_dir,
self.project_id.clone(),
self.project_store.clone(),
self.user_store.clone(),
)
.context("failed to open gb repository")?;
// If current session's branch is not the same as the project's head, flush it first.
if let Some(session) = self
.gb_repository
if let Some(session) = gb_repository
.get_current_session()
.context("failed to get current session")?
{
@ -96,7 +115,7 @@ impl<'listener> Handler<'listener> {
.get_head()
.context("failed to get head")?;
if session.meta.branch != project_head.name().map(|s| s.to_string()) {
self.gb_repository
gb_repository
.flush_session(&project_repository, &session)
.context("failed to flush session")?;
}
@ -112,11 +131,10 @@ impl<'listener> Handler<'listener> {
None => return Ok(vec![]),
};
let current_session = self
.gb_repository
let current_session = gb_repository
.get_or_create_current_session()
.context("failed to get or create current session")?;
let reader = sessions::Reader::open(self.gb_repository, &current_session)
let reader = sessions::Reader::open(&gb_repository, &current_session)
.context("failed to get session reader")?;
let latest_file_content = match reader.file(path) {
@ -155,7 +173,7 @@ impl<'listener> Handler<'listener> {
}
let deltas = text_doc.get_deltas();
let writer = deltas::Writer::new(self.gb_repository)?;
let writer = deltas::Writer::new(&gb_repository)?;
writer
.write(path, &deltas)
.with_context(|| "failed to write deltas")?;

View File

@ -68,12 +68,17 @@ fn test_register_existing_commited_file() -> Result<()> {
commit_all(&repository)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
std::fs::write(project_repo.root().join(file_path), "test2")?;
listener.handle(file_path)?;
@ -107,12 +112,17 @@ fn test_register_must_init_current_session() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;
@ -135,12 +145,17 @@ fn test_register_must_not_override_current_session() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;
@ -168,12 +183,17 @@ fn test_register_new_file() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;
@ -209,12 +229,17 @@ fn test_register_new_file_twice() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;
@ -269,12 +294,17 @@ fn test_register_file_delted() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;
@ -321,12 +351,17 @@ fn test_flow_with_commits() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let size = 10;
let relative_file_path = std::path::Path::new("one/two/test.txt");
@ -413,12 +448,17 @@ fn test_flow_no_commits() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let size = 10;
let relative_file_path = std::path::Path::new("one/two/test.txt");
@ -504,12 +544,17 @@ fn test_flow_signle_session() -> Result<()> {
let project_store = projects::Storage::new(storage);
project_store.add_project(&project)?;
let gb_repo = gb_repository::Repository::open(
gb_repo_path,
gb_repo_path.clone(),
project.id.clone(),
project_store.clone(),
user_store,
user_store.clone(),
)?;
let listener = Handler::new(project.id.clone(), project_store, &gb_repo);
let listener = Handler::new(
gb_repo_path.into(),
project.id.clone(),
project_store,
user_store,
);
let size = 10;
let relative_file_path = std::path::Path::new("one/two/test.txt");

View File

@ -2,27 +2,30 @@ mod dispatchers;
mod events;
mod handlers;
use std::path;
pub use events::Event;
use anyhow::Result;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::{bookmarks, deltas, files, gb_repository, projects, search, sessions};
use crate::{bookmarks, deltas, files, projects, search, sessions, users};
pub struct Watcher<'watcher> {
pub struct Watcher {
project_id: String,
dispatcher: dispatchers::Dispatcher,
handler: handlers::Handler<'watcher>,
handler: handlers::Handler,
cancellation_token: CancellationToken,
}
impl<'watcher> Watcher<'watcher> {
impl<'watcher> Watcher {
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: path::PathBuf,
project: &projects::Project,
project_store: projects::Storage,
gb_repository: &'watcher gb_repository::Repository,
user_store: users::Storage,
deltas_searcher: search::Searcher,
cancellation_token: CancellationToken,
events_sender: crate::events::Sender,
@ -35,9 +38,10 @@ impl<'watcher> Watcher<'watcher> {
project_id: project.id.clone(),
dispatcher: dispatchers::Dispatcher::new(project.id.clone(), project.path.clone()),
handler: handlers::Handler::new(
local_data_dir,
project.id.clone(),
project_store,
gb_repository,
user_store,
deltas_searcher,
events_sender,
sessions_database,
@ -67,15 +71,22 @@ impl<'watcher> Watcher<'watcher> {
log::error!("{}: failed to post event: {:#}", self.project_id, e);
}
},
Some(event) = events_rx.recv() => match self.handler.handle(event) {
Err(err) => log::error!("{}: failed to handle event: {:#}", self.project_id, err),
Ok(events) => {
for event in events {
if let Err(e) = events_tx.send(event) {
log::error!("{}: failed to post event: {:#}", self.project_id, e);
}
Some(event) = events_rx.recv() => {
let project_id = self.project_id.clone();
let handler = self.handler.clone();
let events_tx = events_tx.clone();
tauri::async_runtime::spawn(async move {
match handler.handle(event).await {
Ok(events) => {
for event in events {
if let Err(e) = events_tx.send(event) {
log::error!("{}: failed to post event: {:#}", project_id, e);
}
}
},
Err(err) => log::error!("{}: failed to handle event: {:#}", project_id, err),
}
}
});
},
_ = self.cancellation_token.cancelled() => {
if let Err(e) = self.dispatcher.stop() {