sql mapping between {Bonsai|Hg} changesets

Summary:
We need to store relation between Hg changesets and Bonsai changesets.
- `BonsaiHgMapping` is exactly this mapping which establishes injective relation between `{Hg|Bonsai}Changeset`

Reviewed By: StanislavGlebik

Differential Revision: D8801254

fbshipit-source-id: c7df14172e6c2d67c039a24e1bb821e6d92860af
This commit is contained in:
Pavel Aslanov 2018-07-11 08:52:38 -07:00 committed by Facebook Github Bot
parent 2d3f584e65
commit fcb58cf972
10 changed files with 685 additions and 2 deletions

View File

@ -0,0 +1,7 @@
CREATE TABLE bonsai_hg_mapping (
repo_id INTEGER NOT NULL,
hg_cs_id BINARY(20) NOT NULL,
bcs_id BINARY(32) NOT NULL,
UNIQUE (repo_id, hg_cs_id),
PRIMARY KEY (repo_id, bcs_id)
);

View File

@ -0,0 +1,7 @@
CREATE TABLE bonsai_hg_mapping (
repo_id INTEGER NOT NULL,
hg_cs_id BINARY(20) NOT NULL,
bcs_id BINARY(32) NOT NULL,
UNIQUE (repo_id, hg_cs_id),
PRIMARY KEY (repo_id, bcs_id)
);

View File

@ -0,0 +1,15 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use super::BonsaiHgMappingEntry;
pub use failure::{Error, Result};
#[derive(Debug, Eq, Fail, PartialEq)]
pub enum ErrorKind {
#[fail(display = "Connection error")] ConnectionError,
#[fail(display = "Conflicting entries: stored:{:?} current:{:?}", _0, _1)]
ConflictingEntries(BonsaiHgMappingEntry, BonsaiHgMappingEntry),
}

View File

