mononoke: make retry asynchronous

Reviewed By: ikostia

Differential Revision: D25122781

fbshipit-source-id: a7d69c2cdeff0c9c6abd92e486af191e8baed8d5
This commit is contained in:
Stanislau Hlebik 2020-11-26 04:40:25 -08:00 committed by Facebook GitHub Bot
parent 657d226360
commit ed51aac36c
3 changed files with 94 additions and 92 deletions

View File

@ -12,14 +12,8 @@ use blobrepo::BlobRepo;
use blobstore::{Blobstore, Loadable};
use bookmarks::{BookmarkUpdateLog, Freshness};
use bytes_old::{Bytes as BytesOld, BytesMut as BytesMutOld};
use cloned::cloned;
use context::CoreContext;
use futures::{compat::Future01CompatExt, future::try_join_all, stream::TryStreamExt};
use futures_ext::FutureExt;
use futures_old::{
future::{loop_fn, IntoFuture, Loop},
Future,
};
use futures::{compat::Future01CompatExt, future::try_join_all, stream::TryStreamExt, Future};
use mercurial_bundles::stream_start;
use mononoke_types::RawBundle2Id;
use mutable_counters::MutableCounters;
@ -27,7 +21,7 @@ use slog::{info, Logger};
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use std::time::Duration;
use tempfile::NamedTempFile;
use tokio::{
fs::{read as async_read_all, File as AsyncFile, OpenOptions},
@ -267,46 +261,41 @@ pub fn read_file_contents<F: Seek + Read>(f: &mut F) -> Result<String> {
#[derive(Copy, Clone)]
pub struct RetryAttemptsCount(pub usize);
pub fn retry<V, Fut, Func>(
logger: Logger,
pub async fn retry<V, Fut, Func>(
logger: &Logger,
func: Func,
base_retry_delay_ms: u64,
retry_num: usize,
) -> impl Future<Item = (V, RetryAttemptsCount), Error = Error>
) -> Result<(V, RetryAttemptsCount), Error>
where
V: Send + 'static,
Fut: Future<Item = V, Error = Error>,
Func: Fn(usize) -> Fut + Send + 'static,
Fut: Future<Output = Result<V, Error>>,
Func: Fn(usize) -> Fut + Send,
{
use tokio_timer::Delay;
loop_fn(1, move |attempt| {
cloned!(logger);
func(attempt)
.and_then(move |res| Ok(Loop::Break(Ok((res, RetryAttemptsCount(attempt))))))
.or_else({
move |err| {
let mut attempt = 1;
loop {
let res = func(attempt).await;
match res {
Ok(res) => {
return Ok((res, RetryAttemptsCount(attempt)));
}
Err(err) => {
if attempt >= retry_num {
Ok(Loop::Break(Err(err))).into_future().left_future()
} else {
return Err(err);
}
info!(
logger.clone(),
logger,
"retrying attempt {} of {}...",
attempt + 1,
retry_num
);
let delay =
Duration::from_millis(base_retry_delay_ms * 2u64.pow(attempt as u32));
Delay::new(Instant::now() + delay)
.and_then(move |_| Ok(Loop::Continue(attempt + 1)))
.map_err(|e| -> Error { e.into() })
.right_future()
let delay = Duration::from_millis(base_retry_delay_ms * 2u64.pow(attempt as u32));
delay_for(delay).await;
attempt += 1;
}
}
}
})
})
.flatten()
}
/// Wait until all of the entries in the queue have been synced to hg
@ -372,6 +361,7 @@ mod tests {
use super::*;
use futures::{TryFutureExt, TryStreamExt};
use futures_ext::FutureExt;
use futures_old::Future as FutureOld;
use mercurial_bundles::bundle2::{Bundle2Stream, StreamEvent};
use mercurial_bundles::Bundle2Item;

View File

@ -117,32 +117,34 @@ impl BundlePreparer {
) -> BoxFuture<PreparedBookmarkUpdateLogEntry, Error> {
cloned!(self.repo, self.ty);
let entry_id = log_entry.id;
retry(
ctx.logger().clone(),
{
cloned!(ctx);
move |_| {
cloned!(ctx, repo, ty, log_entry);
let book_values = overlay.get_bookmark_values();
let base_retry_delay_ms = self.base_retry_delay_ms;
let retry_num = self.retry_num;
async move {
Self::try_prepare_single_bundle(&ctx, &repo, log_entry, &ty, &book_values)
.await
let entry_id = log_entry.id;
let book_values = &overlay.get_bookmark_values();
let (p, _attempts) = retry(
&ctx.logger(),
{
|_| {
Self::try_prepare_single_bundle(
&ctx,
&repo,
log_entry.clone(),
&ty,
book_values,
)
}
},
base_retry_delay_ms,
retry_num,
)
.await?;
info!(ctx.logger(), "successful prepare of entry #{}", entry_id);
Ok(p)
}
.boxed()
.compat()
}
},
self.base_retry_delay_ms,
self.retry_num,
)
.map({
cloned!(ctx);
move |(p, _attempts)| {
info!(ctx.logger(), "successful prepare of entry #{}", entry_id);
p
}
})
.boxify()
}

View File

@ -468,8 +468,9 @@ fn sync_single_combined_entry(
};
sync_globalrevs.and_then(move |()| {
async move {
retry(
ctx.logger().clone(),
&ctx.logger(),
{
cloned!(ctx, combined_entry);
move |attempt| {
@ -479,11 +480,16 @@ fn sync_single_combined_entry(
combined_entry.clone(),
hg_repo.clone(),
)
.compat()
}
},
base_retry_delay_ms,
retry_num,
)
.await
}
.boxed()
.compat()
.map(|(_, attempts)| attempts)
})
}
@ -1012,11 +1018,11 @@ async fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> Result<(), Error
.then(build_outcome_handler(ctx.clone(), lock_via))
.map(move |entry| {
let next_id = get_id_to_search_after(&entry);
retry(
ctx.logger().clone(),
{
cloned!(ctx, mutable_counters);
move |_| {
async move {
retry(
&ctx.logger(),
|_| {
mutable_counters
.set_counter(
ctx.clone(),
@ -1033,11 +1039,15 @@ async fn run(ctx: CoreContext, matches: ArgMatches<'static>) -> Result<(), Error
bail!("failed to update counter")
}
})
}
.compat()
},
base_retry_delay_ms,
retry_num,
)
.await
}
.boxed()
.compat()
})
.for_each(|res| res.map(|_| ()))
.compat()