mononoke/bonsai_globalrev_mapping: remove some methods we're not going to use

Summary:
My current plan for globalrevs is to assign them as part of the pushrebase
step, which ensures we put them in the Bonsais we generate (latency-wise, this
should be OK since we're already doing a roundtrip to the DB for bookmarks
anyway).

However, the current methods in our Bonsai Globalrev mapping aren't very
amenable to this:

- `add_many` ignores duplicates, so if we used that, we would end up not
  noticing duplicate globalrevs, which is problematic
- `add` inserts globalrevs one at a time, which isn't desirable. Also, it has a
  lot of ad-hoc logic to compensate for the fact that it does a `INSERT IGNORE`
  when what we really need is a regular "`INSERT` and bail on duplicates".

So, let's:

- Remove `add()` altogether. It's not being used right now anyway.
- Clarify the purpose of `add_many` by renaming it to `bulk_import`, which is
  for contexts where we expect Globalrevs to be correct (i.e. backfill &
      blobimport).

Also, while in there, removing some un-necessary Vec allocations.

Reviewed By: StanislavGlebik

Differential Revision: D19496573

fbshipit-source-id: 7d3ba25be04c71f3d54e36dd54b565dfad0d6ebc
This commit is contained in:
Thomas Orozco 2020-01-21 13:24:44 -08:00 committed by Facebook Github Bot
parent 267113e02b
commit 563f267f8c
6 changed files with 28 additions and 125 deletions

View File

