mononoke: asyncify sync_single_combined_entry in hg_sync job

Reviewed By: ahornby

Differential Revision: D25184245

fbshipit-source-id: 59985cdc3cadc7ff945db60fabbcd6c241ff3ba1
This commit is contained in:
Stanislau Hlebik 2020-11-26 07:40:39 -08:00 committed by Facebook GitHub Bot
parent 569733215c
commit 196dec1904

View File

@ -30,9 +30,9 @@ use futures::{
future::{try_join, try_join3, FutureExt as _, TryFutureExt},
stream::{StreamExt, TryStreamExt},
};
use futures_ext::{spawn_future, try_boxfuture, BoxFuture, FutureExt, StreamExt as OldStreamExt};
use futures_ext::{spawn_future, BoxFuture, FutureExt, StreamExt as OldStreamExt};
use futures_old::{
future::{err, join_all, ok, IntoFuture},
future::{err, join_all, ok},
stream,
stream::Stream,
Future,
@ -414,84 +414,59 @@ fn combine_entries(
}
/// Sends a downloaded bundle to hg
fn try_sync_single_combined_entry(
ctx: CoreContext,
async fn try_sync_single_combined_entry(
ctx: &CoreContext,
attempt: usize,
combined_entry: CombinedBookmarkUpdateLogEntry,
hg_repo: HgRepo,
) -> impl Future<Item = (), Error = Error> {
let CombinedBookmarkUpdateLogEntry {
components,
bundle_file,
timestamps_file,
cs_id,
bookmark,
} = combined_entry;
let ids: Vec<_> = components.iter().map(|entry| entry.id).collect();
combined_entry: &CombinedBookmarkUpdateLogEntry,
hg_repo: &HgRepo,
) -> Result<(), Error> {
let ids: Vec<_> = combined_entry
.components
.iter()
.map(|entry| entry.id)
.collect();
info!(ctx.logger(), "syncing log entries {:?} ...", ids);
let bundle_path = try_boxfuture!(get_path(&bundle_file));
let timestamps_path = try_boxfuture!(get_path(&timestamps_file));
let bundle_path = get_path(&combined_entry.bundle_file)?;
let timestamps_path = get_path(&combined_entry.timestamps_file)?;
hg_repo
.apply_bundle(
bundle_path,
timestamps_path,
bookmark,
cs_id.map(|(_bcs_id, hg_cs_id)| hg_cs_id),
combined_entry.bookmark.clone(),
combined_entry.cs_id.map(|(_bcs_id, hg_cs_id)| hg_cs_id),
attempt,
ctx.logger().clone(),
)
.map(move |()| {
// Make sure temp file is dropped only after bundle was applied is done
let _ = bundle_file;
let _ = timestamps_file;
})
.boxify()
.compat()
.await?;
Ok(())
}
fn sync_single_combined_entry(
ctx: CoreContext,
combined_entry: CombinedBookmarkUpdateLogEntry,
hg_repo: HgRepo,
async fn sync_single_combined_entry(
ctx: &CoreContext,
combined_entry: &CombinedBookmarkUpdateLogEntry,
hg_repo: &HgRepo,
base_retry_delay_ms: u64,
retry_num: usize,
globalrev_syncer: GlobalrevSyncer,
) -> impl Future<Item = RetryAttemptsCount, Error = Error> {
let sync_globalrevs = if let Some((cs_id, _hg_cs_id)) = combined_entry.cs_id {
async move { globalrev_syncer.sync(cs_id).await }
.boxed()
.compat()
.left_future()
} else {
Ok(()).into_future().right_future()
};
globalrev_syncer: &GlobalrevSyncer,
) -> Result<RetryAttemptsCount, Error> {
if let Some((cs_id, _hg_cs_id)) = combined_entry.cs_id {
globalrev_syncer.sync(cs_id).await?
}
sync_globalrevs.and_then(move |()| {
async move {
retry(
&ctx.logger(),
{
cloned!(ctx, combined_entry);
move |attempt| {
try_sync_single_combined_entry(
ctx.clone(),
attempt,
combined_entry.clone(),
hg_repo.clone(),
)
.compat()
}
},
base_retry_delay_ms,
retry_num,
)
.await
}
.boxed()
.compat()
.map(|(_, attempts)| attempts)
})
let (_, attempts) = retry(
&ctx.logger(),
|attempt| try_sync_single_combined_entry(&ctx, attempt, &combined_entry, &hg_repo),
base_retry_delay_ms,
retry_num,
)
.await?;
Ok(attempts)
}
/// Logs to Scuba information about a single bundle sync event
@ -854,14 +829,13 @@ async fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> Result<(), Error
.await?;
sync_single_combined_entry(
ctx.clone(),
combined_entry,
hg_repo,
&ctx,
&combined_entry,
&hg_repo,
base_retry_delay_ms,
retry_num,
globalrev_syncer,
&globalrev_syncer,
)
.compat()
.await
}
.timed()
@ -1007,14 +981,13 @@ async fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> Result<(), Error
let res = match res {
Ok(combined_entry) => {
let (stats, res) = sync_single_combined_entry(
ctx.clone(),
combined_entry.clone(),
hg_repo.clone(),
&ctx,
&combined_entry,
&hg_repo,
base_retry_delay_ms,
retry_num,
globalrev_syncer.clone(),
&globalrev_syncer,
)
.compat()
.timed()
.await;
let res = bind_sync_result(&combined_entry.components, res);