@ -0,0 +1,396 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
#![feature(try_from, never_type)]
extern crate asyncmemo;
extern crate db_conn;
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate failure_ext as failure;
extern crate futures;
extern crate heapsize;
#[macro_use]
extern crate heapsize_derive;
extern crate tokio;
extern crate db;
extern crate futures_ext;
#[macro_use]
extern crate lazy_static;
extern crate mercurial_types;
extern crate mononoke_types;
#[macro_use]
extern crate stats;
use std::result;
use std::sync::{Arc, MutexGuard};
use asyncmemo::{Asyncmemo, Filler, Weight};
use db_conn::{MysqlConnInner, SqliteConnInner};
use diesel::{insert_into, MysqlConnection, SqliteConnection};
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use diesel::result::{DatabaseErrorKind, Error as DieselError};
use futures::Future;
use futures_ext::{asynchronize, BoxFuture, FutureExt};
use mercurial_types::{HgChangesetId, RepositoryId};
use mononoke_types::ChangesetId;
use stats::Timeseries;
mod errors;
mod models;
mod schema;
pub use errors::*;
use models::BonsaiHgMappingRow;
use schema::bonsai_hg_mapping;
define_stats! {
prefix = "mononoke.bonsai-hg-mapping";
gets: timeseries(RATE, SUM),
gets_master: timeseries(RATE, SUM),
adds: timeseries(RATE, SUM),
}
#[derive(Clone, Debug, Eq, Hash, HeapSizeOf, PartialEq)]
pub struct BonsaiHgMappingEntry {
pub repo_id: RepositoryId,
pub hg_cs_id: HgChangesetId,
pub bcs_id: ChangesetId,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, HeapSizeOf)]
pub enum BonsaiOrHgChangesetId {
Bonsai(ChangesetId),
Hg(HgChangesetId),
}
impl From<ChangesetId> for BonsaiOrHgChangesetId {
fn from(cs_id: ChangesetId) -> Self {
BonsaiOrHgChangesetId::Bonsai(cs_id)
}
}
impl From<HgChangesetId> for BonsaiOrHgChangesetId {
fn from(cs_id: HgChangesetId) -> Self {
BonsaiOrHgChangesetId::Hg(cs_id)
}
}
pub trait BonsaiHgMapping: Send + Sync {
fn add(&self, entry: BonsaiHgMappingEntry) -> BoxFuture<bool, Error>;
fn get(
&self,
repo_id: RepositoryId,
cs_id: BonsaiOrHgChangesetId,
) -> BoxFuture<Option<BonsaiHgMappingEntry>, Error>;
fn get_hg_from_bonsai(
&self,
repo_id: RepositoryId,
cs_id: ChangesetId,
) -> BoxFuture<Option<HgChangesetId>, Error> {
self.get(repo_id, cs_id.into())
.map(|result| result.map(|entry| entry.hg_cs_id))
.boxify()
}
fn get_bonsai_from_hg(
&self,
repo_id: RepositoryId,
cs_id: HgChangesetId,
) -> BoxFuture<Option<ChangesetId>, Error> {
self.get(repo_id, cs_id.into())
.map(|result| result.map(|entry| entry.bcs_id))
.boxify()
}
}
impl BonsaiHgMapping for Arc<BonsaiHgMapping> {
fn add(&self, entry: BonsaiHgMappingEntry) -> BoxFuture<bool, Error> {
(**self).add(entry)
}
fn get(
&self,
repo_id: RepositoryId,
cs_id: BonsaiOrHgChangesetId,
) -> BoxFuture<Option<BonsaiHgMappingEntry>, Error> {
(**self).get(repo_id, cs_id)
}
}
pub struct CachingBonsaiHgMapping {
mapping: Arc<BonsaiHgMapping>,
cache: asyncmemo::Asyncmemo<BonsaiHgMappingFiller>,
}
impl CachingBonsaiHgMapping {
pub fn new(mapping: Arc<BonsaiHgMapping>, sizelimit: usize) -> Self {
let cache = asyncmemo::Asyncmemo::with_limits(
"bonsai-hg-mapping",
BonsaiHgMappingFiller::new(mapping.clone()),
std::usize::MAX,
sizelimit,
);
Self { mapping, cache }
}
}
impl BonsaiHgMapping for CachingBonsaiHgMapping {
fn add(&self, entry: BonsaiHgMappingEntry) -> BoxFuture<bool, Error> {
self.mapping.add(entry)
}
fn get(
&self,
repo_id: RepositoryId,
cs: BonsaiOrHgChangesetId,
) -> BoxFuture<Option<BonsaiHgMappingEntry>, Error> {
self.cache
.get((repo_id, cs.into()))
.then(|val| match val {
Ok(val) => Ok(Some(val)),
Err(Some(err)) => Err(err),
Err(None) => Ok(None),
})
.boxify()
}
}
pub struct BonsaiHgMappingFiller {
mapping: Arc<BonsaiHgMapping>,
}
impl BonsaiHgMappingFiller {
fn new(mapping: Arc<BonsaiHgMapping>) -> Self {
BonsaiHgMappingFiller { mapping }
}
}
impl Filler for BonsaiHgMappingFiller {
type Key = (RepositoryId, BonsaiOrHgChangesetId);
type Value = Box<Future<Item = BonsaiHgMappingEntry, Error = Option<Error>> + Send>;
fn fill(&self, _cache: &Asyncmemo<Self>, &(ref repo_id, ref cs_id): &Self::Key) -> Self::Value {
self.mapping
.get(*repo_id, *cs_id)
.map_err(|err| Some(err))
.and_then(|res| match res {
Some(val) => Ok(val),
None => Err(None),
})
.boxify()
}
}
impl Weight for BonsaiOrHgChangesetId {
fn get_weight(&self) -> usize {
match self {
&BonsaiOrHgChangesetId::Bonsai(ref id) => id.get_weight(),
&BonsaiOrHgChangesetId::Hg(ref id) => id.get_weight(),
}
}
}
impl Weight for BonsaiHgMappingEntry {
fn get_weight(&self) -> usize {
self.repo_id.get_weight() + self.hg_cs_id.get_weight() + self.bcs_id.get_weight()
}
}
#[derive(Clone)]
pub struct SqliteBonsaiHgMapping {
inner: SqliteConnInner,
}
impl SqliteBonsaiHgMapping {
fn from(inner: SqliteConnInner) -> Self {
Self { inner }
}
fn get_up_query() -> &'static str {
include_str!("../schemas/sqlite-bonsai-hg-mapping.sql")
}
/// Create a new in-memory empty database. Great for tests.
pub fn in_memory() -> Result<Self> {
Ok(Self::from(SqliteConnInner::in_memory(
Self::get_up_query(),
)?))
}
pub fn open_or_create<P: AsRef<str>>(path: P) -> Result<Self> {
Ok(Self::from(SqliteConnInner::open_or_create(
path,
Self::get_up_query(),
)?))
}
fn get_conn(&self) -> result::Result<MutexGuard<SqliteConnection>, !> {
self.inner.get_conn()
}
fn get_master_conn(&self) -> result::Result<MutexGuard<SqliteConnection>, !> {
self.inner.get_master_conn()
}
}
#[derive(Clone)]
pub struct MysqlBonsaiHgMapping {
inner: MysqlConnInner,
}
impl MysqlBonsaiHgMapping {
fn from(inner: MysqlConnInner) -> Self {
Self { inner }
}
pub fn open(db_address: &str) -> Result<Self> {
Ok(Self::from(MysqlConnInner::open(db_address)?))
}
fn get_up_query() -> &'static str {
include_str!("../schemas/mysql-bonsai-hg-mapping.sql")
}
pub fn create_test_db<P: AsRef<str>>(prefix: P) -> Result<Self> {
Ok(Self::from(MysqlConnInner::create_test_db(
prefix,
Self::get_up_query(),
)?))
}
fn get_conn(&self) -> Result<PooledConnection<ConnectionManager<MysqlConnection>>> {
self.inner.get_conn()
}
fn get_master_conn(&self) -> Result<PooledConnection<ConnectionManager<MysqlConnection>>> {
self.inner.get_master_conn()
}
}
/// Using a macro here is unfortunate, but it appears to be the only way to share this code
/// between SQLite and MySQL.
/// See https://github.com/diesel-rs/diesel/issues/882#issuecomment-300257476
macro_rules! impl_bonsai_hg_mapping {
($struct:ty, $connection:ty) => {
impl BonsaiHgMapping for $struct {
fn get(
&self,
repo_id: RepositoryId,
cs_id: BonsaiOrHgChangesetId,
) -> BoxFuture<Option<BonsaiHgMappingEntry>, Error> {
STATS::gets.add_value(1);
let db = self.clone();
asynchronize(move || {
let result = {
let connection = db.get_conn()?;
Self::actual_get(&connection, repo_id, cs_id)?
};
if result.is_none() {
STATS::gets_master.add_value(1);
let connection = db.get_master_conn()?;
Self::actual_get(&connection, repo_id, cs_id)
} else {
Ok(result)
}
})
}
fn add(&self, entry: BonsaiHgMappingEntry) -> BoxFuture<bool, Error> {
STATS::adds.add_value(1);
let db = self.clone();
asynchronize(move || {
let connection = db.get_master_conn()?;
let BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
} = entry.clone();
let result = insert_into(bonsai_hg_mapping::table)
.values(BonsaiHgMappingRow {
repo_id,
hg_cs_id,
bcs_id,
})
.execute(&*connection);
match result {
Ok(_) => Ok(true),
Err(
err @ DieselError::DatabaseError(DatabaseErrorKind::UniqueViolation, _),
) => {
let entry_by_bcs =
Self::actual_get(&connection, repo_id, bcs_id.into())?;
let entry_by_hgcs =
Self::actual_get(&connection, repo_id, hg_cs_id.into())?;
match entry_by_bcs.or(entry_by_hgcs) {
Some(ref stored_entry) if stored_entry == &entry => Ok(false),
Some(stored_entry) => {
Err(ErrorKind::ConflictingEntries(stored_entry.clone(), entry)
.into())
}
_ => Err(err.into()),
}
}
Err(err) => Err(err.into()),
}
})
}
}
impl $struct {
fn actual_get(
connection: &$connection,
repo_id: RepositoryId,
cs_id: BonsaiOrHgChangesetId,
) -> Result<Option<BonsaiHgMappingEntry>> {
let query = match cs_id {
BonsaiOrHgChangesetId::Bonsai(id) => bonsai_hg_mapping::table
.filter(bonsai_hg_mapping::repo_id.eq(repo_id))
.filter(bonsai_hg_mapping::bcs_id.eq(id))
.limit(1)
.into_boxed(),
BonsaiOrHgChangesetId::Hg(id) => bonsai_hg_mapping::table
.filter(bonsai_hg_mapping::repo_id.eq(repo_id))
.filter(bonsai_hg_mapping::hg_cs_id.eq(id))
.limit(1)
.into_boxed(),
};
query
.first::<BonsaiHgMappingRow>(connection)
.optional()
.map_err(failure::Error::from)
.and_then(|row| match row {
None => Ok(None),
Some(row) => {
let BonsaiHgMappingRow {
repo_id,
hg_cs_id,
bcs_id,
} = row;
Ok(Some(BonsaiHgMappingEntry {
repo_id,
hg_cs_id,
bcs_id,
}))
}
})
}
}
};
}
impl_bonsai_hg_mapping!(MysqlBonsaiHgMapping, MysqlConnection);
impl_bonsai_hg_mapping!(SqliteBonsaiHgMapping, SqliteConnection);

