make watchers async

This commit is contained in:
Nikita Galaiko 2023-03-22 07:51:58 +01:00
parent b202c264aa
commit 80e3e9fd28
No known key found for this signature in database
GPG Key ID: EBAB54E845BA519D
7 changed files with 170 additions and 134 deletions

69
src-tauri/Cargo.lock generated
View File

@ -1149,6 +1149,7 @@ dependencies = [
"tauri-plugin-window-state",
"tempfile",
"thiserror",
"tokio",
"urlencoding",
"uuid 1.3.0",
"walkdir",
@ -3977,9 +3978,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.25.0"
version = "1.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af"
checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64"
dependencies = [
"autocfg",
"bytes",
@ -3989,7 +3990,7 @@ dependencies = [
"num_cpus",
"pin-project-lite",
"socket2",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@ -4582,12 +4583,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.42.1",
"windows_i686_gnu 0.42.1",
"windows_i686_msvc 0.42.1",
"windows_x86_64_gnu 0.42.1",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.42.1",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
@ -4598,9 +4623,9 @@ checksum = "f838de2fe15fe6bac988e74b798f26499a8b21a9d97edec321e79b28d1d7f597"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.1"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_msvc"
@ -4622,9 +4647,9 @@ checksum = "ec7711666096bd4096ffa835238905bb33fb87267910e154b18b44eaabb340f2"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.1"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_i686_gnu"
@ -4646,9 +4671,9 @@ checksum = "763fc57100a5f7042e3057e7e8d9bdd7860d330070251a73d003563a3bb49e1b"
[[package]]
name = "windows_i686_gnu"
version = "0.42.1"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_msvc"
@ -4670,9 +4695,9 @@ checksum = "7bc7cbfe58828921e10a9f446fcaaf649204dcfe6c1ddd712c5eebae6bda1106"
[[package]]
name = "windows_i686_msvc"
version = "0.42.1"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_x86_64_gnu"
@ -4694,15 +4719,15 @@ checksum = "6868c165637d653ae1e8dc4d82c25d4f97dd6605eaa8d784b5c6e0ab2a252b65"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.1"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.1"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_msvc"
@ -4724,9 +4749,9 @@ checksum = "5e4d40883ae9cae962787ca76ba76390ffa29214667a111db9e0a1ad8377e809"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.1"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "winreg"

View File

@ -37,6 +37,7 @@ thiserror = "1.0.38"
tantivy = "0.19.2"
similar = "2.2.1"
fslock = "0.2.1"
tokio = { version = "1.26.0", features = ["sync"] }
[features]
# by default Tauri runs in production mode

View File

@ -13,11 +13,7 @@ use anyhow::{Context, Result};
use deltas::Delta;
use log;
use serde::{ser::SerializeMap, Serialize};
use std::{
collections::HashMap,
ops::Range,
sync::{mpsc, Mutex},
};
use std::{collections::HashMap, ops::Range, sync::Mutex};
use storage::Storage;
use tauri::{generate_context, Manager};
use tauri_plugin_log::{
@ -143,7 +139,7 @@ async fn proxy_image(handle: tauri::AppHandle, src: &str) -> Result<String> {
}
#[tauri::command(async)]
fn search(
async fn search(
handle: tauri::AppHandle,
project_id: &str,
query: &str,
@ -177,7 +173,7 @@ fn search(
}
#[tauri::command(async)]
fn list_sessions(
async fn list_sessions(
handle: tauri::AppHandle,
project_id: &str,
earliest_timestamp_ms: Option<u128>,
@ -220,7 +216,7 @@ async fn get_user(handle: tauri::AppHandle) -> Result<Option<users::User>, Error
}
#[tauri::command(async)]
fn set_user(handle: tauri::AppHandle, user: users::User) -> Result<(), Error> {
async fn set_user(handle: tauri::AppHandle, user: users::User) -> Result<(), Error> {
let app_state = handle.state::<App>();
app_state
@ -234,7 +230,7 @@ fn set_user(handle: tauri::AppHandle, user: users::User) -> Result<(), Error> {
}
#[tauri::command(async)]
fn delete_user(handle: tauri::AppHandle) -> Result<(), Error> {
async fn delete_user(handle: tauri::AppHandle) -> Result<(), Error> {
let app_state = handle.state::<App>();
app_state
@ -248,7 +244,7 @@ fn delete_user(handle: tauri::AppHandle) -> Result<(), Error> {
}
#[tauri::command(async)]
fn update_project(
async fn update_project(
handle: tauri::AppHandle,
project: projects::UpdateRequest,
) -> Result<projects::Project, Error> {
@ -263,7 +259,7 @@ fn update_project(
}
#[tauri::command(async)]
fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project, Error> {
async fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project, Error> {
let app_state = handle.state::<App>();
for project in app_state
@ -299,7 +295,11 @@ fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project
let repo = repo_for_project(handle.clone(), &project.id)?;
let (tx, rx): (mpsc::Sender<events::Event>, mpsc::Receiver<events::Event>) = mpsc::channel();
let (tx, rx): (
tokio::sync::mpsc::Sender<events::Event>,
tokio::sync::mpsc::Receiver<events::Event>,
) = tokio::sync::mpsc::channel(1);
app_state.watchers.lock().unwrap().watch(
tx,
&project,
@ -312,7 +312,7 @@ fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project
}
#[tauri::command(async)]
fn list_projects(handle: tauri::AppHandle) -> Result<Vec<projects::Project>, Error> {
async fn list_projects(handle: tauri::AppHandle) -> Result<Vec<projects::Project>, Error> {
let app_state = handle.state::<App>();
let projects = app_state.projects_storage.list_projects()?;
@ -321,7 +321,7 @@ fn list_projects(handle: tauri::AppHandle) -> Result<Vec<projects::Project>, Err
}
#[tauri::command(async)]
fn delete_project(handle: tauri::AppHandle, id: &str) -> Result<(), Error> {
async fn delete_project(handle: tauri::AppHandle, id: &str) -> Result<(), Error> {
let app_state = handle.state::<App>();
match app_state.projects_storage.get_project(id)? {
@ -343,7 +343,7 @@ fn delete_project(handle: tauri::AppHandle, id: &str) -> Result<(), Error> {
}
#[tauri::command(async)]
fn list_session_files(
async fn list_session_files(
handle: tauri::AppHandle,
project_id: &str,
session_id: &str,
@ -355,7 +355,7 @@ fn list_session_files(
}
#[tauri::command(async)]
fn list_deltas(
async fn list_deltas(
handle: tauri::AppHandle,
project_id: &str,
session_id: &str,
@ -367,7 +367,7 @@ fn list_deltas(
}
#[tauri::command(async)]
fn git_status(
async fn git_status(
handle: tauri::AppHandle,
project_id: &str,
) -> Result<HashMap<String, String>, Error> {
@ -377,7 +377,7 @@ fn git_status(
}
#[tauri::command(async)]
fn git_wd_diff(
async fn git_wd_diff(
handle: tauri::AppHandle,
project_id: &str,
) -> Result<HashMap<String, String>, Error> {
@ -389,7 +389,7 @@ fn git_wd_diff(
}
#[tauri::command(async)]
fn git_file_paths(handle: tauri::AppHandle, project_id: &str) -> Result<Vec<String>, Error> {
async fn git_file_paths(handle: tauri::AppHandle, project_id: &str) -> Result<Vec<String>, Error> {
let repo = repo_for_project(handle, project_id)?;
let files = repo
.file_paths()
@ -399,7 +399,7 @@ fn git_file_paths(handle: tauri::AppHandle, project_id: &str) -> Result<Vec<Stri
}
#[tauri::command(async)]
fn git_match_paths(
async fn git_match_paths(
handle: tauri::AppHandle,
project_id: &str,
match_pattern: &str,
@ -429,7 +429,7 @@ fn repo_for_project(
}
#[tauri::command(async)]
fn git_branches(handle: tauri::AppHandle, project_id: &str) -> Result<Vec<String>, Error> {
async fn git_branches(handle: tauri::AppHandle, project_id: &str) -> Result<Vec<String>, Error> {
let repo = repo_for_project(handle, project_id)?;
let files = repo
.branches()
@ -438,7 +438,7 @@ fn git_branches(handle: tauri::AppHandle, project_id: &str) -> Result<Vec<String
}
#[tauri::command(async)]
fn git_branch(handle: tauri::AppHandle, project_id: &str) -> Result<String, Error> {
async fn git_branch(handle: tauri::AppHandle, project_id: &str) -> Result<String, Error> {
let repo = repo_for_project(handle, project_id)?;
let files = repo
.branch()
@ -447,7 +447,7 @@ fn git_branch(handle: tauri::AppHandle, project_id: &str) -> Result<String, Erro
}
#[tauri::command(async)]
fn git_switch_branch(
async fn git_switch_branch(
handle: tauri::AppHandle,
project_id: &str,
branch: &str,
@ -460,7 +460,7 @@ fn git_switch_branch(
}
#[tauri::command(async)]
fn git_commit(
async fn git_commit(
handle: tauri::AppHandle,
project_id: &str,
message: &str,
@ -650,7 +650,7 @@ fn init(app_handle: tauri::AppHandle) -> Result<()> {
}
// start watching projects
let (tx, rx): (mpsc::Sender<events::Event>, mpsc::Receiver<events::Event>) = mpsc::channel();
let (tx, rx) = tokio::sync::mpsc::channel::<events::Event>(32);
let projects = app_state
.projects_storage
@ -692,9 +692,9 @@ fn init(app_handle: tauri::AppHandle) -> Result<()> {
Ok(())
}
fn watch_events(handle: tauri::AppHandle, rx: mpsc::Receiver<events::Event>) {
fn watch_events(handle: tauri::AppHandle, mut rx: tokio::sync::mpsc::Receiver<events::Event>) {
tauri::async_runtime::spawn(async move {
while let Ok(event) = rx.recv() {
while let Some(event) = rx.recv().await {
if let Some(window) = handle.get_window("main") {
log::info!("Emitting event: {}", event.name);
match window.emit(&event.name, event.payload) {

View File

@ -3,11 +3,11 @@ use crate::projects;
use crate::{events, sessions};
use anyhow::{Context, Result};
use git2;
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::{mpsc, Arc, Mutex};
use std::sync::Arc;
pub struct DeltaWatchers {
watchers: HashMap<String, RecommendedWatcher>,
@ -40,25 +40,27 @@ impl DeltaWatchers {
pub fn watch(
&mut self,
sender: mpsc::Sender<events::Event>,
sender: tokio::sync::mpsc::Sender<events::Event>,
project: projects::Project,
mutex: Arc<Mutex<fslock::LockFile>>,
mutex: Arc<tokio::sync::Mutex<fslock::LockFile>>,
deltas_storage: &deltas::Store,
) -> Result<()> {
log::info!("Watching deltas for {}", project.path);
let project_path = Path::new(&project.path);
let (tx, rx) = mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
let mut watcher = notify::recommended_watcher(move |res| {
let _ = tx.try_send(res);
})?;
watcher.watch(project_path, RecursiveMode::Recursive)?;
self.watchers.insert(project.path.clone(), watcher);
let shared_deltas_storage = deltas_storage.clone();
tauri::async_runtime::spawn_blocking(move || {
let deltas_storage = shared_deltas_storage.clone();
while let Ok(event) = rx.recv() {
tauri::async_runtime::spawn(async move {
let deltas_storage = shared_deltas_storage;
while let Some(event) = rx.recv().await {
match event {
Ok(notify_event) => {
for file_path in notify_event.paths {
@ -88,52 +90,60 @@ impl DeltaWatchers {
continue;
}
let mut fslock = mutex.lock().unwrap();
let mut fslock = mutex.lock().await;
log::debug!("{}: locking", project.id);
fslock.lock().unwrap();
log::debug!("{}: locked", project.id);
match register_file_change(
let change = match register_file_change(
&project,
&repo,
&deltas_storage,
&relative_file_path,
) {
Ok(Some((session, deltas))) => {
if let Err(e) =
sender.send(events::Event::session(&project, &session))
{
log::error!(
"{}: failed to send session event: {:#}",
project.id,
e
)
}
if let Err(e) = sender.send(events::Event::detlas(
&project,
&session,
&deltas,
&relative_file_path,
)) {
log::error!(
"{}: failed to send deltas event: {:#}",
project.id,
e
)
}
Ok(change) => change,
Err(e) => {
log::error!(
"{}: failed to register file change: {:#}",
project.id,
e
);
None
}
Ok(None) => {}
Err(e) => log::error!(
"{}: failed to register file change: {:#}",
project.id,
e
),
}
};
log::debug!("{}: unlocking", project.id);
fslock.unlock().unwrap();
log::debug!("{}: unlocked", project.id);
if let Some((session, deltas)) = change {
if let Err(e) = sender
.send(events::Event::session(&project, &session))
.await
{
log::error!(
"{}: failed to send session event: {:#}",
project.id,
e
);
}
if let Err(e) = sender
.send(events::Event::detlas(
&project,
&session,
&deltas,
&relative_file_path,
))
.await
{
log::error!(
"{}: failed to send deltas event: {:#}",
project.id,
e
);
}
}
}
}
Err(e) => log::error!("{}: notify event error: {:#}", project.id, e),

View File

@ -1,10 +1,10 @@
use crate::{events, projects};
use anyhow::Result;
use notify::{Config, RecommendedWatcher, Watcher};
use notify::{RecommendedWatcher, Watcher};
use std::{
collections::HashMap,
path::Path,
sync::{mpsc, Arc, Mutex},
sync::{Arc, Mutex},
};
pub struct GitWatchers {
@ -27,11 +27,13 @@ impl GitWatchers {
pub fn watch(
&mut self,
sender: mpsc::Sender<events::Event>,
sender: tokio::sync::mpsc::Sender<events::Event>,
project: projects::Project,
) -> Result<()> {
let (tx, rx) = mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
let (tx, mut rx) = tokio::sync::mpsc::channel(32);
let mut watcher = notify::recommended_watcher(move |res| {
let _ = tx.try_send(res);
})?;
watcher.watch(
Path::new(&Path::new(&project.path).join(".git")),
@ -41,13 +43,13 @@ impl GitWatchers {
let project = Arc::new(Mutex::new(project.clone()));
tauri::async_runtime::spawn_blocking(move || {
tauri::async_runtime::spawn(async move {
log::info!("{}: watching git", project.lock().unwrap().id);
let project = project.lock().unwrap().clone();
let project_path = Path::new(&project.path);
while let Ok(event) = rx.recv() {
while let Some(event) = rx.recv().await {
if let Err(e) = event {
log::error!("{}: notify event error: {:#}", project.id.clone(), e);
continue;
@ -70,12 +72,12 @@ impl GitWatchers {
kind_string
);
if let Err(e) = sender.send(event) {
if let Err(e) = sender.send(event).await {
log::error!(
"{}: notify event error: {:#}",
project.id.clone(),
e
);
)
}
}
Ok(None) => {}
@ -83,9 +85,7 @@ impl GitWatchers {
log::error!("{}: notify event error: {:#}", project.id.clone(), e)
}
},
None => {
// ignore
}
None => {}
}
}
}

View File

@ -9,10 +9,7 @@ mod test;
use crate::{deltas, events, projects, search, sessions, users};
use anyhow::Result;
use std::{
path::Path,
sync::{mpsc, Arc, Mutex},
};
use std::{path::Path, sync::Arc};
pub struct Watcher {
session_watcher: session::SessionWatcher,
@ -39,14 +36,14 @@ impl Watcher {
pub fn watch(
&mut self,
sender: mpsc::Sender<events::Event>,
sender: tokio::sync::mpsc::Sender<events::Event>,
project: &projects::Project,
deltas_storage: &deltas::Store,
sessions_storage: &sessions::Store,
) -> Result<()> {
// shared mutex to prevent concurrent write to gitbutler interal state by multiple watchers
// at the same time
let lock_file = Arc::new(Mutex::new(fslock::LockFile::open(
let lock_file = Arc::new(tokio::sync::Mutex::new(fslock::LockFile::open(
&Path::new(&project.path)
.join(".git")
.join(format!("gb-{}", project.id))

View File

@ -1,10 +1,7 @@
use crate::{deltas, events, projects, search, sessions, users};
use anyhow::{Context, Result};
use std::{
sync::{mpsc, Arc, Mutex},
thread,
time::{Duration, SystemTime},
};
use std::{sync::Arc, time::SystemTime};
use tokio::time::{sleep, Duration};
const FIVE_MINUTES: u128 = Duration::new(5 * 60, 0).as_millis();
const ONE_HOUR: u128 = Duration::new(60 * 60, 0).as_millis();
@ -29,11 +26,11 @@ impl<'a> SessionWatcher {
}
}
fn run(
async fn run(
&mut self,
project_id: &str,
sender: mpsc::Sender<events::Event>,
mutex: Arc<Mutex<fslock::LockFile>>,
sender: tokio::sync::mpsc::Sender<events::Event>,
mutex: Arc<tokio::sync::Mutex<fslock::LockFile>>,
deltas_storage: &deltas::Store,
sessions_storage: &sessions::Store,
) -> Result<()> {
@ -52,7 +49,7 @@ impl<'a> SessionWatcher {
.with_context(|| "failed to check for session to comit")?
{
Some(mut session) => {
let mut fslock = mutex.lock().unwrap();
let mut fslock = mutex.lock().await;
log::debug!("{}: locking", project.id);
fslock.lock().unwrap();
log::debug!("{}: locked", project.id);
@ -71,8 +68,9 @@ impl<'a> SessionWatcher {
sender
.send(events::Event::session(&project, &session))
.await
.with_context(|| {
format!("{}: failed to send session event", project.id)
format!("failed to send session {} event", session.id)
})?;
Ok(())
@ -86,9 +84,9 @@ impl<'a> SessionWatcher {
pub fn watch(
&self,
sender: mpsc::Sender<events::Event>,
sender: tokio::sync::mpsc::Sender<events::Event>,
project: projects::Project,
mutex: Arc<Mutex<fslock::LockFile>>,
mutex: Arc<tokio::sync::Mutex<fslock::LockFile>>,
deltas_storage: &deltas::Store,
sessions_storage: &sessions::Store,
) -> Result<()> {
@ -100,22 +98,27 @@ impl<'a> SessionWatcher {
let shared_storage = deltas_storage.clone();
let shared_sessions_storage = sessions_storage.clone();
tauri::async_runtime::spawn_blocking(move || loop {
let local_self = &mut self_copy;
let deltas_storage = shared_storage.clone();
let sessions_storage = shared_sessions_storage.clone();
tauri::async_runtime::spawn(async move {
loop {
let local_self = &mut self_copy;
let deltas_storage = shared_storage.clone();
let sessions_storage = shared_sessions_storage.clone();
if let Err(e) = local_self.run(
&project_id,
sender.clone(),
mutex.clone(),
&deltas_storage,
&sessions_storage,
) {
log::error!("{}: error while running git watcher: {:#}", project_id, e);
if let Err(e) = local_self
.run(
&project_id,
sender.clone(),
mutex.clone(),
&deltas_storage,
&sessions_storage,
)
.await
{
log::error!("{}: error while running git watcher: {:#}", project_id, e);
}
sleep(Duration::from_secs(10)).await;
}
thread::sleep(Duration::from_secs(10));
});
Ok(())