getbundle_response: don't fetch Hg Changeset IDs sequentially

Summary:
Fetching things from MySQL sequentially in a buffered fashion is a bad
practice, since we might end up saturating the underlying MySQL pool with a lot
of requests. Doing so will result in other queries being delayed as they wait
behind our batch of queries, which results in higher dispatch latency.

Instead, let's make fewer, bigger queries. Also, while we're in here, let's
update blobrepo to have an up-to-date comment.

Reviewed By: StanislavGlebik

Differential Revision: D19766788

fbshipit-source-id: 318ec4778ca259b210d431fc2add8b327bfce99a
This commit is contained in:
Thomas Orozco 2020-02-06 12:08:36 -08:00 committed by Facebook Github Bot
parent 4874c1b7ab
commit ce8b9a0fbe
2 changed files with 67 additions and 20 deletions

View File

@ -613,7 +613,10 @@ impl BlobRepo {
}
// Returns only the mapping for valid changests that are known to the server.
// Result may not contain all the ids from the input.
// For Bonsai -> Hg conversion, missing Hg changesets will be derived (so all Bonsais will be
// in the output).
// For Hg -> Bonsai conversion, missing Bonsais will not be returned, since they cannot be
// derived from Hg Changesets.
pub fn get_hg_bonsai_mapping(
&self,
ctx: CoreContext,

View File

@ -19,7 +19,10 @@ use derived_data::BonsaiDerived;
use derived_data_filenodes::{FilenodesOnlyPublic, FilenodesOnlyPublicMapping};
use futures::{future, stream as old_stream, Future, Stream as OldStream};
use futures_ext::FutureExt as OldFutureExt;
use futures_preview::{compat::Future01CompatExt, stream, StreamExt, TryStreamExt};
use futures_preview::{
compat::Future01CompatExt,
stream::{self, StreamExt, TryStreamExt},
};
use futures_util::try_join;
use mercurial_bundles::{changegroup::CgVersion, part_encode::PartEncodeBuilder, parts};
use mercurial_revlog::{self, RevlogChangeset};
@ -182,26 +185,64 @@ async fn create_hg_changeset_part(
blobrepo: &BlobRepo,
nodes_to_send: Vec<ChangesetId>,
) -> Result<PartEncodeBuilder> {
let changesets_buffer_size = 1000;
let map_chunk_size = 100;
let load_buffer_size = 1000;
let changelogentries = old_stream::iter_ok(nodes_to_send)
.map({
cloned!(blobrepo, ctx);
move |bonsai| {
blobrepo
.get_hg_from_bonsai_changeset(ctx.clone(), bonsai)
.and_then({
cloned!(ctx, blobrepo);
move |node| {
node.load(ctx, blobrepo.blobstore())
.from_err()
.map(move |cs| (node.into_nodehash(), cs))
}
})
let changelogentries = stream::iter(nodes_to_send)
.chunks(map_chunk_size)
.then({
cloned!(ctx, blobrepo);
move |bonsais| {
cloned!(ctx, blobrepo);
async move {
let mapping = blobrepo
.get_hg_bonsai_mapping(ctx.clone(), bonsais.clone())
.compat()
.await?
.into_iter()
.map(|(hg_cs_id, bonsai_cs_id)| (bonsai_cs_id, hg_cs_id))
.collect::<HashMap<_, _>>();
// We need to preserve ordering of the Bonsais for Mercurial on the client-side.
let ordered_mapping = bonsais
.into_iter()
.map(|bcs_id| {
let hg_cs_id = mapping.get(&bcs_id).ok_or_else(|| {
anyhow::format_err!("cs_id was missing from mapping: {:?}", bcs_id)
})?;
Ok((*hg_cs_id, bcs_id))
})
.collect::<Vec<_>>();
Result::<_, Error>::Ok(ordered_mapping)
}
}
})
.buffered(changesets_buffer_size)
.and_then(|(node, cs)| {
.map_ok(|res| stream::iter(res))
.try_flatten()
.map({
cloned!(ctx, blobrepo);
move |res| {
cloned!(ctx, blobrepo);
async move {
match res {
Ok((hg_cs_id, _bcs_id)) => {
let cs = hg_cs_id
.load(ctx.clone(), blobrepo.blobstore())
.compat()
.await?;
Ok((hg_cs_id, cs))
}
Err(e) => Err(e),
}
}
}
})
.buffered(load_buffer_size)
.and_then(|(hg_cs_id, cs)| async move {
let node = hg_cs_id.into_nodehash();
let revlogcs = RevlogChangeset::new_from_parts(
cs.parents(),
cs.manifestid(),
@ -214,11 +255,14 @@ async fn create_hg_changeset_part(
let mut v = Vec::new();
mercurial_revlog::changeset::serialize_cs(&revlogcs, &mut v)?;
Ok((
node,
HgBlobNode::new(Bytes::from(v), revlogcs.p1(), revlogcs.p2()),
))
});
})
.boxed()
.compat();
parts::changegroup_part(changelogentries, None, CgVersion::Cg2Version)
}