make wathcher handlers async

This commit is contained in:
Nikita Galaiko 2023-11-21 09:30:49 +01:00 committed by GitButler
parent aa5011264d
commit 47a7c93dad
14 changed files with 180 additions and 156 deletions

View File

@ -222,10 +222,11 @@ impl App {
}
}
pub fn delete_all_data(&self) -> Result<(), Error> {
pub async fn delete_all_data(&self) -> Result<(), Error> {
for project in self.projects.list().context("failed to list projects")? {
self.projects
.delete(&project.id)
.await
.context("failed to delete project")?;
}
Ok(())

View File

@ -1,6 +1,5 @@
use std::{collections::HashMap, path};
use crate::watcher;
use anyhow::Context;
use tauri::Manager;
use tracing::instrument;
@ -12,7 +11,7 @@ use crate::{
paths::DataDir,
project_repository, projects, reader,
sessions::SessionId,
users, virtual_branches,
users, virtual_branches, watcher,
};
impl From<app::Error> for Error {
@ -97,7 +96,7 @@ pub async fn git_head(handle: tauri::AppHandle, project_id: &str) -> Result<Stri
#[instrument(skip(handle))]
pub async fn delete_all_data(handle: tauri::AppHandle) -> Result<(), Error> {
let app = handle.state::<app::App>();
app.delete_all_data()?;
app.delete_all_data().await?;
Ok(())
}
@ -155,7 +154,7 @@ pub async fn git_get_global_config(
#[tauri::command(async)]
#[instrument(skip(handle))]
pub async fn project_flush_and_push(handle: tauri::AppHandle, id: &str) -> Result<(), Error> {
let id = id.parse().map_err(|_| Error::UserError {
let project_id = id.parse().map_err(|_| Error::UserError {
code: Code::Validation,
message: "Malformed project id".into(),
})?;
@ -168,18 +167,14 @@ pub async fn project_flush_and_push(handle: tauri::AppHandle, id: &str) -> Resul
.clone();
let local_data_dir = DataDir::try_from(&handle)?;
let project = projects.get(&id).context("failed to get project")?;
let project = projects.get(&project_id).context("failed to get project")?;
let user = users.get_user()?;
let project_repository = project_repository::Repository::open(&project)?;
let gb_repo =
gb_repository::Repository::open(&local_data_dir, &project_repository, user.as_ref())
.context("failed to open repository")?;
futures::executor::block_on(async {
vbranches
.flush_vbranches(project_repository.project().id)
.await
})?;
vbranches.flush_vbranches(project_id).await?;
let session = gb_repo
.flush(&project_repository, user.as_ref())
@ -190,14 +185,17 @@ pub async fn project_flush_and_push(handle: tauri::AppHandle, id: &str) -> Resul
let watcher = handle.state::<watcher::Watchers>();
if session.is_some() {
if let Err(error) = watcher
.post(watcher::Event::Session(id, session.clone().unwrap()))
.post(watcher::Event::Session(
project_id,
session.clone().unwrap(),
))
.await
{
tracing::error!(?error);
}
}
if let Err(error) = watcher
.post(watcher::Event::PushProjectToGitbutler(id))
.post(watcher::Event::PushProjectToGitbutler(project_id))
.await
{
tracing::error!(?error);

View File

@ -202,8 +202,8 @@ fn test_list_files_from_flushed_session() -> Result<()> {
Ok(())
}
#[test]
fn test_remote_syncronization() -> Result<()> {
#[tokio::test]
async fn test_remote_syncronization() -> Result<()> {
// first, crate a remote, pretending it's a cloud
let cloud = test_remote_repository()?;
let api_project = projects::ApiProject {
@ -225,11 +225,14 @@ fn test_remote_syncronization() -> Result<()> {
path::PathBuf::from("test.txt"),
"Hello World",
)]));
suite.projects.update(&projects::UpdateRequest {
id: case_one.project.id,
api: Some(api_project.clone()),
..Default::default()
})?;
suite
.projects
.update(&projects::UpdateRequest {
id: case_one.project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let case_one = case_one.refresh();
let writer = deltas::Writer::new(&case_one.gb_repository);
@ -248,11 +251,14 @@ fn test_remote_syncronization() -> Result<()> {
// create second local project, fetch it and make sure session is there
let case_two = suite.new_case();
suite.projects.update(&projects::UpdateRequest {
id: case_two.project.id,
api: Some(api_project.clone()),
..Default::default()
})?;
suite
.projects
.update(&projects::UpdateRequest {
id: case_two.project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let case_two = case_two.refresh();
case_two.gb_repository.fetch(Some(&user))?;
@ -287,8 +293,8 @@ fn test_remote_syncronization() -> Result<()> {
Ok(())
}
#[test]
fn test_remote_sync_order() -> Result<()> {
#[tokio::test]
async fn test_remote_sync_order() -> Result<()> {
// first, crate a remote, pretending it's a cloud
let cloud = test_remote_repository()?;
let api_project = projects::ApiProject {
@ -305,19 +311,25 @@ fn test_remote_sync_order() -> Result<()> {
let suite = Suite::default();
let case_one = suite.new_case();
suite.projects.update(&projects::UpdateRequest {
id: case_one.project.id,
api: Some(api_project.clone()),
..Default::default()
})?;
suite
.projects
.update(&projects::UpdateRequest {
id: case_one.project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let case_one = case_one.refresh();
let case_two = suite.new_case();
suite.projects.update(&projects::UpdateRequest {
id: case_two.project.id,
api: Some(api_project.clone()),
..Default::default()
})?;
suite
.projects
.update(&projects::UpdateRequest {
id: case_two.project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let case_two = case_two.refresh();
let user = suite.sign_in();

View File

@ -34,6 +34,7 @@ pub async fn update_project(
handle
.state::<Controller>()
.update(&project)
.await
.map_err(Into::into)
}
@ -135,5 +136,9 @@ pub async fn delete_project(handle: tauri::AppHandle, id: &str) -> Result<(), Er
code: Code::Validation,
message: "Malformed project id".into(),
})?;
handle.state::<Controller>().delete(&id).map_err(Into::into)
handle
.state::<Controller>()
.delete(&id)
.await
.map_err(Into::into)
}

View File

@ -1,7 +1,6 @@
use std::path;
use anyhow::Context;
use futures::executor::block_on;
use tauri::{AppHandle, Manager};
use crate::{gb_repository, paths::DataDir, project_repository, users, watcher};
@ -92,7 +91,7 @@ impl Controller {
Ok(project)
}
pub fn update(&self, project: &UpdateRequest) -> Result<Project, UpdateError> {
pub async fn update(&self, project: &UpdateRequest) -> Result<Project, UpdateError> {
let updated = self
.projects_storage
.update(project)
@ -104,8 +103,9 @@ impl Controller {
if let Some(watchers) = &self.watchers {
if let Some(api) = &project.api {
if api.sync {
if let Err(error) =
block_on(watchers.post(watcher::Event::FetchGitbutlerData(project.id)))
if let Err(error) = watchers
.post(watcher::Event::FetchGitbutlerData(project.id))
.await
{
tracing::error!(
project_id = %project.id,
@ -115,8 +115,9 @@ impl Controller {
}
}
if let Err(error) =
block_on(watchers.post(watcher::Event::PushGitbutlerData(project.id)))
if let Err(error) = watchers
.post(watcher::Event::PushGitbutlerData(project.id))
.await
{
tracing::error!(
project_id = %project.id,
@ -143,7 +144,7 @@ impl Controller {
.map_err(|error| ListError::Other(error.into()))
}
pub fn delete(&self, id: &ProjectId) -> Result<(), DeleteError> {
pub async fn delete(&self, id: &ProjectId) -> Result<(), DeleteError> {
let project = match self.projects_storage.get(id) {
Ok(project) => Ok(project),
Err(super::storage::Error::NotFound) => return Ok(()),
@ -151,7 +152,7 @@ impl Controller {
}?;
if let Some(watchers) = &self.watchers {
if let Err(error) = block_on(watchers.stop(id)) {
if let Err(error) = watchers.stop(id).await {
tracing::error!(
project_id = %id,
?error,

View File

@ -21,9 +21,11 @@ impl From<&AppHandle> for Handler {
}
impl Handler {
pub fn handle(&self, event: &analytics::Event) -> Result<Vec<events::Event>> {
pub async fn handle(&self, event: &analytics::Event) -> Result<Vec<events::Event>> {
if let Some(user) = self.users.get_user().context("failed to get user")? {
futures::executor::block_on(self.client.send(&user, event))
self.client
.send(&user, event)
.await
.context("failed to send event")?;
}
Ok(vec![])

View File

@ -1,13 +1,12 @@
use std::{
sync::{Arc, Mutex, TryLockError},
time,
};
use std::{sync::Arc, time};
use anyhow::{Context, Result};
use tauri::AppHandle;
use tokio::sync::Mutex;
use crate::{gb_repository, project_repository, projects, users};
use crate::{paths::DataDir, projects::ProjectId};
use crate::{
gb_repository, paths::DataDir, project_repository, projects, projects::ProjectId, users,
};
use super::events;
@ -28,15 +27,15 @@ impl TryFrom<&AppHandle> for Handler {
}
impl Handler {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
now: &time::SystemTime,
) -> Result<Vec<events::Event>> {
match self.inner.try_lock() {
Ok(inner) => inner.handle(project_id, now),
Err(TryLockError::Poisoned(_)) => Err(anyhow::anyhow!("mutex poisoned")),
Err(TryLockError::WouldBlock) => Ok(vec![]),
if let Ok(inner) = self.inner.try_lock() {
inner.handle(project_id, now).await
} else {
Ok(vec![])
}
}
}
@ -61,7 +60,7 @@ impl TryFrom<&AppHandle> for HandlerInner {
}
impl HandlerInner {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
now: &time::SystemTime,
@ -117,6 +116,7 @@ impl HandlerInner {
gitbutler_data_last_fetched: Some(fetch_result),
..Default::default()
})
.await
.context("failed to update fetched result")?;
let sessions_after_fetch = gb_repo
@ -150,8 +150,8 @@ mod test {
use super::super::test_remote_repository;
use super::*;
#[test]
fn test_fetch_success() -> Result<()> {
#[tokio::test]
async fn test_fetch_success() -> Result<()> {
let suite = Suite::default();
let Case { project, .. } = suite.new_case();
@ -168,11 +168,14 @@ mod test {
sync: true,
};
suite.projects.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})?;
suite
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let listener = HandlerInner {
local_data_dir: suite.local_app_data,
@ -180,13 +183,16 @@ mod test {
users: suite.users,
};
listener.handle(&project.id, &SystemTime::now()).unwrap();
listener
.handle(&project.id, &SystemTime::now())
.await
.unwrap();
Ok(())
}
#[test]
fn test_fetch_fail_no_sync() {
#[tokio::test]
async fn test_fetch_fail_no_sync() {
let suite = Suite::default();
let Case { project, .. } = suite.new_case();
@ -196,7 +202,7 @@ mod test {
users: suite.users,
};
let res = listener.handle(&project.id, &SystemTime::now());
let res = listener.handle(&project.id, &SystemTime::now()).await;
assert_eq!(&res.unwrap_err().to_string(), "sync disabled");
}

View File

@ -1,10 +1,8 @@
use std::{
sync::{Arc, Mutex, TryLockError},
time,
};
use std::{sync::Arc, time};
use anyhow::{Context, Result};
use tauri::AppHandle;
use tokio::sync::Mutex;
use crate::{
gb_repository, git, keys,
@ -33,15 +31,15 @@ impl TryFrom<&AppHandle> for Handler {
}
impl Handler {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
now: &time::SystemTime,
) -> Result<Vec<events::Event>> {
match self.inner.try_lock() {
Ok(inner) => inner.handle(project_id, now),
Err(TryLockError::Poisoned(_)) => Err(anyhow::anyhow!("mutex poisoned")),
Err(TryLockError::WouldBlock) => Ok(vec![]),
if let Ok(inner) = self.inner.try_lock() {
inner.handle(project_id, now).await
} else {
Ok(vec![])
}
}
}
@ -67,7 +65,7 @@ impl TryFrom<&AppHandle> for HandlerInner {
}
impl HandlerInner {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
now: &time::SystemTime,
@ -133,6 +131,7 @@ impl HandlerInner {
project_data_last_fetched: Some(fetch_result),
..Default::default()
})
.await
.context("failed to update fetch result")?;
Ok(vec![])

View File

@ -1,7 +1,8 @@
use std::sync::{Arc, Mutex, TryLockError};
use std::sync::Arc;
use anyhow::{Context, Result};
use tauri::{AppHandle, Manager};
use tokio::sync::Mutex;
use crate::{
gb_repository, paths::DataDir, project_repository, projects, projects::ProjectId, sessions,
@ -27,15 +28,15 @@ impl TryFrom<&AppHandle> for Handler {
}
impl Handler {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
session: &sessions::Session,
) -> Result<Vec<events::Event>> {
match self.inner.try_lock() {
Ok(inner) => inner.handle(project_id, session),
Err(TryLockError::Poisoned(_)) => Err(anyhow::anyhow!("mutex poisoned")),
Err(TryLockError::WouldBlock) => Ok(vec![]),
if let Ok(inner) = self.inner.try_lock() {
inner.handle(project_id, session).await
} else {
Ok(vec![])
}
}
}
@ -64,7 +65,7 @@ impl TryFrom<&AppHandle> for HandlerInner {
}
impl HandlerInner {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
session: &sessions::Session,
@ -84,9 +85,7 @@ impl HandlerInner {
)
.context("failed to open repository")?;
match futures::executor::block_on(async {
self.vbrach_controller.flush_vbranches(*project_id).await
}) {
match self.vbrach_controller.flush_vbranches(*project_id).await {
Ok(()) => Ok(()),
Err(virtual_branches::controller::ControllerError::VerifyError(error)) => {
tracing::warn!(?error, "failed to flush virtual branches");

View File

@ -63,7 +63,7 @@ impl TryFrom<&AppHandle> for Handler {
impl Handler {
#[instrument(skip(self), fields(event = %event), level = "debug")]
pub fn handle(
pub async fn handle(
&self,
event: &events::Event,
now: time::SystemTime,
@ -90,16 +90,19 @@ impl Handler {
events::Event::PushProjectToGitbutler(project_id) => self
.push_project_to_gitbutler
.handle(project_id, &now)
.await
.context("failed to push project to gitbutler"),
events::Event::FetchGitbutlerData(project_id) => self
.fetch_gitbutler_handler
.handle(project_id, &now)
.await
.context("failed to fetch gitbutler data"),
events::Event::FetchProjectData(project_id) => self
.fetch_project_handler
.handle(project_id, &now)
.await
.context("failed to fetch project data"),
events::Event::Tick(project_id) => self
@ -110,6 +113,7 @@ impl Handler {
events::Event::Flush(project_id, session) => self
.flush_session_handler
.handle(project_id, session)
.await
.context("failed to handle flush session event"),
events::Event::SessionFile((project_id, session_id, file_path, contents)) => {
@ -137,6 +141,7 @@ impl Handler {
events::Event::CalculateVirtualBranches(project_id) => self
.virtual_branch_handler
.handle(project_id)
.await
.context("failed to handle virtual branch event"),
events::Event::CalculateDeltas(project_id, path) => self
@ -157,6 +162,7 @@ impl Handler {
events::Event::Analytics(event) => self
.analytics_handler
.handle(event)
.await
.context("failed to handle analytics event"),
events::Event::Session(project_id, session) => self

View File

@ -1,10 +1,8 @@
use std::{
sync::{Arc, Mutex, TryLockError},
time,
};
use std::{sync::Arc, time};
use anyhow::{Context, Result};
use tauri::AppHandle;
use tokio::sync::Mutex;
use crate::{
project_repository,
@ -31,15 +29,15 @@ impl TryFrom<&AppHandle> for Handler {
}
impl Handler {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
now: &time::SystemTime,
) -> Result<Vec<events::Event>> {
match self.inner.try_lock() {
Ok(inner) => inner.handle(project_id, now),
Err(TryLockError::Poisoned(_)) => Err(anyhow::anyhow!("mutex poisoned")),
Err(TryLockError::WouldBlock) => Ok(vec![]),
if let Ok(inner) = self.inner.try_lock() {
inner.handle(project_id, now).await
} else {
Ok(vec![])
}
}
}
@ -61,7 +59,7 @@ impl TryFrom<&AppHandle> for HandlerInner {
}
impl HandlerInner {
pub fn handle(
pub async fn handle(
&self,
project_id: &ProjectId,
now: &time::SystemTime,
@ -122,6 +120,7 @@ impl HandlerInner {
}),
..Default::default()
})
.await
.context("failed to update last push")?;
tracing::debug!(

View File

@ -185,8 +185,8 @@ mod test_handler {
use super::super::test_remote_repository;
use super::*;
#[test]
fn test_fetch_triggered() -> Result<()> {
#[tokio::test]
async fn test_fetch_triggered() -> Result<()> {
let suite = Suite::default();
let Case { project, .. } = suite.new_case();
@ -203,11 +203,14 @@ mod test_handler {
sync: true,
};
suite.projects.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})?;
suite
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let listener = Handler {
local_data_dir: suite.local_app_data,

View File

@ -1,9 +1,9 @@
use anyhow::{anyhow, Result};
use anyhow::{Context, Result};
use tauri::{AppHandle, Manager};
use crate::{assets, events as app_events, projects::ProjectId, virtual_branches};
use super::events;
use crate::events as app_events;
use crate::{assets, projects::ProjectId, virtual_branches};
#[derive(Clone)]
pub struct Handler {
@ -25,28 +25,17 @@ impl TryFrom<&AppHandle> for Handler {
}
impl Handler {
pub fn handle(&self, project_id: &ProjectId) -> Result<Vec<events::Event>> {
let branches = futures::executor::block_on(async {
self.vbranch_controller
.list_virtual_branches(project_id)
.await
});
pub async fn handle(&self, project_id: &ProjectId) -> Result<Vec<events::Event>> {
let branches = self
.vbranch_controller
.list_virtual_branches(project_id)
.await
.context("failed to list virtual branches")?;
let branches = match branches {
Ok(branches) => {
let branches = futures::executor::block_on(async {
self.assets_proxy.proxy_virtual_branches(branches).await
});
Ok(branches)
}
Err(error) => Err(anyhow!(error)),
};
let branches = self.assets_proxy.proxy_virtual_branches(branches).await;
match branches {
Ok(branches) => Ok(vec![events::Event::Emit(
app_events::Event::virtual_branches(project_id, &branches.clone()),
)]),
Err(error) => Err(error),
}
Ok(vec![events::Event::Emit(
app_events::Event::virtual_branches(project_id, &branches.clone()),
)])
}
}

View File

@ -196,31 +196,35 @@ impl WatcherInner {
let handler = self.handler.clone();
let tx = proxy_tx.clone();
let event = event.clone();
move || match handler.handle(&event, time::SystemTime::now()) {
Err(error) => tracing::error!(
project_id,
%event,
?error,
"failed to handle event",
),
Ok(events) => {
for e in events {
if let Err(error) = tx.send(e.clone()) {
tracing::error!(
project_id,
%event,
?error,
"failed to post event",
);
} else {
tracing::debug!(
project_id,
%event,
"sent response event",
);
move || {
futures::executor::block_on(async move {
match handler.handle(&event, time::SystemTime::now()).await {
Err(error) => tracing::error!(
project_id,
%event,
?error,
"failed to handle event",
),
Ok(events) => {
for e in events {
if let Err(error) = tx.send(e.clone()) {
tracing::error!(
project_id,
%event,
?error,
"failed to post event",
);
} else {
tracing::debug!(
project_id,
%event,
"sent response event",
);
}
}
}
}
}
});
}
})?;
Ok(())