use fslock for syncronization

This commit is contained in:
Nikita Galaiko 2023-03-13 12:06:29 +01:00
parent 436737036b
commit 3a95bc792d
7 changed files with 51 additions and 9 deletions

11
src-tauri/Cargo.lock generated
View File

@ -871,6 +871,16 @@ dependencies = [
"libc",
]
[[package]]
name = "fslock"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "futf"
version = "0.1.5"
@ -1120,6 +1130,7 @@ version = "0.0.0"
dependencies = [
"anyhow",
"filetime",
"fslock",
"git2",
"log",
"md5",

View File

@ -36,6 +36,7 @@ urlencoding = "2.1.2"
thiserror = "1.0.38"
tantivy = "0.19.2"
similar = "2.2.1"
fslock = "0.2.1"
[features]
# by default Tauri runs in production mode

View File

@ -60,6 +60,7 @@ pub fn write(
delta_path.to_str().unwrap()
);
let raw_deltas = serde_json::to_string(&deltas)?;
log::debug!("{}: raw deltas: {}", project.id, raw_deltas);
std::fs::write(delta_path.clone(), raw_deltas).with_context(|| {
format!(
"failed to write file deltas to {}",

View File

@ -734,6 +734,10 @@ fn debug_test_consistency(app_state: &App, project_id: &str) -> Result<()> {
acc
});
if sessions.is_empty() {
return Ok(());
}
let first_session = &sessions[sessions.len() - 1];
let files = repo.files(&first_session.id, None)?;

View File

@ -1,4 +1,4 @@
use crate::deltas::{read, write, Delta, TextDocument};
use crate::deltas::{self, read, write, Delta, TextDocument};
use crate::projects;
use crate::{events, sessions};
use anyhow::{Context, Result};
@ -42,7 +42,7 @@ impl DeltaWatchers {
&mut self,
sender: mpsc::Sender<events::Event>,
project: projects::Project,
mutex: Arc<Mutex<()>>,
mutex: Arc<Mutex<fslock::LockFile>>,
) -> Result<()> {
log::info!("Watching deltas for {}", project.path);
let project_path = Path::new(&project.path);
@ -85,8 +85,9 @@ impl DeltaWatchers {
continue;
}
let mut fslock = mutex.lock().unwrap();
log::debug!("{}: locking", project.id);
let _lock = mutex.lock().unwrap();
fslock.lock().unwrap();
log::debug!("{}: locked", project.id);
match register_file_change(&project, &repo, &relative_file_path) {
@ -121,6 +122,10 @@ impl DeltaWatchers {
e
),
}
log::debug!("{}: unlocking", project.id);
fslock.unlock().unwrap();
log::debug!("{}: unlocked", project.id);
}
}
Err(e) => log::error!("{}: notify event error: {:#}", project.id, e),
@ -192,7 +197,12 @@ pub(crate) fn register_file_change(
(None, None) => TextDocument::new(None, vec![])?,
};
if !text_doc.update(&file_contents)? {
if !text_doc.update(&current_file_contents)? {
log::debug!(
"{}: \"{}\" no new deltas, ignoring",
project.id,
relative_file_path.display()
);
return Ok(None);
}

View File

@ -8,7 +8,10 @@ mod test;
use crate::{events, projects, search, users};
use anyhow::Result;
use std::sync::{mpsc, Arc, Mutex};
use std::{
path::Path,
sync::{mpsc, Arc, Mutex},
};
pub struct Watcher {
session_watcher: session::SessionWatcher,
@ -37,7 +40,14 @@ impl Watcher {
) -> Result<()> {
// shared mutex to prevent concurrent write to gitbutler interal state by multiple watchers
// at the same time
let lock_file = Arc::new(Mutex::new(()));
let lock_file = Arc::new(Mutex::new(fslock::LockFile::open(
&Path::new(&project.path)
.join(".git")
.join(format!("gb-{}", project.id))
.join(".lock"),
)?));
let repo = git2::Repository::open(project.path.clone())?;
repo.add_ignore_rule("*.lock")?;
self.delta_watcher
.watch(sender.clone(), project.clone(), lock_file.clone())?;

View File

@ -34,7 +34,7 @@ impl<'a> SessionWatcher {
&mut self,
project_id: &str,
sender: mpsc::Sender<events::Event>,
mutex: Arc<Mutex<()>>,
mutex: Arc<Mutex<fslock::LockFile>>,
) -> Result<()> {
match self
.projects_storage
@ -54,14 +54,19 @@ impl<'a> SessionWatcher {
.with_context(|| "failed to check for session to comit")?
{
Some(mut session) => {
let mut fslock = mutex.lock().unwrap();
log::debug!("{}: locking", project.id);
let _lock = mutex.lock().unwrap();
fslock.lock().unwrap();
log::debug!("{}: locked", project.id);
session
.flush(&repo, &user, &project)
.with_context(|| format!("failed to flush session {}", session.id))?;
log::debug!("{}: unlocking", project.id);
fslock.unlock().unwrap();
log::debug!("{}: unlocked", project.id);
self.deltas_searcher
.index_session(&repo, &project, &session)
.with_context(|| format!("failed to index session {}", session.id))?;
@ -85,7 +90,7 @@ impl<'a> SessionWatcher {
&self,
sender: mpsc::Sender<events::Event>,
project: projects::Project,
mutex: Arc<Mutex<()>>,
mutex: Arc<Mutex<fslock::LockFile>>,
) -> Result<()> {
log::info!("{}: watching sessions in {}", project.id, project.path);