working_copy: traverse filesystem in parallel

This improves `jj status` time by a factor of ~2x on my machine (M1 Macbook Pro 2021 16-inch, uses an SSD):

```sh
$ hyperfine --parameter-list hash before,after --parameter-list repo nixpkgs,gecko-dev --setup 'git checkout {hash} && cargo build --profile release-with-debug' --warmup 3 './target/release-with-debug/jj -R ../{repo} st'
Benchmark 1: ./target/release-with-debug/jj -R ../nixpkgs st (hash = before)
  Time (mean ± σ):      1.640 s ±  0.019 s    [User: 0.580 s, System: 1.044 s]
  Range (min … max):    1.621 s …  1.673 s    10 runs

Benchmark 2: ./target/release-with-debug/jj -R ../nixpkgs st (hash = after)
  Time (mean ± σ):     760.0 ms ±   5.4 ms    [User: 812.9 ms, System: 2214.6 ms]
  Range (min … max):   751.4 ms … 768.7 ms    10 runs

Benchmark 3: ./target/release-with-debug/jj -R ../gecko-dev st (hash = before)
  Time (mean ± σ):     11.403 s ±  0.648 s    [User: 4.546 s, System: 5.932 s]
  Range (min … max):   10.553 s … 12.718 s    10 runs

Benchmark 4: ./target/release-with-debug/jj -R ../gecko-dev st (hash = after)
  Time (mean ± σ):      5.974 s ±  0.028 s    [User: 5.387 s, System: 11.959 s]
  Range (min … max):    5.937 s …  6.024 s    10 runs

$ hyperfine --parameter-list repo nixpkgs,gecko-dev --warmup 3 'git -C ../{repo} status'
Benchmark 1: git -C ../nixpkgs status
  Time (mean ± σ):     865.4 ms ±   8.4 ms    [User: 119.4 ms, System: 1401.2 ms]
  Range (min … max):   852.8 ms … 879.1 ms    10 runs

Benchmark 2: git -C ../gecko-dev status
  Time (mean ± σ):      2.892 s ±  0.029 s    [User: 0.458 s, System: 14.244 s]
  Range (min … max):    2.837 s …  2.934 s    10 runs
```

Conclusions:

- ~2x improvement from previous `jj status` time.
- Slightly faster than Git on nixpkgs.
- Still 2x slower than Git on gecko-dev, not sure why.

For reference, Git's default number of threads is defined in the `online_cpus` function: ee48e70a82/thread-utils.c (L21-L66). We are using whatever the Rayon default is.
This commit is contained in:
Waleed Khan 2023-08-03 08:27:56 -07:00 committed by Martin von Zweigbergk
parent 326be7c91e
commit 84f807d222
4 changed files with 140 additions and 116 deletions

11
Cargo.lock generated
View File

