mirror of
https://github.com/facebook/sapling.git
synced 2024-12-25 22:11:52 +03:00
mononoke: use wait_for_latest_log_id_to_be_synced in x_repo_sync_job
Summary: Previously we were able to add a backpressure to the x-repo-sync job that waits until backsync queue gets empty. However it's also useful to wait until hg sync queue drains for the large repo. This diff makes it possible to do so. Reviewed By: aslpavel Differential Revision: D23728201 fbshipit-source-id: 6b198c8d9c35179169a46f2b804f83838edeff1e
This commit is contained in:
parent
166e54e0c8
commit
f2e0da3af5
@ -18,6 +18,7 @@ cross_repo_sync = { path = "../cross_repo_sync" }
|
||||
derived_data_utils = { path = "../../derived_data/utils" }
|
||||
live_commit_sync_config = { path = "../live_commit_sync_config" }
|
||||
metaconfig_types = { path = "../../metaconfig/types" }
|
||||
mononoke_hg_sync_job_helper_lib = { path = "../../mononoke_hg_sync_job" }
|
||||
mononoke_types = { path = "../../mononoke_types" }
|
||||
mutable_counters = { path = "../../mutable_counters" }
|
||||
reachabilityindex = { path = "../../reachabilityindex" }
|
||||
|
@ -8,15 +8,16 @@
|
||||
use clap::{App, Arg, SubCommand};
|
||||
use cmdlib::args;
|
||||
|
||||
pub const ARG_ONCE: &'static str = "once";
|
||||
pub const ARG_COMMIT: &'static str = "commit";
|
||||
pub const ARG_TAIL: &'static str = "tail";
|
||||
pub const ARG_TARGET_BOOKMARK: &'static str = "target-bookmark";
|
||||
pub const ARG_CATCH_UP_ONCE: &'static str = "catch-up-once";
|
||||
pub const ARG_LOG_TO_SCUBA: &'static str = "log-to-scuba";
|
||||
pub const ARG_BACKPRESSURE_REPOS_IDS: &'static str = "backpressure-repo-ids";
|
||||
pub const ARG_DERIVED_DATA_TYPES: &'static str = "derived-data-types";
|
||||
pub const ARG_SLEEP_SECS: &'static str = "sleep-secs";
|
||||
pub const ARG_ONCE: &str = "once";
|
||||
pub const ARG_COMMIT: &str = "commit";
|
||||
pub const ARG_TAIL: &str = "tail";
|
||||
pub const ARG_TARGET_BOOKMARK: &str = "target-bookmark";
|
||||
pub const ARG_CATCH_UP_ONCE: &str = "catch-up-once";
|
||||
pub const ARG_LOG_TO_SCUBA: &str = "log-to-scuba";
|
||||
pub const ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS: &str = "backsync-backpressure-repo-ids";
|
||||
pub const ARG_HG_SYNC_BACKPRESSURE: &str = "hg-sync-backpressure";
|
||||
pub const ARG_DERIVED_DATA_TYPES: &str = "derived-data-types";
|
||||
pub const ARG_SLEEP_SECS: &str = "sleep-secs";
|
||||
pub const ARG_BOOKMARK_REGEX: &str = "bookmark-regex";
|
||||
|
||||
pub fn create_app<'a, 'b>() -> App<'a, 'b> {
|
||||
@ -68,8 +69,8 @@ pub fn create_app<'a, 'b>() -> App<'a, 'b> {
|
||||
),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_BACKPRESSURE_REPOS_IDS)
|
||||
.long(ARG_BACKPRESSURE_REPOS_IDS)
|
||||
Arg::with_name(ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS)
|
||||
.long(ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS)
|
||||
.takes_value(true)
|
||||
.multiple(true)
|
||||
.required(false)
|
||||
@ -77,6 +78,12 @@ pub fn create_app<'a, 'b>() -> App<'a, 'b> {
|
||||
"Monitors how many entries to backsync in the queue for other repos and pauses syncing if queue is too large",
|
||||
),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_HG_SYNC_BACKPRESSURE)
|
||||
.long(ARG_HG_SYNC_BACKPRESSURE)
|
||||
.required(false)
|
||||
.help("Waits until new commits created in the target repo are synced to hg"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_DERIVED_DATA_TYPES)
|
||||
.long(ARG_DERIVED_DATA_TYPES)
|
||||
|
@ -42,6 +42,7 @@ use futures::{
|
||||
};
|
||||
use futures_stats::TimedFutureExt;
|
||||
use live_commit_sync_config::{CfgrLiveCommitSyncConfig, LiveCommitSyncConfig};
|
||||
use mononoke_hg_sync_job_helper_lib::wait_for_latest_log_id_to_be_synced;
|
||||
use mononoke_types::{ChangesetId, RepositoryId};
|
||||
use mutable_counters::{MutableCounters, SqlMutableCounters};
|
||||
use regex::Regex;
|
||||
@ -57,8 +58,8 @@ mod setup;
|
||||
mod sync;
|
||||
|
||||
use crate::cli::{
|
||||
create_app, ARG_BACKPRESSURE_REPOS_IDS, ARG_BOOKMARK_REGEX, ARG_CATCH_UP_ONCE,
|
||||
ARG_DERIVED_DATA_TYPES, ARG_ONCE, ARG_TAIL,
|
||||
create_app, ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS, ARG_BOOKMARK_REGEX, ARG_CATCH_UP_ONCE,
|
||||
ARG_DERIVED_DATA_TYPES, ARG_HG_SYNC_BACKPRESSURE, ARG_ONCE, ARG_TAIL,
|
||||
};
|
||||
use crate::reporting::{add_common_fields, log_bookmark_update_result, log_noop_iteration};
|
||||
use crate::setup::{
|
||||
@ -140,7 +141,7 @@ async fn run_in_tailing_mode<
|
||||
target_skiplist_index: Target<Arc<SkiplistIndex>>,
|
||||
common_pushrebase_bookmarks: HashSet<BookmarkName>,
|
||||
base_scuba_sample: ScubaSampleBuilder,
|
||||
backpressure_repos: Vec<BlobRepo>,
|
||||
backpressure_params: BackpressureParams,
|
||||
derived_data_types: Vec<String>,
|
||||
tailing_args: TailingArgs<M>,
|
||||
sleep_secs: u64,
|
||||
@ -157,7 +158,7 @@ async fn run_in_tailing_mode<
|
||||
&common_pushrebase_bookmarks,
|
||||
&source_skiplist_index,
|
||||
&target_skiplist_index,
|
||||
&backpressure_repos,
|
||||
&backpressure_params,
|
||||
&derived_data_types,
|
||||
sleep_secs,
|
||||
&maybe_bookmark_regex,
|
||||
@ -198,7 +199,7 @@ async fn run_in_tailing_mode<
|
||||
&common_pushrebase_bookmarks,
|
||||
&source_skiplist_index,
|
||||
&target_skiplist_index,
|
||||
&backpressure_repos,
|
||||
&backpressure_params,
|
||||
&derived_data_types,
|
||||
sleep_secs,
|
||||
&maybe_bookmark_regex,
|
||||
@ -227,7 +228,7 @@ async fn tail<
|
||||
common_pushrebase_bookmarks: &HashSet<BookmarkName>,
|
||||
source_skiplist_index: &Source<Arc<SkiplistIndex>>,
|
||||
target_skiplist_index: &Target<Arc<SkiplistIndex>>,
|
||||
backpressure_repos: &[BlobRepo],
|
||||
backpressure_params: &BackpressureParams,
|
||||
derived_data_types: &[String],
|
||||
sleep_secs: u64,
|
||||
maybe_bookmark_regex: &Option<Regex>,
|
||||
@ -305,8 +306,8 @@ async fn tail<
|
||||
maybe_apply_backpressure(
|
||||
ctx,
|
||||
mutable_counters,
|
||||
backpressure_repos,
|
||||
commit_syncer.get_target_repo().get_repoid(),
|
||||
backpressure_params,
|
||||
commit_syncer.get_target_repo(),
|
||||
scuba_sample.clone(),
|
||||
sleep_secs,
|
||||
)
|
||||
@ -327,17 +328,18 @@ async fn tail<
|
||||
async fn maybe_apply_backpressure<C>(
|
||||
ctx: &CoreContext,
|
||||
mutable_counters: &C,
|
||||
backpressure_repos: &[BlobRepo],
|
||||
target_repo_id: RepositoryId,
|
||||
backpressure_params: &BackpressureParams,
|
||||
target_repo: &BlobRepo,
|
||||
scuba_sample: ScubaSampleBuilder,
|
||||
sleep_secs: u64,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
C: MutableCounters + Clone + Sync + 'static,
|
||||
{
|
||||
let target_repo_id = target_repo.get_repoid();
|
||||
let limit = 10;
|
||||
loop {
|
||||
let max_further_entries = stream::iter(backpressure_repos)
|
||||
let max_further_entries = stream::iter(&backpressure_params.backsync_repos)
|
||||
.map(Ok)
|
||||
.map_ok(|repo| {
|
||||
async move {
|
||||
@ -382,6 +384,10 @@ where
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if backpressure_params.wait_for_target_repo_hg_sync {
|
||||
wait_for_latest_log_id_to_be_synced(ctx, target_repo, mutable_counters, sleep_secs).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -468,29 +474,7 @@ async fn run(
|
||||
TailingArgs::LoopForever(commit_syncer_args, config_store)
|
||||
};
|
||||
|
||||
let backpressure_repos_ids = sub_m.values_of(ARG_BACKPRESSURE_REPOS_IDS);
|
||||
let backpressure_repos = match backpressure_repos_ids {
|
||||
Some(backpressure_repos_ids) => {
|
||||
let backpressure_repos =
|
||||
stream::iter(backpressure_repos_ids.into_iter().map(|repo_id| {
|
||||
let repo_id = repo_id.parse::<i32>()?;
|
||||
Ok(repo_id)
|
||||
}))
|
||||
.map_ok(|repo_id| {
|
||||
args::open_repo_with_repo_id(
|
||||
fb,
|
||||
ctx.logger(),
|
||||
RepositoryId::new(repo_id),
|
||||
&matches,
|
||||
)
|
||||
.compat()
|
||||
})
|
||||
.try_buffer_unordered(100)
|
||||
.try_collect::<Vec<_>>();
|
||||
backpressure_repos.await?
|
||||
}
|
||||
None => vec![],
|
||||
};
|
||||
let backpressure_params = BackpressureParams::new(&ctx, &matches, sub_m).await?;
|
||||
|
||||
let derived_data_types: Vec<String> = match sub_m.values_of(ARG_DERIVED_DATA_TYPES) {
|
||||
Some(derived_data_types) => derived_data_types
|
||||
@ -512,7 +496,7 @@ async fn run(
|
||||
target_skiplist_index,
|
||||
common_bookmarks,
|
||||
scuba_sample,
|
||||
backpressure_repos,
|
||||
backpressure_params,
|
||||
derived_data_types,
|
||||
tailing_args,
|
||||
sleep_secs,
|
||||
@ -535,6 +519,48 @@ fn context_and_matches<'a>(fb: FacebookInit, app: App<'a, '_>) -> (CoreContext,
|
||||
(ctx, matches)
|
||||
}
|
||||
|
||||
struct BackpressureParams {
|
||||
backsync_repos: Vec<BlobRepo>,
|
||||
wait_for_target_repo_hg_sync: bool,
|
||||
}
|
||||
|
||||
impl BackpressureParams {
|
||||
async fn new(
|
||||
ctx: &CoreContext,
|
||||
matches: &ArgMatches<'static>,
|
||||
sub_m: &ArgMatches<'static>,
|
||||
) -> Result<Self, Error> {
|
||||
let backsync_repos_ids = sub_m.values_of(ARG_BACKSYNC_BACKPRESSURE_REPOS_IDS);
|
||||
let backsync_repos = match backsync_repos_ids {
|
||||
Some(backsync_repos_ids) => {
|
||||
let backsync_repos = stream::iter(backsync_repos_ids.into_iter().map(|repo_id| {
|
||||
let repo_id = repo_id.parse::<i32>()?;
|
||||
Ok(repo_id)
|
||||
}))
|
||||
.map_ok(|repo_id| {
|
||||
args::open_repo_with_repo_id(
|
||||
ctx.fb,
|
||||
ctx.logger(),
|
||||
RepositoryId::new(repo_id),
|
||||
&matches,
|
||||
)
|
||||
.compat()
|
||||
})
|
||||
.try_buffer_unordered(100)
|
||||
.try_collect::<Vec<_>>();
|
||||
backsync_repos.await?
|
||||
}
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
let wait_for_target_repo_hg_sync = sub_m.is_present(ARG_HG_SYNC_BACKPRESSURE);
|
||||
Ok(Self {
|
||||
backsync_repos,
|
||||
wait_for_target_repo_hg_sync,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[fbinit::main]
|
||||
fn main(fb: FacebookInit) -> Result<()> {
|
||||
let (ctx, matches) = context_and_matches(fb, create_app());
|
||||
|
@ -22,7 +22,7 @@ use futures_old::{
|
||||
};
|
||||
use mercurial_bundles::stream_start;
|
||||
use mononoke_types::RawBundle2Id;
|
||||
use mutable_counters::{MutableCounters, SqlMutableCounters};
|
||||
use mutable_counters::MutableCounters;
|
||||
use slog::{info, Logger};
|
||||
use std::convert::TryInto;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
@ -310,12 +310,15 @@ where
|
||||
}
|
||||
|
||||
/// Wait until all of the entries in the queue have been synced to hg
|
||||
pub async fn wait_for_latest_log_id_to_be_synced(
|
||||
pub async fn wait_for_latest_log_id_to_be_synced<C>(
|
||||
ctx: &CoreContext,
|
||||
repo: &BlobRepo,
|
||||
mutable_counters: &SqlMutableCounters,
|
||||
mutable_counters: &C,
|
||||
sleep_secs: u64,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
C: MutableCounters + Clone + Sync + 'static,
|
||||
{
|
||||
let repo_id = repo.get_repoid();
|
||||
let largest_id = match repo
|
||||
.attribute_expected::<dyn BookmarkUpdateLog>()
|
||||
|
Loading…
Reference in New Issue
Block a user