View File

@ -0,0 +1,19 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use mercurial_types::{HgChangesetId, RepositoryId};
use mononoke_types::ChangesetId;
use schema::bonsai_hg_mapping;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[derive(Queryable, Insertable)]
#[table_name = "bonsai_hg_mapping"]
pub(crate) struct BonsaiHgMappingRow {
pub repo_id: RepositoryId,
pub hg_cs_id: HgChangesetId,
pub bcs_id: ChangesetId,
}

View File

@ -0,0 +1,21 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
//! The `table!` macros in this module describe the schemas for these tables in SQL storage
//! (MySQL or SQLite). These descriptions are *not* the source of truth, so if the schema ever
//! changes it will need to be updated here as well.
table! {
use diesel::sql_types::Integer;
use mercurial_types::sql_types::HgChangesetIdSql;
use mononoke_types::sql_types::ChangesetIdSql;
bonsai_hg_mapping (repo_id, bcs_id) {
repo_id -> Integer,
hg_cs_id -> HgChangesetIdSql,
bcs_id -> ChangesetIdSql,
}
}

View File

@ -0,0 +1,168 @@
// Copyright (c) 2017-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
//! Tests for the Changesets store.
#![deny(warnings)]
#[macro_use]
extern crate assert_matches;
extern crate async_unit;
extern crate failure_ext as failure;
extern crate futures;
extern crate bonsai_hg_mapping;
extern crate mercurial_types_mocks;
extern crate mononoke_types_mocks;
use std::sync::Arc;
use futures::Future;
use bonsai_hg_mapping::{BonsaiHgMapping, BonsaiHgMappingEntry, ErrorKind, MysqlBonsaiHgMapping,
SqliteBonsaiHgMapping};
use mercurial_types_mocks::nodehash as hg;
use mercurial_types_mocks::repo::REPO_ZERO;
use mononoke_types_mocks::changesetid as bonsai;
fn add_and_get<M: BonsaiHgMapping>(mapping: M) {
let entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::ONES_CSID,
};
assert_eq!(
true,
mapping
.add(entry.clone())
.wait()
.expect("Adding new entry failed")
);
assert_eq!(
false,
mapping
.add(entry.clone())
.wait()
.expect("Adding same entry failed")
);
let result = mapping
.get(REPO_ZERO, hg::ONES_CSID.into())
.wait()
.expect("Get failed");
assert_eq!(result, Some(entry.clone()));
let result = mapping
.get_hg_from_bonsai(REPO_ZERO, bonsai::ONES_CSID)
.wait()
.expect("Failed to get hg changeset by its bonsai counterpart");
assert_eq!(result, Some(hg::ONES_CSID));
let result = mapping
.get_bonsai_from_hg(REPO_ZERO, hg::ONES_CSID)
.wait()
.expect("Failed to get bonsai changeset by its hg counterpart");
assert_eq!(result, Some(bonsai::ONES_CSID));
let same_bc_entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::TWOS_CSID, // differ from entry.hg_cs_id
bcs_id: bonsai::ONES_CSID,
};
let result = mapping
.add(same_bc_entry.clone())
.wait()
.expect_err("Conflicting entries should haved produced an error");
assert_matches!(
result.downcast::<ErrorKind>(),
Ok(ErrorKind::ConflictingEntries(ref e0, ref e1)) if e0 == &entry && e1 == &same_bc_entry
);
let same_hg_entry = BonsaiHgMappingEntry {
repo_id: REPO_ZERO,
hg_cs_id: hg::ONES_CSID,
bcs_id: bonsai::TWOS_CSID, // differ from entry.bcs_id
};
let result = mapping
.add(same_hg_entry.clone())
.wait()
.expect_err("Conflicting entries should haved produced an error");
assert_matches!(
result.downcast::<ErrorKind>(),
Ok(ErrorKind::ConflictingEntries(ref e0, ref e1)) if e0 == &entry && e1 == &same_hg_entry
);
}
fn missing<M: BonsaiHgMapping>(mapping: M) {
let result = mapping
.get(REPO_ZERO, bonsai::ONES_CSID.into())
.wait()
.expect("Failed to fetch missing changeset (should succeed with None instead)");
assert_eq!(result, None);
}
macro_rules! bonsai_hg_mapping_test_impl {
($mod_name:ident => { new: $new_cb:expr, }) => {
mod $mod_name {
use super::*;
#[test]
fn test_add_and_get() {
async_unit::tokio_unit_test(|| {
add_and_get($new_cb());
});
}
#[test]
fn test_missing() {
async_unit::tokio_unit_test(|| {
missing($new_cb());
});
}
}
};
}
bonsai_hg_mapping_test_impl! {
sqlite_test => {
new: new_sqlite,
}
}
bonsai_hg_mapping_test_impl! {
sqlite_arced_test => {
new: new_sqlite_arced,
}
}
bonsai_hg_mapping_test_impl! {
mysql_test => {
new: new_mysql,
}
}
bonsai_hg_mapping_test_impl! {
mysql_arced_test => {
new: new_mysql_arced,
}
}
fn new_sqlite() -> SqliteBonsaiHgMapping {
let db =
SqliteBonsaiHgMapping::in_memory().expect("Creating an in-memory SQLite database failed");
db
}
fn new_sqlite_arced() -> Arc<BonsaiHgMapping> {
Arc::new(new_sqlite())
}
fn new_mysql() -> MysqlBonsaiHgMapping {
MysqlBonsaiHgMapping::create_test_db("bonsai_hg_mapping_test")
.expect("Failed to create test database")
}
fn new_mysql_arced() -> Arc<BonsaiHgMapping> {
Arc::new(new_mysql())
}

