mononoke: blobrepo_hg: Make generic over facets

Summary:
`BlobRepoHg` provides a set of extension methods on top of `BlobRepo`
for hg specific functionality. The code is at a low layer in Mononoke and so
upper layers need to provide a `BlobRepo` in order to use the hg specific
functionality. This makes refactoring code to remove `BlobRepo` difficult.

This diff refactors `BlobRepoHg` so that it can be implemented for a type that
has the required facets, rather than just `BlobRepo` (although `BlobRepo` does
implement all of the necessary facets). In a later diff, I use this to remove
`BlobRepo` completely from the hg_sync job.

This diff implements this by bounding the implementation of the `BlobRepoHg`
trait to types that have the required facets for a specific method. For
example, a type that implements `ChangesetsRef` and `BonsaiHgMappingRef` can
call the `changeset_exists` method.

I added per-method trait bounds so that a type doesn't need to implement all
facets in order to call a single method.

This diff also removes the `get_filenodes` and `hg_mutation_store` methods as
the functionality they provide can be accessed through facets instead.

NOTE: In this diff I've also tried to remove all usages of `BlobRepo` from the
`blobrepo_hg` crate. However, removing them from `create_changeset.rs` was
pretty tricky and requires some more thinking. I'll come back to that later.

NOTE: I've not renamed `BlobRepoHg` in this diff in order to keep the changes
smaller, but am happy to do so in a later diff if people have better name
suggestions.

Reviewed By: markbt

Differential Revision: D37789413

fbshipit-source-id: c192fb871e3f5784930c02d9c341294340e5eda3
This commit is contained in:
Harvey Hunt 2022-07-13 15:05:11 -07:00 committed by Facebook GitHub Bot
parent 65f7049a11
commit 1969e48c95
9 changed files with 238 additions and 130 deletions

View File

@ -26,10 +26,10 @@ futures_ext = { version = "0.1.0", git = "https://github.com/facebookexperimenta
futures_stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
manifest = { version = "0.1.0", path = "../../manifest" }
mercurial_derived_data = { version = "0.1.0", path = "../../derived_data/mercurial_derived_data" }
mercurial_mutation = { version = "0.1.0", path = "../../mercurial/mutation" }
mercurial_types = { version = "0.1.0", path = "../../mercurial/types" }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
repo_blobstore = { version = "0.1.0", path = "../repo_blobstore" }
repo_derived_data = { version = "0.1.0", path = "../../repo_attributes/repo_derived_data" }
scuba_ext = { version = "0.1.0", path = "../../common/scuba_ext" }
sorted_vector_map = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }

View File

