derived_data: buffer fetching of mapping batches

Summary:
When fetching many derived data mappings, the use of `FuturesUnordered` means
we may fetch many blobs concurrently, which may overload the blobstore.

Switch to using `buffered` to reduce the number of concurrent blob fetches.

Reviewed By: ahornby

Differential Revision: D25371965

fbshipit-source-id: 30417e86bc33defbb821f214a5520ab1b8a8c18c
This commit is contained in:
Mark Juggurnauth-Thomas 2020-12-14 09:22:57 -08:00 committed by Facebook GitHub Bot
parent 615537ef10
commit 90dc6adb1c

View File

@ -13,7 +13,7 @@ use async_trait::async_trait;
use blobrepo::BlobRepo;
use blobstore::{Blobstore, BlobstoreBytes, BlobstoreGetData};
use context::CoreContext;
use futures::stream::{FuturesUnordered, TryStreamExt};
use futures::stream::{self, StreamExt, TryStreamExt};
use metaconfig_types::DerivedDataTypesConfig;
use mononoke_types::ChangesetId;
@ -61,18 +61,16 @@ pub trait BlobstoreRootIdMapping {
ctx: &CoreContext,
cs_ids: Vec<ChangesetId>,
) -> Result<HashMap<ChangesetId, Self::Value>> {
cs_ids
.into_iter()
.map(|cs_id| async move {
match self.fetch(ctx, cs_id).await? {
Some(value) => Ok(Some((cs_id, value))),
None => Ok(None),
}
})
.collect::<FuturesUnordered<_>>()
.try_filter_map(|maybe_value| async move { Ok(maybe_value) })
.try_collect()
.await
stream::iter(cs_ids.into_iter().map(|cs_id| async move {
match self.fetch(ctx, cs_id).await? {
Some(value) => Ok(Some((cs_id, value))),
None => Ok(None),
}
}))
.buffer_unordered(64)
.try_filter_map(|maybe_value| async move { Ok(maybe_value) })
.try_collect()
.await
}
/// Store a new mapping value.
@ -123,19 +121,17 @@ pub trait BlobstoreExistsMapping {
ctx: &CoreContext,
cs_ids: Vec<ChangesetId>,
) -> Result<HashMap<ChangesetId, Self::Value>> {
cs_ids
.into_iter()
.map(|cs_id| async move {
if self.exists(ctx, cs_id).await? {
Ok(Some((cs_id, cs_id.into())))
} else {
Ok(None)
}
})
.collect::<FuturesUnordered<_>>()
.try_filter_map(|maybe_value| async move { Ok(maybe_value) })
.try_collect()
.await
stream::iter(cs_ids.into_iter().map(|cs_id| async move {
if self.exists(ctx, cs_id).await? {
Ok(Some((cs_id, cs_id.into())))
} else {
Ok(None)
}
}))
.buffer_unordered(64)
.try_filter_map(|maybe_value| async move { Ok(maybe_value) })
.try_collect()
.await
}
/// Stores a new entry in the mapping.