View File

@ -19,6 +19,8 @@ extern crate blake2;
extern crate bytes;
extern crate chrono;
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate failure_ext as failure;
extern crate heapsize;
#[macro_use]
@ -44,10 +46,11 @@ pub mod datetime;
pub mod errors;
pub mod file_change;
pub mod file_contents;
pub mod generation;
pub mod hash;
pub mod path;
pub mod sql_types;
pub mod typed_hash;
pub mod generation;
pub use blob::{Blob, BlobstoreBytes, BlobstoreValue, ChangesetBlob, ContentBlob};
pub use bonsai_changeset::BonsaiChangeset;

View File

@ -0,0 +1,38 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::io::Write;
use diesel::backend::Backend;
use diesel::deserialize::{self, FromSql};
use diesel::serialize::{self, IsNull, Output, ToSql};
use diesel::sql_types::Binary;
use failure::ResultExt;
use typed_hash::ChangesetId;
#[derive(QueryId, SqlType)]
#[mysql_type = "Blob"]
#[sqlite_type = "Binary"]
pub struct ChangesetIdSql;
impl<DB: Backend> ToSql<ChangesetIdSql, DB> for ChangesetId {
fn to_sql<W: Write>(&self, out: &mut Output<W, DB>) -> serialize::Result {
out.write_all(self.as_ref())?;
Ok(IsNull::No)
}
}
impl<DB: Backend> FromSql<ChangesetIdSql, DB> for ChangesetId
where
*const [u8]: FromSql<Binary, DB>,
{
fn from_sql(bytes: Option<&DB::RawValue>) -> deserialize::Result<Self> {
let raw_bytes: *const [u8] = FromSql::<Binary, DB>::from_sql(bytes)?;
let raw_bytes: &[u8] = unsafe { &*raw_bytes };
Ok(ChangesetId::from_bytes(raw_bytes).compat()?)
}
}