@ -39,7 +39,6 @@ use repo_blobstore::RepoBlobstore;
use sorted_vector_map::SortedVectorMap;
use crate::errors::*;
use crate::BlobRepo;
/// Creates bonsai changeset from already created HgBlobChangeset.
pub async fn create_bonsai_changeset_object(
@ -47,13 +46,13 @@ pub async fn create_bonsai_changeset_object(
cs: HgBlobChangeset,
parent_manifests: Vec<HgManifestId>,
bonsai_parents: Vec<ChangesetId>,
repo: &BlobRepo,
blobstore: &RepoBlobstore,
) -> Result<BonsaiChangeset, Error> {
let file_changes = find_file_changes(
ctx,
cs.clone(),
parent_manifests,
repo,
blobstore,
bonsai_parents.clone(),
)
.await?;
@ -102,13 +101,13 @@ pub async fn save_bonsai_changeset_object(
fn find_bonsai_diff(
ctx: &CoreContext,
repo: &BlobRepo,
blobstore: RepoBlobstore,
cs: HgBlobChangeset,
parent_manifests: HashSet<HgManifestId>,
) -> Result<impl TryStream<Ok = BonsaiDiffFileChange<HgFileNodeId>, Error = Error>> {
Ok(bonsai_diff(
ctx.clone(),
repo.get_blobstore(),
blobstore,
cs.manifestid(),
parent_manifests,
))
@ -119,59 +118,63 @@ async fn find_file_changes(
ctx: &CoreContext,
cs: HgBlobChangeset,
parent_manifests: Vec<HgManifestId>,
repo: &BlobRepo,
blobstore: &RepoBlobstore,
bonsai_parents: Vec<ChangesetId>,
) -> Result<SortedVectorMap<MPath, FileChange>, Error> {
let diff: Result<_, Error> =
find_bonsai_diff(ctx, repo, cs, parent_manifests.iter().cloned().collect())
.context("While finding bonsai diff")?
.map_ok(|diff| {
cloned!(parent_manifests, bonsai_parents);
async move {
match diff {
BonsaiDiffFileChange::Changed(path, ty, entry_id) => {
let file_node_id = HgFileNodeId::new(entry_id.into_nodehash());
let envelope = file_node_id
.load(ctx, repo.blobstore())
.await
.context("While fetching bonsai file changes")?;
let size = envelope.content_size();
let content_id = envelope.content_id();
let diff: Result<_, Error> = find_bonsai_diff(
ctx,
blobstore.clone(),
cs,
parent_manifests.iter().cloned().collect(),
)
.context("While finding bonsai diff")?
.map_ok(|diff| {
cloned!(parent_manifests, bonsai_parents);
async move {
match diff {
BonsaiDiffFileChange::Changed(path, ty, entry_id) => {
let file_node_id = HgFileNodeId::new(entry_id.into_nodehash());
let envelope = file_node_id
.load(ctx, blobstore)
.await
.context("While fetching bonsai file changes")?;
let size = envelope.content_size();
let content_id = envelope.content_id();
let copyinfo = get_copy_info(
ctx.clone(),
repo.clone(),
bonsai_parents,
path.clone(),
envelope,
parent_manifests,
)
.await
.context("While fetching copy information")?;
Ok((
path,
FileChange::tracked(content_id, ty, size as u64, copyinfo),
))
}
BonsaiDiffFileChange::ChangedReusedId(path, ty, entry_id) => {
let file_node_id = HgFileNodeId::new(entry_id.into_nodehash());
let envelope = file_node_id
.load(ctx, repo.blobstore())
.await
.context("While fetching bonsai file changes")?;
let size = envelope.content_size();
let content_id = envelope.content_id();
// Reused ID means copy info is *not* stored.
Ok((path, FileChange::tracked(content_id, ty, size as u64, None)))
}
BonsaiDiffFileChange::Deleted(path) => Ok((path, FileChange::Deletion)),
}
let copyinfo = get_copy_info(
ctx.clone(),
blobstore.clone(),
bonsai_parents,
path.clone(),
envelope,
parent_manifests,
)
.await
.context("While fetching copy information")?;
Ok((
path,
FileChange::tracked(content_id, ty, size as u64, copyinfo),
))
}
})
.try_buffer_unordered(100) // TODO(stash): magic number?
.try_collect::<std::collections::BTreeMap<_, _>>()
.await;
BonsaiDiffFileChange::ChangedReusedId(path, ty, entry_id) => {
let file_node_id = HgFileNodeId::new(entry_id.into_nodehash());
let envelope = file_node_id
.load(ctx, blobstore)
.await
.context("While fetching bonsai file changes")?;
let size = envelope.content_size();
let content_id = envelope.content_id();
// Reused ID means copy info is *not* stored.
Ok((path, FileChange::tracked(content_id, ty, size as u64, None)))
}
BonsaiDiffFileChange::Deleted(path) => Ok((path, FileChange::Deletion)),
}
}
})
.try_buffer_unordered(100) // TODO(stash): magic number?
.try_collect::<std::collections::BTreeMap<_, _>>()
.await;
Ok(SortedVectorMap::from_iter(diff?))
}
@ -182,7 +185,7 @@ async fn find_file_changes(
// we need to find a parent from which this filenode was copied.
async fn get_copy_info(
ctx: CoreContext,
repo: BlobRepo,
blobstore: RepoBlobstore,
bonsai_parents: Vec<ChangesetId>,
copy_from_path: MPath,
envelope: HgFileEnvelope,
@ -203,10 +206,10 @@ async fn get_copy_info(
let get_bonsai_cs_copied_from =
|(bonsai_parent, parent_mf): (ChangesetId, HgManifestId)| {
cloned!(ctx, repo);
cloned!(ctx, blobstore);
async move {
let entry = parent_mf
.find_entry(ctx, repo.get_blobstore(), Some(repopath.clone()))
.find_entry(ctx, blobstore, Some(repopath.clone()))
.await
.ok()?;
if entry?.into_leaf()?.1 == copyfromnode {

View File

@ -43,6 +43,7 @@ use mononoke_types::BlobstoreValue;
use mononoke_types::BonsaiChangeset;
use mononoke_types::ChangesetId;
use mononoke_types::MPath;
use repo_blobstore::RepoBlobstoreRef;
use scuba_ext::MononokeScubaSampleBuilder;
use stats::prelude::*;
use std::sync::Arc;
@ -109,7 +110,7 @@ pub fn create_bonsai_changeset_hook(origin_repo: Option<BlobRepo>) -> Arc<Bonsai
hg_cs.clone(),
parent_manifest_hashes,
bonsai_parents,
&repo,
repo.repo_blobstore(),
)
.await?;
verify_bonsai_changeset_with_origin(ctx, bonsai_cs, hg_cs, origin_repo).await
@ -194,7 +195,7 @@ impl CreateChangeset {
hg_cs,
parent_manifest_hashes,
bonsai_parents,
&repo,
repo.repo_blobstore(),
)
.await
}

View File

@ -26,22 +26,23 @@ pub mod file_history {
use anyhow::Error;
use async_trait::async_trait;
use blobrepo::BlobRepo;
use blobrepo_errors::ErrorKind;
use bonsai_hg_mapping::BonsaiHgMapping;
use bonsai_hg_mapping::BonsaiHgMappingRef;
use bonsai_hg_mapping::BonsaiOrHgChangesetIds;
use bookmarks::Bookmark;
use bookmarks::BookmarkKind;
use bookmarks::BookmarkName;
use bookmarks::BookmarkPagination;
use bookmarks::BookmarkPrefix;
use bookmarks::BookmarksRef;
use bookmarks::Freshness;
use changesets::ChangesetsRef;
use cloned::cloned;
use context::CoreContext;
use filenodes::ArcFilenodes;
use filenodes::FilenodeInfo;
use filenodes::FilenodeRangeResult;
use filenodes::FilenodeResult;
use filenodes::FilenodesRef;
use futures::future;
use futures::stream;
use futures::stream::BoxStream;
@ -50,89 +51,127 @@ use futures::StreamExt;
use futures::TryFutureExt;
use futures::TryStreamExt;
use mercurial_derived_data::DeriveHgChangeset;
use mercurial_mutation::ArcHgMutationStore;
use mercurial_types::HgChangesetId;
use mercurial_types::HgFileNodeId;
use mononoke_types::ChangesetId;
use mononoke_types::RepoPath;
use repo_derived_data::RepoDerivedDataRef;
use stats::prelude::*;
use std::collections::HashMap;
use std::collections::HashSet;
/// `BlobRepoHg` is an extension trait for `BlobRepo` which contains
/// `BlobRepoHg` is an extension trait for repo facet containers which contains
/// mercurial specific methods.
#[async_trait]
pub trait BlobRepoHg {
fn get_filenodes(&self) -> &ArcFilenodes;
fn hg_mutation_store(&self) -> &ArcHgMutationStore;
pub trait BlobRepoHg: Send + Sync {
async fn get_hg_bonsai_mapping<'a>(
&'a self,
ctx: CoreContext,
bonsai_or_hg_cs_ids: impl Into<BonsaiOrHgChangesetIds> + 'a + Send,
) -> Result<Vec<(HgChangesetId, ChangesetId)>, Error>;
) -> Result<Vec<(HgChangesetId, ChangesetId)>, Error>
where
Self: ChangesetsRef + RepoDerivedDataRef + BonsaiHgMappingRef;
fn get_heads_maybe_stale(
&self,
ctx: CoreContext,
) -> BoxStream<'static, Result<HgChangesetId, Error>>;
) -> BoxStream<'_, Result<HgChangesetId, Error>>
where
Self: BookmarksRef + RepoDerivedDataRef + Send + Sync;
async fn changeset_exists(
&self,
ctx: CoreContext,
changesetid: HgChangesetId,
) -> Result<bool, Error>;
) -> Result<bool, Error>
where
Self: BonsaiHgMappingRef + ChangesetsRef;
async fn get_changeset_parents(
&self,
ctx: CoreContext,
changesetid: HgChangesetId,
) -> Result<Vec<HgChangesetId>, Error>;
) -> Result<Vec<HgChangesetId>, Error>
where
Self: BonsaiHgMappingRef + ChangesetsRef + RepoDerivedDataRef;
async fn get_bookmark(
&self,
ctx: CoreContext,
name: &BookmarkName,
) -> Result<Option<HgChangesetId>, Error>;
) -> Result<Option<HgChangesetId>, Error>
where
Self: BookmarksRef + RepoDerivedDataRef;
fn get_pull_default_bookmarks_maybe_stale(
&self,
ctx: CoreContext,
) -> BoxStream<'static, Result<(Bookmark, HgChangesetId), Error>>;
) -> BoxStream<'_, Result<(Bookmark, HgChangesetId), Error>>
where
Self: ChangesetsRef
+ BonsaiHgMappingRef
+ RepoDerivedDataRef
+ BlobRepoHg
+ BookmarksRef
+ Clone
+ Send
+ Sync;
fn get_publishing_bookmarks_maybe_stale(
&self,
ctx: CoreContext,
) -> BoxStream<'static, Result<(Bookmark, HgChangesetId), Error>>;
) -> BoxStream<'_, Result<(Bookmark, HgChangesetId), Error>>
where
Self: ChangesetsRef
+ BonsaiHgMappingRef
+ RepoDerivedDataRef
+ BookmarksRef
+ Clone
+ Send
+ Sync;
fn get_bookmarks_by_prefix_maybe_stale(
&self,
ctx: CoreContext,
prefix: &BookmarkPrefix,
max: u64,
) -> BoxStream<'static, Result<(Bookmark, HgChangesetId), Error>>;
) -> BoxStream<'_, Result<(Bookmark, HgChangesetId), Error>>
where
Self: ChangesetsRef
+ BonsaiHgMappingRef
+ RepoDerivedDataRef
+ BlobRepoHg
+ BookmarksRef
+ Clone
+ Send
+ Sync;
async fn get_filenode_opt(
&self,
ctx: CoreContext,
path: &RepoPath,
node: HgFileNodeId,
) -> Result<FilenodeResult<Option<FilenodeInfo>>, Error>;
) -> Result<FilenodeResult<Option<FilenodeInfo>>, Error>
where
Self: FilenodesRef;
async fn get_filenode(
&self,
ctx: CoreContext,
path: &RepoPath,
node: HgFileNodeId,
) -> Result<FilenodeResult<FilenodeInfo>, Error>;
) -> Result<FilenodeResult<FilenodeInfo>, Error>
where
Self: FilenodesRef;
async fn get_all_filenodes_maybe_stale(
&self,
ctx: CoreContext,
path: RepoPath,
limit: Option<u64>,
) -> Result<FilenodeRangeResult<Vec<FilenodeInfo>>, Error>;
) -> Result<FilenodeRangeResult<Vec<FilenodeInfo>>, Error>
where
Self: FilenodesRef;
}
define_stats! {
@ -149,15 +188,7 @@ define_stats! {
}
#[async_trait]
impl BlobRepoHg for BlobRepo {
fn get_filenodes(&self) -> &ArcFilenodes {
self.filenodes()
}
fn hg_mutation_store(&self) -> &ArcHgMutationStore {
self.hg_mutation_store()
}
impl<T: ChangesetsRef + BonsaiHgMappingRef + Send + Sync> BlobRepoHg for T {
// Returns only the mapping for valid changests that are known to the server.
// For Bonsai -> Hg conversion, missing Hg changesets will be derived (so all Bonsais will be
// in the output).
@ -167,8 +198,12 @@ impl BlobRepoHg for BlobRepo {
&'a self,
ctx: CoreContext,
bonsai_or_hg_cs_ids: impl Into<BonsaiOrHgChangesetIds> + 'a + Send,
) -> Result<Vec<(HgChangesetId, ChangesetId)>, Error> {
) -> Result<Vec<(HgChangesetId, ChangesetId)>, Error>
where
Self: ChangesetsRef + RepoDerivedDataRef + BonsaiHgMappingRef,
{
STATS::get_hg_bonsai_mapping.add_value(1);
let bonsai_or_hg_cs_ids = bonsai_or_hg_cs_ids.into();
let hg_bonsai_list = self
.bonsai_hg_mapping()
@ -201,7 +236,7 @@ impl BlobRepoHg for BlobRepo {
}
let existing: HashSet<_> = self
.get_changesets_object()
.changesets()
.get_many(ctx.clone(), notfound.clone())
.await?
.into_iter()
@ -231,16 +266,25 @@ impl BlobRepoHg for BlobRepo {
}
/// Get Mercurial heads, which we approximate as publishing Bonsai Bookmarks.
fn get_heads_maybe_stale(
&self,
ctx: CoreContext,
) -> BoxStream<'static, Result<HgChangesetId, Error>> {
fn get_heads_maybe_stale(&self, ctx: CoreContext) -> BoxStream<'_, Result<HgChangesetId, Error>>
where
Self: BookmarksRef + RepoDerivedDataRef + Send + Sync,
{
STATS::get_heads_maybe_stale.add_value(1);
self.get_bonsai_heads_maybe_stale(ctx.clone())
self.bookmarks()
.list(
ctx.clone(),
Freshness::MaybeStale,
&BookmarkPrefix::empty(),
BookmarkKind::ALL_PUBLISHING,
&BookmarkPagination::FromStart,
std::u64::MAX,
)
.map_ok({
let repo = self.clone();
move |cs| {
cloned!(ctx, repo);
move |(_, cs)| {
cloned!(ctx);
async move { repo.derive_hg_changeset(&ctx, cs).await }
}
})
@ -252,7 +296,10 @@ impl BlobRepoHg for BlobRepo {
&self,
ctx: CoreContext,
changesetid: HgChangesetId,
) -> Result<bool, Error> {
) -> Result<bool, Error>
where
Self: BonsaiHgMappingRef + ChangesetsRef,
{
STATS::changeset_exists.add_value(1);
let csid = self
.bonsai_hg_mapping()
@ -260,7 +307,7 @@ impl BlobRepoHg for BlobRepo {
.await?;
match csid {
Some(bonsai) => {
let res = self.get_changesets_object().get(ctx, bonsai).await?;
let res = self.changesets().get(ctx, bonsai).await?;
Ok(res.is_some())
}
None => Ok(false),
@ -271,7 +318,10 @@ impl BlobRepoHg for BlobRepo {
&self,
ctx: CoreContext,
changesetid: HgChangesetId,
) -> Result<Vec<HgChangesetId>, Error> {
) -> Result<Vec<HgChangesetId>, Error>
where
Self: BonsaiHgMappingRef + ChangesetsRef + RepoDerivedDataRef,
{
STATS::get_changeset_parents.add_value(1);
let csid = self
@ -281,7 +331,7 @@ impl BlobRepoHg for BlobRepo {
.ok_or(ErrorKind::BonsaiMappingNotFound(changesetid))?;
let parents = self
.get_changesets_object()
.changesets()
.get(ctx.clone(), csid)
.await?
.ok_or(ErrorKind::BonsaiNotFound(csid))?
@ -296,7 +346,10 @@ impl BlobRepoHg for BlobRepo {
&self,
ctx: CoreContext,
name: &BookmarkName,
) -> Result<Option<HgChangesetId>, Error> {
) -> Result<Option<HgChangesetId>, Error>
where
Self: BookmarksRef + RepoDerivedDataRef,
{
STATS::get_bookmark.add_value(1);
let cs_opt = self.bookmarks().get(ctx.clone(), name).await?;
match cs_opt {
@ -313,7 +366,17 @@ impl BlobRepoHg for BlobRepo {
fn get_pull_default_bookmarks_maybe_stale(
&self,
ctx: CoreContext,
) -> BoxStream<'static, Result<(Bookmark, HgChangesetId), Error>> {
) -> BoxStream<'_, Result<(Bookmark, HgChangesetId), Error>>
where
Self: ChangesetsRef
+ BonsaiHgMappingRef
+ RepoDerivedDataRef
+ BlobRepoHg
+ BookmarksRef
+ Clone
+ Send
+ Sync,
{
STATS::get_pull_default_bookmarks_maybe_stale.add_value(1);
let stream = self.bookmarks().list(
ctx.clone(),
@ -323,7 +386,7 @@ impl BlobRepoHg for BlobRepo {
&BookmarkPagination::FromStart,
std::u64::MAX,
);
to_hg_bookmark_stream(&self, &ctx, stream)
to_hg_bookmark_stream(self, &ctx, stream)
}
/// Get Publishing (Publishing is a Mercurial concept) bookmarks by prefix, they will be read
@ -331,7 +394,17 @@ impl BlobRepoHg for BlobRepo {
fn get_publishing_bookmarks_maybe_stale(
&self,
ctx: CoreContext,
) -> BoxStream<'static, Result<(Bookmark, HgChangesetId), Error>> {
) -> BoxStream<'_, Result<(Bookmark, HgChangesetId), Error>>
where
Self: ChangesetsRef
+ BonsaiHgMappingRef
+ RepoDerivedDataRef
+ BlobRepoHg
+ BookmarksRef
+ Clone
+ Send
+ Sync,
{
STATS::get_publishing_bookmarks_maybe_stale.add_value(1);
let stream = self.bookmarks().list(
ctx.clone(),
@ -341,7 +414,7 @@ impl BlobRepoHg for BlobRepo {
&BookmarkPagination::FromStart,
std::u64::MAX,
);
to_hg_bookmark_stream(&self, &ctx, stream)
to_hg_bookmark_stream(self, &ctx, stream)
}
/// Get bookmarks by prefix, they will be read from replica, so they might be stale.
@ -350,7 +423,17 @@ impl BlobRepoHg for BlobRepo {
ctx: CoreContext,
prefix: &BookmarkPrefix,
max: u64,
) -> BoxStream<'static, Result<(Bookmark, HgChangesetId), Error>> {
) -> BoxStream<'_, Result<(Bookmark, HgChangesetId), Error>>
where
Self: ChangesetsRef
+ BonsaiHgMappingRef
+ RepoDerivedDataRef
+ BlobRepoHg
+ BookmarksRef
+ Clone
+ Send
+ Sync,
{
STATS::get_bookmarks_by_prefix_maybe_stale.add_value(1);
let stream = self.bookmarks().list(
ctx.clone(),
@ -360,7 +443,7 @@ impl BlobRepoHg for BlobRepo {
&BookmarkPagination::FromStart,
max,
);
to_hg_bookmark_stream(&self, &ctx, stream)
to_hg_bookmark_stream(self, &ctx, stream)
}
async fn get_filenode_opt(
@ -368,8 +451,11 @@ impl BlobRepoHg for BlobRepo {
ctx: CoreContext,
path: &RepoPath,
node: HgFileNodeId,
) -> Result<FilenodeResult<Option<FilenodeInfo>>, Error> {
self.get_filenodes().get_filenode(&ctx, path, node).await
) -> Result<FilenodeResult<Option<FilenodeInfo>>, Error>
where
Self: FilenodesRef,
{
self.filenodes().get_filenode(&ctx, path, node).await
}
async fn get_filenode(
@ -377,7 +463,10 @@ impl BlobRepoHg for BlobRepo {
ctx: CoreContext,
path: &RepoPath,
node: HgFileNodeId,
) -> Result<FilenodeResult<FilenodeInfo>, Error> {
) -> Result<FilenodeResult<FilenodeInfo>, Error>
where
Self: FilenodesRef,
{
match self.get_filenode_opt(ctx, path, node).await? {
FilenodeResult::Present(maybe_filenode) => {
let filenode = maybe_filenode
@ -393,19 +482,31 @@ impl BlobRepoHg for BlobRepo {
ctx: CoreContext,
path: RepoPath,
limit: Option<u64>,
) -> Result<FilenodeRangeResult<Vec<FilenodeInfo>>, Error> {
) -> Result<FilenodeRangeResult<Vec<FilenodeInfo>>, Error>
where
Self: FilenodesRef,
{
STATS::get_all_filenodes.add_value(1);
self.get_filenodes()
self.filenodes()
.get_all_filenodes_maybe_stale(&ctx, &path, limit)
.await
}
}
pub fn to_hg_bookmark_stream<BookmarkType>(
repo: &BlobRepo,
pub fn to_hg_bookmark_stream<'repo, BookmarkType>(
repo: &(
impl ChangesetsRef
+ BonsaiHgMappingRef
+ RepoDerivedDataRef
+ BlobRepoHg
+ Clone
+ Send
+ Sync
+ 'repo
),
ctx: &CoreContext,
stream: impl Stream<Item = Result<(BookmarkType, ChangesetId), Error>> + Send + 'static,
) -> BoxStream<'static, Result<(BookmarkType, HgChangesetId), Error>>
stream: impl Stream<Item = Result<(BookmarkType, ChangesetId), Error>> + Send + 'repo,
) -> BoxStream<'repo, Result<(BookmarkType, HgChangesetId), Error>>
where
BookmarkType: Send,
{

View File

@ -36,6 +36,7 @@ pub use blobrepo_common::changed_files::compute_changed_files;
use blobstore::Blobstore;
use blobstore::ErrorKind as BlobstoreError;
use blobstore::Loadable;
use bonsai_hg_mapping::BonsaiHgMappingRef;
use context::CoreContext;
use mercurial_types::blobs::fetch_manifest_envelope;
use mercurial_types::blobs::ChangesetMetadata;
@ -54,9 +55,9 @@ use mononoke_types;
use mononoke_types::BlobstoreKey;
use mononoke_types::BonsaiChangeset;
use mononoke_types::ChangesetId;
use repo_blobstore::RepoBlobstoreRef;
use crate::errors::*;
use crate::BlobRepo;
use repo_blobstore::RepoBlobstore;
define_stats! {
@ -104,7 +105,11 @@ impl ChangesetHandle {
}
}
pub fn ready_cs_handle(ctx: CoreContext, repo: BlobRepo, hg_cs: HgChangesetId) -> Self {
pub fn ready_cs_handle(
ctx: CoreContext,
repo: impl RepoBlobstoreRef + BonsaiHgMappingRef + Clone + Send + Sync + 'static,
hg_cs: HgChangesetId,
) -> Self {
let (trigger, can_be_parent) = oneshot::channel();
let can_be_parent = can_be_parent
.map_err(|e| format_err!("can_be_parent: {:?}", e))
@ -119,7 +124,7 @@ impl ChangesetHandle {
.get_bonsai_from_hg(&ctx, hg_cs)
.await?
.ok_or(ErrorKind::BonsaiMappingNotFound(hg_cs))?;
let bonsai_cs = csid.load(&ctx, repo.blobstore()).await?;
let bonsai_cs = csid.load(&ctx, repo.repo_blobstore()).await?;
Ok::<_, Error>(bonsai_cs)
}
};
@ -127,7 +132,7 @@ impl ChangesetHandle {
let completion_future = async move {
let (bonsai_cs, hg_cs) = future::try_join(
bonsai_cs,
hg_cs.load(&ctx, repo.blobstore()).map_err(Error::from),
hg_cs.load(&ctx, repo.repo_blobstore()).map_err(Error::from),
)
.await?;
let _ = trigger.send((

View File

@ -38,7 +38,7 @@ mod test {
let verify = BonsaiMFVerify {
ctx: ctx.clone(),
logger: ctx.logger().clone(),
repo,
repo: repo.clone(),
follow_limit: 1024,
ignores: HashSet::new(),
broken_merges_before: None,

View File

@ -10,7 +10,6 @@ license = "GPLv2+"
[dependencies]
anyhow = "1.0.56"
blobrepo = { version = "0.1.0", path = "../blobrepo" }
blobrepo_hg = { version = "0.1.0", path = "../blobrepo/blobrepo_hg" }
blobstore = { version = "0.1.0", path = "../blobstore" }
changesets = { version = "0.1.0", path = "../changesets" }
context = { version = "0.1.0", path = "../server/context" }

View File

@ -7,7 +7,6 @@
use anyhow::Error;
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use blobstore::Blobstore;
use changesets::ChangesetEntry;
use context::CoreContext;
@ -173,7 +172,7 @@ pub async fn prime_cache(
let filenodes = snapshot.filenodes.ok_or(Error::msg("filenodes missing"))?;
let filenodes = reheat_filenodes(filenodes)?;
repo.get_filenodes().prime_cache(ctx, filenodes.as_ref());
repo.filenodes().prime_cache(ctx, filenodes.as_ref());
info!(
ctx.logger(),
"primed filenodes cache with {} entries",

View File

@ -119,7 +119,7 @@ where
pub fn update_publishing_bookmarks_after_push(
&self,
ctx: CoreContext,
) -> impl Future<Item = (), Error = Error> {
) -> impl Future<Item = (), Error = Error> + '_ {
let cache = self.cached_publishing_bookmarks_maybe_stale.clone();
// We just updated the bookmark, so go and fetch them from db to return
// the newer version
@ -134,7 +134,7 @@ where
ctx: &CoreContext,
prefix: &BookmarkPrefix,
return_max: u64,
) -> Result<impl Stream<Item = Result<(BookmarkName, HgChangesetId), Error>>, Error> {
) -> Result<impl Stream<Item = Result<(BookmarkName, HgChangesetId), Error>> + '_, Error> {
let mut kinds = vec![BookmarkKind::Scratch];
let mut result = HashMap::new();
@ -257,7 +257,7 @@ where
fn get_publishing_maybe_stale_from_db(
&self,
ctx: CoreContext,
) -> impl Future<Item = HashMap<Bookmark, HgChangesetId>, Error = Error> {
) -> impl Future<Item = HashMap<Bookmark, HgChangesetId>, Error = Error> + '_ {
self.repo
.blobrepo()
.get_publishing_bookmarks_maybe_stale(ctx)