Add set intersection to revsets

Summary: Teach the revsets code to do set intersection.

Reviewed By: jsgf

Differential Revision: D5881788

fbshipit-source-id: 972fd85a47ee29a1037f5e4072c03e25189b8300
This commit is contained in:
Simon Farnsworth 2017-09-21 14:02:47 -07:00 committed by Facebook Github Bot
parent b263ff8307
commit c362649352
4 changed files with 464 additions and 43 deletions

View File

@ -0,0 +1,394 @@
// Copyright (c) 2017-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use futures::Async;
use futures::Poll;
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use repoinfo::{Generation, RepoGenCache};
use std::boxed::Box;
use std::collections::HashMap;
use std::collections::hash_map::IntoIter;
use std::iter::IntoIterator;
use std::mem::replace;
use std::sync::Arc;
use NodeStream;
use errors::*;
use setcommon::*;
pub struct IntersectNodeStream {
inputs: Vec<(InputStream, Poll<Option<(NodeHash, Generation)>, Error>)>,
current_generation: Option<Generation>,
accumulator: HashMap<NodeHash, usize>,
drain: Option<IntoIter<NodeHash, usize>>,
}
impl IntersectNodeStream {
pub fn new<I, R>(repo: &Arc<R>, repo_generation: RepoGenCache<R>, inputs: I) -> Self
where
I: IntoIterator<Item = Box<NodeStream>>,
R: Repo,
{
let hash_and_gen = inputs.into_iter().map({
move |i| {
(
add_generations(i, repo_generation.clone(), repo.clone()),
Ok(Async::NotReady),
)
}
});
IntersectNodeStream {
inputs: hash_and_gen.collect(),
current_generation: None,
accumulator: HashMap::new(),
drain: None,
}
}
fn update_current_generation(&mut self) {
if all_inputs_ready(&self.inputs) {
self.current_generation = self.inputs
.iter()
.filter_map(|&(_, ref state)| match state {
&Ok(Async::Ready(Some((_, gen_id)))) => Some(gen_id),
&Ok(Async::NotReady) => panic!("All states ready, yet some not ready!"),
_ => None,
})
.min();
}
}
fn accumulate_nodes(&mut self) {
let mut found_hashes = false;
for &mut (_, ref mut state) in self.inputs.iter_mut() {
if let Ok(Async::Ready(Some((hash, gen_id)))) = *state {
if Some(gen_id) == self.current_generation {
found_hashes = true;
*self.accumulator.entry(hash).or_insert(0) += 1;
}
// Inputs of higher generation than the current one get consumed and dropped
if Some(gen_id) >= self.current_generation {
*state = Ok(Async::NotReady);
}
}
}
if !found_hashes {
self.current_generation = None;
}
}
fn any_input_finished(&self) -> bool {
if self.inputs.is_empty() {
true
} else {
self.inputs
.iter()
.map(|&(_, ref state)| match state {
&Ok(Async::Ready(None)) => true,
_ => false,
})
.any(|done| done)
}
}
}
impl Stream for IntersectNodeStream {
type Item = NodeHash;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// This feels wrong, but in practice it's fine - it should be quick to hit a return, and
// the standard futures::executor expects you to only return NotReady if blocked on I/O.
loop {
// Start by trying to turn as many NotReady as possible into real items
poll_all_inputs(&mut self.inputs);
// Empty the drain if any - return all items for this generation
while self.drain.is_some() {
let next_in_drain = self.drain.as_mut().and_then(|drain| drain.next());
if next_in_drain.is_some() {
let (hash, count) = next_in_drain.expect("is_some() said this was safe");
if count == self.inputs.len() {
return Ok(Async::Ready(Some(hash)));
}
} else {
self.drain = None;
}
}
// Return any errors
{
if self.inputs.iter().any(|&(_, ref state)| state.is_err()) {
let inputs = replace(&mut self.inputs, Vec::new());
let (_, err) = inputs
.into_iter()
.find(|&(_, ref state)| state.is_err())
.unwrap();
return Err(err.unwrap_err());
}
}
// If any input is not ready (we polled above), wait for them all to be ready
if !all_inputs_ready(&self.inputs) {
return Ok(Async::NotReady);
}
match self.current_generation {
None => if self.accumulator.is_empty() {
self.update_current_generation();
} else {
let full_accumulator = replace(&mut self.accumulator, HashMap::new());
self.drain = Some(full_accumulator.into_iter());
},
Some(_) => self.accumulate_nodes(),
}
// If we cannot ever output another node, we're done.
if self.drain.is_none() && self.accumulator.is_empty() && self.any_input_finished() {
return Ok(Async::Ready(None));
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use {NodeStream, SingleNodeHash, UnionNodeStream};
use assert_node_sequence;
use futures::executor::spawn;
use linear;
use repoinfo::RepoGenCache;
use std::sync::Arc;
use string_to_nodehash;
#[test]
fn intersect_identical_node() {
let repo = Arc::new(linear::getrepo());
let repo_generation = RepoGenCache::new(10);
let head_hash = string_to_nodehash("a5ffa77602a066db7d5cfb9fb5823a0895717c5a");
let inputs: Vec<Box<NodeStream>> = vec![
Box::new(SingleNodeHash::new(head_hash.clone(), &repo)),
Box::new(SingleNodeHash::new(head_hash.clone(), &repo)),
];
let nodestream = Box::new(IntersectNodeStream::new(
&repo,
repo_generation,
inputs.into_iter(),
));
assert_node_sequence(vec![head_hash.clone()], nodestream);
}
#[test]
fn intersect_three_different_nodes() {
let repo = Arc::new(linear::getrepo());
let repo_generation = RepoGenCache::new(10);
// Note that these are *not* in generation order deliberately.
let inputs: Vec<Box<NodeStream>> = vec![
Box::new(SingleNodeHash::new(
string_to_nodehash("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
&repo,
)),
];
let nodestream = Box::new(IntersectNodeStream::new(
&repo,
repo_generation,
inputs.into_iter(),
));
assert_node_sequence(vec![], nodestream);
}
#[test]
fn intersect_three_identical_nodes() {
let repo = Arc::new(linear::getrepo());
let repo_generation = RepoGenCache::new(10);
let inputs: Vec<Box<NodeStream>> = vec![
Box::new(SingleNodeHash::new(
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
&repo,
)),
];
let nodestream = Box::new(IntersectNodeStream::new(
&repo,
repo_generation,
inputs.into_iter(),
));
assert_node_sequence(
vec![
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
],
nodestream,
);
}
#[test]
fn intersect_nesting() {
let repo = Arc::new(linear::getrepo());
let repo_generation = RepoGenCache::new(10);
let inputs: Vec<Box<NodeStream>> = vec![
Box::new(SingleNodeHash::new(
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
&repo,
)),
];
let nodestream = Box::new(IntersectNodeStream::new(
&repo,
repo_generation.clone(),
inputs.into_iter(),
));
let inputs: Vec<Box<NodeStream>> = vec![
nodestream,
Box::new(SingleNodeHash::new(
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
&repo,
)),
];
let nodestream = Box::new(IntersectNodeStream::new(
&repo,
repo_generation,
inputs.into_iter(),
));
assert_node_sequence(
vec![
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
],
nodestream,
);
}
#[test]
fn intersection_of_unions() {
let repo = Arc::new(linear::getrepo());
let repo_generation = RepoGenCache::new(10);
let inputs: Vec<Box<NodeStream>> = vec![
Box::new(SingleNodeHash::new(
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
&repo,
)),
];
let nodestream = Box::new(UnionNodeStream::new(
&repo,
repo_generation.clone(),
inputs.into_iter(),
));
// This set has a different node sequence, so that we can demonstrate that we skip nodes
// when they're not going to contribute.
let inputs: Vec<Box<NodeStream>> = vec![
Box::new(SingleNodeHash::new(
string_to_nodehash("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
&repo,
)),
Box::new(SingleNodeHash::new(
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
&repo,
)),
];
let nodestream2 = Box::new(UnionNodeStream::new(
&repo,
repo_generation.clone(),
inputs.into_iter(),
));
let inputs: Vec<Box<NodeStream>> = vec![nodestream, nodestream2];
let nodestream = Box::new(IntersectNodeStream::new(
&repo,
repo_generation,
inputs.into_iter(),
));
assert_node_sequence(
vec![
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
],
nodestream,
);
}
#[test]
fn intersect_error_node() {
let repo = Arc::new(linear::getrepo());
let repo_generation = RepoGenCache::new(10);
let nodehash = string_to_nodehash("0000000000000000000000000000000000000000");
let inputs: Vec<Box<NodeStream>> = vec![
Box::new(SingleNodeHash::new(nodehash.clone(), &repo)),
Box::new(SingleNodeHash::new(nodehash.clone(), &repo)),
];
let mut nodestream = spawn(Box::new(IntersectNodeStream::new(
&repo,
repo_generation,
inputs.into_iter(),
)));
assert!(
if let Some(Err(Error(ErrorKind::NoSuchNode(hash), _))) = nodestream.wait_stream() {
hash == nodehash
} else {
false
},
"No error for bad node"
);
}
#[test]
fn intersect_nothing() {
let repo = Arc::new(linear::getrepo());
let repo_generation = RepoGenCache::new(10);
let inputs: Vec<Box<NodeStream>> = vec![];
let nodestream = Box::new(IntersectNodeStream::new(
&repo,
repo_generation,
inputs.into_iter(),
));
assert_node_sequence(vec![], nodestream);
}
}

View File

@ -14,6 +14,11 @@ extern crate repoinfo;
use futures::stream::Stream;
use mercurial_types::NodeHash;
mod setcommon;
mod intersectnodestream;
pub use intersectnodestream::IntersectNodeStream;
mod unionnodestream;
pub use unionnodestream::UnionNodeStream;

61
revset/src/setcommon.rs Normal file
View File

@ -0,0 +1,61 @@
// Copyright (c) 2017-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use error_chain::ChainedError;
use futures::future::Future;
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use repoinfo::{Generation, RepoGenCache};
use std::boxed::Box;
use std::sync::Arc;
use NodeStream;
use errors::*;
use futures::{Async, Poll};
pub type InputStream = Box<Stream<Item = (NodeHash, Generation), Error = Error> + 'static>;
pub fn add_generations<R>(
stream: Box<NodeStream>,
repo_generation: RepoGenCache<R>,
repo: Arc<R>,
) -> InputStream
where
R: Repo,
{
let stream = stream.and_then(move |node_hash| {
repo_generation
.get(&repo, node_hash)
.map(move |gen_id| (node_hash, gen_id))
.map_err(|err| {
ChainedError::with_chain(err, ErrorKind::GenerationFetchFailed)
})
});
Box::new(stream)
}
pub fn all_inputs_ready(
inputs: &Vec<(InputStream, Poll<Option<(NodeHash, Generation)>, Error>)>,
) -> bool {
inputs
.iter()
.map(|&(_, ref state)| match state {
&Err(_) => false,
&Ok(ref p) => p.is_ready(),
})
.all(|ready| ready)
}
pub fn poll_all_inputs(
inputs: &mut Vec<(InputStream, Poll<Option<(NodeHash, Generation)>, Error>)>,
) {
for &mut (ref mut input, ref mut state) in inputs.iter_mut() {
if let Ok(Async::NotReady) = *state {
*state = input.poll();
}
}
}

View File

@ -4,10 +4,8 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use error_chain::ChainedError;
use futures::Async;
use futures::Poll;
use futures::future::Future;
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use repoinfo::{Generation, RepoGenCache};
@ -20,8 +18,7 @@ use std::sync::Arc;
use NodeStream;
use errors::*;
type InputStream = Box<Stream<Item = (NodeHash, Generation), Error = Error> + 'static>;
use setcommon::*;
pub struct UnionNodeStream {
inputs: Vec<(InputStream, Poll<Option<(NodeHash, Generation)>, Error>)>,
@ -30,24 +27,6 @@ pub struct UnionNodeStream {
drain: Option<IntoIter<NodeHash>>,
}
fn add_generations<R>(
stream: Box<NodeStream>,
repo_generation: RepoGenCache<R>,
repo: Arc<R>,
) -> InputStream
where
R: Repo,
{
let stream = stream.and_then(move |node_hash| {
repo_generation
.get(&repo, node_hash)
.map(move |gen_id| (node_hash, gen_id))
.map_err(|err|
ChainedError::with_chain(err, ErrorKind::GenerationFetchFailed))
});
Box::new(stream)
}
impl UnionNodeStream {
pub fn new<I, R>(repo: &Arc<R>, repo_generation: RepoGenCache<R>, inputs: I) -> Self
where
@ -70,14 +49,6 @@ impl UnionNodeStream {
}
}
fn poll_all_inputs(&mut self) {
for &mut (ref mut input, ref mut state) in self.inputs.iter_mut() {
if let Ok(Async::NotReady) = *state {
*state = input.poll();
}
}
}
fn gc_finished_inputs(&mut self) {
self.inputs
.retain(|&(_, ref state)| if let Ok(Async::Ready(None)) = *state {
@ -87,18 +58,8 @@ impl UnionNodeStream {
});
}
fn all_inputs_ready(&mut self) -> bool {
self.inputs
.iter()
.map(|&(_, ref state)| match state {
&Err(_) => false,
&Ok(ref p) => p.is_ready(),
})
.all(|ready| ready)
}
fn update_current_generation(&mut self) {
if self.all_inputs_ready() {
if all_inputs_ready(&self.inputs) {
self.current_generation = self.inputs
.iter()
.filter_map(|&(_, ref state)| match state {
@ -135,7 +96,7 @@ impl Stream for UnionNodeStream {
// the standard futures::executor expects you to only return NotReady if blocked on I/O.
loop {
// Start by trying to turn as many NotReady as possible into real items
self.poll_all_inputs();
poll_all_inputs(&mut self.inputs);
// Empty the drain if any - return all items for this generation
let next_in_drain = self.drain.as_mut().and_then(|drain| drain.next());
@ -160,7 +121,7 @@ impl Stream for UnionNodeStream {
self.gc_finished_inputs();
// If any input is not ready (we polled above), wait for them all to be ready
if !self.all_inputs_ready() {
if !all_inputs_ready(&self.inputs) {
return Ok(Async::NotReady);
}