bonsai_hg_mapping: store repo id and make interface consistent

Summary:
The various mappings are inconsistent in how they work.  Some, like the
bonsai-hg mapping, require the client to pass the repo_id on every request (and
behaviour is undefined if the client gets this wrong).  Start to make them
consistent and clean up this gap, starting with the bonsai-hg mapping.

Reviewed By: mitrandir77

Differential Revision: D34059165

fbshipit-source-id: 12c62e0e5f08685929218088bad11a55305795df
This commit is contained in:
Mark Juggurnauth-Thomas 2022-02-14 00:47:22 -08:00 committed by Facebook GitHub Bot
parent f5cb5cc8c6
commit c70975fb6b
26 changed files with 288 additions and 307 deletions

View File

@ -189,10 +189,10 @@ impl BenchmarkRepoFactory {
Ok(sql_phases_builder.build(repo_identity.id(), changeset_fetcher.clone(), heads_fetcher))
}
pub fn bonsai_hg_mapping(&self) -> Result<ArcBonsaiHgMapping> {
pub fn bonsai_hg_mapping(&self, repo_identity: &ArcRepoIdentity) -> Result<ArcBonsaiHgMapping> {
let mapping: Arc<dyn BonsaiHgMapping> = Arc::new(DelayedBonsaiHgMapping::new(
SqlBonsaiHgMappingBuilder::with_sqlite_in_memory()?
.build(RendezVousOptions::for_test()),
.build(repo_identity.id(), RendezVousOptions::for_test()),
self.delay_settings.db_get_dist,
self.delay_settings.db_put_dist,
));
@ -468,6 +468,10 @@ impl<M> DelayedBonsaiHgMapping<M> {
#[async_trait]
impl<M: BonsaiHgMapping> BonsaiHgMapping for DelayedBonsaiHgMapping<M> {
fn repo_id(&self) -> RepositoryId {
self.inner.repo_id()
}
async fn add(&self, ctx: &CoreContext, entry: BonsaiHgMappingEntry) -> Result<bool, Error> {
delay(self.put_dist).await;
self.inner.add(ctx, entry).await
@ -476,24 +480,20 @@ impl<M: BonsaiHgMapping> BonsaiHgMapping for DelayedBonsaiHgMapping<M> {
async fn get(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_id: BonsaiOrHgChangesetIds,
) -> Result<Vec<BonsaiHgMappingEntry>, Error> {
delay(self.get_dist).await;
self.inner.get(ctx, repo_id, cs_id).await
self.inner.get(ctx, cs_id).await
}
async fn get_hg_in_range(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
low: HgChangesetId,
high: HgChangesetId,
limit: usize,
) -> Result<Vec<HgChangesetId>, Error> {
delay(self.get_dist).await;
self.inner
.get_hg_in_range(ctx, repo_id, low, high, limit)
.await
self.inner.get_hg_in_range(ctx, low, high, limit).await
}
}

View File

@ -299,7 +299,6 @@ impl CreateChangeset {
}
});
let repoid = repo.get_repoid();
let complete_changesets = repo.get_changesets_object();
let bonsai_hg_mapping = repo.get_bonsai_hg_mapping().clone();
let _repo = repo.clone();
@ -319,7 +318,6 @@ impl CreateChangeset {
// update bonsai mapping
let bcs_id = bonsai_cs.get_changeset_id();
let bonsai_hg_entry = BonsaiHgMappingEntry {
repo_id: repoid.clone(),
hg_cs_id: hg_cs.get_changeset_id(),
bcs_id,
};

View File

@ -170,7 +170,7 @@ impl BlobRepoHg for BlobRepo {
) -> Result<Option<ChangesetId>, Error> {
STATS::get_bonsai_from_hg.add_value(1);
self.get_bonsai_hg_mapping()
.get_bonsai_from_hg(&ctx, self.get_repoid(), hg_cs_id)
.get_bonsai_from_hg(&ctx, hg_cs_id)
.await
}
@ -188,7 +188,7 @@ impl BlobRepoHg for BlobRepo {
let bonsai_or_hg_cs_ids = bonsai_or_hg_cs_ids.into();
let hg_bonsai_list = self
.get_bonsai_hg_mapping()
.get(&ctx, self.get_repoid(), bonsai_or_hg_cs_ids.clone())
.get(&ctx, bonsai_or_hg_cs_ids.clone())
.await?
.into_iter()
.map(|entry| (entry.hg_cs_id, entry.bcs_id))

View File

@ -68,7 +68,7 @@ async fn get_filenode_generation(
let linknode = filenode_info.linknode;
let bcs_id = match repo
.bonsai_hg_mapping()
.get_bonsai_from_hg(ctx, repo.get_repoid(), linknode)
.get_bonsai_from_hg(ctx, linknode)
.await?
{
Some(a) => a,

View File

@ -15,7 +15,7 @@ const i32 MC_SITEVER = 3;
typedef i32 RepoId (rust.newtype)
struct BonsaiHgMappingEntry {
struct BonsaiHgMappingCacheEntry {
1: required RepoId repo_id;
2: required mononoke_types_thrift.ChangesetId bcs_id;
3: required mercurial_thrift.HgNodeHash hg_cs_id;

View File

@ -6,7 +6,8 @@
*/
use super::{BonsaiHgMapping, BonsaiHgMappingEntry, BonsaiOrHgChangesetIds};
use anyhow::Error;
use abomonation_derive::Abomonation;
use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use bonsai_hg_mapping_entry_thrift as thrift;
use bytes::Bytes;
@ -19,7 +20,7 @@ use context::CoreContext;
use fbinit::FacebookInit;
use fbthrift::compact_protocol;
use memcache::{KeyGen, MemcacheClient};
use mercurial_types::HgChangesetId;
use mercurial_types::{HgChangesetId, HgNodeHash};
use mononoke_types::{ChangesetId, RepositoryId};
use stats::prelude::*;
use std::collections::{HashMap, HashSet};
@ -34,6 +35,67 @@ define_stats! {
memcache_deserialize_err: timeseries("memcache.deserialize_err"; Rate, Sum),
}
#[derive(Abomonation, Clone, Debug, Eq, Hash, PartialEq)]
pub struct BonsaiHgMappingCacheEntry {
pub repo_id: RepositoryId,
pub hg_cs_id: HgChangesetId,
pub bcs_id: ChangesetId,
}
impl BonsaiHgMappingCacheEntry {
fn from_thrift(
entry: bonsai_hg_mapping_entry_thrift::BonsaiHgMappingCacheEntry,
) -> Result<Self> {
Ok(Self {
repo_id: RepositoryId::new(entry.repo_id.0),
hg_cs_id: HgChangesetId::new(HgNodeHash::from_thrift(entry.hg_cs_id)?),
bcs_id: ChangesetId::from_thrift(entry.bcs_id)?,
})
}
fn into_thrift(self) -> bonsai_hg_mapping_entry_thrift::BonsaiHgMappingCacheEntry {
bonsai_hg_mapping_entry_thrift::BonsaiHgMappingCacheEntry {
repo_id: bonsai_hg_mapping_entry_thrift::RepoId(self.repo_id.id()),
hg_cs_id: self.hg_cs_id.into_nodehash().into_thrift(),
bcs_id: self.bcs_id.into_thrift(),
}
}
pub fn new(repo_id: RepositoryId, hg_cs_id: HgChangesetId, bcs_id: ChangesetId) -> Self {
BonsaiHgMappingCacheEntry {
repo_id,
hg_cs_id,
bcs_id,
}
}
pub fn into_entry(self, repo_id: RepositoryId) -> Result<BonsaiHgMappingEntry> {
if self.repo_id == repo_id {
Ok(BonsaiHgMappingEntry {
hg_cs_id: self.hg_cs_id,
bcs_id: self.bcs_id,
})
} else {
Err(anyhow!(
"Cache returned invalid entry: repo {} returned for query to repo {}",
self.repo_id,
repo_id
))
}
}
pub fn from_entry(
entry: BonsaiHgMappingEntry,
repo_id: RepositoryId,
) -> BonsaiHgMappingCacheEntry {
BonsaiHgMappingCacheEntry {
repo_id,
hg_cs_id: entry.hg_cs_id,
bcs_id: entry.bcs_id,
}
}
}
/// Used for cache key generation
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
enum BonsaiOrHgChangesetId {
@ -56,7 +118,7 @@ impl From<HgChangesetId> for BonsaiOrHgChangesetId {
#[derive(Clone)]
pub struct CachingBonsaiHgMapping {
mapping: Arc<dyn BonsaiHgMapping>,
cache_pool: CachelibHandler<BonsaiHgMappingEntry>,
cache_pool: CachelibHandler<BonsaiHgMappingCacheEntry>,
memcache: MemcacheHandler,
keygen: KeyGen,
}
@ -99,12 +161,12 @@ impl CachingBonsaiHgMapping {
}
}
fn memcache_deserialize(bytes: Bytes) -> Result<BonsaiHgMappingEntry, ()> {
fn memcache_deserialize(bytes: Bytes) -> Result<BonsaiHgMappingCacheEntry, ()> {
let thrift_entry = compact_protocol::deserialize(bytes).map_err(|_| ());
thrift_entry.and_then(|entry| BonsaiHgMappingEntry::from_thrift(entry).map_err(|_| ()))
thrift_entry.and_then(|entry| BonsaiHgMappingCacheEntry::from_thrift(entry).map_err(|_| ()))
}
fn memcache_serialize(entry: &BonsaiHgMappingEntry) -> Bytes {
fn memcache_serialize(entry: &BonsaiHgMappingCacheEntry) -> Bytes {
compact_protocol::serialize(&entry.clone().into_thrift())
}
@ -113,6 +175,10 @@ const PARALLEL_CHUNKS: usize = 1;
#[async_trait]
impl BonsaiHgMapping for CachingBonsaiHgMapping {
fn repo_id(&self) -> RepositoryId {
self.mapping.repo_id()
}
async fn add(&self, ctx: &CoreContext, entry: BonsaiHgMappingEntry) -> Result<bool, Error> {
self.mapping.add(ctx, entry).await
}
@ -120,48 +186,47 @@ impl BonsaiHgMapping for CachingBonsaiHgMapping {
async fn get(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs: BonsaiOrHgChangesetIds,
) -> Result<Vec<BonsaiHgMappingEntry>, Error> {
let ctx = (ctx, repo_id, self);
let cache_request = (ctx, self);
let repo_id = self.repo_id();
let res = match cs {
let cache_entry = match cs {
BonsaiOrHgChangesetIds::Bonsai(cs_ids) => get_or_fill_chunked(
ctx,
cache_request,
cs_ids.into_iter().collect(),
CHUNK_SIZE,
PARALLEL_CHUNKS,
)
.await?
.into_iter()
.map(|(_, val)| val)
.collect(),
.map(|(_, val)| val.into_entry(repo_id))
.collect::<Result<_>>()?,
BonsaiOrHgChangesetIds::Hg(hg_ids) => get_or_fill_chunked(
ctx,
cache_request,
hg_ids.into_iter().collect(),
CHUNK_SIZE,
PARALLEL_CHUNKS,
)
.await?
.into_iter()
.map(|(_, val)| val)
.collect(),
.map(|(_, val)| val.into_entry(repo_id))
.collect::<Result<_>>()?,
};
Ok(res)
Ok(cache_entry)
}
/// Use caching for the ranges of one element, use slower path otherwise.
async fn get_hg_in_range(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
low: HgChangesetId,
high: HgChangesetId,
limit: usize,
) -> Result<Vec<HgChangesetId>, Error> {
if low == high {
let res = self.get(ctx, repo_id, low.into()).await?;
let res = self.get(ctx, low.into()).await?;
if res.is_empty() {
return Ok(vec![]);
} else {
@ -169,9 +234,7 @@ impl BonsaiHgMapping for CachingBonsaiHgMapping {
}
}
self.mapping
.get_hg_in_range(ctx, repo_id, low, high, limit)
.await
self.mapping.get_hg_in_range(ctx, low, high, limit).await
}
}
@ -179,7 +242,7 @@ fn get_cache_key(repo_id: RepositoryId, cs: &BonsaiOrHgChangesetId) -> String {
format!("{}.{:?}", repo_id.prefix(), cs).to_string()
}
impl MemcacheEntity for BonsaiHgMappingEntry {
impl MemcacheEntity for BonsaiHgMappingCacheEntry {
fn serialize(&self) -> Bytes {
memcache_serialize(self)
}
@ -189,25 +252,25 @@ impl MemcacheEntity for BonsaiHgMappingEntry {
}
}
type CacheRequest<'a> = (&'a CoreContext, RepositoryId, &'a CachingBonsaiHgMapping);
type CacheRequest<'a> = (&'a CoreContext, &'a CachingBonsaiHgMapping);
impl EntityStore<BonsaiHgMappingEntry> for CacheRequest<'_> {
fn cachelib(&self) -> &CachelibHandler<BonsaiHgMappingEntry> {
let (_, _, mapping) = self;
impl EntityStore<BonsaiHgMappingCacheEntry> for CacheRequest<'_> {
fn cachelib(&self) -> &CachelibHandler<BonsaiHgMappingCacheEntry> {
let (_, mapping) = self;
&mapping.cache_pool
}
fn keygen(&self) -> &KeyGen {
let (_, _, mapping) = self;
let (_, mapping) = self;
&mapping.keygen
}
fn memcache(&self) -> &MemcacheHandler {
let (_, _, mapping) = self;
let (_, mapping) = self;
&mapping.memcache
}
fn cache_determinator(&self, _: &BonsaiHgMappingEntry) -> CacheDisposition {
fn cache_determinator(&self, _: &BonsaiHgMappingCacheEntry) -> CacheDisposition {
CacheDisposition::Cache(CacheTtl::NoTtl)
}
@ -215,53 +278,63 @@ impl EntityStore<BonsaiHgMappingEntry> for CacheRequest<'_> {
}
#[async_trait]
impl KeyedEntityStore<ChangesetId, BonsaiHgMappingEntry> for CacheRequest<'_> {
impl KeyedEntityStore<ChangesetId, BonsaiHgMappingCacheEntry> for CacheRequest<'_> {
fn get_cache_key(&self, key: &ChangesetId) -> String {
let (_, repo_id, _) = self;
get_cache_key(*repo_id, &BonsaiOrHgChangesetId::Bonsai(*key))
let (_, mapping) = self;
get_cache_key(mapping.repo_id(), &BonsaiOrHgChangesetId::Bonsai(*key))
}
async fn get_from_db(
&self,
keys: HashSet<ChangesetId>,
) -> Result<HashMap<ChangesetId, BonsaiHgMappingEntry>, Error> {
let (ctx, repo_id, mapping) = self;
) -> Result<HashMap<ChangesetId, BonsaiHgMappingCacheEntry>, Error> {
let (ctx, mapping) = self;
let repo_id = mapping.repo_id();
let res = mapping
.mapping
.get(
ctx,
*repo_id,
BonsaiOrHgChangesetIds::Bonsai(keys.into_iter().collect()),
)
.await?;
Result::<_, Error>::Ok(res.into_iter().map(|e| (e.bcs_id, e)).collect())
Result::<_, Error>::Ok(
res.into_iter()
.map(|e| (e.bcs_id, BonsaiHgMappingCacheEntry::from_entry(e, repo_id)))
.collect(),
)
}
}
#[async_trait]
impl KeyedEntityStore<HgChangesetId, BonsaiHgMappingEntry> for CacheRequest<'_> {
impl KeyedEntityStore<HgChangesetId, BonsaiHgMappingCacheEntry> for CacheRequest<'_> {
fn get_cache_key(&self, key: &HgChangesetId) -> String {
let (_, repo_id, _) = self;
get_cache_key(*repo_id, &BonsaiOrHgChangesetId::Hg(*key))
let (_, mapping) = self;
get_cache_key(mapping.repo_id(), &BonsaiOrHgChangesetId::Hg(*key))
}
async fn get_from_db(
&self,
keys: HashSet<HgChangesetId>,
) -> Result<HashMap<HgChangesetId, BonsaiHgMappingEntry>, Error> {
let (ctx, repo_id, mapping) = self;
) -> Result<HashMap<HgChangesetId, BonsaiHgMappingCacheEntry>, Error> {
let (ctx, mapping) = self;
let repo_id = mapping.repo_id();
let res = mapping
.mapping
.get(
ctx,
*repo_id,
BonsaiOrHgChangesetIds::Hg(keys.into_iter().collect()),
)
.get(ctx, BonsaiOrHgChangesetIds::Hg(keys.into_iter().collect()))
.await?;
Result::<_, Error>::Ok(res.into_iter().map(|e| (e.hg_cs_id, e)).collect())
Result::<_, Error>::Ok(
res.into_iter()
.map(|e| {
(
e.hg_cs_id,
BonsaiHgMappingCacheEntry::from_entry(e, repo_id),
)
})
.collect(),
)
}
}

View File

@ -7,27 +7,22 @@
#![deny(warnings)]
use std::sync::Arc;
use anyhow::{Error, Result};
use async_trait::async_trait;
use auto_impl::auto_impl;
use sql::Connection;
use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig};
use sql_ext::SqlConnections;
use abomonation_derive::Abomonation;
use anyhow::{Error, Result};
use context::{CoreContext, PerfCounterType};
use fbinit::FacebookInit;
use futures::future;
use mercurial_types::{
HgChangesetId, HgChangesetIdPrefix, HgChangesetIdsResolvedFromPrefix, HgNodeHash,
};
use mercurial_types::{HgChangesetId, HgChangesetIdPrefix, HgChangesetIdsResolvedFromPrefix};
use mononoke_types::{ChangesetId, RepositoryId};
use rand::Rng;
use rendezvous::{
MultiRendezVous, RendezVousOptions, RendezVousStats, TunablesMultiRendezVousController,
};
use rendezvous::{RendezVous, RendezVousOptions, RendezVousStats, TunablesRendezVousController};
use sql::queries;
use sql::Connection;
use sql_construct::{SqlConstruct, SqlConstructFromMetadataDatabaseConfig};
use sql_ext::SqlConnections;
use stats::prelude::*;
mod caching;
@ -46,39 +41,12 @@ define_stats! {
get_many_hg_by_prefix: timeseries(Rate, Sum),
}
#[derive(Abomonation, Clone, Debug, Eq, Hash, PartialEq)]
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct BonsaiHgMappingEntry {
pub repo_id: RepositoryId,
pub hg_cs_id: HgChangesetId,
pub bcs_id: ChangesetId,
}
impl BonsaiHgMappingEntry {
fn from_thrift(entry: bonsai_hg_mapping_entry_thrift::BonsaiHgMappingEntry) -> Result<Self> {
Ok(Self {
repo_id: RepositoryId::new(entry.repo_id.0),
hg_cs_id: HgChangesetId::new(HgNodeHash::from_thrift(entry.hg_cs_id)?),
bcs_id: ChangesetId::from_thrift(entry.bcs_id)?,
})
}
fn into_thrift(self) -> bonsai_hg_mapping_entry_thrift::BonsaiHgMappingEntry {
bonsai_hg_mapping_entry_thrift::BonsaiHgMappingEntry {
repo_id: bonsai_hg_mapping_entry_thrift::RepoId(self.repo_id.id()),
hg_cs_id: self.hg_cs_id.into_nodehash().into_thrift(),
bcs_id: self.bcs_id.into_thrift(),
}
}
pub fn new(repo_id: RepositoryId, hg_cs_id: HgChangesetId, bcs_id: ChangesetId) -> Self {
BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
}
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum BonsaiOrHgChangesetIds {
Bonsai(Vec<ChangesetId>),
@ -122,22 +90,22 @@ impl From<Vec<HgChangesetId>> for BonsaiOrHgChangesetIds {
#[async_trait]
#[auto_impl(&, Arc, Box)]
pub trait BonsaiHgMapping: Send + Sync {
fn repo_id(&self) -> RepositoryId;
async fn add(&self, ctx: &CoreContext, entry: BonsaiHgMappingEntry) -> Result<bool, Error>;
async fn get(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_id: BonsaiOrHgChangesetIds,
) -> Result<Vec<BonsaiHgMappingEntry>, Error>;
async fn get_hg_from_bonsai(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> Result<Option<HgChangesetId>, Error> {
let result = self.get(ctx, repo_id, cs_id.into()).await?;
let result = self.get(ctx, cs_id.into()).await?;
let hg_cs_id = result.into_iter().next().map(|entry| entry.hg_cs_id);
Ok(hg_cs_id)
}
@ -145,10 +113,9 @@ pub trait BonsaiHgMapping: Send + Sync {
async fn get_bonsai_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_id: HgChangesetId,
) -> Result<Option<ChangesetId>, Error> {
let result = self.get(ctx, repo_id, cs_id.into()).await?;
let result = self.get(ctx, cs_id.into()).await?;
let bcs_id = result.into_iter().next().map(|entry| entry.bcs_id);
Ok(bcs_id)
}
@ -156,18 +123,11 @@ pub trait BonsaiHgMapping: Send + Sync {
async fn get_many_hg_by_prefix(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_prefix: HgChangesetIdPrefix,
limit: usize,
) -> Result<HgChangesetIdsResolvedFromPrefix, Error> {
let mut fetched_cs = self
.get_hg_in_range(
ctx,
repo_id,
cs_prefix.min_cs(),
cs_prefix.max_cs(),
limit + 1,
)
.get_hg_in_range(ctx, cs_prefix.min_cs(), cs_prefix.max_cs(), limit + 1)
.await?;
let res = match fetched_cs.len() {
0 => HgChangesetIdsResolvedFromPrefix::NoMatch,
@ -184,7 +144,6 @@ pub trait BonsaiHgMapping: Send + Sync {
async fn get_hg_in_range(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
low: HgChangesetId,
high: HgChangesetId,
limit: usize,
@ -193,8 +152,8 @@ pub trait BonsaiHgMapping: Send + Sync {
#[derive(Clone)]
struct RendezVousConnection {
bonsai: MultiRendezVous<RepositoryId, ChangesetId, HgChangesetId>,
hg: MultiRendezVous<RepositoryId, HgChangesetId, ChangesetId>,
bonsai: RendezVous<ChangesetId, HgChangesetId>,
hg: RendezVous<HgChangesetId, ChangesetId>,
conn: Connection,
}
@ -202,13 +161,19 @@ impl RendezVousConnection {
fn new(conn: Connection, name: &str, opts: RendezVousOptions) -> Self {
Self {
conn,
bonsai: MultiRendezVous::new(
TunablesMultiRendezVousController::new(opts),
RendezVousStats::new(format!("bonsai_hg_mapping.bonsai.{}", name,)),
bonsai: RendezVous::new(
TunablesRendezVousController::new(opts),
Arc::new(RendezVousStats::new(format!(
"bonsai_hg_mapping.bonsai.{}",
name,
))),
),
hg: MultiRendezVous::new(
TunablesMultiRendezVousController::new(opts),
RendezVousStats::new(format!("bonsai_hg_mapping.hg.{}", name,)),
hg: RendezVous::new(
TunablesRendezVousController::new(opts),
Arc::new(RendezVousStats::new(format!(
"bonsai_hg_mapping.hg.{}",
name,
))),
),
}
}
@ -219,6 +184,7 @@ pub struct SqlBonsaiHgMapping {
write_connection: Connection,
read_connection: RendezVousConnection,
read_master_connection: RendezVousConnection,
repo_id: RepositoryId,
// Option that forces all `add()` method calls to overwrite values
// that set in the database. This should be used only when we try to
// fix broken entries in the db.
@ -281,6 +247,7 @@ queries! {
#[derive(Clone)]
pub struct SqlBonsaiHgMappingBuilder {
connections: SqlConnections,
overwrite: bool,
}
impl SqlConstruct for SqlBonsaiHgMappingBuilder {
@ -289,21 +256,24 @@ impl SqlConstruct for SqlBonsaiHgMappingBuilder {
const CREATION_QUERY: &'static str = include_str!("../schemas/sqlite-bonsai-hg-mapping.sql");
fn from_sql_connections(connections: SqlConnections) -> Self {
Self { connections }
Self {
connections,
overwrite: false,
}
}
}
impl SqlBonsaiHgMappingBuilder {
pub fn build(self, opts: RendezVousOptions) -> SqlBonsaiHgMapping {
self.build_impl(opts, false)
pub fn with_overwrite(mut self) -> Self {
self.overwrite = true;
self
}
pub fn build_with_overwrite(self, opts: RendezVousOptions) -> SqlBonsaiHgMapping {
self.build_impl(opts, true)
}
fn build_impl(self, opts: RendezVousOptions, overwrite: bool) -> SqlBonsaiHgMapping {
let connections = self.connections;
pub fn build(self, repo_id: RepositoryId, opts: RendezVousOptions) -> SqlBonsaiHgMapping {
let SqlBonsaiHgMappingBuilder {
connections,
overwrite,
} = self;
SqlBonsaiHgMapping {
write_connection: connections.write_connection,
@ -313,6 +283,7 @@ impl SqlBonsaiHgMappingBuilder {
"read_master",
opts,
),
repo_id,
overwrite,
}
}
@ -322,20 +293,20 @@ impl SqlConstructFromMetadataDatabaseConfig for SqlBonsaiHgMappingBuilder {}
impl SqlBonsaiHgMapping {
async fn verify_consistency(&self, entry: BonsaiHgMappingEntry) -> Result<(), Error> {
let BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
} = entry.clone();
let BonsaiHgMappingEntry { hg_cs_id, bcs_id } = entry.clone();
let tok: i32 = rand::thread_rng().gen();
let hg_ids = &[hg_cs_id];
let by_hg =
SelectMappingByHg::query(&self.read_master_connection.conn, &repo_id, &tok, hg_ids);
let by_hg = SelectMappingByHg::query(
&self.read_master_connection.conn,
&self.repo_id,
&tok,
hg_ids,
);
let bcs_ids = &[bcs_id];
let by_bcs = SelectMappingByBonsai::query(
&self.read_master_connection.conn,
&repo_id,
&self.repo_id,
&tok,
bcs_ids,
);
@ -350,11 +321,7 @@ impl SqlBonsaiHgMapping {
{
Some(entry) if entry == (hg_cs_id, bcs_id) => Ok(()),
Some((hg_cs_id, bcs_id)) => Err(ErrorKind::ConflictingEntries(
BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
},
BonsaiHgMappingEntry { hg_cs_id, bcs_id },
entry,
)
.into()),
@ -365,27 +332,31 @@ impl SqlBonsaiHgMapping {
#[async_trait]
impl BonsaiHgMapping for SqlBonsaiHgMapping {
fn repo_id(&self) -> RepositoryId {
self.repo_id
}
async fn add(&self, ctx: &CoreContext, entry: BonsaiHgMappingEntry) -> Result<bool, Error> {
STATS::adds.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlWrites);
let BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
} = entry.clone();
let BonsaiHgMappingEntry { hg_cs_id, bcs_id } = entry.clone();
if self.overwrite {
let result =
ReplaceMapping::query(&self.write_connection, &[(&repo_id, &hg_cs_id, &bcs_id)])
.await?;
let result = ReplaceMapping::query(
&self.write_connection,
&[(&self.repo_id, &hg_cs_id, &bcs_id)],
)
.await?;
Ok(result.affected_rows() >= 1)
} else {
let result =
InsertMapping::query(&self.write_connection, &[(&repo_id, &hg_cs_id, &bcs_id)])
.await?;
let result = InsertMapping::query(
&self.write_connection,
&[(&self.repo_id, &hg_cs_id, &bcs_id)],
)
.await?;
if result.affected_rows() == 1 {
Ok(true)
} else {
@ -398,14 +369,13 @@ impl BonsaiHgMapping for SqlBonsaiHgMapping {
async fn get(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
ids: BonsaiOrHgChangesetIds,
) -> Result<Vec<BonsaiHgMappingEntry>, Error> {
STATS::gets.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsReplica);
let (mut mappings, left_to_fetch) =
select_mapping(ctx.fb, &self.read_connection, repo_id, ids).await?;
select_mapping(ctx.fb, &self.read_connection, self.repo_id, ids).await?;
if left_to_fetch.is_empty() {
return Ok(mappings);
@ -414,8 +384,13 @@ impl BonsaiHgMapping for SqlBonsaiHgMapping {
STATS::gets_master.add_value(1);
ctx.perf_counters()
.increment_counter(PerfCounterType::SqlReadsMaster);
let (mut master_mappings, _) =
select_mapping(ctx.fb, &self.read_master_connection, repo_id, left_to_fetch).await?;
let (mut master_mappings, _) = select_mapping(
ctx.fb,
&self.read_master_connection,
self.repo_id,
left_to_fetch,
)
.await?;
mappings.append(&mut master_mappings);
Ok(mappings)
@ -426,7 +401,6 @@ impl BonsaiHgMapping for SqlBonsaiHgMapping {
async fn get_hg_in_range(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
low: HgChangesetId,
high: HgChangesetId,
limit: usize,
@ -438,7 +412,7 @@ impl BonsaiHgMapping for SqlBonsaiHgMapping {
.increment_counter(PerfCounterType::SqlReadsReplica);
let rows = SelectHgChangesetsByRange::query(
&self.read_connection.conn,
&repo_id,
&self.repo_id,
&low.as_bytes(),
&high.as_bytes(),
&limit,
@ -450,7 +424,7 @@ impl BonsaiHgMapping for SqlBonsaiHgMapping {
.increment_counter(PerfCounterType::SqlReadsMaster);
let rows = SelectHgChangesetsByRange::query(
&self.read_master_connection.conn,
&repo_id,
&self.repo_id,
&low.as_bytes(),
&high.as_bytes(),
&limit,
@ -478,7 +452,6 @@ async fn select_mapping(
BonsaiOrHgChangesetIds::Bonsai(bcs_ids) => {
let ret = connection
.bonsai
.get(repo_id)
.dispatch(fb, bcs_ids.into_iter().collect(), || {
let conn = connection.conn.clone();
move |bcs_ids| async move {
@ -512,7 +485,6 @@ async fn select_mapping(
BonsaiOrHgChangesetIds::Hg(hg_cs_ids) => {
let ret = connection
.hg
.get(repo_id)
.dispatch(fb, hg_cs_ids.into_iter().collect(), || {
let conn = connection.conn.clone();
move |hg_cs_ids| async move {
@ -547,11 +519,7 @@ async fn select_mapping(
Ok((
found
.into_iter()
.map(move |(hg_cs_id, bcs_id)| BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
})
.map(move |(hg_cs_id, bcs_id)| BonsaiHgMappingEntry { hg_cs_id, bcs_id })
.collect(),
missing,
))

View File

@ -21,8 +21,8 @@ use std::sync::{
};
type Cache = (
HashMap<(RepositoryId, ChangesetId), HgChangesetId>,
HashMap<(RepositoryId, HgChangesetId), ChangesetId>,
HashMap<ChangesetId, HgChangesetId>,
HashMap<HgChangesetId, ChangesetId>,
);
#[derive(Clone)]
@ -64,10 +64,9 @@ impl<T: BonsaiHgMapping + Clone + 'static> MemWritesBonsaiHgMapping<T> {
async fn get_from_cache_and_inner<I, O>(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_ids: Vec<I>,
get_cache: impl Fn(&Cache) -> &HashMap<(RepositoryId, I), O>,
make_entry: impl Fn(RepositoryId, I, O) -> BonsaiHgMappingEntry,
get_cache: impl Fn(&Cache) -> &HashMap<I, O>,
make_entry: impl Fn(I, O) -> BonsaiHgMappingEntry,
) -> Result<Vec<BonsaiHgMappingEntry>, Error>
where
Vec<I>: Into<BonsaiOrHgChangesetIds>,
@ -81,15 +80,15 @@ impl<T: BonsaiHgMapping + Clone + 'static> MemWritesBonsaiHgMapping<T> {
self.cache.with(|cache| {
let cache = get_cache(cache);
match cache.get(&(repo_id, i)).copied() {
Some(o) => from_cache.push(make_entry(repo_id, i, o)),
match cache.get(&i).copied() {
Some(o) => from_cache.push(make_entry(i, o)),
None => from_inner.push(i),
};
});
}
if !self.no_access_to_inner.load(Ordering::Relaxed) {
let from_inner = self.inner.get(ctx, repo_id, from_inner.into()).await?;
let from_inner = self.inner.get(ctx, from_inner.into()).await?;
from_cache.extend(from_inner);
}
Ok(from_cache)
@ -98,6 +97,10 @@ impl<T: BonsaiHgMapping + Clone + 'static> MemWritesBonsaiHgMapping<T> {
#[async_trait]
impl<T: BonsaiHgMapping + Clone + 'static> BonsaiHgMapping for MemWritesBonsaiHgMapping<T> {
fn repo_id(&self) -> RepositoryId {
self.inner.repo_id()
}
async fn add(&self, ctx: &CoreContext, entry: BonsaiHgMappingEntry) -> Result<bool, Error> {
if self.readonly.load(Ordering::Relaxed) {
return Err(anyhow!(
@ -107,19 +110,15 @@ impl<T: BonsaiHgMapping + Clone + 'static> BonsaiHgMapping for MemWritesBonsaiHg
let this = self.clone();
let BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
} = entry;
let BonsaiHgMappingEntry { hg_cs_id, bcs_id } = entry;
let entry = this.get_hg_from_bonsai(ctx, repo_id, bcs_id).await?;
let entry = this.get_hg_from_bonsai(ctx, bcs_id).await?;
if entry.is_some() && !self.save_noop_writes.load(Ordering::Relaxed) {
Ok(false)
} else {
this.cache.with(|cache| {
cache.0.insert((repo_id, bcs_id), hg_cs_id);
cache.1.insert((repo_id, hg_cs_id), bcs_id);
cache.0.insert(bcs_id, hg_cs_id);
cache.1.insert(hg_cs_id, bcs_id);
});
Ok(true)
}
@ -128,35 +127,24 @@ impl<T: BonsaiHgMapping + Clone + 'static> BonsaiHgMapping for MemWritesBonsaiHg
async fn get(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_ids: BonsaiOrHgChangesetIds,
) -> Result<Vec<BonsaiHgMappingEntry>, Error> {
match cs_ids {
BonsaiOrHgChangesetIds::Bonsai(bcs_ids) => {
self.get_from_cache_and_inner(
ctx,
repo_id,
bcs_ids,
|cache| &cache.0,
|repo_id, bcs_id, hg_cs_id| BonsaiHgMappingEntry {
repo_id,
bcs_id,
hg_cs_id,
},
|bcs_id, hg_cs_id| BonsaiHgMappingEntry { bcs_id, hg_cs_id },
)
.await
}
BonsaiOrHgChangesetIds::Hg(hg_cs_ids) => {
self.get_from_cache_and_inner(
ctx,
repo_id,
hg_cs_ids,
|cache| &cache.1,
|repo_id, hg_cs_id, bcs_id| BonsaiHgMappingEntry {
repo_id,
bcs_id,
hg_cs_id,
},
|hg_cs_id, bcs_id| BonsaiHgMappingEntry { bcs_id, hg_cs_id },
)
.await
}
@ -166,7 +154,6 @@ impl<T: BonsaiHgMapping + Clone + 'static> BonsaiHgMapping for MemWritesBonsaiHg
async fn get_hg_in_range(
&self,
_ctx: &CoreContext,
_repo_id: RepositoryId,
_low: HgChangesetId,
_high: HgChangesetId,
_limit: usize,

View File

@ -36,7 +36,6 @@ use std::sync::{
async fn add_and_get<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
let ctx = CoreContext::test_mock(fb);
let entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::ONES_CSID,
};
@ -56,23 +55,22 @@ async fn add_and_get<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
);
let result = mapping
.get(&ctx, REPO_ZERO, hg::ONES_CSID.into())
.get(&ctx, hg::ONES_CSID.into())
.await
.expect("Get failed");
assert_eq!(result, vec![entry.clone()]);
let result = mapping
.get_hg_from_bonsai(&ctx, REPO_ZERO, bonsai::ONES_CSID)
.get_hg_from_bonsai(&ctx, bonsai::ONES_CSID)
.await
.expect("Failed to get hg changeset by its bonsai counterpart");
assert_eq!(result, Some(hg::ONES_CSID));
let result = mapping
.get_bonsai_from_hg(&ctx, REPO_ZERO, hg::ONES_CSID)
.get_bonsai_from_hg(&ctx, hg::ONES_CSID)
.await
.expect("Failed to get bonsai changeset by its hg counterpart");
assert_eq!(result, Some(bonsai::ONES_CSID));
let same_bc_entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::TWOS_CSID, // differ from entry.hg_cs_id
bcs_id: bonsai::ONES_CSID,
};
@ -86,7 +84,6 @@ async fn add_and_get<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
);
let same_hg_entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::TWOS_CSID, // differ from entry.bcs_id
};
@ -103,7 +100,7 @@ async fn add_and_get<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
async fn missing<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
let ctx = CoreContext::test_mock(fb);
let result = mapping
.get(&ctx, REPO_ZERO, bonsai::ONES_CSID.into())
.get(&ctx, bonsai::ONES_CSID.into())
.await
.expect("Failed to fetch missing changeset (should succeed with None instead)");
assert_eq!(result, vec![]);
@ -113,22 +110,18 @@ async fn get_many_hg_by_prefix<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M)
let ctx = CoreContext::test_mock(fb);
let entry1 = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::ONES_CSID,
};
let entry2 = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::TWOS_CSID,
bcs_id: bonsai::TWOS_CSID,
};
let entry3 = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::FS_ES_CSID,
bcs_id: bonsai::FS_ES_CSID,
};
let entry4 = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::FS_CSID,
bcs_id: bonsai::FS_CSID,
};
@ -166,7 +159,6 @@ async fn get_many_hg_by_prefix<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M)
let result = mapping
.get_many_hg_by_prefix(
&ctx,
REPO_ZERO,
HgChangesetIdPrefix::from_bytes(&hg::ONES_CSID.as_ref()[0..8]).unwrap(),
10,
)
@ -182,7 +174,6 @@ async fn get_many_hg_by_prefix<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M)
let result = mapping
.get_many_hg_by_prefix(
&ctx,
REPO_ZERO,
HgChangesetIdPrefix::from_bytes(&hg::TWOS_CSID.as_ref()[0..10]).unwrap(),
1,
)
@ -198,7 +189,6 @@ async fn get_many_hg_by_prefix<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M)
let result = mapping
.get_many_hg_by_prefix(
&ctx,
REPO_ZERO,
HgChangesetIdPrefix::from_bytes(&hg::FS_CSID.as_ref()[0..8]).unwrap(),
10,
)
@ -212,12 +202,7 @@ async fn get_many_hg_by_prefix<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M)
// found several changesets within the limit (try odd hex string prefix this time)
let result = mapping
.get_many_hg_by_prefix(
&ctx,
REPO_ZERO,
HgChangesetIdPrefix::from_str(&"fff").unwrap(),
10,
)
.get_many_hg_by_prefix(&ctx, HgChangesetIdPrefix::from_str("fff").unwrap(), 10)
.await
.expect("Failed to get hg changeset by its prefix");
@ -230,7 +215,6 @@ async fn get_many_hg_by_prefix<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M)
let result = mapping
.get_many_hg_by_prefix(
&ctx,
REPO_ZERO,
HgChangesetIdPrefix::from_bytes(&hg::FS_CSID.as_ref()[0..8]).unwrap(),
1,
)
@ -246,7 +230,6 @@ async fn get_many_hg_by_prefix<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M)
let result = mapping
.get_many_hg_by_prefix(
&ctx,
REPO_ZERO,
HgChangesetIdPrefix::from_bytes(&hg::THREES_CSID.as_ref()[0..16]).unwrap(),
10,
)
@ -260,17 +243,14 @@ async fn get_hg_in_range<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
let ctx = CoreContext::test_mock(fb);
let entry1 = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::ONES_CSID,
};
let entry2 = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::TWOS_CSID,
bcs_id: bonsai::TWOS_CSID,
};
let entry3 = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::THREES_CSID,
bcs_id: bonsai::THREES_CSID,
};
@ -281,7 +261,7 @@ async fn get_hg_in_range<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
assert!(
mapping
.get_hg_in_range(&ctx, REPO_ZERO, hg::AS_CSID, hg::BS_CSID, 10)
.get_hg_in_range(&ctx, hg::AS_CSID, hg::BS_CSID, 10)
.await
.unwrap()
.is_empty(),
@ -290,7 +270,7 @@ async fn get_hg_in_range<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
assert_eq!(
vec![hg::ONES_CSID],
mapping
.get_hg_in_range(&ctx, REPO_ZERO, hg::ONES_CSID, hg::ONES_CSID, 10)
.get_hg_in_range(&ctx, hg::ONES_CSID, hg::ONES_CSID, 10)
.await
.unwrap()
);
@ -298,7 +278,7 @@ async fn get_hg_in_range<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
assert_eq!(
vec![hg::ONES_CSID, hg::TWOS_CSID],
mapping
.get_hg_in_range(&ctx, REPO_ZERO, hg::ONES_CSID, hg::TWOS_CSID, 10)
.get_hg_in_range(&ctx, hg::ONES_CSID, hg::TWOS_CSID, 10)
.await
.unwrap()
);
@ -306,7 +286,7 @@ async fn get_hg_in_range<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
assert_eq!(
vec![hg::ONES_CSID],
mapping
.get_hg_in_range(&ctx, REPO_ZERO, hg::ONES_CSID, hg::TWOS_CSID, 1)
.get_hg_in_range(&ctx, hg::ONES_CSID, hg::TWOS_CSID, 1)
.await
.unwrap()
);
@ -314,7 +294,7 @@ async fn get_hg_in_range<M: BonsaiHgMapping>(fb: FacebookInit, mapping: M) {
assert_eq!(
vec![hg::ONES_CSID, hg::TWOS_CSID, hg::THREES_CSID],
mapping
.get_hg_in_range(&ctx, REPO_ZERO, hg::NULL_CSID, hg::FS_CSID, 10)
.get_hg_in_range(&ctx, hg::NULL_CSID, hg::FS_CSID, 10)
.await
.unwrap()
);
@ -345,6 +325,10 @@ impl CountedBonsaiHgMapping {
#[async_trait]
impl BonsaiHgMapping for CountedBonsaiHgMapping {
fn repo_id(&self) -> RepositoryId {
self.mapping.repo_id()
}
async fn add(&self, ctx: &CoreContext, entry: BonsaiHgMappingEntry) -> Result<bool, Error> {
self.adds.fetch_add(1, Ordering::Relaxed);
self.mapping.add(ctx, entry).await
@ -353,25 +337,21 @@ impl BonsaiHgMapping for CountedBonsaiHgMapping {
async fn get(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
cs_id: BonsaiOrHgChangesetIds,
) -> Result<Vec<BonsaiHgMappingEntry>, Error> {
self.gets.fetch_add(1, Ordering::Relaxed);
self.mapping.get(ctx, repo_id, cs_id).await
self.mapping.get(ctx, cs_id).await
}
async fn get_hg_in_range(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
low: HgChangesetId,
high: HgChangesetId,
limit: usize,
) -> Result<Vec<HgChangesetId>, Error> {
self.gets_many_hg_by_prefix.fetch_add(1, Ordering::Relaxed);
self.mapping
.get_hg_in_range(ctx, repo_id, low, high, limit)
.await
self.mapping.get_hg_in_range(ctx, low, high, limit).await
}
}
@ -389,7 +369,6 @@ async fn caching<M: BonsaiHgMapping + 'static>(fb: FacebookInit, mapping: M) {
let mapping = CachingBonsaiHgMapping::new_test(Arc::new(mapping));
let entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::ONES_CSID,
};
@ -402,21 +381,21 @@ async fn caching<M: BonsaiHgMapping + 'static>(fb: FacebookInit, mapping: M) {
);
let result = mapping
.get_bonsai_from_hg(&ctx, REPO_ZERO, hg::ONES_CSID)
.get_bonsai_from_hg(&ctx, hg::ONES_CSID)
.await
.expect("Failed to get bonsai changeset by its hg counterpart");
assert_eq!(result, Some(bonsai::ONES_CSID));
assert_eq!(gets.load(Ordering::Relaxed), 1);
let result = mapping
.get_bonsai_from_hg(&ctx, REPO_ZERO, hg::ONES_CSID)
.get_bonsai_from_hg(&ctx, hg::ONES_CSID)
.await
.expect("Failed to get bonsai changeset by its hg counterpart");
assert_eq!(result, Some(bonsai::ONES_CSID));
assert_eq!(gets.load(Ordering::Relaxed), 1);
let result = mapping
.get_bonsai_from_hg(&ctx, REPO_ZERO, hg::TWOS_CSID)
.get_bonsai_from_hg(&ctx, hg::TWOS_CSID)
.await
.expect("Failed to get bonsai changeset by its hg counterpart");
assert_eq!(result, None);
@ -429,7 +408,7 @@ async fn test_add_and_get(fb: FacebookInit) {
fb,
SqlBonsaiHgMappingBuilder::with_sqlite_in_memory()
.unwrap()
.build(RendezVousOptions::for_test()),
.build(REPO_ZERO, RendezVousOptions::for_test()),
)
.await;
}
@ -440,7 +419,7 @@ async fn test_missing(fb: FacebookInit) {
fb,
SqlBonsaiHgMappingBuilder::with_sqlite_in_memory()
.unwrap()
.build(RendezVousOptions::for_test()),
.build(REPO_ZERO, RendezVousOptions::for_test()),
)
.await;
}
@ -451,7 +430,7 @@ async fn test_caching(fb: FacebookInit) {
fb,
SqlBonsaiHgMappingBuilder::with_sqlite_in_memory()
.unwrap()
.build(RendezVousOptions::for_test()),
.build(REPO_ZERO, RendezVousOptions::for_test()),
)
.await;
}
@ -462,7 +441,7 @@ async fn test_get_many_hg_by_prefix(fb: FacebookInit) {
fb,
SqlBonsaiHgMappingBuilder::with_sqlite_in_memory()
.unwrap()
.build(RendezVousOptions::for_test()),
.build(REPO_ZERO, RendezVousOptions::for_test()),
)
.await;
}
@ -473,7 +452,7 @@ async fn test_get_hg_in_range(fb: FacebookInit) {
fb,
SqlBonsaiHgMappingBuilder::with_sqlite_in_memory()
.unwrap()
.build(RendezVousOptions::for_test()),
.build(REPO_ZERO, RendezVousOptions::for_test()),
)
.await;
}
@ -482,11 +461,11 @@ async fn test_get_hg_in_range(fb: FacebookInit) {
async fn test_overwrite(fb: FacebookInit) -> Result<(), Error> {
let mapping = SqlBonsaiHgMappingBuilder::with_sqlite_in_memory()
.unwrap()
.build_with_overwrite(RendezVousOptions::for_test());
.with_overwrite()
.build(REPO_ZERO, RendezVousOptions::for_test());
let ctx = CoreContext::test_mock(fb);
let entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::ONES_CSID,
};
@ -500,7 +479,6 @@ async fn test_overwrite(fb: FacebookInit) -> Result<(), Error> {
);
let entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::TWOS_CSID,
};
@ -512,7 +490,7 @@ async fn test_overwrite(fb: FacebookInit) -> Result<(), Error> {
.expect("Adding new entry failed")
);
let result = mapping.get(&ctx, REPO_ZERO, hg::ONES_CSID.into()).await?;
let result = mapping.get(&ctx, hg::ONES_CSID.into()).await?;
assert_eq!(result, vec![entry.clone()]);
Ok(())

View File

@ -12,7 +12,6 @@ use clap::{ArgGroup, Args};
use context::CoreContext;
use mercurial_types::HgChangesetId;
use mononoke_types::ChangesetId;
use repo_identity::RepoIdentityRef;
/// Command line arguments for specifying a changeset.
#[derive(Args, Debug)]
@ -39,7 +38,7 @@ impl ChangesetArgs {
pub async fn resolve_changeset(
&self,
ctx: &CoreContext,
repo: &(impl BookmarksRef + BonsaiHgMappingRef + RepoIdentityRef),
repo: &(impl BookmarksRef + BonsaiHgMappingRef),
) -> Result<Option<ChangesetId>> {
if let Some(bookmark) = &self.bookmark {
repo.bookmarks()
@ -48,7 +47,7 @@ impl ChangesetArgs {
.with_context(|| format!("Failed to resolve bookmark '{}'", bookmark))
} else if let Some(hg_id) = self.hg_id {
repo.bonsai_hg_mapping()
.get_bonsai_from_hg(ctx, repo.repo_identity().id(), hg_id)
.get_bonsai_from_hg(ctx, hg_id)
.await
.with_context(|| format!("Failed to resolve hg changeset id {}", hg_id))
} else {

View File

@ -122,7 +122,7 @@ where
if let Ok(hg_id) = HgChangesetId::from_str(&hash_or_bookmark) {
if let Some(cs_id) = container
.bonsai_hg_mapping()
.get_bonsai_from_hg(&ctx, container.repo_identity().id(), hg_id)
.get_bonsai_from_hg(ctx, hg_id)
.await?
{
return Ok(cs_id);

View File

@ -252,7 +252,7 @@ async fn fetch_root_filenode(
// Check it and return None if hg changeset is not generated
let maybe_hg_cs_id = derivation_ctx
.bonsai_hg_mapping()?
.get_hg_from_bonsai(ctx, derivation_ctx.repo_id(), cs_id)
.get_hg_from_bonsai(ctx, cs_id)
.await?;
let hg_cs_id = if let Some(hg_cs_id) = maybe_hg_cs_id {
hg_cs_id

View File

@ -158,7 +158,6 @@ impl BonsaiDerivable for MappedHgChangesetId {
.add(
ctx,
BonsaiHgMappingEntry {
repo_id: derivation_ctx.repo_id(),
hg_cs_id: self.0,
bcs_id: changeset_id,
},
@ -186,7 +185,7 @@ impl BonsaiDerivable for MappedHgChangesetId {
) -> Result<HashMap<ChangesetId, Self>> {
Ok(derivation_ctx
.bonsai_hg_mapping()?
.get(ctx, derivation_ctx.repo_id(), changeset_ids.to_vec().into())
.get(ctx, changeset_ids.to_vec().into())
.await?
.into_iter()
.map(|entry| (entry.bcs_id, MappedHgChangesetId(entry.hg_cs_id)))

View File

@ -983,12 +983,7 @@ impl RepoContext {
ChangesetPrefixSpecifier::Hg(prefix) => ChangesetSpecifierPrefixResolution::from(
self.blob_repo()
.get_bonsai_hg_mapping()
.get_many_hg_by_prefix(
&self.ctx,
self.blob_repo().get_repoid(),
prefix,
MAX_LIMIT_AMBIGUOUS_IDS,
)
.get_many_hg_by_prefix(&self.ctx, prefix, MAX_LIMIT_AMBIGUOUS_IDS)
.await?,
),
ChangesetPrefixSpecifier::Bonsai(prefix) => ChangesetSpecifierPrefixResolution::from(

View File

@ -754,10 +754,9 @@ impl HgRepoContext {
high: HgChangesetId,
) -> Result<Vec<HgChangesetId>, MononokeError> {
const LIMIT: usize = 10;
let repo_id = self.repo().repoid();
let bonsai_hg_mapping = self.blob_repo().bonsai_hg_mapping();
bonsai_hg_mapping
.get_hg_in_range(self.ctx(), repo_id, low, high, LIMIT)
.get_hg_in_range(self.ctx(), low, high, LIMIT)
.await
.map_err(|e| e.into())
}

View File

@ -1330,7 +1330,6 @@ impl HgCommands for RepoClient {
repo.get_bonsai_hg_mapping()
.get_many_hg_by_prefix(
&ctx,
repo.get_repoid(),
cs_prefix,
MAX_NUMBER_OF_SUGGESTIONS_TO_FETCH,
)
@ -2652,7 +2651,7 @@ async fn maybe_validate_pushed_bonsais(
let entries = repo
.get_bonsai_hg_mapping()
.get(ctx, repo.get_repoid(), hg_cs_ids.into())
.get(ctx, hg_cs_ids.into())
.await?;
let actual_entries = entries

View File

@ -576,18 +576,19 @@ impl RepoFactory {
pub async fn bonsai_hg_mapping(
&self,
repo_identity: &ArcRepoIdentity,
repo_config: &ArcRepoConfig,
) -> Result<ArcBonsaiHgMapping> {
let builder = self
let mut builder = self
.open::<SqlBonsaiHgMappingBuilder>(&repo_config.storage_config.metadata)
.await
.context(RepoFactoryError::BonsaiHgMapping)?;
let bonsai_hg_mapping = if self.bonsai_hg_mapping_overwrite {
builder.build_with_overwrite(self.env.rendezvous_options)
} else {
builder.build(self.env.rendezvous_options)
};
if self.bonsai_hg_mapping_overwrite {
builder = builder.with_overwrite();
}
let bonsai_hg_mapping = builder.build(repo_identity.id(), self.env.rendezvous_options);
if let Some(pool) = self.maybe_volatile_pool("bonsai_hg_mapping")? {
Ok(Arc::new(CachingBonsaiHgMapping::new(

View File

@ -300,10 +300,10 @@ impl TestRepoFactory {
}
/// Construct Bonsai Hg Mapping using the in-memory metadata database.
pub fn bonsai_hg_mapping(&self) -> Result<ArcBonsaiHgMapping> {
pub fn bonsai_hg_mapping(&self, repo_identity: &ArcRepoIdentity) -> Result<ArcBonsaiHgMapping> {
Ok(Arc::new(
SqlBonsaiHgMappingBuilder::from_sql_connections(self.metadata_db.clone().into())
.build(RendezVousOptions::for_test()),
.build(repo_identity.id(), RendezVousOptions::for_test()),
))
}

View File

@ -167,7 +167,7 @@ impl CloneHints {
let csids: Vec<_> = idmap_entries.values().map(|&id| id).collect();
let hg_mapping: HashMap<_, _> = bonsai_hg_mapping
.get(ctx, self.inner.repo_id, csids.into())
.get(ctx, csids.into())
.await
.context("error converting from bonsai to hg")?
.into_iter()

View File

@ -75,7 +75,7 @@ pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> {
.parse::<HgChangesetId>()
.context("Invalid hg changeset id")?;
repo.bonsai_hg_mapping()
.get_bonsai_from_hg(&ctx, repo.repo_identity().id(), hg_cs_id)
.get_bonsai_from_hg(&ctx, hg_cs_id)
.await?
.ok_or_else(|| anyhow!("hg-bonsai mapping not found for {}", hg_cs_id))?
}
@ -112,7 +112,7 @@ pub async fn run(app: MononokeApp, args: CommandArgs) -> Result<()> {
IdentityScheme::Hg => {
let hg_cs_id = repo
.bonsai_hg_mapping()
.get_hg_from_bonsai(&ctx, repo.repo_identity().id(), cs_id)
.get_hg_from_bonsai(&ctx, cs_id)
.await?
.ok_or_else(|| anyhow!("bonsai-hg mapping not found for {}", cs_id))?;
println!("{}", hg_cs_id);

View File

@ -20,7 +20,6 @@ use mononoke_app::args::{ChangesetArgs, RepoArgs};
use mononoke_app::MononokeApp;
use mononoke_types::{ChangesetId, MPath};
use repo_blobstore::{RepoBlobstore, RepoBlobstoreRef};
use repo_identity::RepoIdentityRef;
use crate::repo::AdminRepo;
@ -120,7 +119,7 @@ async fn display_hg_entry(
) -> Result<()> {
let hg_cs_id = repo
.bonsai_hg_mapping()
.get_hg_from_bonsai(ctx, repo.repo_identity().id(), changeset_id)
.get_hg_from_bonsai(ctx, changeset_id)
.await
.context("Failed to get corresponding Hg changeset")?
.ok_or_else(|| anyhow!("No Hg changeset for {}", changeset_id))?;

View File

@ -16,7 +16,7 @@ use bulkops::Direction;
use context::{CoreContext, SamplingKey};
use dashmap::DashMap;
use mercurial_types::HgChangesetId;
use mononoke_types::{datetime::DateTime, ChangesetId, RepositoryId};
use mononoke_types::{datetime::DateTime, ChangesetId};
use phases::Phases;
use regex::Regex;
use slog::Logger;
@ -105,23 +105,21 @@ impl<T: Send + Sync> VisitOne for SamplingWalkVisitor<T> {
async fn get_bonsai_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<ChangesetId, Error> {
self.inner
.get_bonsai_from_hg(ctx, repo_id, bonsai_hg_mapping, hg_cs_id)
.get_bonsai_from_hg(ctx, bonsai_hg_mapping, hg_cs_id)
.await
}
async fn defer_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<Option<ChangesetId>, Error> {
self.inner
.defer_from_hg(ctx, repo_id, bonsai_hg_mapping, hg_cs_id)
.defer_from_hg(ctx, bonsai_hg_mapping, hg_cs_id)
.await
}
}

View File

@ -25,7 +25,7 @@ use itertools::Itertools;
use mercurial_types::{HgChangesetId, HgFileNodeId, HgManifestId};
use mononoke_types::{
ChangesetId, ContentId, DeletedManifestId, FastlogBatchId, FileUnodeId, FsnodeId,
ManifestUnodeId, RepositoryId, SkeletonManifestId,
ManifestUnodeId, SkeletonManifestId,
};
use phases::{Phase, Phases};
use slog::{info, Logger};
@ -697,7 +697,6 @@ impl VisitOne for WalkState {
async fn get_bonsai_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<ChangesetId, Error> {
@ -706,7 +705,7 @@ impl VisitOne for WalkState {
*bcs_id
} else {
let bcs_id = bonsai_hg_mapping
.get_bonsai_from_hg(ctx, repo_id, hg_cs_id.clone())
.get_bonsai_from_hg(ctx, hg_cs_id.clone())
.await?;
if let Some(bcs_id) = bcs_id {
let bcs_int = self.bcs_ids.interned(&bcs_id);
@ -723,7 +722,6 @@ impl VisitOne for WalkState {
async fn defer_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<Option<ChangesetId>, Error> {
@ -731,7 +729,7 @@ impl VisitOne for WalkState {
return Ok(None);
}
let bcs_id = self
.get_bonsai_from_hg(ctx, repo_id, bonsai_hg_mapping, hg_cs_id)
.get_bonsai_from_hg(ctx, bonsai_hg_mapping, hg_cs_id)
.await?;
let id = self.bcs_ids.interned(&bcs_id);
if self.chunk_bcs.contains_key(&id) {

View File

@ -426,11 +426,7 @@ where
// bulk prepopulate the hg/bonsai mappings
let ids =
BonsaiOrHgChangesetIds::Bonsai(chunk_members.clone().into_iter().collect());
repo_params
.repo
.bonsai_hg_mapping()
.get(&ctx, repo_id, ids)
.await?
repo_params.repo.bonsai_hg_mapping().get(&ctx, ids).await?
} else {
vec![]
};

View File

@ -43,7 +43,7 @@ use futures::{future::try_join_all, stream::TryStreamExt};
use itertools::Itertools;
use maplit::hashset;
use mercurial_types::HgChangesetId;
use mononoke_types::{ChangesetId, MPath, RepositoryId};
use mononoke_types::{ChangesetId, MPath};
use phases::{Phase, Phases};
use scuba_ext::MononokeScubaSampleBuilder;
use slog::{info, warn, Logger};
@ -235,24 +235,22 @@ impl VisitOne for ValidatingVisitor {
async fn get_bonsai_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<ChangesetId, Error> {
self.inner
.get_bonsai_from_hg(ctx, repo_id, bonsai_hg_mapping, hg_cs_id)
.get_bonsai_from_hg(ctx, bonsai_hg_mapping, hg_cs_id)
.await
}
async fn defer_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<Option<ChangesetId>, Error> {
self.inner
.defer_from_hg(ctx, repo_id, bonsai_hg_mapping, hg_cs_id)
.defer_from_hg(ctx, bonsai_hg_mapping, hg_cs_id)
.await
}
}

View File

@ -47,7 +47,7 @@ use mercurial_types::{FileBytes, HgChangesetId, HgFileNodeId, HgManifestId, Repo
use mononoke_types::{
blame::BlameMaybeRejected, fsnode::FsnodeEntry, skeleton_manifest::SkeletonManifestEntry,
unode::UnodeEntry, BlameId, ChangesetId, ContentId, DeletedManifestId, FastlogBatchId,
FileUnodeId, FsnodeId, MPath, ManifestUnodeId, RepositoryId, SkeletonManifestId,
FileUnodeId, FsnodeId, MPath, ManifestUnodeId, SkeletonManifestId,
};
use phases::{Phase, Phases, PhasesRef};
use scuba_ext::MononokeScubaSampleBuilder;
@ -141,7 +141,6 @@ pub trait VisitOne {
async fn get_bonsai_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<ChangesetId, Error>;
@ -150,7 +149,6 @@ pub trait VisitOne {
async fn defer_from_hg(
&self,
ctx: &CoreContext,
repo_id: RepositoryId,
bonsai_hg_mapping: &dyn BonsaiHgMapping,
hg_cs_id: &HgChangesetId,
) -> Result<Option<ChangesetId>, Error>;
@ -1638,7 +1636,6 @@ struct Checker<V: VisitOne> {
visitor: V,
phases_store: Arc<dyn Phases>,
bonsai_hg_mapping: Arc<dyn BonsaiHgMapping>,
repo_id: RepositoryId,
with_blame: bool,
with_fastlog: bool,
with_filenodes: bool,
@ -1669,7 +1666,7 @@ impl<V: VisitOne> Checker<V> {
hg_cs_id: &HgChangesetId,
) -> Result<ChangesetId, Error> {
self.visitor
.get_bonsai_from_hg(ctx, self.repo_id, self.bonsai_hg_mapping.as_ref(), hg_cs_id)
.get_bonsai_from_hg(ctx, self.bonsai_hg_mapping.as_ref(), hg_cs_id)
.await
}
@ -1679,7 +1676,7 @@ impl<V: VisitOne> Checker<V> {
hg_cs_id: &HgChangesetId,
) -> Result<Option<ChangesetId>, Error> {
self.visitor
.defer_from_hg(ctx, self.repo_id, self.bonsai_hg_mapping.as_ref(), hg_cs_id)
.defer_from_hg(ctx, self.bonsai_hg_mapping.as_ref(), hg_cs_id)
.await
}
@ -1853,7 +1850,6 @@ where
required_node_data_types,
phases_store: repo.phases().with_frozen_public_heads(heads),
bonsai_hg_mapping: repo.get_bonsai_hg_mapping().clone(),
repo_id: repo.get_repoid(),
});
Ok(limited_by_key_shardable(