mononoke: handle pushvars during pushrebase

Summary:
There were quite a lot of pushes that use pushvars.
This diff adds a parsing of it.

After I added the parsing of pushvars it started to fail because it seems to be
a flat manifest push. But having parsing of pushvars probably won't hurt.

Reviewed By: farnz

Differential Revision: D9751962

fbshipit-source-id: 49796e91edfad76fb022a2e0fc049a79859de1b7
This commit is contained in:
Stanislau Hlebik 2018-09-13 05:53:38 -07:00 committed by Facebook Github Bot
parent 831e52a98c
commit 99ecff8586
4 changed files with 39 additions and 5 deletions

View File

@ -190,7 +190,11 @@ fn resolve_pushrebase(
bundle2: BoxStream<Bundle2Item, Error>,
) -> BoxFuture<Bytes, Error> {
resolver
.resolve_b2xtreegroup2(bundle2)
.maybe_resolve_pushvars(bundle2)
.and_then({
cloned!(resolver);
move |bundle2| resolver.resolve_b2xtreegroup2(bundle2)
})
.and_then({
cloned!(resolver);
move |(manifests, bundle2)| {
@ -401,6 +405,26 @@ impl Bundle2Resolver {
.boxify()
}
/// Parse pushvars
/// It is used to store hook arguments.
fn maybe_resolve_pushvars(
&self,
bundle2: BoxStream<Bundle2Item, Error>,
) -> BoxFuture<BoxStream<Bundle2Item, Error>, Error> {
next_item(bundle2)
.and_then(move |(newpart, bundle2)| match newpart {
Some(Bundle2Item::Pushvars(_header, emptypart)) => {
// ignored for now, will be used for hooks
emptypart.map(move |_| bundle2).boxify()
}
Some(part) => ok(stream::once(Ok(part)).chain(bundle2).boxify()).boxify(),
None => ok(bundle2).boxify(),
})
.context("While resolving Pushvars")
.from_err()
.boxify()
}
/// Parse changegroup.
/// The ChangegroupId will be used in the last step for preparing response
/// The Changesets should be parsed as RevlogChangesets and used for uploading changesets

View File

@ -100,6 +100,7 @@ pub enum Bundle2Item {
B2xRebase(PartHeader, BoxStream<changegroup::Part, Error>),
Replycaps(PartHeader, BoxFuture<capabilities::Capabilities, Error>),
Pushkey(PartHeader, BoxFuture<(), Error>),
Pushvars(PartHeader, BoxFuture<(), Error>),
}
impl Bundle2Item {
@ -142,11 +143,10 @@ impl fmt::Debug for Bundle2Item {
&B2xRebasePack(ref header, _) => {
write!(f, "Bundle2Item::B2xRebasePack({:?}, ...)", header)
}
&B2xRebase(ref header, _) => {
write!(f, "Bundle2Item::B2xRebase({:?}, ...)", header)
}
&B2xRebase(ref header, _) => write!(f, "Bundle2Item::B2xRebase({:?}, ...)", header),
&Replycaps(ref header, _) => write!(f, "Bundle2Item::Replycaps({:?}, ...)", header),
&Pushkey(ref header, _) => write!(f, "Bundle2Item::Pushkey({:?}, ...)", header),
&Pushvars(ref header, _) => write!(f, "Bundle2Item::Pushvars({:?}, ...)", header),
}
}
}

View File

@ -51,6 +51,8 @@ pub enum PartHeaderType {
Pushkey,
/// Respond to a corresponding pushkey part
ReplyPushkey,
/// Contains parameters that can be used by hooks
Pushvars,
// RemoteChangegroup, // We don't wish to support this functionality
// CheckBookmarks, // TODO Do we want to support this?
// CheckHeads, // TODO Do we want to support this?
@ -68,7 +70,6 @@ pub enum PartHeaderType {
// Obsmarkers, // TODO Do we want to support this?
// ReplyObsmarkers, // TODO Do we want to support this?
// HgtagsFnodes, // TODO Do we want to support this?
// Pushvars, // TODO Do we want to support this?
}
impl PartHeaderType {
@ -88,6 +89,7 @@ impl PartHeaderType {
"check:heads" => Ok(CheckHeads),
"pushkey" => Ok(Pushkey),
"reply:pushkey" => Ok(ReplyPushkey),
"pushvars" => Ok(Pushvars),
bad => bail_msg!("unknown header type {}", bad),
}
}
@ -107,6 +109,7 @@ impl PartHeaderType {
B2xRebasePack => "b2x:rebasepackpart",
CheckHeads => "check:heads",
Pushkey => "pushkey",
Pushvars => "pushvars",
ReplyPushkey => "reply:pushkey",
}
}

View File

@ -49,6 +49,7 @@ lazy_static! {
m.insert(PartHeaderType::B2xTreegroup2, hashset!{"version", "cache", "category"});
m.insert(PartHeaderType::Replycaps, hashset!{});
m.insert(PartHeaderType::Pushkey, hashset!{ "namespace", "key", "old", "new" });
m.insert(PartHeaderType::Pushvars, hashset!{});
m
};
}
@ -153,6 +154,12 @@ pub fn inner_stream<R: AsyncRead + BufRead + 'static + Send>(
let empty = wrapped_stream.decode(EmptyUnpacker).for_each(|_| Ok(()));
Bundle2Item::Pushkey(header, Box::new(empty))
}
&PartHeaderType::Pushvars => {
// Pushvars part has an empty part payload, but we still need to "parse" it
// Otherwise polling remainder stream may fail.
let empty = wrapped_stream.decode(EmptyUnpacker).for_each(|_| Ok(()));
Bundle2Item::Pushvars(header, Box::new(empty))
}
_ => panic!("TODO: make this an error"),
};