Replacing HgNodeHash with Changesetid - Without formatting changes

Differential Revision: D13692998

Reviewed By: lukaspiatkowski

fbshipit-source-id: 0ba0d30a96b0a4d4d84f64b410036e9e58cf64b9
This commit is contained in:
Fatih Aydin 2019-01-16 10:49:58 -08:00 committed by Facebook Github Bot
parent b43b908a4b
commit 0c8adaa4c4
6 changed files with 103 additions and 334 deletions

View File

@ -4,7 +4,6 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::collections::hash_set::IntoIter;
/// Union and intersection can be made more efficient if the streams are uninterrupted streams of
/// ancestors. For example:
///
@ -23,6 +22,7 @@ use std::collections::hash_set::IntoIter;
///
/// The stream below aims to solve the aforementioned problems. It's primary usage is in
/// Mercurial pull to find commits that need to be sent to a client.
use std::collections::hash_set::IntoIter;
use std::collections::{BTreeMap, HashSet};
use std::iter::{self, FromIterator};
use std::sync::Arc;

View File

@ -35,9 +35,6 @@ pub use intersectnodestream::IntersectNodeStream;
mod unionnodestream;
pub use unionnodestream::UnionNodeStream;
mod singlenodehash;
pub use singlenodehash::SingleNodeHash;
mod singlechangesetid;
pub use singlechangesetid::single_changeset_id;
@ -65,6 +62,7 @@ pub use range::RangeNodeStream;
use uniqueheap::UniqueHeap;
pub use test::*;
#[cfg(test)]
mod test {
pub extern crate ascii;

View File

@ -5,30 +5,12 @@
// GNU General Public License version 2 or any later version.
use super::tests::TestChangesetFetcher;
use ancestors::AncestorsNodeStream;
use ancestorscombinators::DifferenceOfUnionsOfAncestorsNodeStream;
use async_unit;
use failure::Error;
use futures::executor::spawn;
use futures::{
future::{join_all, ok},
stream, Future, Stream,
};
use futures_ext::{BoxFuture, BoxStream, StreamExt};
use quickcheck::rand::{
distributions::{range::Range, Sample},
thread_rng, Rng,
};
use quickcheck::{quickcheck, Arbitrary, Gen};
use std::collections::HashSet;
use std::iter::Iterator;
use std::sync::Arc;
use blobrepo::{BlobRepo, ChangesetFetcher};
use context::CoreContext;
use mercurial_types::{HgChangesetId, HgNodeHash};
use mononoke_types::ChangesetId;
use reachabilityindex::SkiplistIndex;
use failure::Error;
use fixtures::branch_even;
use fixtures::branch_uneven;
use fixtures::branch_wide;
@ -37,12 +19,26 @@ use fixtures::merge_even;
use fixtures::merge_uneven;
use fixtures::unshared_merge_even;
use fixtures::unshared_merge_uneven;
use ancestors::AncestorsNodeStream;
use ancestorscombinators::DifferenceOfUnionsOfAncestorsNodeStream;
use futures::executor::spawn;
use futures::{
future::{join_all, ok},
stream, Future, Stream,
};
use futures_ext::{BoxFuture, BoxStream, StreamExt};
use intersectnodestream::IntersectNodeStream;
use mercurial_types::{HgChangesetId, HgNodeHash};
use mononoke_types::ChangesetId;
use quickcheck::rand::{
distributions::{range::Range, Sample},
thread_rng, Rng,
};
use quickcheck::{quickcheck, Arbitrary, Gen};
use reachabilityindex::SkiplistIndex;
use setdifferencenodestream::SetDifferenceNodeStream;
use singlechangesetid::single_changeset_id;
use std::collections::HashSet;
use std::iter::Iterator;
use std::sync::Arc;
use unionnodestream::UnionNodeStream;
use validation::ValidateNodeStream;
use BonsaiNodeStream;
@ -50,7 +46,7 @@ use NodeStream;
#[derive(Clone, Copy, Debug)]
enum RevsetEntry {
SingleNode(Option<HgNodeHash>),
SingleNode(Option<ChangesetId>),
SetDifference,
Intersect(usize),
Union(usize),
@ -61,7 +57,7 @@ pub struct RevsetSpec {
rp_entries: Vec<RevsetEntry>,
}
fn get_changesets_from_repo(ctx: CoreContext, repo: &BlobRepo) -> Vec<HgNodeHash> {
fn get_changesets_from_repo(ctx: CoreContext, repo: &BlobRepo) -> Vec<ChangesetId> {
let changeset_fetcher = repo.get_changeset_fetcher();
let mut all_changesets_executor = spawn(
repo.get_bonsai_heads_maybe_stale(ctx.clone())
@ -69,15 +65,10 @@ fn get_changesets_from_repo(ctx: CoreContext, repo: &BlobRepo) -> Vec<HgNodeHash
cloned!(ctx);
move |head| AncestorsNodeStream::new(ctx.clone(), &changeset_fetcher, head)
})
.flatten()
.and_then({
cloned!(ctx, repo);
move |bonsai_cs| repo.get_hg_from_bonsai_changeset(ctx.clone(), bonsai_cs)
})
.map(|cs| cs.into_nodehash()),
.flatten(),
);
let mut all_changesets: Vec<HgNodeHash> = Vec::new();
let mut all_changesets: Vec<ChangesetId> = Vec::new();
loop {
all_changesets.push(match all_changesets_executor.wait_stream() {
None => break,
@ -102,8 +93,8 @@ impl RevsetSpec {
}
}
pub fn as_hashes(&self) -> HashSet<HgNodeHash> {
let mut output: Vec<HashSet<HgNodeHash>> = Vec::new();
pub fn as_hashes(&self) -> HashSet<ChangesetId> {
let mut output: Vec<HashSet<ChangesetId>> = Vec::new();
for entry in self.rp_entries.iter() {
match entry {
&RevsetEntry::SingleNode(None) => panic!("You need to add_hashes first!"),
@ -139,8 +130,8 @@ impl RevsetSpec {
output.pop().expect("No revisions").into_iter().collect()
}
pub fn as_revset(&self, ctx: CoreContext, repo: Arc<BlobRepo>) -> Box<NodeStream> {
let mut output: Vec<Box<NodeStream>> = Vec::with_capacity(self.rp_entries.len());
pub fn as_revset(&self, ctx: CoreContext, repo: Arc<BlobRepo>) -> Box<BonsaiNodeStream> {
let mut output: Vec<Box<BonsaiNodeStream>> = Vec::with_capacity(self.rp_entries.len());
let changeset_fetcher: Arc<ChangesetFetcher> =
Arc::new(TestChangesetFetcher::new(repo.clone()));
for entry in self.rp_entries.iter() {
@ -148,52 +139,30 @@ impl RevsetSpec {
ctx.clone(),
match entry {
&RevsetEntry::SingleNode(None) => panic!("You need to add_hashes first!"),
&RevsetEntry::SingleNode(Some(hash)) => repo
.get_bonsai_from_hg(ctx.clone(), &HgChangesetId::new(hash))
.map({
cloned!(hash);
move |maybecsid| maybecsid.expect(&format!("unknown {}", hash))
})
.map({
cloned!(ctx, repo);
move |csid| {
bonsai_nodestream_to_nodestream(
ctx.clone(),
&repo,
single_changeset_id(ctx.clone(), csid, &*repo.clone()).boxify(),
)
}
})
.flatten_stream()
&RevsetEntry::SingleNode(Some(hash)) =>
single_changeset_id(
ctx.clone(),
Some(hash).expect(&format!("unknown {}", hash)),
&*repo.clone(),
)
.boxify(),
&RevsetEntry::SetDifference => {
let keep = output.pop().expect("No keep for setdifference");
let remove = output.pop().expect("No remove for setdifference");
let keep_input =
nodestreams_to_bonsai_nodestreams(ctx.clone(), &repo, vec![keep])
.remove(0);
let remove_input =
nodestreams_to_bonsai_nodestreams(ctx.clone(), &repo, vec![remove])
.remove(0);
let nodestream = SetDifferenceNodeStream::new(
SetDifferenceNodeStream::new(
ctx.clone(),
&changeset_fetcher,
keep_input,
remove_input,
keep,
remove,
)
.boxify();
bonsai_nodestream_to_nodestream(ctx.clone(), &repo, nodestream)
.boxify()
}
&RevsetEntry::Union(size) => {
let idx = output.len() - size;
let inputs = nodestreams_to_bonsai_nodestreams(
ctx.clone(),
&repo,
output.split_off(idx),
);
let inputs = output.split_off(idx);
let nodestream =
UnionNodeStream::new(ctx.clone(), &changeset_fetcher, inputs).boxify();
bonsai_nodestream_to_nodestream(ctx.clone(), &repo, nodestream)
nodestream
}
&RevsetEntry::Intersect(size) => {
let idx = output.len() - size;
@ -201,13 +170,13 @@ impl RevsetSpec {
let nodestream = IntersectNodeStream::new(
ctx.clone(),
&repo.get_changeset_fetcher(),
nodestreams_to_bonsai_nodestreams(ctx.clone(), &repo, inputs),
inputs,
)
.boxify();
bonsai_nodestream_to_nodestream(ctx.clone(), &repo.clone(), nodestream)
nodestream
}
},
&repo.clone(),
&repo.get_changeset_fetcher().clone(),
)
.boxify();
output.push(next_node);
@ -282,8 +251,8 @@ impl Arbitrary for RevsetSpec {
}
fn match_streams(
expected: BoxStream<HgNodeHash, Error>,
actual: BoxStream<HgNodeHash, Error>,
expected: BoxStream<ChangesetId, Error>,
actual: BoxStream<ChangesetId, Error>,
) -> bool {
let mut expected = {
let mut nodestream = spawn(expected);
@ -373,16 +342,16 @@ quickcheck_setops!(setops_unshared_merge_uneven, unshared_merge_uneven);
// ([], [h1])
// ([], [])
struct IncludeExcludeDiscardCombinationsIterator {
hashes: Vec<HgNodeHash>,
hashes: Vec<ChangesetId>,
index: u64,
}
impl IncludeExcludeDiscardCombinationsIterator {
fn new(hashes: Vec<HgNodeHash>) -> Self {
fn new(hashes: Vec<ChangesetId>) -> Self {
Self { hashes, index: 0 }
}
fn generate_include_exclude(&self) -> (Vec<HgNodeHash>, Vec<HgNodeHash>) {
fn generate_include_exclude(&self) -> (Vec<ChangesetId>, Vec<ChangesetId>) {
let mut val = self.index;
let mut include = vec![];
let mut exclude = vec![];
@ -407,7 +376,7 @@ impl IncludeExcludeDiscardCombinationsIterator {
}
impl Iterator for IncludeExcludeDiscardCombinationsIterator {
type Item = (Vec<HgNodeHash>, Vec<HgNodeHash>);
type Item = (Vec<ChangesetId>, Vec<ChangesetId>);
fn next(&mut self) -> Option<Self::Item> {
let res = if self.index >= 3_u64.pow(self.hashes.len() as u32) {
@ -420,33 +389,6 @@ impl Iterator for IncludeExcludeDiscardCombinationsIterator {
}
}
pub fn nodestreams_to_bonsai_nodestreams(
ctx: CoreContext,
repo: &Arc<BlobRepo>,
inputs: Vec<Box<NodeStream>>,
) -> Vec<Box<BonsaiNodeStream>> {
inputs
.into_iter()
.map({
cloned!(repo, ctx);
move |nodestream| {
nodestream
.and_then({
cloned!(repo, ctx);
move |hash| {
repo.get_bonsai_from_hg(ctx.clone(), &HgChangesetId::new(hash.clone()))
.map(move |maybe_bonsai| {
maybe_bonsai
.expect("Failed to get Bonsai Changeset from HgNodeHash")
})
}
})
.boxify()
}
})
.collect()
}
fn hg_to_bonsai_changesetid(
ctx: CoreContext,
repo: &Arc<BlobRepo>,
@ -505,35 +447,32 @@ macro_rules! ancestors_check {
for (include, exclude) in iter {
let difference_stream = create_skiplist(ctx.clone(), &repo)
.map({
cloned!(ctx, changeset_fetcher, exclude, include, repo);
cloned!(ctx, changeset_fetcher, exclude, include);
move |skiplist| {
DifferenceOfUnionsOfAncestorsNodeStream::new_with_excludes(
ctx.clone(),
&changeset_fetcher,
skiplist,
hg_to_bonsai_changesetid(ctx.clone(), &repo, include.clone()),
hg_to_bonsai_changesetid(ctx, &repo, exclude.clone()),
include.clone(),
exclude.clone(),
)
}
})
.flatten_stream()
.boxify();
let actual = ValidateNodeStream::new(
ctx.clone(),
bonsai_nodestream_to_nodestream(ctx.clone(), &repo, difference_stream),
&repo.clone(),
);
let actual =
ValidateNodeStream::new(ctx.clone(), difference_stream, &changeset_fetcher);
let mut includes = vec![];
for i in hg_to_bonsai_changesetid(ctx.clone(), &repo, include.clone()) {
for i in include.clone() {
includes.push(
AncestorsNodeStream::new(ctx.clone(), &changeset_fetcher, i).boxify(),
);
}
let mut excludes = vec![];
for i in hg_to_bonsai_changesetid(ctx.clone(), &repo, exclude.clone()) {
for i in exclude.clone() {
excludes.push(
AncestorsNodeStream::new(ctx.clone(), &changeset_fetcher, i).boxify(),
);
@ -550,7 +489,6 @@ macro_rules! ancestors_check {
)
.boxify();
let expected = bonsai_nodestream_to_nodestream(ctx.clone(), &repo, expected);
assert!(
match_streams(expected, actual.boxify()),
"streams do not match for {:?} {:?}",

View File

@ -1,103 +0,0 @@
// Copyright (c) 2017-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::boxed::Box;
use blobrepo::BlobRepo;
use context::CoreContext;
use failure::Error;
use futures::future::Future;
use futures::stream::Stream;
use futures::{Async, Poll};
use mercurial_types::nodehash::HgChangesetId;
use mercurial_types::HgNodeHash;
use NodeStream;
pub struct SingleNodeHash {
nodehash: Option<HgNodeHash>,
exists: Box<Future<Item = bool, Error = Error> + Send>,
}
impl SingleNodeHash {
pub fn new(ctx: CoreContext, nodehash: HgNodeHash, repo: &BlobRepo) -> Self {
let changesetid = HgChangesetId::new(nodehash);
let exists = Box::new(repo.changeset_exists(ctx, &changesetid));
let nodehash = Some(nodehash);
SingleNodeHash { nodehash, exists }
}
pub fn boxed(self) -> Box<NodeStream> {
Box::new(self)
}
}
impl Stream for SingleNodeHash {
type Item = HgNodeHash;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.nodehash.is_none() {
Ok(Async::Ready(None))
} else {
match self.exists.poll()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(true) => {
let nodehash = self.nodehash;
self.nodehash = None;
Ok(Async::Ready(nodehash))
}
Async::Ready(false) => Ok(Async::Ready(None)),
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use async_unit;
use context::CoreContext;
use fixtures::linear;
use futures_ext::StreamExt;
use std::sync::Arc;
use tests::assert_node_sequence;
use tests::string_to_nodehash;
#[test]
fn valid_node() {
async_unit::tokio_unit_test(|| {
let ctx = CoreContext::test_mock();
let repo = Arc::new(linear::getrepo(None));
let nodestream = SingleNodeHash::new(
ctx.clone(),
string_to_nodehash("a5ffa77602a066db7d5cfb9fb5823a0895717c5a"),
&repo,
);
assert_node_sequence(
ctx,
&repo,
vec![string_to_nodehash(
"a5ffa77602a066db7d5cfb9fb5823a0895717c5a",
)]
.into_iter(),
nodestream.boxify(),
);
});
}
#[test]
fn invalid_node() {
async_unit::tokio_unit_test(|| {
let ctx = CoreContext::test_mock();
let repo = Arc::new(linear::getrepo(None));
let nodehash = string_to_nodehash("1000000000000000000000000000000000000000");
let nodestream = SingleNodeHash::new(ctx.clone(), nodehash, &repo).boxify();
assert_node_sequence(ctx, &repo, vec![].into_iter(), nodestream);
});
}
}

View File

@ -20,7 +20,6 @@ use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use BonsaiNodeStream;
use NodeStream;
pub fn string_to_nodehash(hash: &str) -> HgNodeHash {
HgNodeHash::from_str(hash).expect("Can't turn string to HgNodeHash")
@ -72,75 +71,6 @@ impl ChangesetFetcher for TestChangesetFetcher {
}
}
// TODO(stash): remove assert_node_sequence, use assert_changesets_sequence instead
/// Accounting for reordering within generations, ensure that a NodeStream gives the expected
/// NodeHashes for testing.
pub fn assert_node_sequence<I>(
ctx: CoreContext,
repo: &Arc<BlobRepo>,
hashes: I,
stream: Box<NodeStream>,
) where
I: IntoIterator<Item = HgNodeHash>,
{
let mut nodestream = spawn(stream);
let mut received_hashes = HashSet::new();
for expected in hashes {
// If we pulled it in earlier, we've found it.
if received_hashes.remove(&expected) {
continue;
}
let expected_generation = repo
.clone()
.get_generation_number(ctx.clone(), &HgChangesetId::new(expected))
.wait()
.expect("Unexpected error");
// Keep pulling in hashes until we either find this one, or move on to a new generation
loop {
let hash = nodestream
.wait_stream()
.expect("Unexpected end of stream")
.expect("Unexpected error");
if hash == expected {
break;
}
let node_generation = repo
.clone()
.get_generation_number(ctx.clone(), &HgChangesetId::new(expected))
.wait()
.expect("Unexpected error");
assert!(
node_generation == expected_generation,
"Did not receive expected node {:?} before change of generation from {:?} to {:?}",
expected,
node_generation,
expected_generation,
);
received_hashes.insert(hash);
}
}
assert!(
received_hashes.is_empty(),
"Too few nodes received: {:?}",
received_hashes
);
let next_node = nodestream.wait_stream();
assert!(
next_node.is_none(),
"Too many nodes received: {:?}",
next_node.unwrap()
);
}
pub fn assert_changesets_sequence<I>(
ctx: CoreContext,
repo: &Arc<BlobRepo>,
@ -151,7 +81,6 @@ pub fn assert_changesets_sequence<I>(
{
let mut nodestream = spawn(stream);
let mut received_hashes = HashSet::new();
for expected in hashes {
// If we pulled it in earlier, we've found it.
if received_hashes.remove(&expected) {

View File

@ -4,50 +4,45 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::collections::HashSet;
use std::sync::Arc;
use blobrepo::BlobRepo;
use blobrepo::ChangesetFetcher;
use context::CoreContext;
use failure::Error;
use futures::stream::Stream;
use futures::{Async, Poll};
use mercurial_types::HgNodeHash;
use futures_ext::StreamExt;
use mononoke_types::ChangesetId;
use mononoke_types::Generation;
use setcommon::{add_generations, InputStream};
use NodeStream;
use setcommon::{add_generations_by_bonsai, BonsaiInputStream};
use std::collections::HashSet;
use std::sync::Arc;
use BonsaiNodeStream;
/// A wrapper around a NodeStream that asserts that the two revset invariants hold:
/// 1. The generation number never increases
/// 2. No hash is seen twice
/// This uses memory proportional to the number of hashes in the revset.
pub struct ValidateNodeStream {
wrapped: InputStream,
wrapped: BonsaiInputStream,
last_generation: Option<Generation>,
seen_hashes: HashSet<HgNodeHash>,
seen_hashes: HashSet<ChangesetId>,
}
impl ValidateNodeStream {
pub fn new(
ctx: CoreContext,
wrapped: Box<NodeStream>,
repo: &Arc<BlobRepo>,
wrapped: Box<BonsaiNodeStream>,
changeset_fetcher: &Arc<ChangesetFetcher>,
) -> ValidateNodeStream {
ValidateNodeStream {
wrapped: add_generations(ctx, wrapped, repo.clone()),
wrapped: add_generations_by_bonsai(ctx, wrapped, changeset_fetcher.clone()).boxify(),
last_generation: None,
seen_hashes: HashSet::new(),
}
}
pub fn boxed(self) -> Box<NodeStream> {
Box::new(self)
}
}
impl Stream for ValidateNodeStream {
type Item = HgNodeHash;
type Item = ChangesetId;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -82,24 +77,25 @@ mod test {
use fixtures::linear;
use futures_ext::StreamExt;
use setcommon::NotReadyEmptyStream;
use single_changeset_id;
use std::sync::Arc;
use tests::assert_node_sequence;
use tests::string_to_nodehash;
use SingleNodeHash;
use tests::{assert_changesets_sequence, string_to_bonsai, TestChangesetFetcher};
#[test]
fn validate_accepts_single_node() {
async_unit::tokio_unit_test(|| {
let ctx = CoreContext::test_mock();
let repo = Arc::new(linear::getrepo(None));
let changeset_fetcher: Arc<ChangesetFetcher> =
Arc::new(TestChangesetFetcher::new(repo.clone()));
let head_hash = string_to_nodehash("a5ffa77602a066db7d5cfb9fb5823a0895717c5a");
let head_csid = string_to_bonsai(&repo, "a5ffa77602a066db7d5cfb9fb5823a0895717c5a");
let nodestream = SingleNodeHash::new(ctx.clone(), head_hash, &repo);
let nodestream = single_changeset_id(ctx.clone(), head_csid.clone(), &repo).boxify();
let nodestream =
ValidateNodeStream::new(ctx.clone(), Box::new(nodestream), &repo).boxify();
assert_node_sequence(ctx, &repo, vec![head_hash], nodestream);
ValidateNodeStream::new(ctx.clone(), nodestream, &changeset_fetcher).boxify();
assert_changesets_sequence(ctx, &repo, vec![head_csid], nodestream);
});
}
@ -110,9 +106,15 @@ mod test {
// Tests that we handle an input staying at NotReady for a while without panicing
let repeats = 10;
let repo = Arc::new(linear::getrepo(None));
let mut nodestream =
ValidateNodeStream::new(ctx, Box::new(NotReadyEmptyStream::new(repeats)), &repo)
.boxify();
let changeset_fetcher: Arc<ChangesetFetcher> =
Arc::new(TestChangesetFetcher::new(repo.clone()));
let mut nodestream = ValidateNodeStream::new(
ctx,
Box::new(NotReadyEmptyStream::new(repeats)),
&changeset_fetcher,
)
.boxify();
// Keep polling until we should be done.
for _ in 0..repeats + 1 {
@ -136,11 +138,14 @@ mod test {
let ctx = CoreContext::test_mock();
let repo = Arc::new(linear::getrepo(None));
let head_hash = string_to_nodehash("a5ffa77602a066db7d5cfb9fb5823a0895717c5a");
let nodestream = SingleNodeHash::new(ctx.clone(), head_hash, &repo)
.chain(SingleNodeHash::new(ctx.clone(), head_hash, &repo));
let head_csid = string_to_bonsai(&repo, "a5ffa77602a066db7d5cfb9fb5823a0895717c5a");
let nodestream = single_changeset_id(ctx.clone(), head_csid.clone(), &repo)
.chain(single_changeset_id(ctx.clone(), head_csid.clone(), &repo));
let mut nodestream = ValidateNodeStream::new(ctx, Box::new(nodestream), &repo).boxify();
let changeset_fetcher: Arc<ChangesetFetcher> =
Arc::new(TestChangesetFetcher::new(repo.clone()));
let mut nodestream =
ValidateNodeStream::new(ctx, nodestream.boxify(), &changeset_fetcher).boxify();
loop {
match nodestream.poll() {
@ -158,18 +163,20 @@ mod test {
let ctx = CoreContext::test_mock();
let repo = Arc::new(linear::getrepo(None));
let nodestream = SingleNodeHash::new(
let nodestream = single_changeset_id(
ctx.clone(),
string_to_nodehash("cb15ca4a43a59acff5388cea9648c162afde8372"),
string_to_bonsai(&repo, "cb15ca4a43a59acff5388cea9648c162afde8372").clone(),
&repo,
)
.chain(SingleNodeHash::new(
.chain(single_changeset_id(
ctx.clone(),
string_to_nodehash("3c15267ebf11807f3d772eb891272b911ec68759"),
string_to_bonsai(&repo, "3c15267ebf11807f3d772eb891272b911ec68759"),
&repo,
));
let mut nodestream = ValidateNodeStream::new(ctx, Box::new(nodestream), &repo).boxify();
let changeset_fetcher: Arc<ChangesetFetcher> =
Arc::new(TestChangesetFetcher::new(repo.clone()));
let mut nodestream =
ValidateNodeStream::new(ctx, nodestream.boxify(), &changeset_fetcher).boxify();
loop {
match nodestream.poll() {