mononoke: change RangeNodeStream to use ChangesetId

Summary:
Revsets must use ChangesetId, not HgNodeHash. I'm going to use
`RangeNodeStream` in pushrebase so I thought it was a good time to change it

Reviewed By: farnz

Differential Revision: D9338827

fbshipit-source-id: 50bbe8f73dba3526d70d3f816ddd93507db99be5
This commit is contained in:
Stanislau Hlebik 2018-08-17 06:51:04 -07:00 committed by Facebook Github Bot
parent 9e24fff2fd
commit 8bba54d313
3 changed files with 155 additions and 68 deletions

View File

@ -83,6 +83,7 @@ define_stats! {
get_linknode: timeseries(RATE, SUM),
get_all_filenodes: timeseries(RATE, SUM),
get_generation_number: timeseries(RATE, SUM),
get_generation_number_by_bonsai: timeseries(RATE, SUM),
upload_blob: timeseries(RATE, SUM),
upload_hg_file_entry: timeseries(RATE, SUM),
upload_hg_tree_entry: timeseries(RATE, SUM),
@ -632,6 +633,19 @@ impl BlobRepo {
})
}
// TODO(stash): rename to get_generation_number
pub fn get_generation_number_by_bonsai(
&self,
cs: &ChangesetId,
) -> impl Future<Item = Option<Generation>, Error = Error> {
STATS::get_generation_number_by_bonsai.add_value(1);
let repo = self.clone();
let repoid = self.repoid.clone();
repo.changesets
.get(repoid, *cs)
.map(|res| res.map(|res| Generation::new(res.gen)))
}
pub fn upload_blob<Id>(&self, blob: Blob<Id>) -> impl Future<Item = Id, Error = Error> + Send
where
Id: MononokeId,

View File

