fix deadlock issue

This commit is contained in:
Nikita Galaiko 2023-03-22 14:11:21 +01:00
parent 41d2893fd9
commit 17fa40b3b8
No known key found for this signature in database
GPG Key ID: EBAB54E845BA519D
5 changed files with 120 additions and 179 deletions

View File

@ -295,17 +295,9 @@ async fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::P
let repo = repo_for_project(handle.clone(), &project.id)?;
let (tx, rx): (
tokio::sync::mpsc::Sender<events::Event>,
tokio::sync::mpsc::Receiver<events::Event>,
) = tokio::sync::mpsc::channel(1);
let (tx, rx) = tokio::sync::mpsc::channel::<events::Event>(1);
app_state.watchers.lock().unwrap().watch(
tx,
&project,
&repo.deltas_storage,
&repo.sessions_storage,
)?;
app_state.watchers.lock().unwrap().watch(tx, &repo)?;
watch_events(handle, rx);
Ok(project)
@ -669,21 +661,15 @@ fn init(app_handle: tauri::AppHandle) -> Result<()> {
.watchers
.lock()
.unwrap()
.watch(
tx.clone(),
&project,
&repo.deltas_storage,
&repo.sessions_storage,
)
.watch(tx.clone(), &repo)
.with_context(|| format!("{}: failed to watch project", project.id))?;
let git_repository = repo.git_repository.lock().unwrap();
if let Err(err) = app_state.deltas_searcher.lock().unwrap().reindex_project(
&git_repository,
&repo.project,
&repo.deltas_storage,
&repo.sessions_storage,
) {
if let Err(err) = app_state
.deltas_searcher
.lock()
.unwrap()
.reindex_project(&repo)
{
log::error!("{}: failed to reindex project: {:#}", project.id, err);
}
}

View File

@ -1,4 +1,4 @@
use crate::{deltas, projects, sessions, storage};
use crate::{deltas, projects, repositories, sessions, storage};
use anyhow::{Context, Result};
use serde::Serialize;
use similar::{ChangeTag, TextDiff};
@ -7,7 +7,7 @@ use std::{
fs,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time, vec,
vec,
};
use tantivy::{collector, directory::MmapDirectory, schema, IndexWriter};
@ -92,79 +92,55 @@ impl Deltas {
search(&self.index, &self.reader, query)
}
pub fn reindex_project(
&mut self,
repo: &git2::Repository,
project: &projects::Project,
deltas_storage: &deltas::Store,
session_storage: &sessions::Store,
) -> Result<()> {
let start = time::SystemTime::now();
pub fn reindex_project(&mut self, repository: &repositories::Repository) -> Result<()> {
let sessions = repository
.sessions(None)
.with_context(|| "Could not list sessions for project")?;
let reference = repo.find_reference(&project.refname())?;
let head = repo.find_commit(reference.target().unwrap())?;
// list all commits from gitbutler head to the first commit
let mut walker = repo.revwalk()?;
walker.push(head.id())?;
walker.set_sorting(git2::Sort::TIME)?;
for oid in walker {
let oid = oid?;
let commit = repo
.find_commit(oid)
.with_context(|| format!("Could not find commit {}", oid.to_string()))?;
let session_id = sessions::id_from_commit(repo, &commit)?;
for session in sessions {
if session.hash.is_none() {
continue;
}
let version = self
.meta_storage
.get(&project.id, &session_id)?
.get(&repository.project.id, &session.id)?
.unwrap_or(0);
if version == CURRENT_VERSION {
continue;
}
let session = sessions::Session::from_commit(repo, &commit).with_context(|| {
format!("Could not parse commit {} in project", oid.to_string())
})?;
if let Err(e) =
self.index_session(project, &session, deltas_storage, session_storage)
{
if let Err(e) = self.index_session(repository, &session) {
log::error!(
"Could not index commit {} in {}: {:#}",
oid,
project.path,
"Could not index session {} in {}: {:#}",
session.id,
repository.project.path,
e
);
}
}
log::info!(
"Reindexing project {} done, took {}ms",
project.path,
time::SystemTime::now().duration_since(start)?.as_millis()
);
Ok(())
}
pub fn index_session(
&mut self,
project: &projects::Project,
repository: &repositories::Repository,
session: &sessions::Session,
deltas_storage: &deltas::Store,
session_storage: &sessions::Store,
) -> Result<()> {
log::info!("Indexing session {} in {}", session.id, project.path);
log::info!(
"Indexing session {} in {}",
session.id,
repository.project.path
);
index_session(
&self.index,
&mut self.writer.lock().unwrap(),
session,
project,
deltas_storage,
session_storage,
&repository,
)?;
self.meta_storage
.set(&project.id, &session.id, CURRENT_VERSION)?;
.set(&repository.project.id, &session.id, CURRENT_VERSION)?;
Ok(())
}
}
@ -199,18 +175,20 @@ fn index_session(
index: &tantivy::Index,
writer: &mut IndexWriter,
session: &sessions::Session,
project: &projects::Project,
deltas_storage: &deltas::Store,
session_storage: &sessions::Store,
repository: &repositories::Repository,
) -> Result<()> {
let deltas = deltas_storage.list(&session.id, None)?;
let deltas = repository
.deltas(&session.id, None)
.with_context(|| "could not list deltas for session")?;
if deltas.is_empty() {
return Ok(());
}
let files = session_storage.list_files(
&session.id,
Some(deltas.keys().map(|k| k.as_str()).collect()),
)?;
let files = repository
.files(
&session.id,
Some(deltas.keys().map(|k| k.as_str()).collect()),
)
.with_context(|| "could not list files for session")?;
// index every file
for (file_path, deltas) in deltas.into_iter() {
// keep the state of the file after each delta operation
@ -227,7 +205,7 @@ fn index_session(
index,
writer,
session,
project,
&repository.project,
&mut file_text,
&file_path,
i,

View File

@ -1,16 +1,13 @@
use crate::{
deltas::{self, Operation},
projects, sessions,
projects, repositories,
};
use anyhow::Result;
use core::ops::Range;
use std::{
path::Path,
sync::{Arc, Mutex},
};
use std::path::Path;
use tempfile::tempdir;
fn test_project() -> Result<(git2::Repository, projects::Project)> {
fn test_project() -> Result<repositories::Repository> {
let path = tempdir()?.path().to_str().unwrap().to_string();
std::fs::create_dir_all(&path)?;
let repo = git2::Repository::init(&path)?;
@ -26,19 +23,17 @@ fn test_project() -> Result<(git2::Repository, projects::Project)> {
&[],
)?;
let project = projects::Project::from_path(path)?;
Ok((repo, project))
repositories::Repository::new(project, None)
}
#[test]
fn test_filter_by_timestamp() {
let (repo, project) = test_project().unwrap();
let repository = test_project().unwrap();
let index_path = tempdir().unwrap().path().to_str().unwrap().to_string();
let repo = Arc::new(Mutex::new(repo));
let sessions_storage = sessions::Store::new(repo.clone(), project.clone());
let deltas_storage = deltas::Store::new(repo, project.clone(), sessions_storage.clone());
let mut session = sessions_storage.create_current().unwrap();
deltas_storage
let mut session = repository.sessions_storage.create_current().unwrap();
repository
.deltas_storage
.write(
Path::new("test.txt"),
&vec![
@ -57,16 +52,15 @@ fn test_filter_by_timestamp() {
],
)
.unwrap();
session = sessions_storage.flush(&session, None).unwrap();
session = repository.sessions_storage.flush(&session, None).unwrap();
let mut searcher = super::Deltas::at(index_path.into()).unwrap();
let write_result =
searcher.index_session(&project, &session, &deltas_storage, &sessions_storage);
let write_result = searcher.index_session(&repository, &session);
assert!(write_result.is_ok());
let search_result_from = searcher.search(&super::SearchQuery {
project_id: project.id.clone(),
project_id: repository.project.id.clone(),
q: "test.txt".to_string(),
limit: 10,
range: Range { start: 2, end: 10 },
@ -78,7 +72,7 @@ fn test_filter_by_timestamp() {
assert_eq!(search_result_from[0].index, 2);
let search_result_to = searcher.search(&super::SearchQuery {
project_id: project.id.clone(),
project_id: repository.project.id.clone(),
q: "test.txt".to_string(),
limit: 10,
range: Range { start: 0, end: 1 },
@ -90,7 +84,7 @@ fn test_filter_by_timestamp() {
assert_eq!(search_result_to[0].index, 0);
let search_result_from_to = searcher.search(&super::SearchQuery {
project_id: project.id.clone(),
project_id: repository.project.id.clone(),
q: "test.txt".to_string(),
limit: 10,
range: Range { start: 1, end: 2 },
@ -104,14 +98,12 @@ fn test_filter_by_timestamp() {
#[test]
fn test_sorted_by_timestamp() {
let (repo, project) = test_project().unwrap();
let repository = test_project().unwrap();
let index_path = tempdir().unwrap().path().to_str().unwrap().to_string();
let repo = Arc::new(Mutex::new(repo));
let sessions_storage = sessions::Store::new(repo.clone(), project.clone());
let deltas_storage = deltas::Store::new(repo, project.clone(), sessions_storage.clone());
let mut session = sessions_storage.create_current().unwrap();
deltas_storage
let mut session = repository.sessions_storage.create_current().unwrap();
repository
.deltas_storage
.write(
Path::new("test.txt"),
&vec![
@ -126,16 +118,15 @@ fn test_sorted_by_timestamp() {
],
)
.unwrap();
session = sessions_storage.flush(&session, None).unwrap();
session = repository.sessions_storage.flush(&session, None).unwrap();
let mut searcher = super::Deltas::at(index_path.into()).unwrap();
let write_result =
searcher.index_session(&project, &session, &deltas_storage, &sessions_storage);
let write_result = searcher.index_session(&repository, &session);
assert!(write_result.is_ok());
let search_result = searcher.search(&super::SearchQuery {
project_id: project.id,
project_id: repository.project.id,
q: "hello world".to_string(),
limit: 10,
range: Range { start: 0, end: 10 },
@ -151,14 +142,12 @@ fn test_sorted_by_timestamp() {
#[test]
fn test_simple() {
let (repo, project) = test_project().unwrap();
let repository = test_project().unwrap();
let index_path = tempdir().unwrap().path().to_str().unwrap().to_string();
let repo = Arc::new(Mutex::new(repo));
let sessions_storage = sessions::Store::new(repo.clone(), project.clone());
let deltas_storage = deltas::Store::new(repo, project.clone(), sessions_storage.clone());
let mut session = sessions_storage.create_current().unwrap();
deltas_storage
let mut session = repository.sessions_storage.create_current().unwrap();
repository
.deltas_storage
.write(
Path::new("test.txt"),
&vec![
@ -173,16 +162,15 @@ fn test_simple() {
],
)
.unwrap();
session = sessions_storage.flush(&session, None).unwrap();
session = repository.sessions_storage.flush(&session, None).unwrap();
let mut searcher = super::Deltas::at(index_path.into()).unwrap();
let write_result =
searcher.index_session(&project, &session, &deltas_storage, &sessions_storage);
let write_result = searcher.index_session(&repository, &session);
assert!(write_result.is_ok());
let search_result1 = searcher.search(&super::SearchQuery {
project_id: project.id.clone(),
project_id: repository.project.id.clone(),
q: "hello".to_string(),
limit: 10,
offset: None,
@ -193,12 +181,12 @@ fn test_simple() {
let search_result1 = search_result1.unwrap();
assert_eq!(search_result1.len(), 1);
assert_eq!(search_result1[0].session_id, session.id);
assert_eq!(search_result1[0].project_id, project.id);
assert_eq!(search_result1[0].project_id, repository.project.id);
assert_eq!(search_result1[0].file_path, "test.txt");
assert_eq!(search_result1[0].index, 0);
let search_result2 = searcher.search(&super::SearchQuery {
project_id: project.id.clone(),
project_id: repository.project.id.clone(),
q: "world".to_string(),
limit: 10,
offset: None,
@ -208,12 +196,12 @@ fn test_simple() {
let search_result2 = search_result2.unwrap();
assert_eq!(search_result2.len(), 1);
assert_eq!(search_result2[0].session_id, session.id);
assert_eq!(search_result2[0].project_id, project.id);
assert_eq!(search_result2[0].project_id, repository.project.id);
assert_eq!(search_result2[0].file_path, "test.txt");
assert_eq!(search_result2[0].index, 1);
let search_result3 = searcher.search(&super::SearchQuery {
project_id: project.id.clone(),
project_id: repository.project.id.clone(),
q: "hello world".to_string(),
limit: 10,
offset: None,
@ -222,15 +210,15 @@ fn test_simple() {
assert!(search_result3.is_ok());
let search_result3 = search_result3.unwrap();
assert_eq!(search_result3.len(), 2);
assert_eq!(search_result3[0].project_id, project.id);
assert_eq!(search_result3[0].project_id, repository.project.id);
assert_eq!(search_result3[0].session_id, session.id);
assert_eq!(search_result3[0].file_path, "test.txt");
assert_eq!(search_result3[1].session_id, session.id);
assert_eq!(search_result3[1].project_id, project.id);
assert_eq!(search_result3[1].project_id, repository.project.id);
assert_eq!(search_result3[1].file_path, "test.txt");
let search_by_filename_result = searcher.search(&super::SearchQuery {
project_id: project.id.clone(),
project_id: repository.project.id.clone(),
q: "test.txt".to_string(),
limit: 10,
offset: None,
@ -240,7 +228,10 @@ fn test_simple() {
let search_by_filename_result = search_by_filename_result.unwrap();
assert_eq!(search_by_filename_result.len(), 2);
assert_eq!(search_by_filename_result[0].session_id, session.id);
assert_eq!(search_by_filename_result[0].project_id, project.id);
assert_eq!(
search_by_filename_result[0].project_id,
repository.project.id
);
assert_eq!(search_by_filename_result[0].file_path, "test.txt");
let not_found_result = searcher.search(&super::SearchQuery {

View File

@ -8,7 +8,7 @@ mod delta_test;
#[cfg(test)]
mod test;
use crate::{deltas, events, projects, search, sessions, users};
use crate::{events, projects, repositories, search, users};
use anyhow::Result;
use std::{path::Path, sync::Arc};
@ -35,33 +35,26 @@ impl Watcher {
pub fn watch(
&mut self,
sender: tokio::sync::mpsc::Sender<events::Event>,
project: &projects::Project,
deltas_storage: &deltas::Store,
sessions_storage: &sessions::Store,
repository: &repositories::Repository,
) -> Result<()> {
// shared mutex to prevent concurrent write to gitbutler interal state by multiple watchers
// at the same time
let lock_file = fslock::LockFile::open(
&Path::new(&project.path)
&Path::new(&repository.project.path)
.join(".git")
.join(format!("gb-{}", project.id))
.join(format!("gb-{}", repository.project.id))
.join(".lock"),
)?;
let repo = git2::Repository::open(project.path.clone())?;
let repo = git2::Repository::open(repository.project.path.clone())?;
repo.add_ignore_rule("*.lock")?;
let shared_sender = Arc::new(sender.clone());
let shared_deltas_store = Arc::new(deltas_storage.clone());
let shared_deltas_store = Arc::new(repository.deltas_storage.clone());
let shared_lock_file = Arc::new(tokio::sync::Mutex::new(lock_file));
self.session_watcher.watch(
sender,
project.clone(),
shared_lock_file.clone(),
deltas_storage,
sessions_storage,
)?;
self.session_watcher
.watch(sender, shared_lock_file.clone(), repository)?;
let (fstx, mut fsevents) = tokio::sync::mpsc::channel::<files::Event>(32);
@ -94,7 +87,7 @@ impl Watcher {
}
}
});
self.files_watcher.watch(fstx, project.clone())?;
self.files_watcher.watch(fstx, repository.project.clone())?;
Ok(())
}

View File

@ -1,4 +1,4 @@
use crate::{deltas, events, projects, search, sessions, users};
use crate::{events, projects, repositories, search, sessions, users};
use anyhow::{Context, Result};
use std::{sync::Arc, time::SystemTime};
use tokio::time::{sleep, Duration};
@ -28,16 +28,14 @@ impl SessionWatcher {
async fn run(
&mut self,
project_id: &str,
sender: tokio::sync::mpsc::Sender<events::Event>,
fslock: Arc<tokio::sync::Mutex<fslock::LockFile>>,
deltas_storage: &deltas::Store,
sessions_storage: &sessions::Store,
repository: &repositories::Repository,
) -> Result<()> {
match self
.projects_storage
.get_project(&project_id)
.with_context(|| format!("{}: failed to get project", project_id))?
.get_project(&repository.project.id)
.with_context(|| format!("{}: failed to get project", repository.project.id))?
{
Some(project) => {
let user = self
@ -45,7 +43,7 @@ impl SessionWatcher {
.get()
.with_context(|| format!("{}: failed to get user", project.id))?;
match session_to_commit(&project, &sessions_storage)
match session_to_commit(&repository)
.with_context(|| "failed to check for session to comit")?
{
Some(mut session) => {
@ -54,7 +52,8 @@ impl SessionWatcher {
fslock.lock().unwrap();
log::debug!("{}: locked", project.id);
session = sessions_storage
session = repository
.sessions_storage
.flush(&session, user)
.with_context(|| format!("failed to flush session {}", session.id))?;
@ -63,7 +62,7 @@ impl SessionWatcher {
log::debug!("{}: unlocked", project.id);
self.deltas_searcher
.index_session(&project, &session, &deltas_storage, &sessions_storage)
.index_session(&repository, &session)
.with_context(|| format!("failed to index session {}", session.id))?;
sender
@ -85,36 +84,32 @@ impl SessionWatcher {
pub fn watch(
&self,
sender: tokio::sync::mpsc::Sender<events::Event>,
project: projects::Project,
fslock: Arc<tokio::sync::Mutex<fslock::LockFile>>,
deltas_storage: &deltas::Store,
sessions_storage: &sessions::Store,
repository: &repositories::Repository,
) -> Result<()> {
log::info!("{}: watching sessions in {}", project.id, project.path);
log::info!(
"{}: watching sessions in {}",
repository.project.id,
repository.project.path
);
let shared_self = self.clone();
let mut self_copy = shared_self.clone();
let project_id = project.id;
let shared_storage = deltas_storage.clone();
let shared_sessions_storage = sessions_storage.clone();
let shared_repository = repository.clone();
tauri::async_runtime::spawn(async move {
let local_self = &mut self_copy;
let deltas_storage = shared_storage.clone();
let sessions_storage = shared_sessions_storage.clone();
let repository = &shared_repository;
loop {
if let Err(e) = local_self
.run(
&project_id,
sender.clone(),
fslock.clone(),
&deltas_storage,
&sessions_storage,
)
.run(sender.clone(), fslock.clone(), &repository)
.await
{
log::error!("{}: error while running git watcher: {:#}", project_id, e);
log::error!(
"{}: error while running git watcher: {:#}",
repository.project.id,
e
);
}
sleep(Duration::from_secs(10)).await;
@ -128,13 +123,11 @@ impl SessionWatcher {
// make sure that the .git/gb/session directory exists (a session is in progress)
// and that there has been no activity in the last 5 minutes (the session appears to be over)
// and the start was at most an hour ago
fn session_to_commit(
project: &projects::Project,
sessions_store: &sessions::Store,
) -> Result<Option<sessions::Session>> {
match sessions_store
fn session_to_commit(repository: &repositories::Repository) -> Result<Option<sessions::Session>> {
match repository
.sessions_storage
.get_current()
.with_context(|| format!("{}: failed to get current session", project.id))?
.with_context(|| format!("{}: failed to get current session", repository.project.id))?
{
None => Ok(None),
Some(current_session) => {
@ -149,8 +142,8 @@ fn session_to_commit(
if (elapsed_last > FIVE_MINUTES) || (elapsed_start > ONE_HOUR) {
log::info!(
"{}: ready to commit {} ({} seconds elapsed, {} seconds since start)",
project.id,
project.path,
repository.project.id,
repository.project.path,
elapsed_last / 1000,
elapsed_start / 1000
);
@ -158,8 +151,8 @@ fn session_to_commit(
} else {
log::debug!(
"{}: not ready to commit {} yet. ({} seconds elapsed, {} seconds since start)",
project.id,
project.path,
repository.project.id,
repository.project.path,
elapsed_last / 1000,
elapsed_start / 1000
);