cross_repo_sync: get rid of CommitSyncerArgs

Summary:
`CommitSyncerArgs` was useful when `CommitSyncer` did not have a way to query
existing configs from the configerator. Now that `SyncDataProvider` is part of
`CommitSyncer`, we can get rid of `CommitSyncerArgs`, which will also make
further improvements more convenient.

Reviewed By: StanislavGlebik

Differential Revision: D24565773

fbshipit-source-id: 4dd507b06d946c6018a4a4e8d5e77c6b27abe195
This commit is contained in:
Kostia Balytskyi 2020-10-27 16:58:44 -07:00 committed by Facebook GitHub Bot
parent 0163f9dcf1
commit e606572eb3
12 changed files with 187 additions and 295 deletions

View File

@ -10,10 +10,13 @@ include = ["src/**/*.rs"]
blobrepo = { path = "../../blobrepo" }
blobrepo_factory = { path = "../../blobrepo/factory" }
cmdlib = { path = ".." }
context = { path = "../../server/context" }
cross_repo_sync = { path = "../../commit_rewriting/cross_repo_sync" }
live_commit_sync_config = { path = "../../commit_rewriting/live_commit_sync_config" }
metaconfig_types = { path = "../../metaconfig/types" }
sql_ext = { path = "../../common/rust/sql_ext" }
synced_commit_mapping = { path = "../../commit_rewriting/synced_commit_mapping" }
cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
clap = "2.33"

View File