@ -1063,6 +1063,7 @@ dependencies = [
"prost", "prost",
"rand", "rand",
"rand_chacha", "rand_chacha",
"rayon",
"regex", "regex",
"rustix 0.38.6", "rustix 0.38.6",
"serde_json", "serde_json",
@ -1663,21 +1664,19 @@ dependencies = [
[[package]] [[package]]
name = "rayon" name = "rayon"
version = "1.5.3" version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
dependencies = [ dependencies = [
"autocfg",
"crossbeam-deque",
"either", "either",
"rayon-core", "rayon-core",
] ]
[[package]] [[package]]
name = "rayon-core" name = "rayon-core"
version = "1.9.3" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"crossbeam-deque", "crossbeam-deque",

View File

@ -41,6 +41,7 @@ pest_derive = "2.7.2"
prost = "0.11.9" prost = "0.11.9"
rand = "0.8.5" rand = "0.8.5"
rand_chacha = "0.3.1" rand_chacha = "0.3.1"
rayon = "1.7.0"
regex = "1.9.1" regex = "1.9.1"
serde_json = "1.0.104" serde_json = "1.0.104"
smallvec = { version = "1.11.0", features = [ smallvec = { version = "1.11.0", features = [

View File

@ -67,7 +67,7 @@ pub enum VisitFiles {
Set(HashSet<RepoPathComponent>), Set(HashSet<RepoPathComponent>),
} }
pub trait Matcher { pub trait Matcher: Sync {
fn matches(&self, file: &RepoPath) -> bool; fn matches(&self, file: &RepoPath) -> bool;
fn visit(&self, dir: &RepoPath) -> Visit; fn visit(&self, dir: &RepoPath) -> Visit;
} }

View File

@ -33,6 +33,8 @@ use std::time::UNIX_EPOCH;
use itertools::Itertools; use itertools::Itertools;
use once_cell::unsync::OnceCell; use once_cell::unsync::OnceCell;
use prost::Message; use prost::Message;
use rayon::iter::IntoParallelIterator;
use rayon::prelude::ParallelIterator;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use thiserror::Error; use thiserror::Error;
use tracing::{instrument, trace_span}; use tracing::{instrument, trace_span};
@ -649,30 +651,26 @@ impl TreeState {
}); });
let matcher = IntersectionMatcher::new(sparse_matcher.as_ref(), fsmonitor_matcher); let matcher = IntersectionMatcher::new(sparse_matcher.as_ref(), fsmonitor_matcher);
let mut work = vec![WorkItem { let work_item = WorkItem {
dir: RepoPath::root(), dir: RepoPath::root(),
disk_dir: self.working_copy_path.clone(), disk_dir: self.working_copy_path.clone(),
git_ignore: base_ignores, git_ignore: base_ignores,
}]; };
trace_span!("traverse filesystem").in_scope(|| -> Result<(), SnapshotError> { trace_span!("traverse filesystem").in_scope(|| -> Result<(), SnapshotError> {
let (tree_entries_tx, tree_entries_rx) = channel(); let (tree_entries_tx, tree_entries_rx) = channel();
let (file_states_tx, file_states_rx) = channel(); let (file_states_tx, file_states_rx) = channel();
let (deleted_files_tx, deleted_files_rx) = channel(); let (deleted_files_tx, deleted_files_rx) = channel();
while let Some(work_item) = work.pop() {
work.extend(self.visit_directory(
&matcher,
&current_tree,
tree_entries_tx.clone(),
file_states_tx.clone(),
deleted_files_tx.clone(),
work_item,
progress,
)?);
}
drop(tree_entries_tx); self.visit_directory(
drop(file_states_tx); &matcher,
drop(deleted_files_tx); &current_tree,
tree_entries_tx,
file_states_tx,
deleted_files_tx,
work_item,
progress,
)?;
while let Ok((path, tree_value)) = tree_entries_rx.recv() { while let Ok((path, tree_value)) = tree_entries_rx.recv() {
tree_builder.set(path, tree_value); tree_builder.set(path, tree_value);
} }
@ -706,7 +704,7 @@ impl TreeState {
deleted_files_tx: Sender<RepoPath>, deleted_files_tx: Sender<RepoPath>,
work_item: WorkItem, work_item: WorkItem,
progress: Option<&SnapshotProgress>, progress: Option<&SnapshotProgress>,
) -> Result<Vec<WorkItem>, SnapshotError> { ) -> Result<(), SnapshotError> {
let WorkItem { let WorkItem {
dir, dir,
disk_dir, disk_dir,
@ -714,115 +712,141 @@ impl TreeState {
} = work_item; } = work_item;
if matcher.visit(&dir).is_nothing() { if matcher.visit(&dir).is_nothing() {
return Ok(Default::default()); return Ok(());
} }
let git_ignore = let git_ignore =
git_ignore.chain_with_file(&dir.to_internal_dir_string(), disk_dir.join(".gitignore")); git_ignore.chain_with_file(&dir.to_internal_dir_string(), disk_dir.join(".gitignore"));
let mut work = Vec::new(); let dir_entries = disk_dir
for maybe_entry in disk_dir.read_dir().unwrap() { .read_dir()
let entry = maybe_entry.unwrap(); .unwrap()
let file_type = entry.file_type().unwrap(); .map(|maybe_entry| maybe_entry.unwrap())
let file_name = entry.file_name(); .collect_vec();
let name = file_name dir_entries.into_par_iter().try_for_each_with(
.to_str() (
.ok_or_else(|| SnapshotError::InvalidUtf8Path { tree_entries_tx.clone(),
path: file_name.clone(), file_states_tx.clone(),
})?; deleted_files_tx.clone(),
if name == ".jj" || name == ".git" { ),
continue; |(tree_entries_tx, file_states_tx, deleted_files_tx),
} entry|
let path = dir.join(&RepoPathComponent::from(name)); -> Result<(), SnapshotError> {
if let Some(file_state) = self.file_states.get(&path) { let file_type = entry.file_type().unwrap();
if file_state.file_type == FileType::GitSubmodule { let file_name = entry.file_name();
continue; let name = file_name
} .to_str()
} .ok_or_else(|| SnapshotError::InvalidUtf8Path {
path: file_name.clone(),
})?;
if file_type.is_dir() { if name == ".jj" || name == ".git" {
if git_ignore.matches_all_files_in(&path.to_internal_dir_string()) { return Ok(());
// If the whole directory is ignored, visit only paths we're already }
// tracking. let path = dir.join(&RepoPathComponent::from(name));
let tracked_paths = self if let Some(file_state) = self.file_states.get(&path) {
.file_states if file_state.file_type == FileType::GitSubmodule {
.range((Bound::Excluded(&path), Bound::Unbounded)) return Ok(());
.take_while(|(sub_path, _)| path.contains(sub_path)) }
.map(|(sub_path, file_state)| (sub_path.clone(), file_state.clone())) }
.collect_vec();
for (tracked_path, current_file_state) in tracked_paths { if file_type.is_dir() {
if !matcher.matches(&tracked_path) { if git_ignore.matches_all_files_in(&path.to_internal_dir_string()) {
continue; // If the whole directory is ignored, visit only paths we're already
} // tracking.
let disk_path = tracked_path.to_fs_path(&self.working_copy_path); let tracked_paths = self
let metadata = match disk_path.metadata() { .file_states
Ok(metadata) => metadata, .range((Bound::Excluded(&path), Bound::Unbounded))
Err(err) if err.kind() == std::io::ErrorKind::NotFound => { .take_while(|(sub_path, _)| path.contains(sub_path))
.map(|(sub_path, file_state)| (sub_path.clone(), file_state.clone()))
.collect_vec();
for (tracked_path, current_file_state) in tracked_paths {
if !matcher.matches(&tracked_path) {
continue; continue;
} }
Err(err) => { let disk_path = tracked_path.to_fs_path(&self.working_copy_path);
return Err(SnapshotError::IoError { let metadata = match disk_path.metadata() {
message: format!("Failed to stat file {}", disk_path.display()), Ok(metadata) => metadata,
err, Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
}); continue;
}
Err(err) => {
return Err(SnapshotError::IoError {
message: format!(
"Failed to stat file {}",
disk_path.display()
),
err,
});
}
};
if let Some(new_file_state) = file_state(&metadata) {
deleted_files_tx.send(tracked_path.clone()).ok();
let update = self.get_updated_tree_value(
&tracked_path,
disk_path,
Some(&current_file_state),
current_tree,
&new_file_state,
)?;
if let Some(tree_value) = update {
tree_entries_tx
.send((tracked_path.clone(), tree_value))
.ok();
}
file_states_tx.send((tracked_path, new_file_state)).ok();
} }
}
} else {
let work_item = WorkItem {
dir: path,
disk_dir: entry.path(),
git_ignore: git_ignore.clone(),
}; };
self.visit_directory(
matcher,
current_tree,
tree_entries_tx.clone(),
file_states_tx.clone(),
deleted_files_tx.clone(),
work_item,
progress,
)?;
}
} else if matcher.matches(&path) {
if let Some(progress) = progress {
progress(&path);
}
let maybe_current_file_state = self.file_states.get(&path);
if maybe_current_file_state.is_none()
&& git_ignore.matches_file(&path.to_internal_file_string())
{
// If it wasn't already tracked and it matches
// the ignored paths, then
// ignore it.
} else {
let metadata = entry.metadata().map_err(|err| SnapshotError::IoError {
message: format!("Failed to stat file {}", entry.path().display()),
err,
})?;
if let Some(new_file_state) = file_state(&metadata) { if let Some(new_file_state) = file_state(&metadata) {
deleted_files_tx.send(tracked_path.clone()).ok(); deleted_files_tx.send(path.clone()).ok();
let update = self.get_updated_tree_value( let update = self.get_updated_tree_value(
&tracked_path, &path,
disk_path, entry.path(),
Some(&current_file_state), maybe_current_file_state,
current_tree, current_tree,
&new_file_state, &new_file_state,
)?; )?;
if let Some(tree_value) = update { if let Some(tree_value) = update {
tree_entries_tx tree_entries_tx.send((path.clone(), tree_value)).ok();
.send((tracked_path.clone(), tree_value))
.ok();
} }
file_states_tx.send((tracked_path, new_file_state)).ok(); file_states_tx.send((path, new_file_state)).ok();
} }
} }
} else {
work.push(WorkItem {
dir: path,
disk_dir: entry.path(),
git_ignore: git_ignore.clone(),
});
} }
} else if matcher.matches(&path) { Ok(())
if let Some(progress) = progress { },
progress(&path); )?;
} Ok(())
let maybe_current_file_state = self.file_states.get(&path);
if maybe_current_file_state.is_none()
&& git_ignore.matches_file(&path.to_internal_file_string())
{
// If it wasn't already tracked and it matches
// the ignored paths, then
// ignore it.
} else {
let metadata = entry.metadata().map_err(|err| SnapshotError::IoError {
message: format!("Failed to stat file {}", entry.path().display()),
err,
})?;
if let Some(new_file_state) = file_state(&metadata) {
deleted_files_tx.send(path.clone()).ok();
let update = self.get_updated_tree_value(
&path,
entry.path(),
maybe_current_file_state,
current_tree,
&new_file_state,
)?;
if let Some(tree_value) = update {
tree_entries_tx.send((path.clone(), tree_value)).ok();
}
file_states_tx.send((path, new_file_state)).ok();
}
}
}
}
Ok(work)
} }
#[instrument(skip_all)] #[instrument(skip_all)]
@ -1587,4 +1611,4 @@ impl Drop for LockedWorkingCopy<'_> {
} }
} }
pub type SnapshotProgress<'a> = dyn Fn(&RepoPath) + 'a; pub type SnapshotProgress<'a> = dyn Fn(&RepoPath) + 'a + Sync;