@ -27,7 +27,7 @@ use futures_ext::{BoxFuture, FutureExt, StreamExt};
use slog::{debug, error, info, Logger};
use blobrepo::BlobRepo;
use bonsai_globalrev_mapping::{upload_globalrevs, BonsaiGlobalrevMapping};
use bonsai_globalrev_mapping::{bulk_import_globalrevs, BonsaiGlobalrevMapping};
use context::CoreContext;
use mercurial_revlog::{revlog::RevIdx, RevlogRepo};
use mercurial_types::{HgChangesetId, HgNodeHash};
@ -185,11 +185,11 @@ impl Blobimport {
};
let globalrevs_work = if has_globalrev {
upload_globalrevs(
bulk_import_globalrevs(
ctx.clone(),
repo_id,
globalrevs_store.clone(),
chunk.into_iter().map(|(_, cs)| cs).collect(),
chunk.into_iter().map(|(_, cs)| cs),
)
.left_future()
} else {

View File

@ -1,18 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License found in the LICENSE file in the root
* directory of this source tree.
*/
use super::BonsaiGlobalrevMappingEntry;
use thiserror::Error;
#[derive(Debug, Eq, Error, PartialEq)]
pub enum ErrorKind {
#[error("Conflicting entries: stored:{0:?} current:{1:?}")]
ConflictingEntries(BonsaiGlobalrevMappingEntry, BonsaiGlobalrevMappingEntry),
#[error("Conflict detected during insert, but no value was there for: {0:?}")]
RaceConditionWithDelete(BonsaiGlobalrevMappingEntry),
}

View File

@ -24,10 +24,6 @@ use slog::warn;
use sql::queries;
use std::sync::Arc;
mod errors;
pub use crate::errors::ErrorKind;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct BonsaiGlobalrevMappingEntry {
pub repo_id: RepositoryId,
@ -84,9 +80,7 @@ impl From<Vec<Globalrev>> for BonsaisOrGlobalrevs {
}
pub trait BonsaiGlobalrevMapping: Send + Sync {
fn add(&self, entry: BonsaiGlobalrevMappingEntry) -> BoxFuture<bool, Error>;
fn add_many(&self, entries: Vec<BonsaiGlobalrevMappingEntry>) -> BoxFuture<(), Error>;
fn bulk_import(&self, entries: &[BonsaiGlobalrevMappingEntry]) -> BoxFuture<(), Error>;
fn get(
&self,
@ -108,12 +102,8 @@ pub trait BonsaiGlobalrevMapping: Send + Sync {
}
impl BonsaiGlobalrevMapping for Arc<dyn BonsaiGlobalrevMapping> {
fn add(&self, entry: BonsaiGlobalrevMappingEntry) -> BoxFuture<bool, Error> {
(**self).add(entry)
}
fn add_many(&self, entries: Vec<BonsaiGlobalrevMappingEntry>) -> BoxFuture<(), Error> {
(**self).add_many(entries)
fn bulk_import(&self, entries: &[BonsaiGlobalrevMappingEntry]) -> BoxFuture<(), Error> {
(**self).bulk_import(entries)
}
fn get(
@ -142,7 +132,7 @@ impl BonsaiGlobalrevMapping for Arc<dyn BonsaiGlobalrevMapping> {
}
queries! {
write InsertMapping(values: (
write BulkImportGlobalrevs(values: (
repo_id: RepositoryId,
bcs_id: ChangesetId,
globalrev: Globalrev,
@ -198,54 +188,7 @@ impl SqlConstructors for SqlBonsaiGlobalrevMapping {
}
impl BonsaiGlobalrevMapping for SqlBonsaiGlobalrevMapping {
fn add(&self, entry: BonsaiGlobalrevMappingEntry) -> BoxFuture<bool, Error> {
let BonsaiGlobalrevMappingEntry {
repo_id,
bcs_id,
globalrev,
} = entry;
cloned!(self.read_master_connection);
InsertMapping::query(&self.write_connection, &[(&repo_id, &bcs_id, &globalrev)])
.and_then(move |result| {
if result.affected_rows() == 1 {
Ok(true).into_future().boxify()
} else {
select_mapping(
&read_master_connection,
repo_id,
&BonsaisOrGlobalrevs::Bonsai(vec![bcs_id]),
)
.and_then(move |mappings| match mappings.into_iter().next() {
Some(BonsaiGlobalrevMappingEntry {
repo_id,
bcs_id,
globalrev,
}) => {
if globalrev == entry.globalrev {
Ok(false)
} else {
Err(ErrorKind::ConflictingEntries(
BonsaiGlobalrevMappingEntry {
repo_id,
bcs_id,
globalrev,
},
entry,
)
.into())
}
}
None => Err(ErrorKind::RaceConditionWithDelete(entry).into()),
})
.map(move |_| false)
.boxify()
}
})
.boxify()
}
fn add_many(&self, entries: Vec<BonsaiGlobalrevMappingEntry>) -> BoxFuture<(), Error> {
fn bulk_import(&self, entries: &[BonsaiGlobalrevMappingEntry]) -> BoxFuture<(), Error> {
let entries: Vec<_> = entries
.iter()
.map(
@ -257,7 +200,7 @@ impl BonsaiGlobalrevMapping for SqlBonsaiGlobalrevMapping {
)
.collect();
InsertMapping::query(&self.write_connection, &entries[..])
BulkImportGlobalrevs::query(&self.write_connection, &entries[..])
.from_err()
.map(|_| ())
.boxify()
@ -381,15 +324,17 @@ fn select_mapping(
.boxify()
}
pub fn upload_globalrevs(
/// This method is for importing Globalrevs in bulk from a set of BonsaiChangesets where you know
/// they are correct. Don't use this to assign new Globalrevs.
pub fn bulk_import_globalrevs(
ctx: CoreContext,
repo_id: RepositoryId,
globalrevs_store: Arc<dyn BonsaiGlobalrevMapping>,
cs_ids: Vec<BonsaiChangeset>,
changesets: impl IntoIterator<Item = BonsaiChangeset>,
) -> BoxFuture<(), Error> {
let mut entries = vec![];
for bcs in cs_ids {
match Globalrev::from_bcs(bcs.clone()) {
for bcs in changesets.into_iter() {
match Globalrev::from_bcs(&bcs) {
Ok(globalrev) => {
let entry =
BonsaiGlobalrevMappingEntry::new(repo_id, bcs.get_changeset_id(), globalrev);
@ -403,5 +348,5 @@ pub fn upload_globalrevs(
}
}
}
globalrevs_store.add_many(entries).boxify()
globalrevs_store.bulk_import(&entries)
}

View File

@ -8,9 +8,8 @@
#![deny(warnings)]
use assert_matches::assert_matches;
use bonsai_globalrev_mapping::{
BonsaiGlobalrevMapping, BonsaiGlobalrevMappingEntry, BonsaisOrGlobalrevs, ErrorKind,
BonsaiGlobalrevMapping, BonsaiGlobalrevMappingEntry, BonsaisOrGlobalrevs,
SqlBonsaiGlobalrevMapping, SqlConstructors,
};
use futures::Future;
@ -24,20 +23,11 @@ fn add_and_get<M: BonsaiGlobalrevMapping>(mapping: M) {
bcs_id: bonsai::ONES_CSID,
globalrev: GLOBALREV_ZERO,
};
assert_eq!(
true,
mapping
.add(entry.clone())
.bulk_import(&vec![entry.clone()])
.wait()
.expect("Adding new entry failed")
);
assert_eq!(
false,
mapping
.add(entry.clone())
.wait()
.expect("Adding same entry failed")
);
.expect("Adding new entry failed");
let result = mapping
.get(
@ -57,23 +47,9 @@ fn add_and_get<M: BonsaiGlobalrevMapping>(mapping: M) {
.wait()
.expect("Failed to get bonsai changeset by its globalrev counterpart");
assert_eq!(result, Some(bonsai::ONES_CSID));
let same_bc_entry = BonsaiGlobalrevMappingEntry {
repo_id: REPO_ZERO,
bcs_id: bonsai::ONES_CSID,
globalrev: GLOBALREV_ONE, // different than entry.globalrev
};
let result = mapping
.add(same_bc_entry.clone())
.wait()
.expect_err("Conflicting entries should haved produced an error");
assert_matches!(
result.downcast::<ErrorKind>(),
Ok(ErrorKind::ConflictingEntries(ref e0, ref e1)) if e0 == &entry && e1 == &same_bc_entry
);
}
fn add_many<M: BonsaiGlobalrevMapping>(mapping: M) {
fn bulk_import<M: BonsaiGlobalrevMapping>(mapping: M) {
let entry1 = BonsaiGlobalrevMappingEntry {
repo_id: REPO_ZERO,
bcs_id: bonsai::ONES_CSID,
@ -93,7 +69,7 @@ fn add_many<M: BonsaiGlobalrevMapping>(mapping: M) {
assert_eq!(
(),
mapping
.add_many(vec![entry1.clone(), entry2.clone(), entry3.clone()])
.bulk_import(&vec![entry1.clone(), entry2.clone(), entry3.clone()])
.wait()
.expect("Adding new entries vector failed")
);
@ -120,7 +96,7 @@ fn test_add_and_get() {
#[fbinit::test]
fn test_add_many() {
async_unit::tokio_unit_test(move || {
add_many(SqlBonsaiGlobalrevMapping::with_sqlite_in_memory().unwrap());
bulk_import(SqlBonsaiGlobalrevMapping::with_sqlite_in_memory().unwrap());
});
}

View File

@ -9,7 +9,7 @@
use anyhow::Error;
use blobrepo::BlobRepo;
use bonsai_globalrev_mapping::{
upload_globalrevs, BonsaiGlobalrevMapping, SqlBonsaiGlobalrevMapping,
bulk_import_globalrevs, BonsaiGlobalrevMapping, SqlBonsaiGlobalrevMapping,
};
use bytes::Bytes;
use changesets::{deserialize_cs_entries, ChangesetEntry};
@ -58,7 +58,7 @@ pub fn upload<P: AsRef<Path>>(
.buffered(chunk_size)
.chunks(chunk_size)
.and_then(move |chunk| {
upload_globalrevs(
bulk_import_globalrevs(
ctx.clone(),
repo.get_repoid(),
globalrevs_store.clone(),

View File

@ -40,7 +40,7 @@ impl Globalrev {
Ok(result)
}
pub fn from_bcs(bcs: BonsaiChangeset) -> Result<Self> {
pub fn from_bcs(bcs: &BonsaiChangeset) -> Result<Self> {
match (
bcs.extra().find(|(key, _)| key == &"global_rev"),
bcs.extra().find(|(key, _)| key == &"convert_revision"),