@ -9,52 +9,85 @@
#![deny(warnings)]
use anyhow::{format_err, Error};
use anyhow::{bail, Error};
use blobrepo::BlobRepo;
use blobrepo_factory::ReadOnlyStorage;
use clap::ArgMatches;
use cmdlib::{args, helpers::open_sql_with_config_and_mysql_options};
use cross_repo_sync::CommitSyncerArgs;
use fbinit::FacebookInit;
use context::CoreContext;
use cross_repo_sync::{
create_commit_syncers,
types::{Source, Target},
CommitSyncRepos, CommitSyncer, Syncers,
};
use futures::compat::Future01CompatExt;
use futures_util::try_join;
use metaconfig_types::{CommitSyncConfig, RepoConfig};
use slog::Logger;
use sql_ext::facebook::MysqlOptions;
use live_commit_sync_config::{CfgrLiveCommitSyncConfig, LiveCommitSyncConfig};
use std::sync::Arc;
use synced_commit_mapping::SqlSyncedCommitMapping;
pub async fn create_commit_syncer_args_from_matches(
fb: FacebookInit,
logger: &Logger,
/// Instantiate the `Syncers` struct by parsing `matches`
pub async fn create_commit_syncers_from_matches(
ctx: &CoreContext,
matches: &ArgMatches<'_>,
) -> Result<CommitSyncerArgs<SqlSyncedCommitMapping>, Error> {
let (args, _): (CommitSyncerArgs<SqlSyncedCommitMapping>, CommitSyncConfig) =
create_commit_syncer_args_and_config_from_matches_impl(
fb, logger, matches, false, /*reverse*/
)
.await?;
Ok(args)
) -> Result<Syncers<SqlSyncedCommitMapping>, Error> {
let (source_repo, target_repo, mapping, live_commit_sync_config) =
get_things_from_matches(ctx, matches).await?;
let current_config =
live_commit_sync_config.get_current_commit_sync_config(ctx, source_repo.0.get_repoid())?;
let large_repo_id = current_config.large_repo_id;
let source_repo_id = source_repo.0.get_repoid();
let target_repo_id = target_repo.0.get_repoid();
let (small_repo, large_repo) = if large_repo_id == source_repo_id {
(target_repo.0, source_repo.0)
} else if large_repo_id == target_repo_id {
(source_repo.0, target_repo.0)
} else {
bail!(
"Unexpectedly CommitSyncConfig {:?} has neither of {}, {} as a large repo",
current_config,
source_repo_id,
target_repo_id
);
};
create_commit_syncers(small_repo, large_repo, mapping, live_commit_sync_config)
}
pub async fn create_reverse_commit_syncer_args_from_matches(
fb: FacebookInit,
logger: &Logger,
/// Instantiate the source-target `CommitSyncer` struct by parsing `matches`
pub async fn create_commit_syncer_from_matches(
ctx: &CoreContext,
matches: &ArgMatches<'_>,
) -> Result<CommitSyncerArgs<SqlSyncedCommitMapping>, Error> {
let (args, _): (CommitSyncerArgs<SqlSyncedCommitMapping>, CommitSyncConfig) =
create_commit_syncer_args_and_config_from_matches_impl(
fb, logger, matches, true, /*reverse*/
)
.await?;
Ok(args)
) -> Result<CommitSyncer<SqlSyncedCommitMapping>, Error> {
create_commit_syncer_from_matches_impl(ctx, matches, false /* reverse */).await
}
async fn create_commit_syncer_args_and_config_from_matches_impl(
fb: FacebookInit,
logger: &Logger,
/// Instantiate the target-source `CommitSyncer` struct by parsing `matches`
pub async fn create_reverse_commit_syncer_from_matches(
ctx: &CoreContext,
matches: &ArgMatches<'_>,
reverse: bool,
) -> Result<(CommitSyncerArgs<SqlSyncedCommitMapping>, CommitSyncConfig), Error> {
) -> Result<CommitSyncer<SqlSyncedCommitMapping>, Error> {
create_commit_syncer_from_matches_impl(ctx, matches, true /* reverse */).await
}
/// Instantiate some auxiliary things from `matches`
/// Naming is hard.
async fn get_things_from_matches(
ctx: &CoreContext,
matches: &ArgMatches<'_>,
) -> Result<
(
Source<BlobRepo>,
Target<BlobRepo>,
SqlSyncedCommitMapping,
Arc<dyn LiveCommitSyncConfig>,
),
Error,
> {
let fb = ctx.fb;
let logger = ctx.logger();
let config_store = args::init_config_store(fb, logger, matches)?;
let source_repo_id = args::get_source_repo_id(config_store, &matches)?;
let target_repo_id = args::get_target_repo_id(config_store, &matches)?;
@ -63,61 +96,80 @@ async fn create_commit_syncer_args_and_config_from_matches_impl(
args::get_config_by_repoid(config_store, &matches, source_repo_id)?;
let (_, target_repo_config) =
args::get_config_by_repoid(config_store, &matches, target_repo_id)?;
let source_repo_fut = args::open_repo_with_repo_id(fb, logger, source_repo_id, &matches);
let target_repo_fut = args::open_repo_with_repo_id(fb, logger, target_repo_id, &matches);
let (source_repo, target_repo) = try_join!(source_repo_fut.compat(), target_repo_fut.compat())?;
let mysql_options = args::parse_mysql_options(&matches);
let readonly_storage = args::parse_readonly_storage(&matches);
if reverse {
create_commit_syncer_and_config(
fb,
(target_repo, target_repo_config),
(source_repo, source_repo_config),
mysql_options,
readonly_storage,
)
.await
} else {
create_commit_syncer_and_config(
fb,
(source_repo, source_repo_config),
(target_repo, target_repo_config),
mysql_options,
readonly_storage,
)
.await
}
}
async fn create_commit_syncer_and_config<'a>(
fb: FacebookInit,
(source_repo, source_config): (BlobRepo, RepoConfig),
(target_repo, target_config): (BlobRepo, RepoConfig),
mysql_options: MysqlOptions,
readonly_storage: ReadOnlyStorage,
) -> Result<(CommitSyncerArgs<SqlSyncedCommitMapping>, CommitSyncConfig), Error> {
if source_config.storage_config.metadata != target_config.storage_config.metadata {
if source_repo_config.storage_config.metadata != target_repo_config.storage_config.metadata {
return Err(Error::msg(
"source repo and target repo have different metadata database configs!",
));
}
let mysql_options = args::parse_mysql_options(&matches);
let readonly_storage = args::parse_readonly_storage(&matches);
let mapping = open_sql_with_config_and_mysql_options::<SqlSyncedCommitMapping>(
fb,
source_config.storage_config.metadata.clone(),
ctx.fb,
source_repo_config.storage_config.metadata.clone(),
mysql_options,
readonly_storage,
)
.compat()
.await?;
let commit_sync_config = source_config
.commit_sync_config
.ok_or_else(|| format_err!("missing CommitSyncMapping config"))?;
let source_repo_fut = args::open_repo_with_repo_id(fb, logger, source_repo_id, &matches);
let target_repo_fut = args::open_repo_with_repo_id(fb, logger, target_repo_id, &matches);
let commit_syncer_args = CommitSyncerArgs::new(source_repo, target_repo, mapping);
Ok((commit_syncer_args, commit_sync_config))
let (source_repo, target_repo) = try_join!(source_repo_fut.compat(), target_repo_fut.compat())?;
let live_commit_sync_config: Arc<dyn LiveCommitSyncConfig> =
Arc::new(CfgrLiveCommitSyncConfig::new(&ctx.logger(), config_store)?);
Ok((
Source(source_repo),
Target(target_repo),
mapping,
live_commit_sync_config,
))
}
fn flip_direction<T>(source_item: Source<T>, target_item: Target<T>) -> (Source<T>, Target<T>) {
(Source(target_item.0), Target(source_item.0))
}
async fn create_commit_syncer_from_matches_impl(
ctx: &CoreContext,
matches: &ArgMatches<'_>,
reverse: bool,
) -> Result<CommitSyncer<SqlSyncedCommitMapping>, Error> {
let (source_repo, target_repo, mapping, live_commit_sync_config) =
get_things_from_matches(ctx, matches).await?;
let (source_repo, target_repo) = if reverse {
flip_direction(source_repo, target_repo)
} else {
(source_repo, target_repo)
};
create_commit_syncer(
ctx,
source_repo,
target_repo,
mapping,
live_commit_sync_config,
)
.await
}
async fn create_commit_syncer<'a>(
ctx: &'a CoreContext,
source_repo: Source<BlobRepo>,
target_repo: Target<BlobRepo>,
mapping: SqlSyncedCommitMapping,
live_commit_sync_config: Arc<dyn LiveCommitSyncConfig>,
) -> Result<CommitSyncer<SqlSyncedCommitMapping>, Error> {
let current_config =
live_commit_sync_config.get_current_commit_sync_config(ctx, source_repo.0.get_repoid())?;
let repos = CommitSyncRepos::new(source_repo.0, target_repo.0, &current_config)?;
let commit_syncer = CommitSyncer::new(mapping, repos, live_commit_sync_config);
Ok(commit_syncer)
}

View File

@ -80,6 +80,7 @@ pub async fn backsync_latest<M>(
where
M: SyncedCommitMapping + Clone + 'static,
{
// TODO(ikostia): start borrowing `CommitSyncer`, no reason to consume it
let TargetRepoDbs { ref counters, .. } = target_repo_dbs;
let target_repo_id = commit_syncer.get_target_repo().get_repoid();
let source_repo_id = commit_syncer.get_source_repo().get_repoid();

View File

@ -16,9 +16,9 @@ use bookmarks::Freshness;
use clap::{Arg, SubCommand};
use cloned::cloned;
use cmdlib::{args, monitoring};
use cmdlib_x_repo::create_commit_syncer_args_from_matches;
use cmdlib_x_repo::create_commit_syncer_from_matches;
use context::{CoreContext, SessionContainer};
use cross_repo_sync::{CandidateSelectionHint, CommitSyncOutcome, CommitSyncer, CommitSyncerArgs};
use cross_repo_sync::{CandidateSelectionHint, CommitSyncOutcome, CommitSyncer};
use fbinit::FacebookInit;
use futures::{
compat::Future01CompatExt,
@ -102,7 +102,7 @@ async fn derive_target_hg_changesets(
pub async fn backsync_forever<M>(
ctx: CoreContext,
commit_syncer_args: CommitSyncerArgs<M>,
commit_syncer: CommitSyncer<M>,
target_repo_dbs: TargetRepoDbs,
source_repo_name: String,
target_repo_name: String,
@ -111,32 +111,26 @@ pub async fn backsync_forever<M>(
where
M: SyncedCommitMapping + Clone + 'static,
{
let target_repo_id = commit_syncer_args.get_target_repo_id();
let target_repo_id = commit_syncer.get_target_repo_id();
let live_commit_sync_config = Arc::new(live_commit_sync_config);
loop {
// We only care about public pushes because draft pushes are not in the bookmark
// update log at all.
let enabled = live_commit_sync_config.push_redirector_enabled_for_public(target_repo_id);
if enabled {
let delay = calculate_delay(&ctx, &commit_syncer_args, &target_repo_dbs).await?;
let delay = calculate_delay(&ctx, &commit_syncer, &target_repo_dbs).await?;
log_delay(&ctx, &delay, &source_repo_name, &target_repo_name);
if delay.remaining_entries == 0 {
debug!(ctx.logger(), "no entries remained");
tokio::time::delay_for(Duration::new(1, 0)).await;
} else {
debug!(ctx.logger(), "backsyncing...");
let commit_sync_config =
live_commit_sync_config.get_current_commit_sync_config(&ctx, target_repo_id)?;
let commit_syncer = commit_syncer_args
.clone()
.try_into_commit_syncer(&commit_sync_config, live_commit_sync_config.clone())?;
backsync_latest(
ctx.clone(),
commit_syncer,
commit_syncer.clone(),
target_repo_dbs.clone(),
BacksyncLimit::NoLimit,
)
@ -168,15 +162,15 @@ impl Delay {
// Returns logs delay and returns the number of remaining bookmark update log entries
async fn calculate_delay<M>(
ctx: &CoreContext,
commit_syncer_args: &CommitSyncerArgs<M>,
commit_syncer: &CommitSyncer<M>,
target_repo_dbs: &TargetRepoDbs,
) -> Result<Delay, Error>
where
M: SyncedCommitMapping + Clone + 'static,
{
let TargetRepoDbs { ref counters, .. } = target_repo_dbs;
let target_repo_id = commit_syncer_args.get_target_repo().get_repoid();
let source_repo_id = commit_syncer_args.get_source_repo().get_repoid();
let target_repo_id = commit_syncer.get_target_repo().get_repoid();
let source_repo_id = commit_syncer.get_source_repo().get_repoid();
let counter_name = format_counter(&source_repo_id);
let maybe_counter = counters
@ -184,7 +178,7 @@ where
.compat()
.await?;
let counter = maybe_counter.ok_or(format_err!("{} counter not found", counter_name))?;
let source_repo = commit_syncer_args.get_source_repo();
let source_repo = commit_syncer.get_source_repo();
let next_entry = source_repo
.read_next_bookmark_log_entries(ctx.clone(), counter as u64, 1, Freshness::MostRecent)
.collect()
@ -267,14 +261,16 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
let (target_repo_name, target_repo_config) =
args::get_config_by_repoid(config_store, &matches, target_repo_id)?;
let commit_syncer_args = runtime.block_on_std(create_commit_syncer_args_from_matches(
fb, &logger, &matches,
))?;
let session_container = SessionContainer::new_with_defaults(fb);
let commit_syncer = {
let scuba_sample = ScubaSampleBuilder::with_discard();
let ctx = session_container.new_context(logger.clone(), scuba_sample);
runtime.block_on_std(create_commit_syncer_from_matches(&ctx, &matches))?
};
let mysql_options = args::parse_mysql_options(&matches);
let readonly_storage = args::parse_readonly_storage(&matches);
let session_container = SessionContainer::new_with_defaults(fb);
info!(
logger,
"syncing from repoid {:?} into repoid {:?}", source_repo_id, target_repo_id,
@ -287,10 +283,6 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
(ARG_MODE_BACKSYNC_ALL, _) => {
let scuba_sample = ScubaSampleBuilder::with_discard();
let ctx = session_container.new_context(logger.clone(), scuba_sample);
let commit_sync_config =
live_commit_sync_config.get_current_commit_sync_config(&ctx, target_repo_id)?;
let commit_syncer = commit_syncer_args
.try_into_commit_syncer(&commit_sync_config, Arc::new(live_commit_sync_config))?;
let db_config = target_repo_config.storage_config.metadata;
let target_repo_dbs = runtime.block_on_std(
@ -304,6 +296,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
.boxed(),
)?;
// TODO(ikostia): why do we use discarding ScubaSample for BACKSYNC_ALL?
runtime.block_on_std(
backsync_latest(ctx, commit_syncer, target_repo_dbs, BacksyncLimit::NoLimit)
.boxed(),
@ -316,7 +309,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
let target_repo_dbs = runtime.block_on_std(
open_backsyncer_dbs(
ctx,
commit_syncer_args.get_target_repo().clone(),
commit_syncer.get_target_repo().clone(),
db_config,
mysql_options,
readonly_storage,
@ -334,7 +327,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
let ctx = session_container.new_context(logger.clone(), scuba_sample);
let f = backsync_forever(
ctx,
commit_syncer_args,
commit_syncer,
target_repo_dbs,
source_repo_name,
target_repo_name,
@ -354,10 +347,6 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
}
(ARG_MODE_BACKSYNC_COMMITS, Some(sub_m)) => {
let ctx = session_container.new_context(logger, ScubaSampleBuilder::with_discard());
let commit_sync_config =
live_commit_sync_config.get_current_commit_sync_config(&ctx, target_repo_id)?;
let commit_syncer = commit_syncer_args
.try_into_commit_syncer(&commit_sync_config, Arc::new(live_commit_sync_config))?;
let inputfile = sub_m
.value_of(ARG_INPUT_FILE)

View File

@ -10,28 +10,24 @@
use anyhow::{format_err, Error};
use bookmarks::{BookmarkName, Freshness};
use cached_config::ConfigStore;
use clap::ArgMatches;
use cmdlib::{args, helpers, monitoring};
use cmdlib_x_repo::{
create_commit_syncer_args_from_matches, create_reverse_commit_syncer_args_from_matches,
};
use cmdlib_x_repo::create_commit_syncers_from_matches;
use context::{CoreContext, SessionContainer};
use cross_repo_sync::{
validation::{self, BookmarkDiff},
CommitSyncOutcome, CommitSyncer, CommitSyncerArgs, Syncers,
CommitSyncOutcome, CommitSyncer, Syncers,
};
use fbinit::FacebookInit;
use futures::{compat::Future01CompatExt, future};
use futures_old::Stream;
use live_commit_sync_config::CONFIGERATOR_PUSHREDIRECT_ENABLE;
use live_commit_sync_config::{CfgrLiveCommitSyncConfig, LiveCommitSyncConfig};
use mononoke_types::ChangesetId;
use pushredirect_enable::types::MononokePushRedirectEnable;
use scuba_ext::ScubaSampleBuilder;
use slog::{error, info, Logger};
use stats::prelude::*;
use std::{sync::Arc, time::Duration};
use synced_commit_mapping::{SqlSyncedCommitMapping, SyncedCommitMapping};
use synced_commit_mapping::SyncedCommitMapping;
define_stats! {
prefix = "mononoke.bookmark_validator";
@ -51,34 +47,16 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
let matches = app.get_matches();
let (_, logger, mut runtime) = args::init_mononoke(fb, &matches, None)?;
let ctx = create_core_context(fb, logger.clone());
let large_to_small_commit_syncer_args = runtime.block_on_std(
create_commit_syncer_args_from_matches(ctx.fb, &logger, &matches),
)?;
let large_to_small_commit_syncer =
get_commit_syncer(&ctx, &logger, &matches, large_to_small_commit_syncer_args)?;
if large_to_small_commit_syncer.get_source_repo().get_repoid()
!= large_to_small_commit_syncer.get_large_repo().get_repoid()
{
return Err(format_err!("Source repo must be a large repo!"));
}
let config_store = args::init_config_store(fb, &logger, &matches)?;
let source_repo_id = args::get_source_repo_id(config_store, &matches)?;
// Backsyncer works in large -> small direction, however
// for bookmarks vaidator it's simpler to have commits syncer in small -> large direction
// Hence here we are creating a reverse syncer
let small_to_large_commit_syncer_args = runtime.block_on_std(
create_reverse_commit_syncer_args_from_matches(ctx.fb, &logger, &matches),
)?;
let small_to_large_commit_syncer =
get_commit_syncer(&ctx, &logger, &matches, small_to_large_commit_syncer_args)?;
let syncers = Syncers {
large_to_small: large_to_small_commit_syncer,
small_to_large: small_to_large_commit_syncer,
};
let config_store = args::init_config_store(fb, &logger, &matches)?;
let syncers = runtime.block_on_std(create_commit_syncers_from_matches(&ctx, &matches))?;
if syncers.large_to_small.get_large_repo().get_repoid() != source_repo_id {
return Err(format_err!("Source repo must be a large repo!"));
}
helpers::block_execute(
loop_forever(ctx, syncers, config_store),
@ -96,20 +74,6 @@ fn create_core_context(fb: FacebookInit, logger: Logger) -> CoreContext {
session_container.new_context(logger, scuba_sample)
}
fn get_commit_syncer<'a>(
ctx: &CoreContext,
logger: &Logger,
matches: &ArgMatches<'a>,
commit_syncer_args: CommitSyncerArgs<SqlSyncedCommitMapping>,
) -> Result<CommitSyncer<SqlSyncedCommitMapping>, Error> {
let config_store = args::init_config_store(ctx.fb, logger, &matches)?;
let target_repo_id = args::get_target_repo_id(config_store, &matches)?;
let live_commit_sync_config = Arc::new(CfgrLiveCommitSyncConfig::new(&logger, config_store)?);
let commit_sync_config =
live_commit_sync_config.get_current_commit_sync_config(&ctx, target_repo_id)?;
commit_syncer_args.try_into_commit_syncer(&commit_sync_config, live_commit_sync_config)
}
async fn loop_forever<M: SyncedCommitMapping + Clone + 'static>(
ctx: CoreContext,
syncers: Syncers<M>,

View File

@ -1,86 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Error;
use blobrepo::BlobRepo;
use live_commit_sync_config::LiveCommitSyncConfig;
use metaconfig_types::CommitSyncConfig;
use mononoke_types::RepositoryId;
use std::fmt;
use std::sync::Arc;
use synced_commit_mapping::SyncedCommitMapping;
use crate::{CommitSyncRepos, CommitSyncer};
/// An auxillary struct to hold data necessary for `CommitSyncer` instantiation
#[derive(Clone)]
pub struct CommitSyncerArgs<T> {
source_repo: BlobRepo,
target_repo: BlobRepo,
mapping: T,
}
impl<T: SyncedCommitMapping + Clone + 'static> CommitSyncerArgs<T> {
pub fn new(source_repo: BlobRepo, target_repo: BlobRepo, mapping: T) -> Self {
Self {
source_repo,
target_repo,
mapping,
}
}
pub fn get_source_repo(&self) -> &BlobRepo {
&self.source_repo
}
pub fn get_target_repo(&self) -> &BlobRepo {
&self.target_repo
}
pub fn get_target_repo_id(&self) -> RepositoryId {
self.target_repo.get_repoid()
}
pub fn get_source_repo_id(&self) -> RepositoryId {
self.source_repo.get_repoid()
}
pub fn try_into_commit_syncer(
self,
commit_sync_config: &CommitSyncConfig,
// TODO(stash): remove commit_sync_config and use just live_commit_sync_config
live_commit_sync_config: Arc<dyn LiveCommitSyncConfig>,
) -> Result<CommitSyncer<T>, Error> {
let Self {
source_repo,
target_repo,
mapping,
} = self;
let commit_sync_repos = CommitSyncRepos::new(source_repo, target_repo, commit_sync_config)?;
Ok(CommitSyncer::new(
mapping,
commit_sync_repos,
live_commit_sync_config,
))
}
}
impl<M> fmt::Debug for CommitSyncerArgs<M>
where
M: SyncedCommitMapping + Clone + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let source_repo_id = self.source_repo.get_repoid();
let target_repo_id = self.target_repo.get_repoid();
write!(
f,
"CommitSyncerArgs{{{}->{}}}",
source_repo_id, target_repo_id
)
}
}

View File

@ -45,14 +45,12 @@ use synced_commit_mapping::{
use thiserror::Error;
use topo_sort::sort_topological;
pub use commit_syncer_args::CommitSyncerArgs;
use merge_utils::get_version_for_merge;
use pushrebase_hook::CrossRepoSyncPushrebaseHook;
use types::{Source, Target};
mod commit_sync_data_provider;
pub mod commit_sync_outcome;
mod commit_syncer_args;
mod merge_utils;
mod pushrebase_hook;
pub mod types;

View File

@ -14,7 +14,7 @@ use borrowed::borrowed;
use cached_config::ConfigStore;
use clap::ArgMatches;
use cmdlib::{args, helpers};
use cmdlib_x_repo::create_commit_syncer_args_from_matches;
use cmdlib_x_repo::create_commit_syncer_from_matches;
use context::CoreContext;
use cross_repo_sync::CommitSyncer;
use cross_repo_sync::{
@ -27,7 +27,7 @@ use futures::{
future::{try_join, try_join3, try_join_all},
Stream, StreamExt, TryStreamExt,
};
use live_commit_sync_config::{CfgrLiveCommitSyncConfig, LiveCommitSyncConfig};
use live_commit_sync_config::CfgrLiveCommitSyncConfig;
use metaconfig_types::RepoConfig;
use metaconfig_types::{CommitSyncConfigVersion, MetadataDatabaseConfig};
use mononoke_types::{MPath, RepositoryId};
@ -432,7 +432,7 @@ async fn run_manual_commit_sync<'a>(
matches: &ArgMatches<'a>,
sub_m: &ArgMatches<'a>,
) -> Result<(), Error> {
let commit_syncer = get_commit_syncer(&ctx, matches).await?;
let commit_syncer = create_commit_syncer_from_matches(&ctx, matches).await?;
let target_repo = commit_syncer.get_target_repo();
let target_repo_parents = sub_m.values_of(PARENTS);
@ -477,15 +477,7 @@ async fn run_check_push_redirection_prereqs<'a>(
matches: &ArgMatches<'a>,
sub_m: &ArgMatches<'a>,
) -> Result<(), Error> {
let config_store = args::init_config_store(ctx.fb, ctx.logger(), &matches)?;
let target_repo_id = args::get_target_repo_id(config_store, &matches)?;
let live_commit_sync_config = CfgrLiveCommitSyncConfig::new(ctx.logger(), config_store)?;
let commit_syncer_args =
create_commit_syncer_args_from_matches(ctx.fb, ctx.logger(), &matches).await?;
let commit_sync_config =
live_commit_sync_config.get_current_commit_sync_config(&ctx, target_repo_id)?;
let commit_syncer = commit_syncer_args
.try_into_commit_syncer(&commit_sync_config, Arc::new(live_commit_sync_config))?;
let commit_syncer = create_commit_syncer_from_matches(&ctx, &matches).await?;
let target_repo = commit_syncer.get_target_repo();
let source_repo = commit_syncer.get_source_repo();
@ -607,7 +599,7 @@ async fn run_mover<'a>(
matches: &ArgMatches<'a>,
sub_m: &ArgMatches<'a>,
) -> Result<(), Error> {
let commit_syncer = get_commit_syncer(&ctx, matches).await?;
let commit_syncer = create_commit_syncer_from_matches(&ctx, matches).await?;
let version = get_version(sub_m)?;
let mover = commit_syncer.get_mover_by_version(&version)?;
let path = sub_m
@ -654,7 +646,7 @@ async fn run_mark_not_synced<'a>(
matches: &ArgMatches<'a>,
sub_m: &ArgMatches<'a>,
) -> Result<(), Error> {
let commit_syncer = get_commit_syncer(&ctx, matches).await?;
let commit_syncer = create_commit_syncer_from_matches(&ctx, matches).await?;
let small_repo = commit_syncer.get_small_repo();
let large_repo = commit_syncer.get_large_repo();
@ -722,7 +714,7 @@ async fn run_backfill_noop_mapping<'a>(
matches: &ArgMatches<'a>,
sub_m: &ArgMatches<'a>,
) -> Result<(), Error> {
let commit_syncer = get_commit_syncer(&ctx, matches).await?;
let commit_syncer = create_commit_syncer_from_matches(&ctx, matches).await?;
let small_repo = commit_syncer.get_small_repo();
let large_repo = commit_syncer.get_large_repo();
@ -862,21 +854,6 @@ async fn process_stream_and_wait_for_replication<'a>(
Ok(())
}
async fn get_commit_syncer(
ctx: &CoreContext,
matches: &ArgMatches<'_>,
) -> Result<CommitSyncer<SqlSyncedCommitMapping>> {
let config_store = args::init_config_store(ctx.fb, ctx.logger(), matches)?;
let target_repo_id = args::get_target_repo_id(config_store, &matches)?;
let live_commit_sync_config = CfgrLiveCommitSyncConfig::new(ctx.logger(), config_store)?;
let commit_syncer_args =
create_commit_syncer_args_from_matches(ctx.fb, ctx.logger(), &matches).await?;
let commit_sync_config =
live_commit_sync_config.get_current_commit_sync_config(&ctx, target_repo_id)?;
commit_syncer_args
.try_into_commit_syncer(&commit_sync_config, Arc::new(live_commit_sync_config))
}
fn get_version(matches: &ArgMatches<'_>) -> Result<CommitSyncConfigVersion> {
Ok(CommitSyncConfigVersion(
matches

View File

@ -25,11 +25,11 @@ use bookmarks::{BookmarkName, BookmarkUpdateLog, Freshness};
use cached_config::ConfigStore;
use clap::{App, ArgMatches};
use cmdlib::{args, monitoring};
use cmdlib_x_repo::create_commit_syncer_args_from_matches;
use cmdlib_x_repo::create_commit_syncer_from_matches;
use context::CoreContext;
use cross_repo_sync::{
types::{Source, Target},
CommitSyncer, CommitSyncerArgs,
CommitSyncer,
};
use derived_data_utils::derive_data_for_csids;
use fbinit::FacebookInit;
@ -118,7 +118,7 @@ async fn run_in_single_commit_mode<M: SyncedCommitMapping + Clone + 'static>(
enum TailingArgs<M> {
CatchUpOnce(CommitSyncer<M>),
LoopForever(CommitSyncerArgs<M>, &'static ConfigStore),
LoopForever(CommitSyncer<M>, &'static ConfigStore),
}
async fn run_in_tailing_mode<
@ -155,10 +155,10 @@ async fn run_in_tailing_mode<
)
.await?;
}
TailingArgs::LoopForever(commit_syncer_args, config_store) => {
TailingArgs::LoopForever(commit_syncer, config_store) => {
let live_commit_sync_config =
Arc::new(CfgrLiveCommitSyncConfig::new(ctx.logger(), &config_store)?);
let source_repo_id = commit_syncer_args.get_source_repo().get_repoid();
let source_repo_id = commit_syncer.get_source_repo().get_repoid();
loop {
let scuba_sample = base_scuba_sample.clone();
@ -174,13 +174,6 @@ async fn run_in_tailing_mode<
continue;
}
let commit_sync_config =
live_commit_sync_config.get_current_commit_sync_config(&ctx, source_repo_id)?;
let commit_syncer = commit_syncer_args
.clone()
.try_into_commit_syncer(&commit_sync_config, live_commit_sync_config.clone())?;
let synced_something = tail(
&ctx,
&commit_syncer,
@ -416,7 +409,7 @@ async fn run(
let (source_repo, target_repo, counters) =
try_join3(source_repo, target_repo, mutable_counters).await?;
let commit_syncer_args = create_commit_syncer_args_from_matches(fb, &logger, &matches).await?;
let commit_syncer = create_commit_syncer_from_matches(&ctx, &matches).await?;
let live_commit_sync_config = Arc::new(CfgrLiveCommitSyncConfig::new(&logger, &config_store)?);
let commit_sync_config =
@ -428,17 +421,13 @@ async fn run(
.into_iter()
.collect();
let commit_syncer = commit_syncer_args
.clone()
.try_into_commit_syncer(&commit_sync_config, live_commit_sync_config)?;
let source_skiplist =
get_skiplist_index(&ctx, &source_repo_config, &source_repo).map_ok(Source);
let target_skiplist =
get_skiplist_index(&ctx, &target_repo_config, &target_repo).map_ok(Target);
match matches.subcommand() {
(ARG_ONCE, Some(sub_m)) => {
add_common_fields(&mut scuba_sample, &commit_syncer_args);
add_common_fields(&mut scuba_sample, &commit_syncer);
let maybe_target_bookmark = sub_m
.value_of(ARG_TARGET_BOOKMARK)
.map(BookmarkName::new)
@ -465,7 +454,7 @@ async fn run(
(ARG_TAIL, Some(sub_m)) => {
let (source_skiplist_index, target_skiplist_index) =
try_join(source_skiplist, target_skiplist).await?;
add_common_fields(&mut scuba_sample, &commit_syncer_args);
add_common_fields(&mut scuba_sample, &commit_syncer);
let sleep_secs = get_sleep_secs(&sub_m)?;
let tailing_args = if sub_m.is_present(ARG_CATCH_UP_ONCE) {
@ -473,7 +462,7 @@ async fn run(
} else {
let config_store = args::init_config_store(fb, ctx.logger(), &matches)?;
TailingArgs::LoopForever(commit_syncer_args, config_store)
TailingArgs::LoopForever(commit_syncer, config_store)
};
let backpressure_params = BackpressureParams::new(&ctx, &matches, sub_m).await?;

View File

@ -7,7 +7,7 @@
use anyhow::Error;
use context::CoreContext;
use cross_repo_sync::CommitSyncerArgs;
use cross_repo_sync::CommitSyncer;
use futures_stats::FutureStats;
use mononoke_types::ChangesetId;
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
@ -29,11 +29,11 @@ const SUCCESS: &'static str = "success";
/// this tailer run
pub fn add_common_fields<M: SyncedCommitMapping + Clone + 'static>(
scuba_sample: &mut ScubaSampleBuilder,
commit_syncer_args: &CommitSyncerArgs<M>,
commit_syncer: &CommitSyncer<M>,
) {
scuba_sample
.add(SOURCE_REPO, commit_syncer_args.get_source_repo_id().id())
.add(TARGET_REPO, commit_syncer_args.get_target_repo_id().id());
.add(SOURCE_REPO, commit_syncer.get_source_repo_id().id())
.add(TARGET_REPO, commit_syncer.get_target_repo_id().id());
}
/// Log the fact of successful syncing of the single changeset to Scuba

View File

@ -76,6 +76,8 @@ run the sync again
$ mononoke_x_repo_sync 1 0 once --commit "$TOMERGE" |& grep -v "using repo"
* Initializing CfgrLiveCommitSyncConfig (glob)
* Done initializing CfgrLiveCommitSyncConfig (glob)
* Initializing CfgrLiveCommitSyncConfig (glob)
* Done initializing CfgrLiveCommitSyncConfig (glob)
* changeset resolved as: ChangesetId(Blake2(*)) (glob)
* Checking if 6d7f84d613e4cccb4ec27259b7b59335573cdd65ee5dc78887056a5eeb6e6a47 is already synced 1->0 (glob)
* 1 unsynced ancestors of 6d7f84d613e4cccb4ec27259b7b59335573cdd65ee5dc78887056a5eeb6e6a47 (glob)
@ -85,6 +87,8 @@ run the sync again
$ mononoke_x_repo_sync 1 0 once --target-bookmark master_bookmark --commit fbsource_master |& grep -v "using repo"
* Initializing CfgrLiveCommitSyncConfig (glob)
* Done initializing CfgrLiveCommitSyncConfig (glob)
* Initializing CfgrLiveCommitSyncConfig (glob)
* Done initializing CfgrLiveCommitSyncConfig (glob)
* changeset resolved as: ChangesetId(Blake2(*)) (glob)
* Checking if * is already synced 1->0 (glob)
* 1 unsynced ancestors of 85b7d7910b3858629737adff1f3e2c4aa9f16b6239f115507cce6e91c8665df8 (glob)
@ -116,4 +120,3 @@ check that the changes are synced
date: Thu Jan 01 00:00:00 1970 +0000
summary: megarepo commit 1

View File

@ -289,6 +289,8 @@ Merge with preserved ancestors
$ mononoke_x_repo_sync 1 0 once --target-bookmark master_bookmark --commit $(hg log -T "{node}" -r pre_merge_p1 --cwd "$TESTTMP/with_merge_hg") |& grep -v "using repo"
* Initializing CfgrLiveCommitSyncConfig (glob)
* Done initializing CfgrLiveCommitSyncConfig (glob)
* Initializing CfgrLiveCommitSyncConfig (glob)
* Done initializing CfgrLiveCommitSyncConfig (glob)
* changeset resolved as: ChangesetId(Blake2(87924512f63d088d5b6bb5368bfef8016246e59927fe9d06d8ea657bc94e993d)) (glob)
* Checking if 87924512f63d088d5b6bb5368bfef8016246e59927fe9d06d8ea657bc94e993d is already synced 1->0 (glob)
* 1 unsynced ancestors of 87924512f63d088d5b6bb5368bfef8016246e59927fe9d06d8ea657bc94e993d (glob)