diff --git a/hgproto/src/lib.rs b/hgproto/src/lib.rs index ab44ecf2fa..218fb3bc0f 100644 --- a/hgproto/src/lib.rs +++ b/hgproto/src/lib.rs @@ -172,9 +172,11 @@ pub struct GettreepackArgs { pub mfnodes: Vec, /// The manifest nodes of the rootdir that are already on the client. pub basemfnodes: Vec, - /// The fullpath (not relative path) of directories underneath + /// The fullpath (not relative path) of directories underneath /// the rootdir that should be sent. pub directories: Vec, + /// The depth from the root that should be sent. + pub depth: Option, } #[derive(Debug)] diff --git a/hgproto/src/sshproto/request.rs b/hgproto/src/sshproto/request.rs index 728e25f8e4..6c86584870 100644 --- a/hgproto/src/sshproto/request.rs +++ b/hgproto/src/sshproto/request.rs @@ -285,6 +285,32 @@ where } } +/// Given a hash of parameters, look up a parameter by name, and if it exists, +/// apply a parser to its value. If it doesn't, return None. +fn parseval_option<'a, F, T>( + params: &'a HashMap, Vec>, + key: &str, + parser: F, +) -> Result> +where + F: Fn(&'a [u8]) -> IResult<&'a [u8], T>, +{ + match params.get(key.as_bytes()) { + None => Ok(None), + Some(v) => match parser(v.as_ref()) { + IResult::Done(unparsed, v) => match match_eof(unparsed) { + IResult::Done(..) => Ok(Some(v)), + _ => bail_msg!( + "Unconsumed characters remain after parsing param: {:?}", + unparsed + ), + }, + IResult::Incomplete(err) => bail_msg!("param parse incomplete: {:?}", err), + IResult::Error(err) => bail_msg!("param parse failed: {:?}", err), + }, + } +} + /// Parse a command, given some input, a command name (used as a tag), a param parser /// function (which generalizes over batched and non-batched parameter syntaxes), /// number of args (since each command has a fixed number of expected parameters, @@ -493,6 +519,12 @@ fn parse_with_params( mfnodes: parseval(&kv, "mfnodes", hashlist)?, basemfnodes: parseval(&kv, "basemfnodes", hashlist)?, directories: parseval(&kv, "directories", gettreepack_directories)?, + depth: parseval_option(&kv, "depth", closure!( + map_res!( + map_res!(take_while1!(is_digit), str::from_utf8), + usize::from_str + ) + ))?, }))) | command!("getfiles", Getfiles, parse_params, {}) ) @@ -1257,12 +1289,15 @@ mod test_parse { mfnodes: vec![hash_ones()], basemfnodes: vec![hash_ones()], directories: vec![], + depth: None, })), ); let inp = "gettreepack\n\ - * 4\n\ + * 5\n\ + depth 1\n\ + 1\ rootdir 5\n\ ololo\ mfnodes 81\n\ @@ -1279,6 +1314,7 @@ mod test_parse { mfnodes: vec![hash_ones(), hash_twos()], basemfnodes: vec![hash_twos(), hash_ones()], directories: vec![Bytes::from(",".as_bytes()), Bytes::from(";".as_bytes())], + depth: Some(1), })), ); } diff --git a/mercurial-types/src/manifest_utils.rs b/mercurial-types/src/manifest_utils.rs index 8f00b8b5aa..f54ebffaf2 100644 --- a/mercurial-types/src/manifest_utils.rs +++ b/mercurial-types/src/manifest_utils.rs @@ -310,7 +310,7 @@ where TM: Manifest, FM: Manifest, { - changed_entry_stream_with_pruner(to, from, path, |_| true).boxify() + changed_entry_stream_with_pruner(to, from, path, |_| true, None).boxify() } pub fn changed_entry_stream_with_pruner( @@ -318,23 +318,26 @@ pub fn changed_entry_stream_with_pruner( from: &FM, path: Option, pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static, + max_depth: Option, ) -> impl Stream where TM: Manifest, FM: Manifest, { + if max_depth == Some(0) { + return empty().boxify(); + } + diff_manifests(path, to, from) .map(move |diff| { select_all( - diff.into_iter() - .filter({ - let pruner = pruner.clone(); - pruner - }) - .map(|entry| recursive_changed_entry_stream(entry, pruner.clone())), + diff.into_iter().filter(pruner.clone()).map(|entry| { + recursive_changed_entry_stream(entry, 1, pruner.clone(), max_depth) + }), ) }) .flatten_stream() + .boxify() } /// Given a ChangedEntry, return a stream that consists of this entry, and all subentries @@ -342,9 +345,11 @@ where /// subtrees are recursively compared. fn recursive_changed_entry_stream( changed_entry: ChangedEntry, + depth: usize, pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static, + max_depth: Option, ) -> BoxStream { - if !changed_entry.status.is_tree() { + if !changed_entry.status.is_tree() || (max_depth.is_some() && max_depth <= Some(depth)) { return once(Ok(changed_entry)).boxify(); } @@ -394,11 +399,9 @@ fn recursive_changed_entry_stream( .map(move |(to_mf, from_mf)| { diff_manifests(path, &to_mf, &from_mf) .map(move |diff| { - select_all( - diff.into_iter() - .filter(pruner.clone()) - .map(|entry| recursive_changed_entry_stream(entry, pruner.clone())), - ) + select_all(diff.into_iter().filter(pruner.clone()).map(|entry| { + recursive_changed_entry_stream(entry, depth + 1, pruner.clone(), max_depth) + })) }) .flatten_stream() }) diff --git a/mercurial-types/tests/src/lib.rs b/mercurial-types/tests/src/lib.rs index ecf24558ea..b81203633f 100644 --- a/mercurial-types/tests/src/lib.rs +++ b/mercurial-types/tests/src/lib.rs @@ -197,12 +197,14 @@ fn find_changed_entry_status_stream( manifest: Box, basemanifest: Box, pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static, + max_depth: Option, ) -> Vec { let mut stream = spawn(changed_entry_stream_with_pruner( &manifest, &basemanifest, None, pruner, + max_depth, )); let mut res = vec![]; loop { @@ -290,6 +292,7 @@ fn do_check_with_pruner

( expected_deleted: Vec<&str>, expected_modified: Vec<&str>, pruner: P, + max_depth: Option, ) where P: FnMut(&ChangedEntry) -> bool + Send + Clone + 'static, { @@ -297,7 +300,8 @@ fn do_check_with_pruner

( let manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(main_hash)); let base_manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(base_hash)); - let res = find_changed_entry_status_stream(manifest, base_manifest, pruner.clone()); + let res = + find_changed_entry_status_stream(manifest, base_manifest, pruner.clone(), max_depth); check_changed_paths( res, @@ -313,7 +317,7 @@ fn do_check_with_pruner

( let manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(base_hash)); let base_manifest = get_root_manifest(repo.clone(), &HgChangesetId::new(main_hash)); - let res = find_changed_entry_status_stream(manifest, base_manifest, pruner); + let res = find_changed_entry_status_stream(manifest, base_manifest, pruner, max_depth); check_changed_paths( res, @@ -340,6 +344,7 @@ fn do_check( expected_deleted, expected_modified, |_| true, + None, ) } @@ -522,6 +527,91 @@ fn test_recursive_changed_entry_stream_dirs_replaced_with_file() { }).expect("test failed") } +#[test] +fn test_depth_parameter() { + async_unit::tokio_unit_test(|| -> Result<_, !> { + let repo = Arc::new(many_files_dirs::getrepo(None)); + let main_hash = HgNodeHash::from_str("d261bc7900818dea7c86935b3fb17a33b2e3a6b4").unwrap(); + let base_hash = HgNodeHash::from_str("2f866e7e549760934e31bf0420a873f65100ad63").unwrap(); + // main_hash is a child of base_hash + // hg st --change . + // A dir1/subdir1/subsubdir1/file_1 + // A dir1/subdir1/subsubdir2/file_1 + // A dir1/subdir1/subsubdir2/file_2 + let expected_added = vec![ + "dir1/subdir1/subsubdir1", + "dir1/subdir1/subsubdir1/file_1", + "dir1/subdir1/subsubdir2", + "dir1/subdir1/subsubdir2/file_1", + "dir1/subdir1/subsubdir2/file_2", + ]; + let expected_modified = vec!["dir1", "dir1/subdir1"]; + do_check_with_pruner( + repo.clone(), + main_hash, + base_hash, + expected_added, + vec![], + expected_modified, + |_| true, + Some(4), + ); + + let expected_added = vec!["dir1/subdir1/subsubdir1", "dir1/subdir1/subsubdir2"]; + let expected_modified = vec!["dir1", "dir1/subdir1"]; + do_check_with_pruner( + repo.clone(), + main_hash, + base_hash, + expected_added, + vec![], + expected_modified, + |_| true, + Some(3), + ); + + let expected_added = vec![]; + let expected_modified = vec!["dir1", "dir1/subdir1"]; + do_check_with_pruner( + repo.clone(), + main_hash, + base_hash, + expected_added, + vec![], + expected_modified, + |_| true, + Some(2), + ); + + let expected_added = vec![]; + let expected_modified = vec!["dir1"]; + do_check_with_pruner( + repo.clone(), + main_hash, + base_hash, + expected_added, + vec![], + expected_modified, + |_| true, + Some(1), + ); + + let expected_added = vec![]; + let expected_modified = vec![]; + do_check_with_pruner( + repo.clone(), + main_hash, + base_hash, + expected_added, + vec![], + expected_modified, + |_| true, + Some(0), + ); + Ok(()) + }).expect("test failed") +} + #[test] fn test_recursive_changed_entry_prune() { async_unit::tokio_unit_test(|| -> Result<_, !> { @@ -556,6 +646,7 @@ fn test_recursive_changed_entry_prune() { None => true, } }, + None, ); let expected_added = vec!["dir1"]; @@ -586,6 +677,7 @@ fn test_recursive_changed_entry_prune() { None => true, } }, + None, ); Ok(()) @@ -626,10 +718,15 @@ fn test_recursive_changed_entry_prune_visited() { let pruner = visited_pruner(); - let first_stream = - changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, pruner.clone()); + let first_stream = changed_entry_stream_with_pruner( + &manifest_1, + &basemanifest, + None, + pruner.clone(), + None, + ); let second_stream = - changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner); + changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner, None); let mut res = spawn(select_all(vec![first_stream, second_stream]).collect()); let res = res.wait_future().unwrap(); let unique_len = res.len(); @@ -680,19 +777,24 @@ fn test_recursive_changed_entry_prune_visited_no_files() { let basemanifest = get_root_manifest(repo.clone(), &HgChangesetId::new(base_hash)); let pruner = and_pruner_combinator(&file_pruner, visited_pruner()); - let first_stream = - changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, pruner.clone()); + let first_stream = changed_entry_stream_with_pruner( + &manifest_1, + &basemanifest, + None, + pruner.clone(), + None, + ); let second_stream = - changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner); + changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, pruner, None); let mut res = spawn(select_all(vec![first_stream, second_stream]).collect()); let res = res.wait_future().unwrap(); let unique_len = res.len(); assert_eq!(unique_len, 7); let first_stream = - changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, &file_pruner); + changed_entry_stream_with_pruner(&manifest_1, &basemanifest, None, &file_pruner, None); let second_stream = - changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, &file_pruner); + changed_entry_stream_with_pruner(&manifest_2, &basemanifest, None, &file_pruner, None); let mut res = spawn(select_all(vec![first_stream, second_stream]).collect()); let res = res.wait_future().unwrap(); // Make sure that more entries are produced without VisitedPruner i.e. some entries are @@ -715,7 +817,7 @@ fn test_visited_pruner_different_files_same_hash() { let pruner = visited_pruner(); let stream = - changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner); + changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner, None); let mut res = spawn(stream.collect()); let res = res.wait_future().unwrap(); @@ -736,7 +838,7 @@ fn test_file_pruner() { let pruner = file_pruner; let stream = - changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner); + changed_entry_stream_with_pruner(&root_manifest, &EmptyManifest {}, None, pruner, None); let mut res = spawn(stream.collect()); let res = res.wait_future().unwrap(); diff --git a/repo_client/src/client/mod.rs b/repo_client/src/client/mod.rs index 3676af916d..1dfbbea97f 100644 --- a/repo_client/src/client/mod.rs +++ b/repo_client/src/client/mod.rs @@ -20,7 +20,7 @@ use futures_ext::{select_all, BoxFuture, BoxStream, FutureExt, StreamExt}; use futures_stats::{Timed, TimedStreamTrait}; use itertools::Itertools; use slog::Logger; -use stats::Histogram; +use stats::{Histogram, Timeseries}; use time_ext::DurationExt; use uuid::Uuid; @@ -31,9 +31,8 @@ use mercurial::{self, RevlogChangeset}; use mercurial_bundles::{create_bundle_stream, parts, Bundle2EncodeBuilder, Bundle2Item}; use mercurial_types::{percent_encode, Changeset, Entry, HgBlobNode, HgChangesetId, HgManifestId, HgNodeHash, MPath, RepoPath, Type, NULL_HASH}; -use mercurial_types::manifest_utils::{and_pruner_combinator, changed_entry_stream, - changed_entry_stream_with_pruner, file_pruner, - visited_pruner, ChangedEntry, EntryStatus}; +use mercurial_types::manifest_utils::{and_pruner_combinator, changed_entry_stream_with_pruner, + file_pruner, visited_pruner, ChangedEntry, EntryStatus}; use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt}; use tracing::{TraceContext, Traced}; @@ -55,6 +54,8 @@ define_stats! { histogram(500, 0, 20_000, AVG, SUM, COUNT; P 5; P 25; P 50; P 75; P 95; P 97; P 99), getfiles_ms: histogram(500, 0, 20_000, AVG, SUM, COUNT; P 5; P 25; P 50; P 75; P 95; P 97; P 99), + gettreepack_passed: timeseries(RATE, SUM), + gettreepack_rejected: timeseries(RATE, SUM), } mod ops { @@ -278,6 +279,9 @@ impl RepoClient { fn gettreepack_untimed(&self, params: GettreepackArgs) -> BoxStream { debug!(self.logger(), "gettreepack"); + // 65536 matches the default TREE_DEPTH_MAX value from Mercurial + let fetchdepth = params.depth.unwrap_or(2 << 16); + if !params.directories.is_empty() { // This param is not used by core hg, don't worry about implementing it now return stream::once(Err(err_msg("directories param is not supported"))).boxify(); @@ -301,7 +305,8 @@ impl RepoClient { &manifest_id, &basemfnode, rootpath.clone(), - Some(and_pruner_combinator(&file_pruner, visited_pruner.clone())), + and_pruner_combinator(&file_pruner, visited_pruner.clone()), + fetchdepth, self.trace().clone(), ) })).boxify() @@ -312,7 +317,8 @@ impl RepoClient { &mfnode, &basemfnode, rootpath.clone(), - Some(&file_pruner), + &file_pruner, + fetchdepth, self.trace().clone(), ), None => empty().boxify(), @@ -686,7 +692,8 @@ fn get_changed_entry_stream( mfid: &HgNodeHash, basemfid: &HgNodeHash, rootpath: Option, - pruner: Option bool + Send + Clone + 'static>, + pruner: impl FnMut(&ChangedEntry) -> bool + Send + Clone + 'static, + max_depth: usize, trace: TraceContext, ) -> BoxStream<(Box, Option), Error> { let manifest = repo.get_manifest_by_nodeid(mfid) @@ -699,11 +706,8 @@ fn get_changed_entry_stream( .join(basemanifest) .map({ let rootpath = rootpath.clone(); - move |(mf, basemf)| match pruner { - Some(pruner) => { - changed_entry_stream_with_pruner(&mf, &basemf, rootpath, pruner).boxify() - } - None => changed_entry_stream(&mf, &basemf, rootpath).boxify(), + move |(mf, basemf)| { + changed_entry_stream_with_pruner(&mf, &basemf, rootpath, pruner, Some(max_depth)) } }) .flatten_stream(); @@ -712,19 +716,26 @@ fn get_changed_entry_stream( changed_entries.filter_map(move |entry_status| match entry_status.status { EntryStatus::Added(entry) => { if entry.get_type() == Type::Tree { + STATS::gettreepack_passed.add_value(1); Some((entry, entry_status.dirname)) } else { + STATS::gettreepack_rejected.add_value(1); None } } EntryStatus::Modified { to_entry, .. } => { if to_entry.get_type() == Type::Tree { + STATS::gettreepack_passed.add_value(1); Some((to_entry, entry_status.dirname)) } else { + STATS::gettreepack_rejected.add_value(1); None } } - EntryStatus::Deleted(..) => None, + EntryStatus::Deleted(..) => { + STATS::gettreepack_rejected.add_value(1); + None + } }); // Append root manifest