mononoke/unbundle resolver module: replace 0.1 future_ext::BoxStream usage with 0.3

Reviewed By: farnz

Differential Revision: D24706576

fbshipit-source-id: 3f6438961f52810b4ef59aba3c4dabb753ffb50b
This commit is contained in:
Lukas Piatkowski 2020-11-03 13:36:51 -08:00 committed by Facebook GitHub Bot
parent 719b637925
commit e0c7333e94
3 changed files with 68 additions and 96 deletions

View File

@ -25,13 +25,13 @@ use context::{CoreContext, LoggingContainer, PerfCounterType, SessionContainer};
use filenodes::FilenodeResult;
use futures::{
channel::oneshot::{self, Sender},
compat::Future01CompatExt,
compat::{Future01CompatExt, Stream01CompatExt},
future::{self, select, Either, FutureExt, TryFutureExt},
pin_mut, TryStreamExt,
pin_mut, StreamExt, TryStreamExt,
};
use futures_ext::{
spawn_future, try_boxfuture, try_boxstream, BoxFuture, BoxStream, BufferedParams,
FutureExt as OldFutureExt, StreamExt, StreamTimeoutError,
FutureExt as OldFutureExt, StreamExt as OldStreamExt, StreamTimeoutError,
};
use futures_old::future::ok;
use futures_old::{
@ -1609,7 +1609,7 @@ impl HgCommands for RepoClient {
&ctx,
&blobrepo,
infinitepush_writes_allowed,
stream,
stream.compat().boxed(),
read_write,
maybe_full_content,
pure_push_allowed,

View File

@ -26,13 +26,10 @@ use core::fmt::Debug;
use failure_ext::{Compat as FailureCompat, FutureFailureErrorExt};
use futures::{
future::{self, try_join_all, Future},
stream,
stream::{self, BoxStream},
};
use futures_ext::{
BoxFuture as OldBoxFuture, BoxStream as OldBoxStream, FutureExt as OldFutureExt,
StreamExt as OldStreamExt,
};
use futures_old::{future::Shared, stream as old_stream, Future as OldFuture, Stream as OldStream};
use futures_ext::{BoxFuture as OldBoxFuture, FutureExt as OldFutureExt};
use futures_old::{future::Shared, Future as OldFuture, Stream as OldStream};
use futures_util::{compat::Future01CompatExt, try_join, StreamExt, TryStreamExt};
use hooks::HookRejectionInfo;
use lazy_static::lazy_static;
@ -260,7 +257,7 @@ pub async fn resolve<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
infinitepush_writes_allowed: bool,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
readonly: RepoReadOnly,
maybe_full_content: Option<Arc<Mutex<BytesOld>>>,
pure_push_allowed: bool,
@ -399,7 +396,7 @@ fn report_unbundle_type(
async fn resolve_push<'r>(
ctx: &'r CoreContext,
resolver: Bundle2Resolver<'r>,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
maybe_pushvars: Option<HashMap<String, Bytes>>,
non_fast_forward_policy: NonFastForwardPolicy,
maybe_full_content: Option<Arc<Mutex<BytesOld>>>,
@ -584,7 +581,7 @@ async fn resolve_pushrebase<'r>(
ctx: &'r CoreContext,
commonheads: CommonHeads,
resolver: Bundle2Resolver<'r>,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
maybe_pushvars: Option<HashMap<String, Bytes>>,
maybe_full_content: Option<Arc<Mutex<BytesOld>>>,
changegroup_acceptable: impl FnOnce() -> bool + Send + Sync + 'static,
@ -692,7 +689,7 @@ async fn resolve_pushrebase<'r>(
async fn resolve_bookmark_only_pushrebase<'r>(
ctx: &'r CoreContext,
resolver: Bundle2Resolver<'r>,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
maybe_pushvars: Option<HashMap<String, Bytes>>,
non_fast_forward_policy: NonFastForwardPolicy,
maybe_full_content: Option<Arc<Mutex<BytesOld>>>,
@ -738,18 +735,6 @@ async fn resolve_bookmark_only_pushrebase<'r>(
))
}
async fn next_item(
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<
(
Option<Bundle2Item<'static>>,
OldBoxStream<Bundle2Item<'static>, Error>,
),
Error,
> {
bundle2.into_future().map_err(|(err, _)| err).compat().await
}
/// Represents all the bookmark pushes that are created
/// by a single unbundle wireproto command. This can
/// be either an exactly one infinitepush, or multiple
@ -835,17 +820,16 @@ impl<'r> Bundle2Resolver<'r> {
/// Return unchanged `bundle2`
async fn is_next_part_pushkey(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<(bool, OldBoxStream<Bundle2Item<'static>, Error>), Error> {
let (start, bundle2) = next_item(bundle2).await?;
match start {
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<(bool, BoxStream<'static, Result<Bundle2Item<'static>>>)> {
match bundle2.try_next().await? {
Some(part) => {
if let Bundle2Item::Pushkey(header, box_future) = part {
Ok((
true,
old_stream::once(Ok(Bundle2Item::Pushkey(header, box_future)))
stream::once(async { Ok(Bundle2Item::Pushkey(header, box_future)) })
.chain(bundle2)
.boxify(),
.boxed(),
))
} else {
return_with_rest_of_bundle(false, part, bundle2).await
@ -881,11 +865,13 @@ impl<'r> Bundle2Resolver<'r> {
/// Return the rest of the stream along with params
async fn resolve_stream_params(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<(StreamHeader, OldBoxStream<Bundle2Item<'static>, Error>), Error> {
let (maybe_start, rest_of_bundle2) = next_item(bundle2).await?;
match maybe_start {
Some(Bundle2Item::Start(stream_header)) => Ok((stream_header, rest_of_bundle2)),
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<(
StreamHeader,
BoxStream<'static, Result<Bundle2Item<'static>>>,
)> {
match bundle2.try_next().await? {
Some(Bundle2Item::Start(stream_header)) => Ok((stream_header, bundle2)),
_ => Err(format_err!("Expected Bundle2 Start")),
}
}
@ -894,13 +880,12 @@ impl<'r> Bundle2Resolver<'r> {
/// Return the rest of the bundle
async fn resolve_replycaps(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<OldBoxStream<Bundle2Item<'static>, Error>, Error> {
let (maybe_replycaps, rest_of_bundle2) = next_item(bundle2).await?;
match maybe_replycaps {
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<BoxStream<'static, Result<Bundle2Item<'static>>>> {
match bundle2.try_next().await? {
Some(Bundle2Item::Replycaps(_, part)) => {
part.await?;
Ok(rest_of_bundle2)
Ok(bundle2)
}
_ => Err(format_err!("Expected Bundle2 Replycaps")),
}
@ -911,17 +896,15 @@ impl<'r> Bundle2Resolver<'r> {
// client. This part is used as a marker that this push is pushrebase.
async fn maybe_resolve_commonheads(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<
(
Option<CommonHeads>,
OldBoxStream<Bundle2Item<'static>, Error>,
BoxStream<'static, Result<Bundle2Item<'static>>>,
),
Error,
> {
let (maybe_commonheads, bundle2) = next_item(bundle2).await?;
match maybe_commonheads {
match bundle2.try_next().await? {
Some(Bundle2Item::B2xCommonHeads(_header, heads)) => Ok((
Some(CommonHeads {
heads: heads.try_collect().await?,
@ -938,17 +921,15 @@ impl<'r> Bundle2Resolver<'r> {
/// It is used to store hook arguments.
async fn maybe_resolve_pushvars(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<
(
Option<HashMap<String, Bytes>>,
OldBoxStream<Bundle2Item<'static>, Error>,
BoxStream<'static, Result<Bundle2Item<'static>>>,
),
Error,
> {
let (newpart, bundle2) = next_item(bundle2).await?;
let maybe_pushvars = match newpart {
let maybe_pushvars = match bundle2.try_next().await? {
Some(Bundle2Item::Pushvars(header, emptypart)) => {
let pushvars = header.into_inner().aparams;
// ignored for now, will be used for hooks
@ -971,20 +952,18 @@ impl<'r> Bundle2Resolver<'r> {
/// pure (non-pushrebase and non-infinitepush) pushes
async fn maybe_resolve_changegroup(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
changegroup_acceptable: impl FnOnce() -> bool + Send + Sync + 'static,
) -> Result<
(
Option<ChangegroupPush>,
OldBoxStream<Bundle2Item<'static>, Error>,
BoxStream<'static, Result<Bundle2Item<'static>>>,
),
Error,
> {
let infinitepush_writes_allowed = self.infinitepush_writes_allowed;
let (changegroup, bundle2) = next_item(bundle2).await?;
let maybe_cg_push: Option<ChangegroupPush> = match changegroup {
let maybe_cg_push: Option<ChangegroupPush> = match bundle2.try_next().await? {
// XXX: we may be interested in checking that this is a correct changegroup part
// type
Some(Bundle2Item::Changegroup(header, parts))
@ -1055,11 +1034,12 @@ impl<'r> Bundle2Resolver<'r> {
/// Returns an error if the pushkey namespace is unknown
async fn maybe_resolve_pushkey(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<(Option<Pushkey>, OldBoxStream<Bundle2Item<'static>, Error>), Error> {
let (newpart, bundle2) = next_item(bundle2).await?;
match newpart {
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<(
Option<Pushkey>,
BoxStream<'static, Result<Bundle2Item<'static>>>,
)> {
match bundle2.try_next().await? {
Some(Bundle2Item::Pushkey(header, emptypart)) => {
let namespace = header
.mparams()
@ -1102,20 +1082,15 @@ impl<'r> Bundle2Resolver<'r> {
/// Parse b2xinfinitepushmutation.
async fn maybe_resolve_infinitepush_mutation(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<
(
Vec<HgMutationEntry>,
OldBoxStream<Bundle2Item<'static>, Error>,
BoxStream<'static, Result<Bundle2Item<'static>>>,
),
Error,
> {
let (infinitepushmutation, bundle2): (
Option<Bundle2Item>,
OldBoxStream<Bundle2Item, Error>,
) = next_item(bundle2).await?;
match infinitepushmutation {
match bundle2.try_next().await? {
Some(Bundle2Item::B2xInfinitepushMutation(_, entries)) => {
let mutations = entries.try_concat().await?;
Ok((mutations, bundle2))
@ -1130,11 +1105,9 @@ impl<'r> Bundle2Resolver<'r> {
/// their upload as well as their parsed content should be used for uploading changesets.
async fn resolve_b2xtreegroup2(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<(Manifests, OldBoxStream<Bundle2Item<'static>, Error>), Error> {
let (b2xtreegroup2, bundle2) = next_item(bundle2).await?;
match b2xtreegroup2 {
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<(Manifests, BoxStream<'static, Result<Bundle2Item<'static>>>)> {
match bundle2.try_next().await? {
Some(Bundle2Item::B2xTreegroup2(_, parts))
| Some(Bundle2Item::B2xRebasePack(_, parts)) => {
let manifests = upload_hg_blobs(
@ -1159,14 +1132,9 @@ impl<'r> Bundle2Resolver<'r> {
/// This part is ignored, so just parse it and forget it
async fn maybe_resolve_infinitepush_bookmarks(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<((), OldBoxStream<Bundle2Item<'static>, Error>), Error> {
let (infinitepushbookmarks, bundle2): (
Option<Bundle2Item>,
OldBoxStream<Bundle2Item, Error>,
) = next_item(bundle2).await?;
match infinitepushbookmarks {
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<((), BoxStream<'static, Result<Bundle2Item<'static>>>)> {
match bundle2.try_next().await? {
Some(Bundle2Item::B2xInfinitepushBookmarks(_, bookmarks)) => {
bookmarks.try_for_each(|_| future::ok(())).await?;
Ok(((), bundle2))
@ -1275,11 +1243,13 @@ impl<'r> Bundle2Resolver<'r> {
/// Ensures that the next item in stream is None
async fn ensure_stream_finished(
&self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
mut bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
maybe_full_content: Option<Arc<Mutex<BytesOld>>>,
) -> Result<Option<RawBundle2Id>, Error> {
let (none, _bundle2) = next_item(bundle2).await?;
ensure!(none.is_none(), "Expected end of Bundle2");
ensure!(
bundle2.try_next().await?.is_none(),
"Expected end of Bundle2"
);
self.maybe_save_full_content_bundle2(maybe_full_content)
.await
}
@ -1290,13 +1260,15 @@ impl<'r> Bundle2Resolver<'r> {
/// one pushkey part per bookmark.
async fn resolve_multiple_parts<'a, T, Func, Fut>(
&'a self,
bundle2: OldBoxStream<Bundle2Item<'static>, Error>,
bundle2: BoxStream<'static, Result<Bundle2Item<'static>>>,
mut maybe_resolve: Func,
) -> Result<(Vec<T>, OldBoxStream<Bundle2Item<'static>, Error>), Error>
) -> Result<(Vec<T>, BoxStream<'static, Result<Bundle2Item<'static>>>)>
where
Fut: Future<Output = Result<(Option<T>, OldBoxStream<Bundle2Item<'static>, Error>), Error>>
Fut: Future<Output = Result<(Option<T>, BoxStream<'static, Result<Bundle2Item<'static>>>)>>
+ Sized,
Func: FnMut(&'a Self, OldBoxStream<Bundle2Item<'static>, Error>) -> Fut + Send + 'static,
Func: FnMut(&'a Self, BoxStream<'static, Result<Bundle2Item<'static>>>) -> Fut
+ Send
+ 'static,
T: Send + 'static,
{
let mut result = Vec::new();
@ -1450,13 +1422,13 @@ fn try_collect_all_bookmark_pushes(
async fn return_with_rest_of_bundle<T: Send + 'static>(
value: T,
unused_part: Bundle2Item<'static>,
rest_of_bundle: OldBoxStream<Bundle2Item<'static>, Error>,
) -> Result<(T, OldBoxStream<Bundle2Item<'static>, Error>), Error> {
rest_of_bundle: BoxStream<'static, Result<Bundle2Item<'static>>>,
) -> Result<(T, BoxStream<'static, Result<Bundle2Item<'static>>>)> {
Ok((
value,
old_stream::once(Ok(unused_part))
stream::once(async { Ok(unused_part) })
.chain(rest_of_bundle)
.boxify(),
.boxed(),
))
}

View File

@ -24,7 +24,7 @@ use cmdlib::{args, monitoring::ReadyFlagService};
use context::CoreContext;
use fbinit::FacebookInit;
use futures::{
compat::Future01CompatExt,
compat::{Future01CompatExt, Stream01CompatExt},
future,
stream::{self, Stream, StreamExt, TryStreamExt},
};
@ -261,7 +261,7 @@ async fn maybe_unbundle(
&ctx,
&repo,
false, // infinitepush_writes_allowed
Box::new(bundle_stream),
bundle_stream.compat().boxed(),
RepoReadOnly::ReadWrite,
None, // maybe_full_content
false, // pure_push_allowed