gettreepack: handle the depth parameter being send by client

Reviewed By: StanislavGlebik

Differential Revision: D9378908

fbshipit-source-id: 980e625765803c7cac9a272f3e701a3b2fa8da28
This commit is contained in:
Lukas Piatkowski 2018-08-17 09:01:20 -07:00 committed by Facebook Github Bot
parent 60150b9488
commit 2d92742e38
5 changed files with 194 additions and 40 deletions

View File

@ -172,9 +172,11 @@ pub struct GettreepackArgs {
pub mfnodes: Vec<HgNodeHash>,
/// The manifest nodes of the rootdir that are already on the client.
pub basemfnodes: Vec<HgNodeHash>,
/// The fullpath (not relative path) of directories underneath
/// The fullpath (not relative path) of directories underneath
/// the rootdir that should be sent.
pub directories: Vec<Bytes>,
/// The depth from the root that should be sent.
pub depth: Option<usize>,
}
#[derive(Debug)]

View File

@ -285,6 +285,32 @@ where
}
}
/// Given a hash of parameters, look up a parameter by name, and if it exists,
/// apply a parser to its value. If it doesn't, return None.
fn parseval_option<'a, F, T>(
params: &'a HashMap<Vec<u8>, Vec<u8>>,
key: &str,
parser: F,
) -> Result<Option<T>>
where
F: Fn(&'a [u8]) -> IResult<&'a [u8], T>,
{
match params.get(key.as_bytes()) {
None => Ok(None),
Some(v) => match parser(v.as_ref()) {
IResult::Done(unparsed, v) => match match_eof(unparsed) {
IResult::Done(..) => Ok(Some(v)),
_ => bail_msg!(
"Unconsumed characters remain after parsing param: {:?}",
unparsed
),
},
IResult::Incomplete(err) => bail_msg!("param parse incomplete: {:?}", err),
IResult::Error(err) => bail_msg!("param parse failed: {:?}", err),
},
}
}
/// Parse a command, given some input, a command name (used as a tag), a param parser
/// function (which generalizes over batched and non-batched parameter syntaxes),
/// number of args (since each command has a fixed number of expected parameters,
@ -493,6 +519,12 @@ fn parse_with_params(
mfnodes: parseval(&kv, "mfnodes", hashlist)?,
basemfnodes: parseval(&kv, "basemfnodes", hashlist)?,
directories: parseval(&kv, "directories", gettreepack_directories)?,
depth: parseval_option(&kv, "depth", closure!(
map_res!(
map_res!(take_while1!(is_digit), str::from_utf8),
usize::from_str
)
))?,
})))
| command!("getfiles", Getfiles, parse_params, {})
)
@ -1257,12 +1289,15 @@ mod test_parse {
mfnodes: vec![hash_ones()],
basemfnodes: vec![hash_ones()],
directories: vec![],
depth: None,
})),
);
let inp =
"gettreepack\n\
* 4\n\
* 5\n\
depth 1\n\
1\
rootdir 5\n\
ololo\
mfnodes 81\n\
@ -1279,6 +1314,7 @@ mod test_parse {
mfnodes: vec![hash_ones(), hash_twos()],
basemfnodes: vec![hash_twos(), hash_ones()],
directories: vec![Bytes::from(",".as_bytes()), Bytes::from(";".as_bytes())],
depth: Some(1),
})),
);
}

View File

