mononke: initialize TargetRepoDbs in repo_handlers

Summary:
Push redirector needs to be able to call backsyncer, and it in turn needs `TargetRepoDbs` struct. So this diff adds an ability to instantiate this struct.

NB: this will be used in one of the following diffs, for now I just added the wiring.

Reviewed By: StanislavGlebik

Differential Revision: D18336859

fbshipit-source-id: 9cda4e6a13dd0eefbb5f61eecdf6e26a01d0732f
This commit is contained in:
Kostia Balytskyi 2019-11-06 12:32:58 -08:00 committed by Facebook Github Bot
parent c3c877527c
commit 6eba355c41

View File

@ -15,6 +15,7 @@ use futures::{future, Future, IntoFuture};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use slog::{info, o, Logger};
use backsyncer::open_backsyncer_dbs_compat;
use blobrepo_factory::{open_blobrepo, Caching};
use bookmark_renaming::{get_large_to_small_renamer, get_small_to_large_renamer};
use cache_warmup::cache_warmup;
@ -89,6 +90,16 @@ impl IncompleteRepoHandler {
}
}
/// An auxillary struct to pass between closures before
/// we are capable of creating a full `RepoSyncTarget`
#[derive(Clone)]
struct RepoSyncTargetArgs {
commit_sync_config: CommitSyncConfig,
synced_commit_mapping: SqlSyncedCommitMapping,
db_config: MetadataDBConfig,
maybe_myrouter_port: Option<u16>,
}
#[derive(Clone)]
pub struct RepoHandler {
pub logger: Logger,
@ -121,16 +132,31 @@ fn open_db_from_config<S: SqlConstructors>(
/// build `RepoSyncTarget` for a push rediction from this
/// small repo into a large repo.
fn create_repo_sync_target(
ctx: CoreContext,
source_repo: &MononokeRepo,
target_incomplete_repo_handler: &IncompleteRepoHandler,
commit_sync_config: &CommitSyncConfig,
repo_sync_target_args: RepoSyncTargetArgs,
small_repo_id: RepositoryId,
synced_commit_mapping: SqlSyncedCommitMapping,
) -> Result<RepoSyncTarget> {
let small_to_large_mover = get_small_to_large_mover(commit_sync_config, small_repo_id)?;
let large_to_small_mover = get_large_to_small_mover(commit_sync_config, small_repo_id)?;
let small_to_large_renamer = get_small_to_large_renamer(commit_sync_config, small_repo_id)?;
let large_to_small_renamer = get_large_to_small_renamer(commit_sync_config, small_repo_id)?;
) -> BoxFuture<RepoSyncTarget, Error> {
let RepoSyncTargetArgs {
commit_sync_config,
synced_commit_mapping,
db_config,
maybe_myrouter_port,
} = repo_sync_target_args;
let small_to_large_mover =
try_boxfuture!(get_small_to_large_mover(&commit_sync_config, small_repo_id));
let large_to_small_mover =
try_boxfuture!(get_large_to_small_mover(&commit_sync_config, small_repo_id));
let small_to_large_renamer = try_boxfuture!(get_small_to_large_renamer(
&commit_sync_config,
small_repo_id
));
let large_to_small_renamer = try_boxfuture!(get_large_to_small_renamer(
&commit_sync_config,
small_repo_id
));
let small_repo = source_repo.blobrepo().clone();
let large_repo = target_incomplete_repo_handler.repo.blobrepo().clone();
@ -162,11 +188,54 @@ fn create_repo_sync_target(
let repo = target_incomplete_repo_handler.repo.clone();
Ok(RepoSyncTarget {
repo,
small_to_large_commit_syncer,
large_to_small_commit_syncer,
})
open_backsyncer_dbs_compat(ctx.clone(), db_config, maybe_myrouter_port)
.map(move |_target_repo_dbs| RepoSyncTarget {
repo,
small_to_large_commit_syncer,
large_to_small_commit_syncer,
})
.boxify()
}
fn get_maybe_create_repo_sync_target_fut(
ctx: CoreContext,
incomplete_repo_handler: &IncompleteRepoHandler,
repo_sync_target_args: RepoSyncTargetArgs,
lookup_table: &HashMap<RepositoryId, IncompleteRepoHandler>,
) -> BoxFuture<Option<RepoSyncTarget>, Error> {
let large_repo_id = repo_sync_target_args.commit_sync_config.large_repo_id;
let current_repo_id = incomplete_repo_handler.repo.repoid();
let current_repo = &incomplete_repo_handler.repo;
let target_incomplete_repo_handler = try_boxfuture!(lookup_table
.get(&large_repo_id)
.ok_or(ErrorKind::LargeRepoNotFound(large_repo_id)));
if large_repo_id == current_repo_id {
future::ok(None).boxify()
} else {
let current_repo_config = try_boxfuture!(repo_sync_target_args
.commit_sync_config
.small_repos
.get(&current_repo_id)
.ok_or(ErrorKind::SmallRepoNotFound(current_repo_id)));
let direction = current_repo_config.direction;
if direction != CommitSyncDirection::LargeToSmall {
// We can only do push redirection when sync happens in the
// `LargeToSmall` direction, as `SmallToLarge` is handled by
// tailers.
future::ok(None).boxify()
} else {
create_repo_sync_target(
ctx,
current_repo,
target_incomplete_repo_handler,
repo_sync_target_args,
current_repo_id,
)
.map(Some)
.boxify()
}
}
}
pub fn repo_handlers(
@ -183,10 +252,10 @@ pub fn repo_handlers(
let repo_futs: Vec<
BoxFuture<
(
CoreContext,
String,
IncompleteRepoHandler,
Option<CommitSyncConfig>,
SqlSyncedCommitMapping,
Option<RepoSyncTargetArgs>,
),
Error,
>,
@ -357,7 +426,8 @@ pub fn repo_handlers(
// initialize phases hint from the skip index
let phases_hint: Arc<dyn Phases> =
if let MetadataDBConfig::Mysql { .. } = dbconfig {
if let MetadataDBConfig::Mysql { .. } = dbconfig.clone()
{
Arc::new(CachingPhases::new(
fb,
Arc::new(phases_hint),
@ -386,7 +456,18 @@ pub fn repo_handlers(
mutable_counters,
);
let maybe_repo_sync_target_args =
commit_sync_config.map(move |commit_sync_config| {
RepoSyncTargetArgs {
commit_sync_config,
synced_commit_mapping: sql_commit_sync_mapping,
db_config: dbconfig,
maybe_myrouter_port: myrouter_port,
}
});
(
ctx,
reponame,
IncompleteRepoHandler {
logger: listen_log,
@ -398,8 +479,7 @@ pub fn repo_handlers(
pure_push_allowed,
support_bundle2_listkeys,
},
commit_sync_config,
sql_commit_sync_mapping,
maybe_repo_sync_target_args,
)
}
})
@ -419,68 +499,47 @@ pub fn repo_handlers(
fn build_repo_handlers(
tuples: Vec<(
CoreContext,
String,
IncompleteRepoHandler,
Option<CommitSyncConfig>,
SqlSyncedCommitMapping,
Option<RepoSyncTargetArgs>,
)>,
) -> Result<HashMap<String, RepoHandler>> {
) -> impl Future<Item = HashMap<String, RepoHandler>, Error = Error> {
let lookup_table: HashMap<RepositoryId, IncompleteRepoHandler> = tuples
.clone()
.into_iter()
.map(|(_, incomplete_repo_handler, _, _)| {
.iter()
.map(|(_, _, incomplete_repo_handler, _)| {
(
incomplete_repo_handler.repo.repoid(),
incomplete_repo_handler,
incomplete_repo_handler.clone(),
)
})
.collect();
tuples
.into_iter()
.map(
|(reponame, incomplete_repo_handler, maybe_commit_sync_config, commit_sync_mapping)| -> Result<(String, RepoHandler)> {
let maybe_repo_sync_target = match maybe_commit_sync_config {
None => None,
Some(commit_sync_config) => {
let large_repo_id = commit_sync_config.large_repo_id;
let current_repo_id = incomplete_repo_handler.repo.repoid();
let current_repo = &incomplete_repo_handler.repo;
if large_repo_id == current_repo_id {
None
} else {
let direction = commit_sync_config
.small_repos
.get(&current_repo_id)
.ok_or(ErrorKind::SmallRepoNotFound(current_repo_id))?
.direction;
if direction != CommitSyncDirection::LargeToSmall {
// We can only do push redirection when sync happens in the
// `LargeToSmall` direction, as `SmallToLarge` is handled by
// tailers.
None
} else {
let target_incomplete_repo_handler = lookup_table
.get(&large_repo_id)
.ok_or(ErrorKind::LargeRepoNotFound(large_repo_id))?;
Some(create_repo_sync_target(
current_repo,
target_incomplete_repo_handler,
&commit_sync_config,
current_repo_id,
commit_sync_mapping,
)?)
}
}
}
future::join_all({
cloned!(lookup_table);
tuples.into_iter().map(
move |(ctx, reponame, incomplete_repo_handler, maybe_repo_sync_target_args)| {
let maybe_repo_sync_target_fut = match maybe_repo_sync_target_args {
None => future::ok(None).boxify(),
Some(repo_sync_target_args) => get_maybe_create_repo_sync_target_fut(
ctx.clone(),
&incomplete_repo_handler,
repo_sync_target_args,
&lookup_table,
),
};
Ok((
reponame,
incomplete_repo_handler.into_repo_handler_with_sync_target(maybe_repo_sync_target),
))
maybe_repo_sync_target_fut
.map(move |maybe_repo_sync_target| {
(
reponame,
incomplete_repo_handler
.into_repo_handler_with_sync_target(maybe_repo_sync_target),
)
})
.boxify()
},
)
.collect::<Result<HashMap<_, _>>>()
})
.map(|v| v.into_iter().collect())
}