View File

@ -8,6 +8,7 @@ use std::fmt::{self, Display};
use std::str::FromStr;
use ascii::{AsciiStr, AsciiString};
use asyncmemo;
use quickcheck::{empty_shrinker, Arbitrary, Gen};
use blob::BlobstoreValue;
@ -15,6 +16,7 @@ use bonsai_changeset::BonsaiChangeset;
use errors::*;
use file_contents::FileContents;
use hash::{Blake2, Context};
use sql_types::ChangesetIdSql;
use thrift;
// There is no NULL_HASH for typed hashes. Any places that need a null hash should use an
@ -34,7 +36,8 @@ pub trait MononokeId: Copy + Send + 'static {
/// An identifier for a changeset in Mononoke.
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug, Hash)]
#[derive(HeapSizeOf)]
#[derive(HeapSizeOf, FromSqlRow, AsExpression)]
#[sql_type = "ChangesetIdSql"]
pub struct ChangesetId(Blake2);
/// An identifier for file contents in Mononoke.
@ -140,6 +143,12 @@ macro_rules! impl_typed_hash {
}
}
impl asyncmemo::Weight for $typed {
fn get_weight(&self) -> usize {
::std::mem::size_of::<Blake2>()
}
}
impl MononokeId for $typed {
type Value = $value_type;