From ebe41e8a754b8c293974b0d5472f896d15a15be4 Mon Sep 17 00:00:00 2001 From: extrawurst Date: Thu, 31 Aug 2023 12:09:13 +0200 Subject: [PATCH] parallelize log search * will consume all cores now and got faster in all my benchmarks * setting progress via asyncjob now makes sure to only set it if it has changed and return whether that is the case to simplify sending progress notifications only in case progress actually changed --- CHANGELOG.md | 3 ++ Cargo.lock | 11 +++++ Makefile | 1 + asyncgit/Cargo.toml | 1 + asyncgit/src/asyncjob/mod.rs | 21 ++++++--- asyncgit/src/filter_commits.rs | 83 ++++++++++++++++++++++------------ 6 files changed, 86 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2949ab0e..31d29240 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * fix commit log not updating after branch switch ([#1862](https://github.com/extrawurst/gitui/issues/1862)) * fix stashlist not updating after pop/drop ([#1864](https://github.com/extrawurst/gitui/issues/1864)) +### Changed +* log search consumes all cores now and got even faster + ## [0.24.1] - 2023-08-30 ### Fixes diff --git a/Cargo.lock b/Cargo.lock index 1b53fa0d..4f7cff19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,7 @@ dependencies = [ "log", "openssl-sys", "pretty_assertions", + "rayon", "rayon-core", "scopetime", "serde", @@ -1258,6 +1259,16 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + [[package]] name = "rayon-core" version = "1.11.0" diff --git a/Makefile b/Makefile index f93c5df1..c25c5e6e 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ .PHONY: debug build-release release-linux-musl test clippy clippy-pedantic install install-debug ARGS=-l +# ARGS=-l -d ~/code/extern/kubernetes # ARGS=-l -d ~/code/extern/linux # ARGS=-l -d ~/code/git-bare-test.git -w ~/code/git-bare-test diff --git a/asyncgit/Cargo.toml b/asyncgit/Cargo.toml index 99165be7..1bbf8206 100644 --- a/asyncgit/Cargo.toml +++ b/asyncgit/Cargo.toml @@ -22,6 +22,7 @@ log = "0.4" # git2 = { git="https://github.com/extrawurst/git2-rs.git", rev="fc13dcc", features = ["vendored-openssl"]} # pinning to vendored openssl, using the git2 feature this gets lost with new resolver openssl-sys = { version = '0.9', features = ["vendored"], optional = true } +rayon = "1.7" rayon-core = "1.11" scopetime = { path = "../scopetime", version = "0.1" } serde = { version = "1.0", features = ["derive"] } diff --git a/asyncgit/src/asyncjob/mod.rs b/asyncgit/src/asyncjob/mod.rs index d17d2932..d92be0de 100644 --- a/asyncgit/src/asyncjob/mod.rs +++ b/asyncgit/src/asyncjob/mod.rs @@ -7,12 +7,17 @@ use crossbeam_channel::Sender; use std::sync::{Arc, Mutex, RwLock}; /// Passed to `AsyncJob::run` allowing sending intermediate progress notifications -pub struct RunParams { +pub struct RunParams< + T: Copy + Send, + P: Clone + Send + Sync + PartialEq, +> { sender: Sender, progress: Arc>, } -impl RunParams { +impl + RunParams +{ /// send an intermediate update notification. /// do not confuse this with the return value of `run`. /// `send` should only be used about progress notifications @@ -24,9 +29,13 @@ impl RunParams { } /// set the current progress - pub fn set_progress(&self, p: P) -> Result<()> { - *(self.progress.write()?) = p; - Ok(()) + pub fn set_progress(&self, p: P) -> Result { + Ok(if *self.progress.read()? == p { + false + } else { + *(self.progress.write()?) = p; + true + }) } } @@ -35,7 +44,7 @@ pub trait AsyncJob: Send + Sync + Clone { /// defines what notification type is used to communicate outside type Notification: Copy + Send; /// type of progress - type Progress: Clone + Default + Send + Sync; + type Progress: Clone + Default + Send + Sync + PartialEq; /// can run a synchronous time intensive task. /// the returned notification is used to tell interested parties diff --git a/asyncgit/src/filter_commits.rs b/asyncgit/src/filter_commits.rs index 0c28635d..ca88c73d 100644 --- a/asyncgit/src/filter_commits.rs +++ b/asyncgit/src/filter_commits.rs @@ -1,3 +1,8 @@ +use rayon::{ + prelude::ParallelIterator, + slice::{ParallelSlice, ParallelSliceMut}, +}; + use crate::{ asyncjob::{AsyncJob, RunParams}, error::Result, @@ -5,7 +10,7 @@ use crate::{ AsyncGitNotification, ProgressPercent, }; use std::{ - sync::{Arc, Mutex}, + sync::{atomic::AtomicUsize, Arc, Mutex}, time::{Duration, Instant}, }; @@ -69,45 +74,63 @@ impl AsyncCommitFilterJob { commits: Vec, params: &RunParams, ) -> JobState { - let response = sync::repo(repo_path) - .map(|repo| self.filter_commits(&repo, commits, params)) - .map(|(start, result)| CommitFilterResult { - result, - duration: start.elapsed(), - }); + let (start, result) = + self.filter_commits(repo_path, commits, params); - JobState::Response(response) + //TODO: still need this to be a result? + JobState::Response(Ok(CommitFilterResult { + result, + duration: start.elapsed(), + })) } fn filter_commits( &self, - repo: &git2::Repository, + repo_path: &RepoPath, commits: Vec, params: &RunParams, ) -> (Instant, Vec) { let total_amount = commits.len(); let start = Instant::now(); - let mut progress = ProgressPercent::new(0, total_amount); - - let result = commits + let idx = AtomicUsize::new(0); + let mut result = commits .into_iter() .enumerate() - .filter_map(|(idx, c)| { - let new_progress = - ProgressPercent::new(idx, total_amount); + .collect::>() + .par_chunks(1000) + .filter_map(|c| { + //TODO: error log repo open errors + sync::repo(repo_path).ok().map(|repo| { + c.iter() + .filter_map(|(e, c)| { + let idx = idx.fetch_add( + 1, + std::sync::atomic::Ordering::Relaxed, + ); - if new_progress != progress { - Self::update_progress(params, new_progress); - progress = new_progress; - } + Self::update_progress( + params, + ProgressPercent::new( + idx, + total_amount, + ), + ); - (*self.filter)(repo, &c) - .ok() - .and_then(|res| res.then_some(c)) + (*self.filter)(&repo, c).ok().and_then( + |res| res.then_some((*e, *c)), + ) + }) + .collect::>() + }) }) + .flatten() .collect::>(); + result.par_sort_by(|a, b| a.0.cmp(&b.0)); + + let result = result.into_iter().map(|c| c.1).collect(); + (start, result) } @@ -115,12 +138,16 @@ impl AsyncCommitFilterJob { params: &RunParams, new_progress: ProgressPercent, ) { - if let Err(e) = params.set_progress(new_progress) { - log::error!("progress error: {e}"); - } else if let Err(e) = - params.send(AsyncGitNotification::CommitFilter) - { - log::error!("send error: {e}"); + match params.set_progress(new_progress) { + Err(e) => log::error!("progress error: {e}"), + Ok(result) if result => { + if let Err(e) = + params.send(AsyncGitNotification::CommitFilter) + { + log::error!("send error: {e}"); + } + } + _ => (), } } }