blobrepo: don't fetch Hg Changeset IDs sequentially

Summary:
Fetching things from MySQL sequentially in a buffered fashion is a bad
practice, since we might end up saturating the underlying MySQL pool, and
starving other MySQL  clients.

Instead, let's make fewer, bigger queries.

Reviewed By: ahornby

Differential Revision: D19766787

fbshipit-source-id: 1cf9102eaca8cc1ab55b7b85039ca99627a86b71
This commit is contained in:
Thomas Orozco 2020-02-06 12:08:36 -08:00 committed by Facebook Github Bot
parent ce8b9a0fbe
commit d39eea991b

View File

@ -1982,17 +1982,39 @@ fn to_hg_bookmark_stream<T>(
where
T: Stream<Item = (Bookmark, ChangesetId), Error = Error>,
{
// TODO: (torozco) T44876554 If this hits the database for all (or most of) the bookmarks,
// it'll be fairly inefficient.
stream
.chunks(100)
.map({
cloned!(repo, ctx);
move |(bookmark, cs_id)| {
repo.get_hg_from_bonsai_changeset(ctx.clone(), cs_id)
.map(move |cs_id| (bookmark, cs_id))
move |chunk| {
let cs_ids = chunk.iter().map(|(_, cs_id)| *cs_id).collect::<Vec<_>>();
repo.get_hg_bonsai_mapping(ctx.clone(), cs_ids)
.map(move |mapping| {
let mapping = mapping
.into_iter()
.map(|(hg_cs_id, cs_id)| (cs_id, hg_cs_id))
.collect::<HashMap<_, _>>();
let res = chunk
.into_iter()
.map(|(bookmark, cs_id)| {
let hg_cs_id = mapping.get(&cs_id).ok_or_else(|| {
anyhow::format_err!(
"cs_id was missing from mapping: {:?}",
cs_id
)
})?;
Ok((bookmark, *hg_cs_id))
})
.collect::<Vec<_>>();
stream::iter_result(res)
})
.flatten_stream()
}
})
.buffer_unordered(100)
.flatten()
}
impl DangerousOverride<Arc<dyn LeaseOps>> for BlobRepo {