diff --git a/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/Cargo.toml b/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/Cargo.toml index 54d8484d8b..c152097e22 100644 --- a/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/Cargo.toml +++ b/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/Cargo.toml @@ -18,6 +18,7 @@ cross_repo_sync = { path = "../cross_repo_sync" } derived_data_utils = { path = "../../derived_data/utils" } live_commit_sync_config = { path = "../live_commit_sync_config" } metaconfig_types = { path = "../../metaconfig/types" } +mononoke_hg_sync_job_helper_lib = { path = "../../mononoke_hg_sync_job" } mononoke_types = { path = "../../mononoke_types" } mutable_counters = { path = "../../mutable_counters" } reachabilityindex = { path = "../../reachabilityindex" } diff --git a/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/cli.rs b/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/cli.rs index fdc22f3bb6..9ce77a3473 100644 --- a/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/cli.rs +++ b/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/cli.rs @@ -8,15 +8,16 @@ use clap::{App, Arg, SubCommand}; use cmdlib::args; -pub const ARG_ONCE: &'static str = "once"; -pub const ARG_COMMIT: &'static str = "commit"; -pub const ARG_TAIL: &'static str = "tail"; -pub const ARG_TARGET_BOOKMARK: &'static str = "target-bookmark"; -pub const ARG_CATCH_UP_ONCE: &'static str = "catch-up-once"; -pub const ARG_LOG_TO_SCUBA: &'static str = "log-to-scuba"; -pub const ARG_BACKPRESSURE_REPOS_IDS: &'static str = "backpressure-repo-ids"; -pub const ARG_DERIVED_DATA_TYPES: &'static str = "derived-data-types"; -pub const ARG_SLEEP_SECS: &'static str = "sleep-secs"; +pub const ARG_ONCE: &str = "once"; +pub const ARG_COMMIT: &str = "commit"; +pub const ARG_TAIL: &str = "tail"; +pub const ARG_TARGET_BOOKMARK: &str = "target-bookmark"; +pub const ARG_CATCH_UP_ONCE: &str = "catch-up-once"; +pub const ARG_LOG_TO_SCUBA: &str = "log-to-scuba"; +pub const ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS: &str = "backsync-backpressure-repo-ids"; +pub const ARG_HG_SYNC_BACKPRESSURE: &str = "hg-sync-backpressure"; +pub const ARG_DERIVED_DATA_TYPES: &str = "derived-data-types"; +pub const ARG_SLEEP_SECS: &str = "sleep-secs"; pub const ARG_BOOKMARK_REGEX: &str = "bookmark-regex"; pub fn create_app<'a, 'b>() -> App<'a, 'b> { @@ -68,8 +69,8 @@ pub fn create_app<'a, 'b>() -> App<'a, 'b> { ), ) .arg( - Arg::with_name(ARG_BACKPRESSURE_REPOS_IDS) - .long(ARG_BACKPRESSURE_REPOS_IDS) + Arg::with_name(ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS) + .long(ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS) .takes_value(true) .multiple(true) .required(false) @@ -77,6 +78,12 @@ pub fn create_app<'a, 'b>() -> App<'a, 'b> { "Monitors how many entries to backsync in the queue for other repos and pauses syncing if queue is too large", ), ) + .arg( + Arg::with_name(ARG_HG_SYNC_BACKPRESSURE) + .long(ARG_HG_SYNC_BACKPRESSURE) + .required(false) + .help("Waits until new commits created in the target repo are synced to hg"), + ) .arg( Arg::with_name(ARG_DERIVED_DATA_TYPES) .long(ARG_DERIVED_DATA_TYPES) diff --git a/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/main.rs b/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/main.rs index 9fe350e44e..202bce054a 100644 --- a/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/main.rs +++ b/eden/mononoke/commit_rewriting/mononoke_x_repo_sync_job/src/main.rs @@ -42,6 +42,7 @@ use futures::{ }; use futures_stats::TimedFutureExt; use live_commit_sync_config::{CfgrLiveCommitSyncConfig, LiveCommitSyncConfig}; +use mononoke_hg_sync_job_helper_lib::wait_for_latest_log_id_to_be_synced; use mononoke_types::{ChangesetId, RepositoryId}; use mutable_counters::{MutableCounters, SqlMutableCounters}; use regex::Regex; @@ -57,8 +58,8 @@ mod setup; mod sync; use crate::cli::{ - create_app, ARG_BACKPRESSURE_REPOS_IDS, ARG_BOOKMARK_REGEX, ARG_CATCH_UP_ONCE, - ARG_DERIVED_DATA_TYPES, ARG_ONCE, ARG_TAIL, + create_app, ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS, ARG_BOOKMARK_REGEX, ARG_CATCH_UP_ONCE, + ARG_DERIVED_DATA_TYPES, ARG_HG_SYNC_BACKPRESSURE, ARG_ONCE, ARG_TAIL, }; use crate::reporting::{add_common_fields, log_bookmark_update_result, log_noop_iteration}; use crate::setup::{ @@ -140,7 +141,7 @@ async fn run_in_tailing_mode< target_skiplist_index: Target>, common_pushrebase_bookmarks: HashSet, base_scuba_sample: ScubaSampleBuilder, - backpressure_repos: Vec, + backpressure_params: BackpressureParams, derived_data_types: Vec, tailing_args: TailingArgs, sleep_secs: u64, @@ -157,7 +158,7 @@ async fn run_in_tailing_mode< &common_pushrebase_bookmarks, &source_skiplist_index, &target_skiplist_index, - &backpressure_repos, + &backpressure_params, &derived_data_types, sleep_secs, &maybe_bookmark_regex, @@ -198,7 +199,7 @@ async fn run_in_tailing_mode< &common_pushrebase_bookmarks, &source_skiplist_index, &target_skiplist_index, - &backpressure_repos, + &backpressure_params, &derived_data_types, sleep_secs, &maybe_bookmark_regex, @@ -227,7 +228,7 @@ async fn tail< common_pushrebase_bookmarks: &HashSet, source_skiplist_index: &Source>, target_skiplist_index: &Target>, - backpressure_repos: &[BlobRepo], + backpressure_params: &BackpressureParams, derived_data_types: &[String], sleep_secs: u64, maybe_bookmark_regex: &Option, @@ -305,8 +306,8 @@ async fn tail< maybe_apply_backpressure( ctx, mutable_counters, - backpressure_repos, - commit_syncer.get_target_repo().get_repoid(), + backpressure_params, + commit_syncer.get_target_repo(), scuba_sample.clone(), sleep_secs, ) @@ -327,17 +328,18 @@ async fn tail< async fn maybe_apply_backpressure( ctx: &CoreContext, mutable_counters: &C, - backpressure_repos: &[BlobRepo], - target_repo_id: RepositoryId, + backpressure_params: &BackpressureParams, + target_repo: &BlobRepo, scuba_sample: ScubaSampleBuilder, sleep_secs: u64, ) -> Result<(), Error> where C: MutableCounters + Clone + Sync + 'static, { + let target_repo_id = target_repo.get_repoid(); let limit = 10; loop { - let max_further_entries = stream::iter(backpressure_repos) + let max_further_entries = stream::iter(&backpressure_params.backsync_repos) .map(Ok) .map_ok(|repo| { async move { @@ -382,6 +384,10 @@ where break; } } + + if backpressure_params.wait_for_target_repo_hg_sync { + wait_for_latest_log_id_to_be_synced(ctx, target_repo, mutable_counters, sleep_secs).await?; + } Ok(()) } @@ -468,29 +474,7 @@ async fn run( TailingArgs::LoopForever(commit_syncer_args, config_store) }; - let backpressure_repos_ids = sub_m.values_of(ARG_BACKPRESSURE_REPOS_IDS); - let backpressure_repos = match backpressure_repos_ids { - Some(backpressure_repos_ids) => { - let backpressure_repos = - stream::iter(backpressure_repos_ids.into_iter().map(|repo_id| { - let repo_id = repo_id.parse::()?; - Ok(repo_id) - })) - .map_ok(|repo_id| { - args::open_repo_with_repo_id( - fb, - ctx.logger(), - RepositoryId::new(repo_id), - &matches, - ) - .compat() - }) - .try_buffer_unordered(100) - .try_collect::>(); - backpressure_repos.await? - } - None => vec![], - }; + let backpressure_params = BackpressureParams::new(&ctx, &matches, sub_m).await?; let derived_data_types: Vec = match sub_m.values_of(ARG_DERIVED_DATA_TYPES) { Some(derived_data_types) => derived_data_types @@ -512,7 +496,7 @@ async fn run( target_skiplist_index, common_bookmarks, scuba_sample, - backpressure_repos, + backpressure_params, derived_data_types, tailing_args, sleep_secs, @@ -535,6 +519,48 @@ fn context_and_matches<'a>(fb: FacebookInit, app: App<'a, '_>) -> (CoreContext, (ctx, matches) } +struct BackpressureParams { + backsync_repos: Vec, + wait_for_target_repo_hg_sync: bool, +} + +impl BackpressureParams { + async fn new( + ctx: &CoreContext, + matches: &ArgMatches<'static>, + sub_m: &ArgMatches<'static>, + ) -> Result { + let backsync_repos_ids = sub_m.values_of(ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS); + let backsync_repos = match backsync_repos_ids { + Some(backsync_repos_ids) => { + let backsync_repos = stream::iter(backsync_repos_ids.into_iter().map(|repo_id| { + let repo_id = repo_id.parse::()?; + Ok(repo_id) + })) + .map_ok(|repo_id| { + args::open_repo_with_repo_id( + ctx.fb, + ctx.logger(), + RepositoryId::new(repo_id), + &matches, + ) + .compat() + }) + .try_buffer_unordered(100) + .try_collect::>(); + backsync_repos.await? + } + None => vec![], + }; + + let wait_for_target_repo_hg_sync = sub_m.is_present(ARG_HG_SYNC_BACKPRESSURE); + Ok(Self { + backsync_repos, + wait_for_target_repo_hg_sync, + }) + } +} + #[fbinit::main] fn main(fb: FacebookInit) -> Result<()> { let (ctx, matches) = context_and_matches(fb, create_app()); diff --git a/eden/mononoke/mononoke_hg_sync_job/helper_lib/lib.rs b/eden/mononoke/mononoke_hg_sync_job/helper_lib/lib.rs index 6dcea2891a..80a5dafe3d 100644 --- a/eden/mononoke/mononoke_hg_sync_job/helper_lib/lib.rs +++ b/eden/mononoke/mononoke_hg_sync_job/helper_lib/lib.rs @@ -22,7 +22,7 @@ use futures_old::{ }; use mercurial_bundles::stream_start; use mononoke_types::RawBundle2Id; -use mutable_counters::{MutableCounters, SqlMutableCounters}; +use mutable_counters::MutableCounters; use slog::{info, Logger}; use std::convert::TryInto; use std::io::{Read, Seek, SeekFrom}; @@ -310,12 +310,15 @@ where } /// Wait until all of the entries in the queue have been synced to hg -pub async fn wait_for_latest_log_id_to_be_synced( +pub async fn wait_for_latest_log_id_to_be_synced( ctx: &CoreContext, repo: &BlobRepo, - mutable_counters: &SqlMutableCounters, + mutable_counters: &C, sleep_secs: u64, -) -> Result<(), Error> { +) -> Result<(), Error> +where + C: MutableCounters + Clone + Sync + 'static, +{ let repo_id = repo.get_repoid(); let largest_id = match repo .attribute_expected::()