@ -310,7 +310,7 @@ where
TM: Manifest,
FM: Manifest,
{
changed_entry_stream_with_pruner(to, from, path, |_| true).boxify()
changed_entry_stream_with_pruner(to, from, path, |_| true, None).boxify()
}
pub fn changed_entry_stream_with_pruner<TM, FM>(
@ -318,23 +318,26 @@ pub fn changed_entry_stream_with_pruner<TM, FM>(
from: &FM,
path: Option<MPath>,
pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static,
max_depth: Option<usize>,
) -> impl Stream<Item = ChangedEntry, Error = Error>
where
TM: Manifest,
FM: Manifest,
{
if max_depth == Some(0) {
return empty().boxify();
}
diff_manifests(path, to, from)
.map(move |diff| {
select_all(
diff.into_iter()
.filter({
let pruner = pruner.clone();
pruner
})
.map(|entry| recursive_changed_entry_stream(entry, pruner.clone())),
diff.into_iter().filter(pruner.clone()).map(|entry| {
recursive_changed_entry_stream(entry, 1, pruner.clone(), max_depth)
}),
)
})
.flatten_stream()
.boxify()
}
/// Given a ChangedEntry, return a stream that consists of this entry, and all subentries
@ -342,9 +345,11 @@ where
/// subtrees are recursively compared.
fn recursive_changed_entry_stream(
changed_entry: ChangedEntry,
depth: usize,
pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static,
max_depth: Option<usize>,
) -> BoxStream<ChangedEntry, Error> {
if !changed_entry.status.is_tree() {
if !changed_entry.status.is_tree() || (max_depth.is_some() && max_depth <= Some(depth)) {
return once(Ok(changed_entry)).boxify();
}
@ -394,11 +399,9 @@ fn recursive_changed_entry_stream(
.map(move |(to_mf, from_mf)| {
diff_manifests(path, &to_mf, &from_mf)
.map(move |diff| {
select_all(
diff.into_iter()
.filter(pruner.clone())
.map(|entry| recursive_changed_entry_stream(entry, pruner.clone())),
)
select_all(diff.into_iter().filter(pruner.clone()).map(|entry| {
recursive_changed_entry_stream(entry, depth + 1, pruner.clone(), max_depth)
}))
})
.flatten_stream()
})

View File

@ -197,12 +197,14 @@ fn find_changed_entry_status_stream(
manifest: Box<Manifest>,
basemanifest: Box<Manifest>,
pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static,
max_depth: Option<usize>,
) -> Vec<ChangedEntry> {
let mut stream = spawn(changed_entry_stream_with_pruner(
&manifest,
&basemanifest,
None,
pruner,
max_depth,
));
let mut res = vec![];
loop {
@ -290,6 +292,7 @@ fn do_check_with_pruner<P>(
expected_deleted: Vec<&str>,
expected_modified: Vec<&str>,
pruner: P,
max_depth: Option<usize>,
) where
P: FnMut(&ChangedEntry) -> bool + Send + Clone + 'static,
{
@ -297,7 +300,8 @@ fn do_check_with_pruner<P>(
let manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(main_hash));
let base_manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(base_hash));
let res = find_changed_entry_status_stream(manifest, base_manifest, pruner.clone());
let res =
find_changed_entry_status_stream(manifest, base_manifest, pruner.clone(), max_depth);
check_changed_paths(
res,
@ -313,7 +317,7 @@ fn do_check_with_pruner<P>(
let manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(base_hash));
let base_manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(main_hash));
let res = find_changed_entry_status_stream(manifest, base_manifest, pruner);
let res = find_changed_entry_status_stream(manifest, base_manifest, pruner, max_depth);
check_changed_paths(
res,
@ -340,6 +344,7 @@ fn do_check(
expected_deleted,
expected_modified,
|_| true,
None,
)
}
@ -522,6 +527,91 @@ fn test_recursive_changed_entry_stream_dirs_replaced_with_file() {
}).expect("test failed")
}
#[test]
fn test_depth_parameter() {
async_unit::tokio_unit_test(|| -> Result<_, !> {
let repo = Arc::new(many_files_dirs::getrepo(None));
let main_hash = HgNodeHash::from_str("d261bc7900818dea7c86935b3fb17a33b2e3a6b4").unwrap();
let base_hash = HgNodeHash::from_str("2f866e7e549760934e31bf0420a873f65100ad63").unwrap();
// main_hash is a child of base_hash
// hg st --change .
// A dir1/subdir1/subsubdir1/file_1
// A dir1/subdir1/subsubdir2/file_1
// A dir1/subdir1/subsubdir2/file_2
let expected_added = vec![
"dir1/subdir1/subsubdir1",
"dir1/subdir1/subsubdir1/file_1",
"dir1/subdir1/subsubdir2",
"dir1/subdir1/subsubdir2/file_1",
"dir1/subdir1/subsubdir2/file_2",
];
let expected_modified = vec!["dir1", "dir1/subdir1"];
do_check_with_pruner(
repo.clone(),
main_hash,
base_hash,
expected_added,
vec![],
expected_modified,
|_| true,
Some(4),
);
let expected_added = vec!["dir1/subdir1/subsubdir1", "dir1/subdir1/subsubdir2"];
let expected_modified = vec!["dir1", "dir1/subdir1"];
do_check_with_pruner(
repo.clone(),
main_hash,
base_hash,
expected_added,
vec![],
expected_modified,
|_| true,
Some(3),
);
let expected_added = vec![];
let expected_modified = vec!["dir1", "dir1/subdir1"];
do_check_with_pruner(
repo.clone(),
main_hash,
base_hash,
expected_added,
vec![],
expected_modified,
|_| true,
Some(2),
);
let expected_added = vec![];
let expected_modified = vec!["dir1"];
do_check_with_pruner(
repo.clone(),
main_hash,
base_hash,
expected_added,
vec![],
expected_modified,
|_| true,
Some(1),
);
let expected_added = vec![];
let expected_modified = vec![];
do_check_with_pruner(
repo.clone(),
main_hash,
base_hash,
expected_added,
vec![],
expected_modified,
|_| true,
Some(0),
);
Ok(())
}).expect("test failed")
}
#[test]
fn test_recursive_changed_entry_prune() {
async_unit::tokio_unit_test(|| -> Result<_, !> {
@ -556,6 +646,7 @@ fn test_recursive_changed_entry_prune() {
None => true,
}
},
None,
);
let expected_added = vec!["dir1"];
@ -586,6 +677,7 @@ fn test_recursive_changed_entry_prune() {
None => true,
}
},
None,
);
Ok(())
@ -626,10 +718,15 @@ fn test_recursive_changed_entry_prune_visited() {
let pruner = visited_pruner();
let first_stream =
changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, pruner.clone());
let first_stream = changed_entry_stream_with_pruner(
&manifest_1,
&basemanifest,
None,
pruner.clone(),
None,
);
let second_stream =
changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner);
changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner, None);
let mut res = spawn(select_all(vec![first_stream, second_stream]).collect());
let res = res.wait_future().unwrap();
let unique_len = res.len();
@ -680,19 +777,24 @@ fn test_recursive_changed_entry_prune_visited_no_files() {
let basemanifest = get_root_manifest(repo.clone(), &HgChangesetId::new(base_hash));
let pruner = and_pruner_combinator(&file_pruner, visited_pruner());
let first_stream =
changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, pruner.clone());
let first_stream = changed_entry_stream_with_pruner(
&manifest_1,
&basemanifest,
None,
pruner.clone(),
None,
);
let second_stream =
changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner);
changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner, None);
let mut res = spawn(select_all(vec![first_stream, second_stream]).collect());
let res = res.wait_future().unwrap();
let unique_len = res.len();
assert_eq!(unique_len, 7);
let first_stream =
changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, &file_pruner);
changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, &file_pruner, None);
let second_stream =
changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, &file_pruner);
changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, &file_pruner, None);
let mut res = spawn(select_all(vec![first_stream, second_stream]).collect());
let res = res.wait_future().unwrap();
// Make sure that more entries are produced without VisitedPruner i.e. some entries are
@ -715,7 +817,7 @@ fn test_visited_pruner_different_files_same_hash() {
let pruner = visited_pruner();
let stream =
changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner);
changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner, None);
let mut res = spawn(stream.collect());
let res = res.wait_future().unwrap();
@ -736,7 +838,7 @@ fn test_file_pruner() {
let pruner = file_pruner;
let stream =
changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner);
changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner, None);
let mut res = spawn(stream.collect());
let res = res.wait_future().unwrap();

