From 17fa40b3b88e8605627773f8b7e7cd1fc8f12e45 Mon Sep 17 00:00:00 2001 From: Nikita Galaiko Date: Wed, 22 Mar 2023 14:11:21 +0100 Subject: [PATCH] fix deadlock issue --- src-tauri/src/main.rs | 32 +++------- src-tauri/src/search/deltas.rs | 90 +++++++++++------------------ src-tauri/src/search/deltas_test.rs | 85 ++++++++++++--------------- src-tauri/src/watchers/mod.rs | 25 +++----- src-tauri/src/watchers/session.rs | 67 ++++++++++----------- 5 files changed, 120 insertions(+), 179 deletions(-) diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index dd83678cd..aa4361f06 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -295,17 +295,9 @@ async fn add_project(handle: tauri::AppHandle, path: &str) -> Result, - tokio::sync::mpsc::Receiver, - ) = tokio::sync::mpsc::channel(1); + let (tx, rx) = tokio::sync::mpsc::channel::(1); - app_state.watchers.lock().unwrap().watch( - tx, - &project, - &repo.deltas_storage, - &repo.sessions_storage, - )?; + app_state.watchers.lock().unwrap().watch(tx, &repo)?; watch_events(handle, rx); Ok(project) @@ -669,21 +661,15 @@ fn init(app_handle: tauri::AppHandle) -> Result<()> { .watchers .lock() .unwrap() - .watch( - tx.clone(), - &project, - &repo.deltas_storage, - &repo.sessions_storage, - ) + .watch(tx.clone(), &repo) .with_context(|| format!("{}: failed to watch project", project.id))?; - let git_repository = repo.git_repository.lock().unwrap(); - if let Err(err) = app_state.deltas_searcher.lock().unwrap().reindex_project( - &git_repository, - &repo.project, - &repo.deltas_storage, - &repo.sessions_storage, - ) { + if let Err(err) = app_state + .deltas_searcher + .lock() + .unwrap() + .reindex_project(&repo) + { log::error!("{}: failed to reindex project: {:#}", project.id, err); } } diff --git a/src-tauri/src/search/deltas.rs b/src-tauri/src/search/deltas.rs index a036e4089..296294b68 100644 --- a/src-tauri/src/search/deltas.rs +++ b/src-tauri/src/search/deltas.rs @@ -1,4 +1,4 @@ -use crate::{deltas, projects, sessions, storage}; +use crate::{deltas, projects, repositories, sessions, storage}; use anyhow::{Context, Result}; use serde::Serialize; use similar::{ChangeTag, TextDiff}; @@ -7,7 +7,7 @@ use std::{ fs, path::{Path, PathBuf}, sync::{Arc, Mutex}, - time, vec, + vec, }; use tantivy::{collector, directory::MmapDirectory, schema, IndexWriter}; @@ -92,79 +92,55 @@ impl Deltas { search(&self.index, &self.reader, query) } - pub fn reindex_project( - &mut self, - repo: &git2::Repository, - project: &projects::Project, - deltas_storage: &deltas::Store, - session_storage: &sessions::Store, - ) -> Result<()> { - let start = time::SystemTime::now(); + pub fn reindex_project(&mut self, repository: &repositories::Repository) -> Result<()> { + let sessions = repository + .sessions(None) + .with_context(|| "Could not list sessions for project")?; - let reference = repo.find_reference(&project.refname())?; - let head = repo.find_commit(reference.target().unwrap())?; - - // list all commits from gitbutler head to the first commit - let mut walker = repo.revwalk()?; - walker.push(head.id())?; - walker.set_sorting(git2::Sort::TIME)?; - - for oid in walker { - let oid = oid?; - let commit = repo - .find_commit(oid) - .with_context(|| format!("Could not find commit {}", oid.to_string()))?; - let session_id = sessions::id_from_commit(repo, &commit)?; + for session in sessions { + if session.hash.is_none() { + continue; + } let version = self .meta_storage - .get(&project.id, &session_id)? + .get(&repository.project.id, &session.id)? .unwrap_or(0); if version == CURRENT_VERSION { continue; } - let session = sessions::Session::from_commit(repo, &commit).with_context(|| { - format!("Could not parse commit {} in project", oid.to_string()) - })?; - if let Err(e) = - self.index_session(project, &session, deltas_storage, session_storage) - { + if let Err(e) = self.index_session(repository, &session) { log::error!( - "Could not index commit {} in {}: {:#}", - oid, - project.path, + "Could not index session {} in {}: {:#}", + session.id, + repository.project.path, e ); } } - log::info!( - "Reindexing project {} done, took {}ms", - project.path, - time::SystemTime::now().duration_since(start)?.as_millis() - ); Ok(()) } pub fn index_session( &mut self, - project: &projects::Project, + repository: &repositories::Repository, session: &sessions::Session, - deltas_storage: &deltas::Store, - session_storage: &sessions::Store, ) -> Result<()> { - log::info!("Indexing session {} in {}", session.id, project.path); + log::info!( + "Indexing session {} in {}", + session.id, + repository.project.path + ); index_session( &self.index, &mut self.writer.lock().unwrap(), session, - project, - deltas_storage, - session_storage, + &repository, )?; self.meta_storage - .set(&project.id, &session.id, CURRENT_VERSION)?; + .set(&repository.project.id, &session.id, CURRENT_VERSION)?; Ok(()) } } @@ -199,18 +175,20 @@ fn index_session( index: &tantivy::Index, writer: &mut IndexWriter, session: &sessions::Session, - project: &projects::Project, - deltas_storage: &deltas::Store, - session_storage: &sessions::Store, + repository: &repositories::Repository, ) -> Result<()> { - let deltas = deltas_storage.list(&session.id, None)?; + let deltas = repository + .deltas(&session.id, None) + .with_context(|| "could not list deltas for session")?; if deltas.is_empty() { return Ok(()); } - let files = session_storage.list_files( - &session.id, - Some(deltas.keys().map(|k| k.as_str()).collect()), - )?; + let files = repository + .files( + &session.id, + Some(deltas.keys().map(|k| k.as_str()).collect()), + ) + .with_context(|| "could not list files for session")?; // index every file for (file_path, deltas) in deltas.into_iter() { // keep the state of the file after each delta operation @@ -227,7 +205,7 @@ fn index_session( index, writer, session, - project, + &repository.project, &mut file_text, &file_path, i, diff --git a/src-tauri/src/search/deltas_test.rs b/src-tauri/src/search/deltas_test.rs index 85b742f16..517b9289e 100644 --- a/src-tauri/src/search/deltas_test.rs +++ b/src-tauri/src/search/deltas_test.rs @@ -1,16 +1,13 @@ use crate::{ deltas::{self, Operation}, - projects, sessions, + projects, repositories, }; use anyhow::Result; use core::ops::Range; -use std::{ - path::Path, - sync::{Arc, Mutex}, -}; +use std::path::Path; use tempfile::tempdir; -fn test_project() -> Result<(git2::Repository, projects::Project)> { +fn test_project() -> Result { let path = tempdir()?.path().to_str().unwrap().to_string(); std::fs::create_dir_all(&path)?; let repo = git2::Repository::init(&path)?; @@ -26,19 +23,17 @@ fn test_project() -> Result<(git2::Repository, projects::Project)> { &[], )?; let project = projects::Project::from_path(path)?; - Ok((repo, project)) + repositories::Repository::new(project, None) } #[test] fn test_filter_by_timestamp() { - let (repo, project) = test_project().unwrap(); + let repository = test_project().unwrap(); let index_path = tempdir().unwrap().path().to_str().unwrap().to_string(); - let repo = Arc::new(Mutex::new(repo)); - let sessions_storage = sessions::Store::new(repo.clone(), project.clone()); - let deltas_storage = deltas::Store::new(repo, project.clone(), sessions_storage.clone()); - let mut session = sessions_storage.create_current().unwrap(); - deltas_storage + let mut session = repository.sessions_storage.create_current().unwrap(); + repository + .deltas_storage .write( Path::new("test.txt"), &vec![ @@ -57,16 +52,15 @@ fn test_filter_by_timestamp() { ], ) .unwrap(); - session = sessions_storage.flush(&session, None).unwrap(); + session = repository.sessions_storage.flush(&session, None).unwrap(); let mut searcher = super::Deltas::at(index_path.into()).unwrap(); - let write_result = - searcher.index_session(&project, &session, &deltas_storage, &sessions_storage); + let write_result = searcher.index_session(&repository, &session); assert!(write_result.is_ok()); let search_result_from = searcher.search(&super::SearchQuery { - project_id: project.id.clone(), + project_id: repository.project.id.clone(), q: "test.txt".to_string(), limit: 10, range: Range { start: 2, end: 10 }, @@ -78,7 +72,7 @@ fn test_filter_by_timestamp() { assert_eq!(search_result_from[0].index, 2); let search_result_to = searcher.search(&super::SearchQuery { - project_id: project.id.clone(), + project_id: repository.project.id.clone(), q: "test.txt".to_string(), limit: 10, range: Range { start: 0, end: 1 }, @@ -90,7 +84,7 @@ fn test_filter_by_timestamp() { assert_eq!(search_result_to[0].index, 0); let search_result_from_to = searcher.search(&super::SearchQuery { - project_id: project.id.clone(), + project_id: repository.project.id.clone(), q: "test.txt".to_string(), limit: 10, range: Range { start: 1, end: 2 }, @@ -104,14 +98,12 @@ fn test_filter_by_timestamp() { #[test] fn test_sorted_by_timestamp() { - let (repo, project) = test_project().unwrap(); + let repository = test_project().unwrap(); let index_path = tempdir().unwrap().path().to_str().unwrap().to_string(); - let repo = Arc::new(Mutex::new(repo)); - let sessions_storage = sessions::Store::new(repo.clone(), project.clone()); - let deltas_storage = deltas::Store::new(repo, project.clone(), sessions_storage.clone()); - let mut session = sessions_storage.create_current().unwrap(); - deltas_storage + let mut session = repository.sessions_storage.create_current().unwrap(); + repository + .deltas_storage .write( Path::new("test.txt"), &vec![ @@ -126,16 +118,15 @@ fn test_sorted_by_timestamp() { ], ) .unwrap(); - session = sessions_storage.flush(&session, None).unwrap(); + session = repository.sessions_storage.flush(&session, None).unwrap(); let mut searcher = super::Deltas::at(index_path.into()).unwrap(); - let write_result = - searcher.index_session(&project, &session, &deltas_storage, &sessions_storage); + let write_result = searcher.index_session(&repository, &session); assert!(write_result.is_ok()); let search_result = searcher.search(&super::SearchQuery { - project_id: project.id, + project_id: repository.project.id, q: "hello world".to_string(), limit: 10, range: Range { start: 0, end: 10 }, @@ -151,14 +142,12 @@ fn test_sorted_by_timestamp() { #[test] fn test_simple() { - let (repo, project) = test_project().unwrap(); + let repository = test_project().unwrap(); let index_path = tempdir().unwrap().path().to_str().unwrap().to_string(); - let repo = Arc::new(Mutex::new(repo)); - let sessions_storage = sessions::Store::new(repo.clone(), project.clone()); - let deltas_storage = deltas::Store::new(repo, project.clone(), sessions_storage.clone()); - let mut session = sessions_storage.create_current().unwrap(); - deltas_storage + let mut session = repository.sessions_storage.create_current().unwrap(); + repository + .deltas_storage .write( Path::new("test.txt"), &vec![ @@ -173,16 +162,15 @@ fn test_simple() { ], ) .unwrap(); - session = sessions_storage.flush(&session, None).unwrap(); + session = repository.sessions_storage.flush(&session, None).unwrap(); let mut searcher = super::Deltas::at(index_path.into()).unwrap(); - let write_result = - searcher.index_session(&project, &session, &deltas_storage, &sessions_storage); + let write_result = searcher.index_session(&repository, &session); assert!(write_result.is_ok()); let search_result1 = searcher.search(&super::SearchQuery { - project_id: project.id.clone(), + project_id: repository.project.id.clone(), q: "hello".to_string(), limit: 10, offset: None, @@ -193,12 +181,12 @@ fn test_simple() { let search_result1 = search_result1.unwrap(); assert_eq!(search_result1.len(), 1); assert_eq!(search_result1[0].session_id, session.id); - assert_eq!(search_result1[0].project_id, project.id); + assert_eq!(search_result1[0].project_id, repository.project.id); assert_eq!(search_result1[0].file_path, "test.txt"); assert_eq!(search_result1[0].index, 0); let search_result2 = searcher.search(&super::SearchQuery { - project_id: project.id.clone(), + project_id: repository.project.id.clone(), q: "world".to_string(), limit: 10, offset: None, @@ -208,12 +196,12 @@ fn test_simple() { let search_result2 = search_result2.unwrap(); assert_eq!(search_result2.len(), 1); assert_eq!(search_result2[0].session_id, session.id); - assert_eq!(search_result2[0].project_id, project.id); + assert_eq!(search_result2[0].project_id, repository.project.id); assert_eq!(search_result2[0].file_path, "test.txt"); assert_eq!(search_result2[0].index, 1); let search_result3 = searcher.search(&super::SearchQuery { - project_id: project.id.clone(), + project_id: repository.project.id.clone(), q: "hello world".to_string(), limit: 10, offset: None, @@ -222,15 +210,15 @@ fn test_simple() { assert!(search_result3.is_ok()); let search_result3 = search_result3.unwrap(); assert_eq!(search_result3.len(), 2); - assert_eq!(search_result3[0].project_id, project.id); + assert_eq!(search_result3[0].project_id, repository.project.id); assert_eq!(search_result3[0].session_id, session.id); assert_eq!(search_result3[0].file_path, "test.txt"); assert_eq!(search_result3[1].session_id, session.id); - assert_eq!(search_result3[1].project_id, project.id); + assert_eq!(search_result3[1].project_id, repository.project.id); assert_eq!(search_result3[1].file_path, "test.txt"); let search_by_filename_result = searcher.search(&super::SearchQuery { - project_id: project.id.clone(), + project_id: repository.project.id.clone(), q: "test.txt".to_string(), limit: 10, offset: None, @@ -240,7 +228,10 @@ fn test_simple() { let search_by_filename_result = search_by_filename_result.unwrap(); assert_eq!(search_by_filename_result.len(), 2); assert_eq!(search_by_filename_result[0].session_id, session.id); - assert_eq!(search_by_filename_result[0].project_id, project.id); + assert_eq!( + search_by_filename_result[0].project_id, + repository.project.id + ); assert_eq!(search_by_filename_result[0].file_path, "test.txt"); let not_found_result = searcher.search(&super::SearchQuery { diff --git a/src-tauri/src/watchers/mod.rs b/src-tauri/src/watchers/mod.rs index d019df78f..33be35619 100644 --- a/src-tauri/src/watchers/mod.rs +++ b/src-tauri/src/watchers/mod.rs @@ -8,7 +8,7 @@ mod delta_test; #[cfg(test)] mod test; -use crate::{deltas, events, projects, search, sessions, users}; +use crate::{events, projects, repositories, search, users}; use anyhow::Result; use std::{path::Path, sync::Arc}; @@ -35,33 +35,26 @@ impl Watcher { pub fn watch( &mut self, sender: tokio::sync::mpsc::Sender, - project: &projects::Project, - deltas_storage: &deltas::Store, - sessions_storage: &sessions::Store, + repository: &repositories::Repository, ) -> Result<()> { // shared mutex to prevent concurrent write to gitbutler interal state by multiple watchers // at the same time let lock_file = fslock::LockFile::open( - &Path::new(&project.path) + &Path::new(&repository.project.path) .join(".git") - .join(format!("gb-{}", project.id)) + .join(format!("gb-{}", repository.project.id)) .join(".lock"), )?; - let repo = git2::Repository::open(project.path.clone())?; + let repo = git2::Repository::open(repository.project.path.clone())?; repo.add_ignore_rule("*.lock")?; let shared_sender = Arc::new(sender.clone()); - let shared_deltas_store = Arc::new(deltas_storage.clone()); + let shared_deltas_store = Arc::new(repository.deltas_storage.clone()); let shared_lock_file = Arc::new(tokio::sync::Mutex::new(lock_file)); - self.session_watcher.watch( - sender, - project.clone(), - shared_lock_file.clone(), - deltas_storage, - sessions_storage, - )?; + self.session_watcher + .watch(sender, shared_lock_file.clone(), repository)?; let (fstx, mut fsevents) = tokio::sync::mpsc::channel::(32); @@ -94,7 +87,7 @@ impl Watcher { } } }); - self.files_watcher.watch(fstx, project.clone())?; + self.files_watcher.watch(fstx, repository.project.clone())?; Ok(()) } diff --git a/src-tauri/src/watchers/session.rs b/src-tauri/src/watchers/session.rs index 43d1e0e08..4409c8283 100644 --- a/src-tauri/src/watchers/session.rs +++ b/src-tauri/src/watchers/session.rs @@ -1,4 +1,4 @@ -use crate::{deltas, events, projects, search, sessions, users}; +use crate::{events, projects, repositories, search, sessions, users}; use anyhow::{Context, Result}; use std::{sync::Arc, time::SystemTime}; use tokio::time::{sleep, Duration}; @@ -28,16 +28,14 @@ impl SessionWatcher { async fn run( &mut self, - project_id: &str, sender: tokio::sync::mpsc::Sender, fslock: Arc>, - deltas_storage: &deltas::Store, - sessions_storage: &sessions::Store, + repository: &repositories::Repository, ) -> Result<()> { match self .projects_storage - .get_project(&project_id) - .with_context(|| format!("{}: failed to get project", project_id))? + .get_project(&repository.project.id) + .with_context(|| format!("{}: failed to get project", repository.project.id))? { Some(project) => { let user = self @@ -45,7 +43,7 @@ impl SessionWatcher { .get() .with_context(|| format!("{}: failed to get user", project.id))?; - match session_to_commit(&project, &sessions_storage) + match session_to_commit(&repository) .with_context(|| "failed to check for session to comit")? { Some(mut session) => { @@ -54,7 +52,8 @@ impl SessionWatcher { fslock.lock().unwrap(); log::debug!("{}: locked", project.id); - session = sessions_storage + session = repository + .sessions_storage .flush(&session, user) .with_context(|| format!("failed to flush session {}", session.id))?; @@ -63,7 +62,7 @@ impl SessionWatcher { log::debug!("{}: unlocked", project.id); self.deltas_searcher - .index_session(&project, &session, &deltas_storage, &sessions_storage) + .index_session(&repository, &session) .with_context(|| format!("failed to index session {}", session.id))?; sender @@ -85,36 +84,32 @@ impl SessionWatcher { pub fn watch( &self, sender: tokio::sync::mpsc::Sender, - project: projects::Project, fslock: Arc>, - deltas_storage: &deltas::Store, - sessions_storage: &sessions::Store, + repository: &repositories::Repository, ) -> Result<()> { - log::info!("{}: watching sessions in {}", project.id, project.path); + log::info!( + "{}: watching sessions in {}", + repository.project.id, + repository.project.path + ); let shared_self = self.clone(); let mut self_copy = shared_self.clone(); - let project_id = project.id; - - let shared_storage = deltas_storage.clone(); - let shared_sessions_storage = sessions_storage.clone(); + let shared_repository = repository.clone(); tauri::async_runtime::spawn(async move { let local_self = &mut self_copy; - let deltas_storage = shared_storage.clone(); - let sessions_storage = shared_sessions_storage.clone(); + let repository = &shared_repository; loop { if let Err(e) = local_self - .run( - &project_id, - sender.clone(), - fslock.clone(), - &deltas_storage, - &sessions_storage, - ) + .run(sender.clone(), fslock.clone(), &repository) .await { - log::error!("{}: error while running git watcher: {:#}", project_id, e); + log::error!( + "{}: error while running git watcher: {:#}", + repository.project.id, + e + ); } sleep(Duration::from_secs(10)).await; @@ -128,13 +123,11 @@ impl SessionWatcher { // make sure that the .git/gb/session directory exists (a session is in progress) // and that there has been no activity in the last 5 minutes (the session appears to be over) // and the start was at most an hour ago -fn session_to_commit( - project: &projects::Project, - sessions_store: &sessions::Store, -) -> Result> { - match sessions_store +fn session_to_commit(repository: &repositories::Repository) -> Result> { + match repository + .sessions_storage .get_current() - .with_context(|| format!("{}: failed to get current session", project.id))? + .with_context(|| format!("{}: failed to get current session", repository.project.id))? { None => Ok(None), Some(current_session) => { @@ -149,8 +142,8 @@ fn session_to_commit( if (elapsed_last > FIVE_MINUTES) || (elapsed_start > ONE_HOUR) { log::info!( "{}: ready to commit {} ({} seconds elapsed, {} seconds since start)", - project.id, - project.path, + repository.project.id, + repository.project.path, elapsed_last / 1000, elapsed_start / 1000 ); @@ -158,8 +151,8 @@ fn session_to_commit( } else { log::debug!( "{}: not ready to commit {} yet. ({} seconds elapsed, {} seconds since start)", - project.id, - project.path, + repository.project.id, + repository.project.path, elapsed_last / 1000, elapsed_start / 1000 );