@ -17,16 +17,14 @@ use futures::future::Future;
use futures::stream::{self, iter_ok, Stream};
use blobrepo::BlobRepo;
use mercurial_types::{Changeset, HgNodeHash};
use mercurial_types::nodehash::HgChangesetId;
use mononoke_types::ChangesetId;
use mononoke_types::Generation;
use NodeStream;
use errors::*;
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct HashGen {
hash: HgNodeHash,
hash: ChangesetId,
generation: Generation,
}
@ -38,13 +36,13 @@ struct ParentChild {
pub struct RangeNodeStream {
repo: Arc<BlobRepo>,
start_node: HgNodeHash,
start_node: ChangesetId,
start_generation: Box<Stream<Item = Generation, Error = Error> + Send>,
children: HashMap<HashGen, HashSet<HashGen>>,
// Child, parent
pending_nodes: Box<Stream<Item = ParentChild, Error = Error> + Send>,
output_nodes: Option<BTreeMap<Generation, HashSet<HgNodeHash>>>,
drain: Option<IntoIter<HgNodeHash>>,
output_nodes: Option<BTreeMap<Generation, HashSet<ChangesetId>>>,
drain: Option<IntoIter<ChangesetId>>,
}
fn make_pending(
@ -54,13 +52,16 @@ fn make_pending(
Box::new(
{
let repo = repo.clone();
repo.get_changeset_by_changesetid(&HgChangesetId::new(child.hash))
.map(move |cs| (child, cs.parents().clone()))
repo.get_bonsai_changeset(child.hash)
.map(move |cs| {
let parents: Vec<_> = cs.parents().cloned().collect();
(child, parents)
})
.map_err(|err| err.context(ErrorKind::ParentsFetchFailed).into())
}.map(|(child, parents)| iter_ok::<_, Error>(iter::repeat(child).zip(parents.into_iter())))
.flatten_stream()
.and_then(move |(child, parent_hash)| {
repo.get_generation_number(&HgChangesetId::new(parent_hash))
repo.get_generation_number_by_bonsai(&parent_hash)
.and_then(move |genopt| {
genopt.ok_or_else(|| err_msg(format!("{} not found", parent_hash)))
})
@ -77,10 +78,10 @@ fn make_pending(
}
impl RangeNodeStream {
pub fn new(repo: &Arc<BlobRepo>, start_node: HgNodeHash, end_node: HgNodeHash) -> Self {
pub fn new(repo: &Arc<BlobRepo>, start_node: ChangesetId, end_node: ChangesetId) -> Self {
let start_generation = Box::new(
repo.clone()
.get_generation_number(&HgChangesetId::new(start_node))
.get_generation_number_by_bonsai(&start_node)
.and_then(move |genopt| {
genopt.ok_or_else(|| err_msg(format!("{} not found", start_node)))
})
@ -93,7 +94,7 @@ impl RangeNodeStream {
let repo = repo.clone();
Box::new(
repo.clone()
.get_generation_number(&HgChangesetId::new(end_node))
.get_generation_number_by_bonsai(&end_node)
.and_then(move |genopt| {
genopt.ok_or_else(|| err_msg(format!("{} not found", end_node)))
})
@ -122,10 +123,6 @@ impl RangeNodeStream {
}
}
pub fn boxed(self) -> Box<NodeStream> {
Box::new(self)
}
fn build_output_nodes(&mut self, start_generation: Generation) {
// We've been walking backwards from the end point, storing the nodes we see.
// Now walk forward from the start point, looking at children only. These are
@ -157,7 +154,7 @@ impl RangeNodeStream {
}
impl Stream for RangeNodeStream {
type Item = HgNodeHash;
type Item = ChangesetId;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Empty the drain; this can only happen once we're in Stage 2
@ -231,9 +228,19 @@ mod test {
use async_unit;
use fixtures::linear;
use fixtures::merge_uneven;
use tests::assert_node_sequence;
use futures_ext::StreamExt;
use mercurial_types::HgChangesetId;
use tests::assert_changesets_sequence;
use tests::string_to_nodehash;
fn string_to_bonsai(repo: &Arc<BlobRepo>, s: &'static str) -> ChangesetId {
let node = string_to_nodehash(s);
repo.get_bonsai_from_hg(&HgChangesetId::new(node))
.wait()
.unwrap()
.unwrap()
}
#[test]
fn linear_range() {
async_unit::tokio_unit_test(|| {
@ -241,18 +248,18 @@ mod test {
let nodestream = RangeNodeStream::new(
&repo,
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
string_to_nodehash("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
).boxed();
string_to_bonsai(&repo, "d0a361e9022d226ae52f689667bd7d212a19cfe0"),
string_to_bonsai(&repo, "a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
).boxify();
assert_node_sequence(
assert_changesets_sequence(
&repo,
vec![
string_to_nodehash("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
string_to_nodehash("0ed509bf086fadcb8a8a5384dc3b550729b0fc17"),
string_to_nodehash("eed3a8c0ec67b6a6fe2eb3543334df3f0b4f202b"),
string_to_nodehash("cb15ca4a43a59acff5388cea9648c162afde8372"),
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
string_to_bonsai(&repo, "a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
string_to_bonsai(&repo, "0ed509bf086fadcb8a8a5384dc3b550729b0fc17"),
string_to_bonsai(&repo, "eed3a8c0ec67b6a6fe2eb3543334df3f0b4f202b"),
string_to_bonsai(&repo, "cb15ca4a43a59acff5388cea9648c162afde8372"),
string_to_bonsai(&repo, "d0a361e9022d226ae52f689667bd7d212a19cfe0"),
],
nodestream,
);
@ -266,15 +273,15 @@ mod test {
let nodestream = RangeNodeStream::new(
&repo,
string_to_nodehash("0ed509bf086fadcb8a8a5384dc3b550729b0fc17"),
string_to_nodehash("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
).boxed();
string_to_bonsai(&repo, "0ed509bf086fadcb8a8a5384dc3b550729b0fc17"),
string_to_bonsai(&repo, "a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
).boxify();
assert_node_sequence(
assert_changesets_sequence(
&repo,
vec![
string_to_nodehash("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
string_to_nodehash("0ed509bf086fadcb8a8a5384dc3b550729b0fc17"),
string_to_bonsai(&repo, "a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
string_to_bonsai(&repo, "0ed509bf086fadcb8a8a5384dc3b550729b0fc17"),
],
nodestream,
);
@ -288,14 +295,14 @@ mod test {
let nodestream = RangeNodeStream::new(
&repo,
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
).boxed();
string_to_bonsai(&repo, "d0a361e9022d226ae52f689667bd7d212a19cfe0"),
string_to_bonsai(&repo, "d0a361e9022d226ae52f689667bd7d212a19cfe0"),
).boxify();
assert_node_sequence(
assert_changesets_sequence(
&repo,
vec![
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
string_to_bonsai(&repo, "d0a361e9022d226ae52f689667bd7d212a19cfe0"),
],
nodestream,
);
@ -310,11 +317,11 @@ mod test {
// These are swapped, so won't find anything
let nodestream = RangeNodeStream::new(
&repo,
string_to_nodehash("a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
string_to_nodehash("d0a361e9022d226ae52f689667bd7d212a19cfe0"),
).boxed();
string_to_bonsai(&repo, "a9473beb2eb03ddb1cccc3fbaeb8a4820f9cd157"),
string_to_bonsai(&repo, "d0a361e9022d226ae52f689667bd7d212a19cfe0"),
).boxify();
assert_node_sequence(&repo, vec![], nodestream);
assert_changesets_sequence(&repo, vec![], nodestream);
})
}
@ -325,16 +332,16 @@ mod test {
let nodestream = RangeNodeStream::new(
&repo,
string_to_nodehash("1d8a907f7b4bf50c6a09c16361e2205047ecc5e5"),
string_to_nodehash("b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
).boxed();
string_to_bonsai(&repo, "1d8a907f7b4bf50c6a09c16361e2205047ecc5e5"),
string_to_bonsai(&repo, "b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
).boxify();
assert_node_sequence(
assert_changesets_sequence(
&repo,
vec![
string_to_nodehash("b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
string_to_nodehash("16839021e338500b3cf7c9b871c8a07351697d68"),
string_to_nodehash("1d8a907f7b4bf50c6a09c16361e2205047ecc5e5"),
string_to_bonsai(&repo, "b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
string_to_bonsai(&repo, "16839021e338500b3cf7c9b871c8a07351697d68"),
string_to_bonsai(&repo, "1d8a907f7b4bf50c6a09c16361e2205047ecc5e5"),
],
nodestream,
);
@ -348,26 +355,26 @@ mod test {
let nodestream = RangeNodeStream::new(
&repo,
string_to_nodehash("15c40d0abc36d47fb51c8eaec51ac7aad31f669c"),
string_to_nodehash("b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
).boxed();
string_to_bonsai(&repo, "15c40d0abc36d47fb51c8eaec51ac7aad31f669c"),
string_to_bonsai(&repo, "b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
).boxify();
assert_node_sequence(
assert_changesets_sequence(
&repo,
vec![
string_to_nodehash("b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
string_to_nodehash("264f01429683b3dd8042cb3979e8bf37007118bc"),
string_to_nodehash("5d43888a3c972fe68c224f93d41b30e9f888df7c"),
string_to_nodehash("fc2cef43395ff3a7b28159007f63d6529d2f41ca"),
string_to_nodehash("bc7b4d0f858c19e2474b03e442b8495fd7aeef33"),
string_to_nodehash("795b8133cf375f6d68d27c6c23db24cd5d0cd00f"),
string_to_nodehash("4f7f3fd428bec1a48f9314414b063c706d9c1aed"),
string_to_nodehash("16839021e338500b3cf7c9b871c8a07351697d68"),
string_to_nodehash("1d8a907f7b4bf50c6a09c16361e2205047ecc5e5"),
string_to_nodehash("b65231269f651cfe784fd1d97ef02a049a37b8a0"),
string_to_nodehash("d7542c9db7f4c77dab4b315edd328edf1514952f"),
string_to_nodehash("3cda5c78aa35f0f5b09780d971197b51cad4613a"),
string_to_nodehash("15c40d0abc36d47fb51c8eaec51ac7aad31f669c"),
string_to_bonsai(&repo, "b47ca72355a0af2c749d45a5689fd5bcce9898c7"),
string_to_bonsai(&repo, "264f01429683b3dd8042cb3979e8bf37007118bc"),
string_to_bonsai(&repo, "5d43888a3c972fe68c224f93d41b30e9f888df7c"),
string_to_bonsai(&repo, "fc2cef43395ff3a7b28159007f63d6529d2f41ca"),
string_to_bonsai(&repo, "bc7b4d0f858c19e2474b03e442b8495fd7aeef33"),
string_to_bonsai(&repo, "795b8133cf375f6d68d27c6c23db24cd5d0cd00f"),
string_to_bonsai(&repo, "4f7f3fd428bec1a48f9314414b063c706d9c1aed"),
string_to_bonsai(&repo, "16839021e338500b3cf7c9b871c8a07351697d68"),
string_to_bonsai(&repo, "1d8a907f7b4bf50c6a09c16361e2205047ecc5e5"),
string_to_bonsai(&repo, "b65231269f651cfe784fd1d97ef02a049a37b8a0"),
string_to_bonsai(&repo, "d7542c9db7f4c77dab4b315edd328edf1514952f"),
string_to_bonsai(&repo, "3cda5c78aa35f0f5b09780d971197b51cad4613a"),
string_to_bonsai(&repo, "15c40d0abc36d47fb51c8eaec51ac7aad31f669c"),
],
nodestream,
);

View File

@ -6,10 +6,12 @@
use NodeStream;
use blobrepo::BlobRepo;
use futures::Future;
use failure::Error;
use futures::{Future, Stream};
use futures::executor::spawn;
use mercurial_types::HgNodeHash;
use mercurial_types::nodehash::HgChangesetId;
use mononoke_types::ChangesetId;
use std::collections::HashSet;
use std::sync::Arc;
@ -17,6 +19,7 @@ pub fn string_to_nodehash(hash: &'static str) -> HgNodeHash {
HgNodeHash::from_static_str(hash).expect("Can't turn string to HgNodeHash")
}
// TODO(stash): remove assert_node_sequence, use assert_changesets_sequence instead
/// Accounting for reordering within generations, ensure that a NodeStream gives the expected
/// NodeHashes for testing.
pub fn assert_node_sequence<I>(repo: &Arc<BlobRepo>, hashes: I, stream: Box<NodeStream>)
@ -78,3 +81,66 @@ where
next_node.unwrap()
);
}
pub fn assert_changesets_sequence<I>(
repo: &Arc<BlobRepo>,
hashes: I,
stream: Box<Stream<Item = ChangesetId, Error = Error>>,
) where
I: IntoIterator<Item = ChangesetId>,
{
let mut nodestream = spawn(stream);
let mut received_hashes = HashSet::new();
for expected in hashes {
// If we pulled it in earlier, we've found it.
if received_hashes.remove(&expected) {
continue;
}
let expected_generation = repo.clone()
.get_generation_number_by_bonsai(&expected)
.wait()
.expect("Unexpected error");
// Keep pulling in hashes until we either find this one, or move on to a new generation
loop {
let hash = nodestream
.wait_stream()
.expect("Unexpected end of stream")
.expect("Unexpected error");
if hash == expected {
break;
}
let node_generation = repo.clone()
.get_generation_number_by_bonsai(&expected)
.wait()
.expect("Unexpected error");
assert!(
node_generation == expected_generation,
"Did not receive expected node {:?} before change of generation from {:?} to {:?}",
expected,
node_generation,
expected_generation,
);
received_hashes.insert(hash);
}
}
assert!(
received_hashes.is_empty(),
"Too few nodes received: {:?}",
received_hashes
);
let next_node = nodestream.wait_stream();
assert!(
next_node.is_none(),
"Too many nodes received: {:?}",
next_node.unwrap()
);
}