View File

@ -20,7 +20,7 @@ use futures_ext::{select_all, BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_stats::{Timed, TimedStreamTrait};
use itertools::Itertools;
use slog::Logger;
use stats::Histogram;
use stats::{Histogram, Timeseries};
use time_ext::DurationExt;
use uuid::Uuid;
@ -31,9 +31,8 @@ use mercurial::{self, RevlogChangeset};
use mercurial_bundles::{create_bundle_stream, parts, Bundle2EncodeBuilder, Bundle2Item};
use mercurial_types::{percent_encode, Changeset, Entry, HgBlobNode, HgChangesetId, HgManifestId,
HgNodeHash, MPath, RepoPath, Type, NULL_HASH};
use mercurial_types::manifest_utils::{and_pruner_combinator, changed_entry_stream,
changed_entry_stream_with_pruner, file_pruner,
visited_pruner, ChangedEntry, EntryStatus};
use mercurial_types::manifest_utils::{and_pruner_combinator, changed_entry_stream_with_pruner,
file_pruner, visited_pruner, ChangedEntry, EntryStatus};
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
use tracing::{TraceContext, Traced};
@ -55,6 +54,8 @@ define_stats! {
histogram(500, 0, 20_000, AVG, SUM, COUNT; P 5; P 25; P 50; P 75; P 95; P 97; P 99),
getfiles_ms:
histogram(500, 0, 20_000, AVG, SUM, COUNT; P 5; P 25; P 50; P 75; P 95; P 97; P 99),
gettreepack_passed: timeseries(RATE, SUM),
gettreepack_rejected: timeseries(RATE, SUM),
}
mod ops {
@ -278,6 +279,9 @@ impl RepoClient {
fn gettreepack_untimed(&self, params: GettreepackArgs) -> BoxStream<Bytes, Error> {
debug!(self.logger(), "gettreepack");
// 65536 matches the default TREE_DEPTH_MAX value from Mercurial
let fetchdepth = params.depth.unwrap_or(2 << 16);
if !params.directories.is_empty() {
// This param is not used by core hg, don't worry about implementing it now
return stream::once(Err(err_msg("directories param is not supported"))).boxify();
@ -301,7 +305,8 @@ impl RepoClient {
&manifest_id,
&basemfnode,
rootpath.clone(),
Some(and_pruner_combinator(&file_pruner, visited_pruner.clone())),
and_pruner_combinator(&file_pruner, visited_pruner.clone()),
fetchdepth,
self.trace().clone(),
)
})).boxify()
@ -312,7 +317,8 @@ impl RepoClient {
&mfnode,
&basemfnode,
rootpath.clone(),
Some(&file_pruner),
&file_pruner,
fetchdepth,
self.trace().clone(),
),
None => empty().boxify(),
@ -686,7 +692,8 @@ fn get_changed_entry_stream(
mfid: &HgNodeHash,
basemfid: &HgNodeHash,
rootpath: Option<MPath>,
pruner: Option<impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static>,
pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static,
max_depth: usize,
trace: TraceContext,
) -> BoxStream<(Box<Entry + Sync>, Option<MPath>), Error> {
let manifest = repo.get_manifest_by_nodeid(mfid)
@ -699,11 +706,8 @@ fn get_changed_entry_stream(
.join(basemanifest)
.map({
let rootpath = rootpath.clone();
move |(mf, basemf)| match pruner {
Some(pruner) => {
changed_entry_stream_with_pruner(&mf, &basemf, rootpath, pruner).boxify()
}
None => changed_entry_stream(&mf, &basemf, rootpath).boxify(),
move |(mf, basemf)| {
changed_entry_stream_with_pruner(&mf, &basemf, rootpath, pruner, Some(max_depth))
}
})
.flatten_stream();
@ -712,19 +716,26 @@ fn get_changed_entry_stream(
changed_entries.filter_map(move |entry_status| match entry_status.status {
EntryStatus::Added(entry) => {
if entry.get_type() == Type::Tree {
STATS::gettreepack_passed.add_value(1);
Some((entry, entry_status.dirname))
} else {
STATS::gettreepack_rejected.add_value(1);
None
}
}
EntryStatus::Modified { to_entry, .. } => {
if to_entry.get_type() == Type::Tree {
STATS::gettreepack_passed.add_value(1);
Some((to_entry, entry_status.dirname))
} else {
STATS::gettreepack_rejected.add_value(1);
None
}
}
EntryStatus::Deleted(..) => None,
EntryStatus::Deleted(..) => {
STATS::gettreepack_rejected.add_value(1);
None
}
});
// Append root manifest