diff --git a/Cargo.lock b/Cargo.lock index f32c4d576..c93adad27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2097,6 +2097,7 @@ dependencies = [ "backtrace", "chrono", "console-subscriber", + "crossbeam-channel", "futures", "git2", "gitbutler-core", diff --git a/crates/gitbutler-tauri/Cargo.toml b/crates/gitbutler-tauri/Cargo.toml index 3b588f9c8..7bf4039b3 100644 --- a/crates/gitbutler-tauri/Cargo.toml +++ b/crates/gitbutler-tauri/Cargo.toml @@ -29,6 +29,7 @@ backoff = "0.4.0" backtrace = { version = "0.3.71", optional = true } chrono = { version = "0.4.37", features = ["serde"] } console-subscriber = "0.2.0" +crossbeam-channel = "0.5.12" futures = "0.3" git2.workspace = true governor = "0.6.3" diff --git a/crates/gitbutler-tauri/src/watcher/handler/calculate_deltas.rs b/crates/gitbutler-tauri/src/watcher/handler/calculate_deltas.rs index 2293b08df..0018677ae 100644 --- a/crates/gitbutler-tauri/src/watcher/handler/calculate_deltas.rs +++ b/crates/gitbutler-tauri/src/watcher/handler/calculate_deltas.rs @@ -2,54 +2,55 @@ use anyhow::{Context, Result}; use gitbutler_core::{ deltas, gb_repository, project_repository, projects::ProjectId, reader, sessions, }; +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use tracing::instrument; impl super::Handler { #[instrument(skip(self, paths, project_id))] pub fn calculate_deltas(&self, paths: Vec, project_id: ProjectId) -> Result<()> { - let project = self - .projects - .get(&project_id) - .context("failed to get project")?; - let project_repository = project_repository::Repository::open(&project) - .with_context(|| "failed to open project repository for project")?; - let user = self.users.get_user().context("failed to get user")?; - let gb_repository = gb_repository::Repository::open( - &self.local_data_dir, - &project_repository, - user.as_ref(), - ) - .context("failed to open gb repository")?; + let make_processor = || -> Result<_> { + let project = self + .projects + .get(&project_id) + .context("failed to get project")?; + let project_repository = project_repository::Repository::open(&project) + .with_context(|| "failed to open project repository for project")?; + let user = self.users.get_user().context("failed to get user")?; + let gb_repository = gb_repository::Repository::open( + &self.local_data_dir, + &project_repository, + user.as_ref(), + ) + .context("failed to open gb repository")?; - // If current session's branch is not the same as the project's head, flush it first. - if let Some(session) = gb_repository - .get_current_session() - .context("failed to get current session")? - { - let project_head = project_repository - .get_head() - .context("failed to get head")?; - if session.meta.branch != project_head.name().map(|n| n.to_string()) { - gb_repository - .flush_session(&project_repository, &session, user.as_ref()) - .context(format!("failed to flush session {}", session.id))?; + // If current session's branch is not the same as the project's head, flush it first. + if let Some(session) = gb_repository + .get_current_session() + .context("failed to get current session")? + { + let project_head = project_repository + .get_head() + .context("failed to get head")?; + if session.meta.branch != project_head.name().map(|n| n.to_string()) { + gb_repository + .flush_session(&project_repository, &session, user.as_ref()) + .context(format!("failed to flush session {}", session.id))?; + } } - } + let current_session = gb_repository + .get_or_create_current_session() + .context("failed to get or create current session")?; + let session = current_session.clone(); - let current_session = gb_repository - .get_or_create_current_session() - .context("failed to get or create current session")?; - let current_session_reader = sessions::Reader::open(&gb_repository, ¤t_session) - .context("failed to get session reader")?; - let deltas_reader = deltas::Reader::new(¤t_session_reader); - let writer = deltas::Writer::new(&gb_repository).context("failed to open deltas writer")?; - - let num_paths = paths.len(); - let mut num_no_delta = 0; - std::thread::scope(|_scope| -> Result<()> { - for path in paths { - let path = path.as_path(); + let process = move |path: &Path| -> Result { + let _span = tracing::span!(tracing::Level::TRACE, "processing", ?path).entered(); + let current_session_reader = + sessions::Reader::open(&gb_repository, ¤t_session) + .context("failed to get session reader")?; + let deltas_reader = deltas::Reader::new(¤t_session_reader); + let writer = + deltas::Writer::new(&gb_repository).context("failed to open deltas writer")?; let current_wd_file_content = match Self::file_content(&project_repository, path) { Ok(content) => Some(content), Err(reader::Error::NotFound) => None, @@ -72,8 +73,7 @@ impl super::Handler { .context("failed to calculate new deltas")?; let Some(new_delta) = new_delta else { - num_no_delta += 1; - continue; + return Ok(false); }; let deltas = text_doc.get_deltas(); @@ -102,10 +102,65 @@ impl super::Handler { std::slice::from_ref(&new_delta), path, ))?; - } - Ok(()) + Ok(true) + }; + Ok((process, session)) + }; + let num_paths = paths.len(); + let num_no_delta = std::thread::scope(|scope| -> Result { + let num_threads = std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::new(1).unwrap()) + .get() + .min(paths.len()); + let mut num_no_delta = 0; + let current_session = if num_threads < 2 { + let (process, session) = make_processor()?; + for path in paths { + if !process(path.as_path())? { + num_no_delta += 1; + } + } + session + } else { + let (threads, tx) = { + let (tx, rx) = crossbeam_channel::bounded::(num_threads); + let threads: Vec<_> = (0..num_threads) + .map(|id| { + std::thread::Builder::new() + .name(format!("gitbutler_delta_thread_{id}")) + .stack_size(512 * 1024) + .spawn_scoped(scope, { + let rx = rx.clone(); + || -> Result { + let mut num_no_delta = 0; + let (process, _) = make_processor()?; + for path in rx { + if !process(path.as_path())? { + num_no_delta += 1; + } + } + Ok(num_no_delta) + } + }) + .expect("worker thread can be created") + }) + .collect(); + (threads, tx) + }; + for path in paths { + tx.send(path).expect("many receivers"); + } + drop(tx); + + for thread in threads { + num_no_delta += thread.join().unwrap()?; + } + let (_, session) = make_processor()?; + session + }; + self.index_session(project_id, ¤t_session)?; + Ok(num_no_delta) })?; - self.index_session(project_id, ¤t_session)?; tracing::debug!(%project_id, paths_without_deltas = num_no_delta, paths_with_delta = num_paths - num_no_delta); Ok(()) }