mononoke/unbundle_replay: move unbundle & filenodes derivation to their own task

Summary:
Since we do those concurrently, it makes sense to do them on their own task.
Besides, since those are still old futures that need ownership, there is
effectively no tradeoff here.

Differential Revision: D20691373

fbshipit-source-id: 1a45e43ec857d91bed1614568b4354d56a2b0848
This commit is contained in:
Thomas Orozco 2020-03-30 06:12:14 -07:00 committed by Facebook GitHub Bot
parent 066cdcfb3d
commit dfcaca8077

View File

@ -41,6 +41,7 @@ use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use time_ext::DurationExt;
use tokio::{task, time};
use unbundle::{
self, get_pushrebase_hooks, PostResolveAction, PostResolvePushRebase, PushrebaseBookmarkSpec,
};
@ -152,7 +153,7 @@ async fn get_replay_stream<'a>(
onto,
onto_rev
);
tokio::time::delay_for(poll_interval).await;
time::delay_for(poll_interval).await;
continue;
}
}
@ -252,7 +253,7 @@ async fn maybe_unbundle(
StreamEvent::Done(..) => None,
});
let (unbundle_stats, resolution) = unbundle::resolve(
let (unbundle_stats, resolution) = task::spawn(unbundle::resolve(
ctx.clone(),
repo.clone(),
false, // infinitepush_writes_allowed
@ -261,13 +262,14 @@ async fn maybe_unbundle(
None, // maybe_full_content
false, // pure_push_allowed
repo_config.pushrebase.flags,
)
))
.timed()
.await;
let resolution = match resolution {
Ok(resolution) => resolution,
Err(e) => return Ok(UnbundleOutcome::Deferred(bundle, pushrebase_spec, e.into())),
Ok(Ok(resolution)) => resolution,
Ok(Err(e)) => return Ok(UnbundleOutcome::Deferred(bundle, pushrebase_spec, e.into())),
Err(e) => return Err(e.into()),
};
let PushrebaseSpec {
@ -538,9 +540,8 @@ async fn do_main(
Ok(head)
})
.try_for_each_concurrent(filenodes_concurrency, |head| async move {
FilenodesOnlyPublic::derive(ctx.clone(), repo.clone(), head)
.compat()
.await?;
task::spawn(FilenodesOnlyPublic::derive(ctx.clone(), repo.clone(), head).compat())
.await??;
info!(ctx.logger(), "Filenodes derived for {:?}", head);