Fix for delta cycles in partial pull / fetch through Mononoke GRit server

Summary: When pulling the latest changes for `whatsapp/server` through `Mononoke GRit` server, we encountered an error which was essentially a result of a cycle included in the packfile. Git has weird set of conditions in which it allows cycles and certain other conditions in which it doesn't allow cycles. To avoid bumping into this problem in the future again, let's build in cycle-detection logic which prevents from sending objects like A and B where A is expressed as delta of B and B is expressed as delta of A.

Reviewed By: mitrandir77

Differential Revision: D56981007

fbshipit-source-id: dad6e11460531176b805d94c06f857377a850dcd
This commit is contained in:
Rajiv Sharma 2024-05-07 03:11:41 -07:00 committed by Facebook GitHub Bot
parent 3ab7e2f70b
commit b36ba1179b

View File

@ -185,14 +185,16 @@ pub async fn ref_oid_mapping(
anyhow::Ok(wanted_refs_with_oid.into_iter())
}
/// Get the count of distinct blob and tree items to be included in the packfile
/// Get the count of distinct blob and tree items to be included in the packfile along with the
/// set of base objects that are expected to be present at the client
async fn trees_and_blobs_count(
ctx: &CoreContext,
repo: &impl Repo,
target_commits: BoxStream<'_, Result<ChangesetId>>,
git_delta_manifest_version: GitDeltaManifestVersion,
delta_inclusion: DeltaInclusion,
concurrency: usize,
) -> Result<usize> {
) -> Result<(usize, FxHashSet<ObjectId>)> {
// Sum up the entries in the delta manifest for each commit included in packfile
target_commits
.map_ok(|changeset_id| {
@ -210,8 +212,14 @@ async fn trees_and_blobs_count(
// in the packfile
let objects = delta_manifest
.into_subentries(ctx, &blobstore)
.map_ok(|(_, entry)| entry.full_object_oid())
.try_collect::<FxHashSet<_>>()
.map_ok(|(_, entry)| {
let delta = delta_base(&entry, delta_inclusion);
(
entry.full_object_oid(),
delta.map(|delta| delta.base_object_oid()),
)
})
.try_collect::<Vec<_>>()
.await
.with_context(|| {
format!(
@ -222,10 +230,29 @@ async fn trees_and_blobs_count(
anyhow::Ok(objects)
}
})
.try_buffer_unordered(concurrency)
.try_concat()
.try_buffered(concurrency)
.try_fold(
(FxHashSet::default(), FxHashSet::default()),
|(mut object_set, mut base_set), objects_with_bases| async move {
for (object, base) in objects_with_bases {
// If the object is already used as a base, then it should NOT be
// part of the packfile
if !base_set.contains(&object) {
object_set.insert(object);
if let Some(base_oid) = base {
// If the base of this delta was already counted as part of the packfile,
// then do NOT add it to the set of base objects
if !object_set.contains(&base_oid) {
base_set.insert(base_oid);
}
}
}
}
Ok((object_set, base_set))
},
)
.await
.map(|objects| objects.len())
.map(|(object_set, base_set)| (object_set.len(), base_set))
}
fn delta_below_threshold(
@ -692,6 +719,7 @@ async fn blob_and_tree_packfile_stream<'a>(
delta_inclusion: DeltaInclusion,
packfile_item_inclusion: PackfileItemInclusion,
git_delta_manifest_version: GitDeltaManifestVersion,
base_set: Arc<FxHashSet<ObjectId>>,
concurrency: PackfileConcurrency,
) -> Result<BoxStream<'a, Result<PackfileItem>>> {
// Get the packfile items corresponding to blob and tree objects in the repo. Where applicable, use delta to represent them
@ -713,6 +741,18 @@ async fn blob_and_tree_packfile_stream<'a>(
})
.try_buffered(concurrency.trees_and_blobs * 2)
.try_flatten()
.try_filter_map(move |(cs_id, path, entry)| {
let base_set = base_set.clone();
async move {
let object_id = entry.full_object_oid();
if base_set.contains(&object_id) {
// This object is already present at the client, so do not include it in the packfile
anyhow::Ok(None)
} else {
anyhow::Ok(Some((cs_id, path, entry)))
}
}
})
// We use map + buffered instead of map_ok + try_buffered since weighted buffering for futures
// currently exists only for Stream and not for TryStream
.map(move |result| {
@ -971,11 +1011,12 @@ pub async fn generate_pack_item_stream<'a>(
target_commits.reverse();
let commits_count = target_commits.len();
// STEP 1: Get the count of distinct blob and tree objects to be included in the packfile/bundle.
let trees_and_blobs_count = trees_and_blobs_count(
let (trees_and_blobs_count, base_set) = trees_and_blobs_count(
&ctx,
repo,
to_commit_stream(target_commits.clone()),
git_delta_manifest_version,
request.delta_inclusion,
request.concurrency.trees_and_blobs,
)
.await
@ -1002,6 +1043,7 @@ pub async fn generate_pack_item_stream<'a>(
request.delta_inclusion,
request.packfile_item_inclusion,
git_delta_manifest_version,
Arc::new(base_set),
request.concurrency,
)
.await
@ -1106,11 +1148,12 @@ pub async fn fetch_response<'a>(
// Reverse the list of commits so that we can prevent delta cycles from appearing in the packfile
target_commits.reverse();
// Get the count of unique blob and tree objects to be included in the packfile
let trees_and_blobs_count = trees_and_blobs_count(
let (trees_and_blobs_count, base_set) = trees_and_blobs_count(
&ctx,
repo,
to_commit_stream(target_commits.clone()),
git_delta_manifest_version,
delta_inclusion,
request.concurrency.trees_and_blobs,
)
.await
@ -1125,6 +1168,7 @@ pub async fn fetch_response<'a>(
delta_inclusion,
packfile_item_inclusion,
git_delta_manifest_version,
Arc::new(base_set),
request.concurrency,
)
.await