parallelize log search

* will consume all cores now and got faster in all my benchmarks
* setting progress via asyncjob now makes sure to only set it if it has changed and return whether that is the case to simplify sending progress notifications only in case progress actually changed
This commit is contained in:
extrawurst 2023-08-31 12:09:13 +02:00
parent 5be397b335
commit ebe41e8a75
6 changed files with 86 additions and 34 deletions

View File

@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* fix commit log not updating after branch switch ([#1862](https://github.com/extrawurst/gitui/issues/1862))
* fix stashlist not updating after pop/drop ([#1864](https://github.com/extrawurst/gitui/issues/1864))
### Changed
* log search consumes all cores now and got even faster
## [0.24.1] - 2023-08-30
### Fixes

11
Cargo.lock generated
View File

@ -61,6 +61,7 @@ dependencies = [
"log",
"openssl-sys",
"pretty_assertions",
"rayon",
"rayon-core",
"scopetime",
"serde",
@ -1258,6 +1259,16 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "rayon"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.11.0"

View File

@ -2,6 +2,7 @@
.PHONY: debug build-release release-linux-musl test clippy clippy-pedantic install install-debug
ARGS=-l
# ARGS=-l -d ~/code/extern/kubernetes
# ARGS=-l -d ~/code/extern/linux
# ARGS=-l -d ~/code/git-bare-test.git -w ~/code/git-bare-test

View File

@ -22,6 +22,7 @@ log = "0.4"
# git2 = { git="https://github.com/extrawurst/git2-rs.git", rev="fc13dcc", features = ["vendored-openssl"]}
# pinning to vendored openssl, using the git2 feature this gets lost with new resolver
openssl-sys = { version = '0.9', features = ["vendored"], optional = true }
rayon = "1.7"
rayon-core = "1.11"
scopetime = { path = "../scopetime", version = "0.1" }
serde = { version = "1.0", features = ["derive"] }

View File

@ -7,12 +7,17 @@ use crossbeam_channel::Sender;
use std::sync::{Arc, Mutex, RwLock};
/// Passed to `AsyncJob::run` allowing sending intermediate progress notifications
pub struct RunParams<T: Copy + Send, P: Clone + Send + Sync> {
pub struct RunParams<
T: Copy + Send,
P: Clone + Send + Sync + PartialEq,
> {
sender: Sender<T>,
progress: Arc<RwLock<P>>,
}
impl<T: Copy + Send, P: Clone + Send + Sync> RunParams<T, P> {
impl<T: Copy + Send, P: Clone + Send + Sync + PartialEq>
RunParams<T, P>
{
/// send an intermediate update notification.
/// do not confuse this with the return value of `run`.
/// `send` should only be used about progress notifications
@ -24,9 +29,13 @@ impl<T: Copy + Send, P: Clone + Send + Sync> RunParams<T, P> {
}
/// set the current progress
pub fn set_progress(&self, p: P) -> Result<()> {
*(self.progress.write()?) = p;
Ok(())
pub fn set_progress(&self, p: P) -> Result<bool> {
Ok(if *self.progress.read()? == p {
false
} else {
*(self.progress.write()?) = p;
true
})
}
}
@ -35,7 +44,7 @@ pub trait AsyncJob: Send + Sync + Clone {
/// defines what notification type is used to communicate outside
type Notification: Copy + Send;
/// type of progress
type Progress: Clone + Default + Send + Sync;
type Progress: Clone + Default + Send + Sync + PartialEq;
/// can run a synchronous time intensive task.
/// the returned notification is used to tell interested parties

View File

@ -1,3 +1,8 @@
use rayon::{
prelude::ParallelIterator,
slice::{ParallelSlice, ParallelSliceMut},
};
use crate::{
asyncjob::{AsyncJob, RunParams},
error::Result,
@ -5,7 +10,7 @@ use crate::{
AsyncGitNotification, ProgressPercent,
};
use std::{
sync::{Arc, Mutex},
sync::{atomic::AtomicUsize, Arc, Mutex},
time::{Duration, Instant},
};
@ -69,45 +74,63 @@ impl AsyncCommitFilterJob {
commits: Vec<CommitId>,
params: &RunParams<AsyncGitNotification, ProgressPercent>,
) -> JobState {
let response = sync::repo(repo_path)
.map(|repo| self.filter_commits(&repo, commits, params))
.map(|(start, result)| CommitFilterResult {
result,
duration: start.elapsed(),
});
let (start, result) =
self.filter_commits(repo_path, commits, params);
JobState::Response(response)
//TODO: still need this to be a result?
JobState::Response(Ok(CommitFilterResult {
result,
duration: start.elapsed(),
}))
}
fn filter_commits(
&self,
repo: &git2::Repository,
repo_path: &RepoPath,
commits: Vec<CommitId>,
params: &RunParams<AsyncGitNotification, ProgressPercent>,
) -> (Instant, Vec<CommitId>) {
let total_amount = commits.len();
let start = Instant::now();
let mut progress = ProgressPercent::new(0, total_amount);
let result = commits
let idx = AtomicUsize::new(0);
let mut result = commits
.into_iter()
.enumerate()
.filter_map(|(idx, c)| {
let new_progress =
ProgressPercent::new(idx, total_amount);
.collect::<Vec<(usize, CommitId)>>()
.par_chunks(1000)
.filter_map(|c| {
//TODO: error log repo open errors
sync::repo(repo_path).ok().map(|repo| {
c.iter()
.filter_map(|(e, c)| {
let idx = idx.fetch_add(
1,
std::sync::atomic::Ordering::Relaxed,
);
if new_progress != progress {
Self::update_progress(params, new_progress);
progress = new_progress;
}
Self::update_progress(
params,
ProgressPercent::new(
idx,
total_amount,
),
);
(*self.filter)(repo, &c)
.ok()
.and_then(|res| res.then_some(c))
(*self.filter)(&repo, c).ok().and_then(
|res| res.then_some((*e, *c)),
)
})
.collect::<Vec<_>>()
})
})
.flatten()
.collect::<Vec<_>>();
result.par_sort_by(|a, b| a.0.cmp(&b.0));
let result = result.into_iter().map(|c| c.1).collect();
(start, result)
}
@ -115,12 +138,16 @@ impl AsyncCommitFilterJob {
params: &RunParams<AsyncGitNotification, ProgressPercent>,
new_progress: ProgressPercent,
) {
if let Err(e) = params.set_progress(new_progress) {
log::error!("progress error: {e}");
} else if let Err(e) =
params.send(AsyncGitNotification::CommitFilter)
{
log::error!("send error: {e}");
match params.set_progress(new_progress) {
Err(e) => log::error!("progress error: {e}"),
Ok(result) if result => {
if let Err(e) =
params.send(AsyncGitNotification::CommitFilter)
{
log::error!("send error: {e}");
}
}
_ => (),
}
}
}