mononoke: getbundle_response: Remove some usages of BlobRepo

Summary:
In the next diff I am going to completely remove `BlobRepo` from the
hg_sync job. However, the code relies on `getbundle_response` which uses `BlobRepo`.

This diff removes enough of `BlobRepo` for the hg_sync job to compile without
using `BlobRepo`.

Reviewed By: markbt

Differential Revision: D37789412

fbshipit-source-id: 2f0063cc285e5796600ba9b3ac137766ccfcf1d7
This commit is contained in:
Harvey Hunt 2022-07-13 15:05:11 -07:00 committed by Facebook GitHub Bot
parent 1969e48c95
commit dadda621f9
3 changed files with 59 additions and 27 deletions

View File

@ -278,7 +278,7 @@ impl BlobRepo {
changesetid: ChangesetId,
) -> Result<Vec<ChangesetId>, Error> {
STATS::get_changeset_parents_by_bonsai.add_value(1);
let changeset = self.inner.changesets.get(ctx, changesetid).await?;
let changeset = self.changesets().get(ctx, changesetid).await?;
let parents = changeset
.ok_or_else(|| format_err!("Commit {} does not exist in the repo", changesetid))?
.parents;

View File

@ -12,9 +12,11 @@ anyhow = "1.0.56"
blobrepo = { version = "0.1.0", path = "../../blobrepo" }
blobrepo_hg = { version = "0.1.0", path = "../../blobrepo/blobrepo_hg" }
blobstore = { version = "0.1.0", path = "../../blobstore" }
bonsai_hg_mapping = { version = "0.1.0", path = "../../bonsai_hg_mapping" }
bytes = { version = "1.1", features = ["serde"] }
bytes-old = { package = "bytes", version = "0.4", features = ["serde"] }
changeset_fetcher = { version = "0.1.0", path = "../../blobrepo/changeset_fetcher" }
changesets = { version = "0.1.0", path = "../../changesets" }
cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
context = { version = "0.1.0", path = "../../server/context" }
derived_data = { version = "0.1.0", path = "../../derived_data" }
@ -36,6 +38,7 @@ phases = { version = "0.1.0", path = "../../phases" }
rate_limiting = { version = "0.1.0", path = "../../rate_limiting" }
reachabilityindex = { version = "0.1.0", path = "../../reachabilityindex" }
repo_blobstore = { version = "0.1.0", path = "../../blobrepo/repo_blobstore" }
repo_derived_data = { version = "0.1.0", path = "../../repo_attributes/repo_derived_data" }
revset = { version = "0.1.0", path = "../../revset" }
sha-1 = "0.8"
slog = { version = "2.7", features = ["max_level_trace", "nested-values"] }

View File

@ -12,9 +12,11 @@ use anyhow::Result;
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Loadable;
use bonsai_hg_mapping::BonsaiHgMappingRef;
use bytes::Bytes;
use bytes_old::Bytes as BytesOld;
use changeset_fetcher::ArcChangesetFetcher;
use changesets::ChangesetsRef;
use cloned::cloned;
use context::CoreContext;
use context::PerfCounterType;
@ -70,6 +72,8 @@ use phases::PhasesRef;
use rate_limiting::Metric;
use reachabilityindex::LeastCommonAncestorsHint;
use repo_blobstore::RepoBlobstore;
use repo_blobstore::RepoBlobstoreRef;
use repo_derived_data::RepoDerivedDataRef;
use revset::DifferenceOfUnionsOfAncestorsNodeStream;
use sha1::Digest;
use sha1::Sha1;
@ -805,7 +809,7 @@ async fn find_phase_heads(
/// `draft_callback` returns true.
async fn traverse_draft_commits(
ctx: &CoreContext,
repo: &BlobRepo,
repo: &(impl ChangesetsRef + RepoDerivedDataRef + BonsaiHgMappingRef + Send + Sync),
phases: &dyn Phases,
heads: &[HgChangesetId],
mut public_callback: impl FnMut(ChangesetId, HgChangesetId),
@ -850,8 +854,16 @@ async fn traverse_draft_commits(
// TODO(mbthomas): After blobrepo refactoring, change to use a method that calls `Changesets::get_many`.
let parents: Vec<_> = stream::iter(traverse)
.map(move |csid| async move {
repo.get_changeset_parents_by_bonsai(ctx.clone(), csid)
.await
let parents = repo
.changesets()
.get(ctx.clone(), csid)
.await?
.ok_or_else(|| {
anyhow::format_err!("Commit {} does not exist in the repo", csid)
})?
.parents;
Result::<_, Error>::Ok(parents)
})
.buffered(100)
.try_collect::<Vec<_>>()
@ -897,7 +909,7 @@ impl PreparedFilenodeEntry {
async fn into_filenode<'a>(
self,
ctx: &'a CoreContext,
repo: &'a BlobRepo,
blobstore: &'a RepoBlobstore,
) -> Result<(HgFileNodeId, HgChangesetId, HgBlobNode, Option<RevFlags>), Error> {
let Self {
filenode,
@ -910,21 +922,21 @@ impl PreparedFilenodeEntry {
async fn fetch_and_wrap(
ctx: &CoreContext,
repo: &BlobRepo,
blobstore: &RepoBlobstore,
content_id: ContentId,
) -> Result<FileBytes, Error> {
let content = filestore::fetch_concat(repo.blobstore(), ctx, content_id).await?;
let content = filestore::fetch_concat(blobstore, ctx, content_id).await?;
Ok(FileBytes(content))
}
let (blob, flags) = match content {
FilenodeEntryContent::InlineV2(content_id) => {
let bytes = fetch_and_wrap(ctx, repo, content_id).await?;
let bytes = fetch_and_wrap(ctx, blobstore, content_id).await?;
(generate_inline_file(&bytes, parents, &metadata), None)
}
FilenodeEntryContent::InlineV3(content_id) => {
let bytes = fetch_and_wrap(ctx, repo, content_id).await?;
let bytes = fetch_and_wrap(ctx, blobstore, content_id).await?;
(
generate_inline_file(&bytes, parents, &metadata),
Some(RevFlags::REVIDX_DEFAULT_FLAGS),
@ -964,14 +976,14 @@ fn calculate_content_weight_hint(content_size: u64, content: &FilenodeEntryConte
fn prepare_filenode_entries_stream<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
blobstore: &'a RepoBlobstore,
filenodes: Vec<(MPath, HgFileNodeId, HgChangesetId)>,
lfs_session: &'a SessionLfsParams,
) -> impl Stream<Item = Result<(MPath, Vec<PreparedFilenodeEntry>), Error>> + 'a {
stream::iter(filenodes.into_iter())
.map({
move |(path, filenode, linknode)| async move {
let envelope = filenode.load(ctx, repo.blobstore()).await?;
let envelope = filenode.load(ctx, blobstore).await?;
let file_size = envelope.content_size();
@ -982,7 +994,7 @@ fn prepare_filenode_entries_stream<'a>(
}
_ => {
let key = FetchKey::from(envelope.content_id());
let meta = filestore::get_metadata(repo.blobstore(), ctx, &key).await?;
let meta = filestore::get_metadata(blobstore, ctx, &key).await?;
let meta =
meta.ok_or_else(|| Error::from(ErrorKind::MissingContent(key)))?;
let oid = meta.sha256;
@ -1085,7 +1097,9 @@ pub fn create_manifest_entries_stream(
async fn diff_with_parents(
ctx: &CoreContext,
repo: &BlobRepo,
repo: &(
impl ChangesetsRef + RepoDerivedDataRef + BonsaiHgMappingRef + RepoBlobstoreRef + Send + Sync
),
hg_cs_id: HgChangesetId,
) -> Result<
(
@ -1094,13 +1108,21 @@ async fn diff_with_parents(
),
Error,
> {
let (mf_id, parent_mf_ids) = try_join!(fetch_manifest(ctx, repo, &hg_cs_id), async {
let parents = repo.get_changeset_parents(ctx.clone(), hg_cs_id).await?;
let (mf_id, parent_mf_ids) = try_join!(
fetch_manifest(ctx, repo.repo_blobstore(), &hg_cs_id),
async {
let parents = repo.get_changeset_parents(ctx.clone(), hg_cs_id).await?;
future::try_join_all(parents.iter().map(|p| fetch_manifest(ctx, repo, p))).await
})?;
future::try_join_all(
parents
.iter()
.map(|p| fetch_manifest(ctx, repo.repo_blobstore(), p)),
)
.await
}
)?;
let blobstore = Arc::new(repo.get_blobstore());
let blobstore = Arc::new(repo.repo_blobstore().clone());
let new_entries: Vec<(Option<MPath>, Entry<_, _>, _)> =
find_intersection_of_diffs_and_parents(ctx.clone(), blobstore, mf_id, parent_mf_ids)
.try_collect()
@ -1137,7 +1159,7 @@ async fn diff_with_parents(
fn create_filenodes_weighted(
ctx: CoreContext,
repo: BlobRepo,
repo: impl RepoBlobstoreRef + Clone + Sync + Send + 'static,
entries: HashMap<MPath, Vec<PreparedFilenodeEntry>>,
) -> impl OldStream<
Item = (
@ -1159,7 +1181,7 @@ fn create_filenodes_weighted(
|entry| {
{
cloned!(ctx, repo);
async move { entry.into_filenode(&ctx, &repo).await }
async move { entry.into_filenode(&ctx, repo.repo_blobstore()).await }
}
.boxed()
.compat()
@ -1177,7 +1199,7 @@ fn create_filenodes_weighted(
pub fn create_filenodes(
ctx: CoreContext,
repo: BlobRepo,
repo: impl RepoBlobstoreRef + Clone + Sync + Send + 'static,
entries: HashMap<MPath, Vec<PreparedFilenodeEntry>>,
) -> impl OldStream<Item = (MPath, Vec<FilenodeEntry>), Error = Error> {
let params = BufferedParams {
@ -1191,7 +1213,9 @@ pub fn create_filenodes(
// created in an earlier commit will be earlier in the output.
pub async fn get_manifests_and_filenodes(
ctx: &CoreContext,
repo: &BlobRepo,
repo: &(
impl ChangesetsRef + RepoDerivedDataRef + BonsaiHgMappingRef + RepoBlobstoreRef + Send + Sync
),
commits: impl IntoIterator<Item = HgChangesetId>,
lfs_params: &SessionLfsParams,
) -> Result<
@ -1207,9 +1231,14 @@ pub async fn get_manifests_and_filenodes(
let (manifests, filenodes) = diff_with_parents(ctx, repo, hg_cs_id).await?;
let filenodes: Vec<(MPath, Vec<PreparedFilenodeEntry>)> =
prepare_filenode_entries_stream(&ctx, &repo, filenodes, &lfs_params)
.try_collect()
.await?;
prepare_filenode_entries_stream(
ctx,
repo.repo_blobstore(),
filenodes,
lfs_params,
)
.try_collect()
.await?;
Result::<_, Error>::Ok((manifests, filenodes))
}
})
@ -1248,9 +1277,9 @@ pub async fn get_manifests_and_filenodes(
async fn fetch_manifest(
ctx: &CoreContext,
repo: &BlobRepo,
blobstore: &RepoBlobstore,
hg_cs_id: &HgChangesetId,
) -> Result<HgManifestId, Error> {
let blob_cs = hg_cs_id.load(ctx, repo.blobstore()).await?;
let blob_cs = hg_cs_id.load(ctx, blobstore).await?;
Ok(blob_cs.manifestid())
}