Back out "treemanifest: moving diff fetching to a background thread"

Summary:
Original commit changeset: b4d12a0de8af

Back out "[hg] pymatcher: extract pure Rust matchers when possible"

This is needed to revert D29971824 (13614c3ed4) which makes lots of mononoke tests time out (T99068634). I'm not
sure where the bug is so I'll leave the fix up to DurhamG

Reviewed By: StanislavGlebik

Differential Revision: D30540881

fbshipit-source-id: 73ed2b7b63d33fbe3a76ee93d86241e37c4b2dd0
This commit is contained in:
Mateusz Kwapich 2021-08-25 09:42:55 -07:00 committed by Facebook GitHub Bot
parent 9fc04082d0
commit 7cbf978d0d
7 changed files with 74 additions and 166 deletions

View File

@ -65,7 +65,7 @@ py_class!(class checkoutplan |py| {
let mut actions = py.allow_threads(move || {
let target = target.read();
let current = current.read();
let mut diff = Diff::new(&current, &target, &matcher)?;
let mut diff = Diff::new(&current, &target, &matcher);
let bar = &ProgressBar::new("Calculating", 0, "depth");
Registry::main().register_progress_bar(bar);
diff.attach_progress_bar(bar);

View File

@ -288,7 +288,7 @@ py_class!(pub class treemanifest |py| {
let other_tree = other.underlying(py);
let results: Vec<_> = py.allow_threads(move || -> Result<_> {
manifest_tree::Diff::new(&this_tree.read(), &other_tree.read(), &matcher)?.collect()
manifest_tree::Diff::new(&this_tree.read(), &other_tree.read(), &matcher).collect()
}).map_pyerr(py)?;
for entry in results {
let path = PyPathBuf::from(entry.path);
@ -312,7 +312,7 @@ py_class!(pub class treemanifest |py| {
None => Box::new(AlwaysMatcher::new()),
Some(pyobj) => Box::new(PythonMatcher::new(py, pyobj)),
};
for entry in manifest_tree::Diff::new(&this_tree, &other_tree, &matcher).map_pyerr(py)? {
for entry in manifest_tree::Diff::new(&this_tree, &other_tree, &matcher) {
let entry = entry.map_pyerr(py)?;
match entry.diff_type {
DiffType::LeftOnly(_) => {

View File

@ -958,7 +958,7 @@ mod test {
let matcher = AlwaysMatcher::new();
let left_tree = make_tree_manifest_from_meta(from.iter().cloned());
let right_tree = make_tree_manifest_from_meta(to.iter().cloned());
let diff = Diff::new(&left_tree, &right_tree, &matcher).unwrap();
let diff = Diff::new(&left_tree, &right_tree, &matcher);
let vfs = VFS::new(working_path.clone())?;
let checkout = Checkout::default_config(vfs);
let plan = checkout

View File

@ -46,9 +46,9 @@ impl Merge {
base: &M,
) -> Result<MergeResult<M>> {
let matcher = AlwaysMatcher::new();
let diff = base.diff(dest, &matcher)?;
let diff = base.diff(dest, &matcher);
let dest_actions = ActionMap::from_diff(diff)?;
let diff = base.diff(src, &matcher)?;
let diff = base.diff(src, &matcher);
let src_actions = ActionMap::from_diff(diff)?;
let dest_files: HashSet<_> = dest_actions.keys().collect();
let src_files = src_actions.keys().collect();

View File

@ -5,14 +5,7 @@
* GNU General Public License version 2.
*/
use std::{
cmp::Ordering,
collections::VecDeque,
sync::mpsc::{channel, Receiver, Sender},
sync::Arc,
thread::JoinHandle,
time::Duration,
};
use std::{cmp::Ordering, collections::VecDeque, mem, sync::Arc};
use anyhow::Result;
@ -43,11 +36,10 @@ enum DiffItem {
impl DiffItem {
fn process(
self,
fetcher: &mut Sender<DiffItem>,
next: &mut VecDeque<DiffItem>,
lstore: &InnerStore,
rstore: &InnerStore,
matcher: &dyn Matcher,
pending: &mut u64,
) -> Result<Vec<DiffEntry>> {
match self {
DiffItem::Single(dir, side) => {
@ -55,11 +47,9 @@ impl DiffItem {
Side::Left => lstore,
Side::Right => rstore,
};
diff_single(dir, fetcher, side, store, matcher, pending)
}
DiffItem::Changed(left, right) => {
diff(left, right, fetcher, lstore, rstore, matcher, pending)
diff_single(dir, next, side, store, matcher)
}
DiffItem::Changed(left, right) => diff(left, right, next, lstore, rstore, matcher),
}
}
@ -90,130 +80,53 @@ impl DiffItem {
/// only fetching tree nodes that have actually changed.
pub struct Diff<'a> {
output: VecDeque<DiffEntry>,
current: VecDeque<DiffItem>,
next: VecDeque<DiffItem>,
lstore: &'a InnerStore,
rstore: &'a InnerStore,
matcher: &'a dyn Matcher,
progress_bar: Option<&'a Arc<ProgressBar>>,
#[allow(dead_code)]
fetch_thread: JoinHandle<()>,
sender: Sender<DiffItem>,
receiver: Receiver<DiffItem>,
pending: u64,
}
impl<'a> Diff<'a> {
pub fn new(
left: &'a TreeManifest,
right: &'a TreeManifest,
matcher: &'a dyn Matcher,
) -> Result<Self> {
pub fn new(left: &'a TreeManifest, right: &'a TreeManifest, matcher: &'a dyn Matcher) -> Self {
let lroot = DirLink::from_root(&left.root).expect("tree root is not a directory");
let rroot = DirLink::from_root(&right.root).expect("tree root is not a directory");
let (send_prefetch, receive_prefetch) = channel();
let (send_done, receive_done) = channel();
let mut pending = 0;
let lstore = left.store.clone();
let rstore = right.store.clone();
let fetch_thread = std::thread::spawn(move || {
prefetch_thread(receive_prefetch, send_done, lstore, rstore)
});
let mut current = VecDeque::new();
// Don't even attempt to perform a diff if these trees are the same.
if lroot.hgid() != rroot.hgid() || lroot.hgid().is_none() {
pending += 1;
send_prefetch.send(DiffItem::Changed(lroot, rroot))?;
current.push_back(DiffItem::Changed(lroot, rroot));
}
Ok(Diff {
Diff {
output: VecDeque::new(),
current,
next: VecDeque::new(),
lstore: &left.store,
rstore: &right.store,
matcher,
progress_bar: None,
fetch_thread,
sender: send_prefetch,
receiver: receive_done,
pending,
})
}
}
pub fn attach_progress_bar(&mut self, bar: &'a Arc<ProgressBar>) {
self.progress_bar = Some(bar);
}
/// Process the next `DiffItem` for this layer (either a pair of modified directories
/// or an added/removed directory), potentially generating new `DiffEntry`s for
/// any changed files contained therein.
/// Prefetch the contents of the directories in the next layer of the traversal.
///
/// If this method reaches the end of the current layer of the breadth-first traversal,
/// it will perform I/O to prefetch the next layer of directories before continuing. As
/// such, this function will occassionally block for an extended period of time.
///
/// Returns `true` if there are more items to process after the current one. Once this
/// method returns `false`, the traversal is complete.
fn process_next_item(&mut self) -> Result<bool> {
if self.pending == 0 {
return Ok(false);
}
/// Given that each tree owns its own store, we need to perform two prefetches
/// to ensure that the keys for each tree are correctly prefetched from the
/// corresponding store.
fn prefetch(&self) -> Result<()> {
let mut lkeys = Vec::new();
let mut rkeys = Vec::new();
let item = self.receiver.recv()?;
self.pending -= 1;
let entries = item.process(
&mut self.sender,
&self.lstore,
&self.rstore,
self.matcher,
&mut self.pending,
)?;
self.output.extend(entries);
if let Some(bar) = self.progress_bar {
// Increase "depth" by one as we descend to next BFS level.
bar.increase_position(1);
}
Ok(self.pending != 0)
}
}
fn prefetch_thread<'a>(
receiver: Receiver<DiffItem>,
sender: Sender<DiffItem>,
lstore: InnerStore,
rstore: InnerStore,
) {
let limit = 100000;
let timeout = Duration::from_millis(1);
let mut received = Vec::with_capacity(limit);
'outer: loop {
// Wait for a prefetch request
match receiver.recv() {
Ok(request) => received.push(request),
Err(_) => break,
};
// Grab a bunch of them at once.
loop {
use std::sync::mpsc::RecvTimeoutError::*;
match receiver.recv_timeout(timeout) {
Ok(request) => received.push(request),
Err(Timeout) => break,
Err(Disconnected) => {
break 'outer;
}
};
if received.len() >= limit {
break;
}
}
// Prefetch them
let mut lkeys = Vec::with_capacity(received.len());
let mut rkeys = Vec::with_capacity(received.len());
for item in received.iter() {
// Group the keys in the next layer by which tree
// they came from so that we can prefetch using
// the correct store for each tree.
for item in &self.next {
match item {
DiffItem::Single(dir, side) => {
match side {
@ -229,18 +142,43 @@ fn prefetch_thread<'a>(
}
if !lkeys.is_empty() {
let _ = lstore.prefetch(lkeys);
self.lstore.prefetch(lkeys)?;
}
if !rkeys.is_empty() {
let _ = rstore.prefetch(rkeys);
self.rstore.prefetch(rkeys)?;
}
// Notify that we finished
for item in received.drain(..) {
if sender.send(item).is_err() {
break 'outer;
Ok(())
}
/// Process the next `DiffItem` for this layer (either a pair of modified directories
/// or an added/removed directory), potentially generating new `DiffEntry`s for
/// any changed files contained therein.
///
/// If this method reaches the end of the current layer of the breadth-first traversal,
/// it will perform I/O to prefetch the next layer of directories before continuing. As
/// such, this function will occassionally block for an extended period of time.
///
/// Returns `true` if there are more items to process after the current one. Once this
/// method returns `false`, the traversal is complete.
fn process_next_item(&mut self) -> Result<bool> {
if self.current.is_empty() {
self.prefetch()?;
mem::swap(&mut self.current, &mut self.next);
if let Some(bar) = self.progress_bar {
// Increase "depth" by one as we descend to next BFS level.
bar.increase_position(1);
}
}
let entries = match self.current.pop_front() {
Some(item) => item.process(&mut self.next, &self.lstore, &self.rstore, self.matcher)?,
None => return Ok(false),
};
self.output.extend(entries);
Ok(true)
}
}
@ -273,18 +211,16 @@ impl<'a> Iterator for Diff<'a> {
/// adds any subdirectories to the next layer to be processed.
fn diff_single(
dir: DirLink,
fetcher: &mut Sender<DiffItem>,
next: &mut VecDeque<DiffItem>,
side: Side,
store: &InnerStore,
matcher: &dyn Matcher,
pending: &mut u64,
) -> Result<Vec<DiffEntry>> {
let (files, dirs) = dir.list(store)?;
for d in dirs.into_iter() {
if matcher.matches_directory(&d.path)? != DirectoryMatch::Nothing {
*pending += 1;
fetcher.send(DiffItem::Single(d, side))?;
next.push_back(DiffItem::Single(d, side));
}
}
let mut entries = Vec::new();
@ -308,18 +244,14 @@ fn diff_single(
fn diff(
left: DirLink,
right: DirLink,
fetcher: &mut Sender<DiffItem>,
next: &mut VecDeque<DiffItem>,
lstore: &InnerStore,
rstore: &InnerStore,
matcher: &dyn Matcher,
pending: &mut u64,
) -> Result<Vec<DiffEntry>> {
let (lfiles, ldirs) = left.list(lstore)?;
let (rfiles, rdirs) = right.list(rstore)?;
for item in diff_dirs(ldirs, rdirs, matcher)? {
*pending += 1;
fetcher.send(item)?;
}
next.extend(diff_dirs(ldirs, rdirs, matcher)?);
diff_files(lfiles, rfiles, matcher)
}
@ -506,19 +438,10 @@ mod tests {
fn test_diff_single() {
let tree = make_tree_manifest(&[("a", "1"), ("b/f", "2"), ("c", "3"), ("d/f", "4")]);
let dir = DirLink::from_root(&tree.root).unwrap();
let (mut sender, receiver) = channel();
let mut pending = 0;
let mut next = VecDeque::new();
let matcher = AlwaysMatcher::new();
let entries = diff_single(
dir,
&mut sender,
Side::Left,
&tree.store,
&matcher,
&mut pending,
)
.unwrap();
let entries = diff_single(dir, &mut next, Side::Left, &tree.store, &matcher).unwrap();
let expected_entries = vec![
DiffEntry::new(
@ -539,8 +462,7 @@ mod tests {
assert_eq!(entries, expected_entries);
let dummy = Link::ephemeral();
let next = vec![receiver.recv().unwrap(), receiver.recv().unwrap()];
let expected_next = vec![
let expected_next = VecDeque::from(vec![
DiffItem::Single(
DirLink::from_link(&dummy, repo_path_buf("b")).unwrap(),
Side::Left,
@ -549,7 +471,7 @@ mod tests {
DirLink::from_link(&dummy, repo_path_buf("d")).unwrap(),
Side::Left,
),
];
]);
assert_eq!(next, expected_next);
}
@ -630,7 +552,7 @@ mod tests {
]);
let matcher = AlwaysMatcher::new();
let diff = Diff::new(&ltree, &rtree, &matcher).unwrap();
let diff = Diff::new(&ltree, &rtree, &matcher);
let entries = diff
.collect::<Result<Vec<_>>>()
.unwrap()
@ -678,7 +600,7 @@ mod tests {
]);
let matcher = TreeMatcher::from_rules(["d1/**"].iter()).unwrap();
let diff = Diff::new(&ltree, &rtree, &matcher).unwrap();
let diff = Diff::new(&ltree, &rtree, &matcher);
let entries = diff
.collect::<Result<Vec<_>>>()
.unwrap()
@ -702,7 +624,6 @@ mod tests {
assert_eq!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(
@ -726,7 +647,6 @@ mod tests {
assert_eq!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(
@ -754,7 +674,6 @@ mod tests {
assert!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.next()
.is_none()
);
@ -767,7 +686,6 @@ mod tests {
let right = TreeManifest::durable(Arc::new(TestStore::new()), hgid("10"));
assert!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.next()
.is_none()
);
@ -775,7 +693,6 @@ mod tests {
let right = TreeManifest::durable(Arc::new(TestStore::new()), hgid("20"));
assert!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.next()
.unwrap()
.is_err()
@ -797,7 +714,6 @@ mod tests {
assert_eq!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(
@ -817,7 +733,6 @@ mod tests {
assert_eq!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(
@ -838,7 +753,6 @@ mod tests {
assert_eq!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(
@ -866,7 +780,6 @@ mod tests {
&right,
&TreeMatcher::from_rules(["a1/b1/**"].iter()).unwrap()
)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(DiffEntry::new(
@ -880,7 +793,6 @@ mod tests {
&right,
&TreeMatcher::from_rules(["a1/b2"].iter()).unwrap()
)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(DiffEntry::new(
@ -894,7 +806,6 @@ mod tests {
&right,
&TreeMatcher::from_rules(["a2/b2/**"].iter()).unwrap()
)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(DiffEntry::new(
@ -908,7 +819,6 @@ mod tests {
&right,
&TreeMatcher::from_rules(["*/b2/**"].iter()).unwrap()
)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec!(
@ -928,7 +838,6 @@ mod tests {
&right,
&TreeMatcher::from_rules(["a3/**"].iter()).unwrap()
)
.unwrap()
.next()
.is_none()
);
@ -944,7 +853,6 @@ mod tests {
assert_eq!(
Diff::new(&left, &right, &AlwaysMatcher::new())
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap(),
vec![DiffEntry::new(

View File

@ -315,8 +315,8 @@ impl Manifest for TreeManifest {
&'a self,
other: &'a Self,
matcher: &'a M,
) -> Result<Box<dyn Iterator<Item = Result<DiffEntry>> + 'a>> {
Ok(Box::new(Diff::new(self, other, matcher)?))
) -> Box<dyn Iterator<Item = Result<DiffEntry>> + 'a> {
Box::new(Diff::new(self, other, matcher))
}
}

View File

@ -95,7 +95,7 @@ pub trait Manifest {
&'a self,
other: &'a Self,
matcher: &'a M,
) -> Result<Box<dyn Iterator<Item = Result<DiffEntry>> + 'a>>;
) -> Box<dyn Iterator<Item = Result<DiffEntry>> + 'a>;
}
/// The result of a list operation. Given a path, the manifest will return: