mononoke: add synced_working_copy_equivalence table

Summary:
This table is needed for backsyncer to track commits that are "equivalent"
between large ans small repo i.e. the closest ancestor ancestor of large commit
that was rewritten in the small repo.

It's very similar to normal `synced_commit_mapping`, but with a few important differences:

1) Small bcs id can be NULL, meaning that commit from large repo doesn't remap to any commit from the small repo [1]
2) Multiple large bcs ids can point to the same small bcs id
3) Inserting new entry in synced_commit_mapping  automatically inserts entry in synced_working_copy_equivalence - if a commit was remapped, then it's by definition an equivalent working copy

Reviewed By: ikostia

Differential Revision: D18172894

fbshipit-source-id: 14d5f692521282223778e8c9ad46191d0e24d618
This commit is contained in:
Stanislau Hlebik 2019-10-29 07:09:31 -07:00 committed by Facebook Github Bot
parent f2ecf19d90
commit 3bebf38753
3 changed files with 351 additions and 15 deletions

View File

@ -1,5 +1,5 @@
CREATE TABLE `synced_commit_mapping` (
`mapping_id` INTEGER PRIMARY KEY,
`mapping_id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
`small_repo_id` int(11) NOT NULL,
`small_bcs_id` binary(32) NOT NULL,
`large_repo_id` int(11) NOT NULL,
@ -7,3 +7,16 @@ CREATE TABLE `synced_commit_mapping` (
UNIQUE (`large_repo_id`,`small_repo_id`,`small_bcs_id`),
UNIQUE (`small_repo_id`,`large_repo_id`,`large_bcs_id`)
);
CREATE TABLE `synced_working_copy_equivalence` (
`mapping_id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
`small_repo_id` int(11) NOT NULL,
`small_bcs_id` binary(32),
`large_repo_id` int(11) NOT NULL,
`large_bcs_id` binary(32) NOT NULL,
UNIQUE (`large_repo_id`,`small_repo_id`,`large_bcs_id`)
);
-- Small bcs id can map to multiple large bcs ids
CREATE INDEX small_bcs_key ON synced_working_copy_equivalence
(`large_repo_id`,`small_repo_id`,`small_bcs_id`);

View File

@ -15,19 +15,33 @@ pub use sql_ext::SqlConstructors;
use cloned::cloned;
use context::CoreContext;
use failure_ext::Error;
use failure_ext::{Error, Fail};
use futures::{future, Future};
use futures_ext::{BoxFuture, FutureExt};
use mononoke_types::{ChangesetId, RepositoryId};
use sql::queries;
use stats::{define_stats, Timeseries};
#[derive(Debug, Eq, Fail, PartialEq)]
pub enum ErrorKind {
#[fail(
display = "tried to insert inconsistent small bcs id {:?}, while db has {:?}",
_0, _1
)]
InconsistentWorkingCopyEntry {
expected: Option<ChangesetId>,
actual: Option<ChangesetId>,
},
}
// TODO(simonfar): Once we've proven the concept, we want to cache these
define_stats! {
prefix = "mononoke.synced_commit_mapping";
gets: timeseries(RATE, SUM),
gets_master: timeseries(RATE, SUM),
adds: timeseries(RATE, SUM),
insert_working_copy_eqivalence: timeseries(RATE, SUM),
get_equivalent_working_copy: timeseries(RATE, SUM),
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
@ -54,6 +68,23 @@ impl SyncedCommitMappingEntry {
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct EquivalentWorkingCopyEntry {
pub large_repo_id: RepositoryId,
pub large_bcs_id: ChangesetId,
pub small_repo_id: RepositoryId,
pub small_bcs_id: Option<ChangesetId>,
}
#[derive(Debug, PartialEq, Eq)]
pub enum WorkingCopyEquivalence {
/// There's no matching working copy. It can happen if a pre-big-merge commit from one small
/// repo is mapped into another small repo
NoWorkingCopy,
/// ChangesetId of matching working copy.
WorkingCopy(ChangesetId),
}
pub trait SyncedCommitMapping: Send + Sync {
/// Given the full large, small mapping, store it in the DB.
/// Future resolves to true if the mapping was saved, false otherwise
@ -67,6 +98,28 @@ pub trait SyncedCommitMapping: Send + Sync {
bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> BoxFuture<Option<ChangesetId>, Error>;
/// Inserts equivalent working copy of a large bcs id. It's similar to mapping entry,
/// however there are a few differences:
/// 1) For (large repo, small repo) pair, many large commits can map to the same small commit
/// 2) Small commit can be null
///
/// If there's a mapping between small and large commits, then equivalent working copy is
/// the same as the same as the mapping.
fn insert_equivalent_working_copy(
&self,
ctx: CoreContext,
entry: EquivalentWorkingCopyEntry,
) -> BoxFuture<bool, Error>;
/// Finds equivalent working copy
fn get_equivalent_working_copy(
&self,
ctx: CoreContext,
source_repo_id: RepositoryId,
source_bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> BoxFuture<Option<WorkingCopyEquivalence>, Error>;
}
impl SyncedCommitMapping for Arc<dyn SyncedCommitMapping> {
@ -83,6 +136,24 @@ impl SyncedCommitMapping for Arc<dyn SyncedCommitMapping> {
) -> BoxFuture<Option<ChangesetId>, Error> {
(**self).get(ctx, source_repo_id, bcs_id, target_repo_id)
}
fn insert_equivalent_working_copy(
&self,
ctx: CoreContext,
entry: EquivalentWorkingCopyEntry,
) -> BoxFuture<bool, Error> {
(**self).insert_equivalent_working_copy(ctx, entry)
}
fn get_equivalent_working_copy(
&self,
ctx: CoreContext,
source_repo_id: RepositoryId,
source_bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> BoxFuture<Option<WorkingCopyEquivalence>, Error> {
(**self).get_equivalent_working_copy(ctx, source_repo_id, source_bcs_id, target_repo_id)
}
}
#[derive(Clone)]
@ -113,6 +184,30 @@ queries! {
WHERE (large_repo_id = {source_repo_id} AND large_bcs_id = {bcs_id} AND small_repo_id = {target_repo_id}) OR
(small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id} AND large_repo_id = {target_repo_id})"
}
write InsertWorkingCopyEquivalence(values: (
large_repo_id: RepositoryId,
large_bcs_id: ChangesetId,
small_repo_id: RepositoryId,
small_bcs_id: Option<ChangesetId>,
)) {
insert_or_ignore,
"{insert_or_ignore} INTO synced_working_copy_equivalence (large_repo_id, large_bcs_id, small_repo_id, small_bcs_id) VALUES {values}"
}
read SelectWorkingCopyEquivalence(
source_repo_id: RepositoryId,
bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> (RepositoryId, ChangesetId, RepositoryId, Option<ChangesetId>) {
"SELECT large_repo_id, large_bcs_id, small_repo_id, small_bcs_id
FROM synced_working_copy_equivalence
WHERE (large_repo_id = {source_repo_id} AND small_repo_id = {target_repo_id} AND large_bcs_id = {bcs_id})
OR (large_repo_id = {target_repo_id} AND small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id})
ORDER BY mapping_id ASC
LIMIT 1
"
}
}
impl SqlConstructors for SqlSyncedCommitMapping {
@ -146,18 +241,35 @@ impl SyncedCommitMapping for SqlSyncedCommitMapping {
small_bcs_id,
} = entry;
InsertMapping::query(
&self.write_connection,
&[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)],
)
.and_then(move |result| {
if result.affected_rows() == 1 {
Ok(true)
} else {
Ok(false)
}
})
.boxify()
self.write_connection
.start_transaction()
.and_then(move |txn| {
InsertMapping::query_with_transaction(
txn,
&[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)],
)
.and_then(move |(txn, _result)| {
InsertWorkingCopyEquivalence::query_with_transaction(
txn,
&[(
&large_repo_id,
&large_bcs_id,
&small_repo_id,
&Some(small_bcs_id),
)],
)
.and_then(|(txn, result)| {
txn.commit().map(move |()| {
if result.affected_rows() == 1 {
true
} else {
false
}
})
})
})
})
.boxify()
}
fn get(
@ -206,4 +318,108 @@ impl SyncedCommitMapping for SqlSyncedCommitMapping {
})
.boxify()
}
fn insert_equivalent_working_copy(
&self,
ctx: CoreContext,
entry: EquivalentWorkingCopyEntry,
) -> BoxFuture<bool, Error> {
STATS::insert_working_copy_eqivalence.add_value(1);
let EquivalentWorkingCopyEntry {
large_repo_id,
large_bcs_id,
small_repo_id,
small_bcs_id,
} = entry;
let this = self.clone();
InsertWorkingCopyEquivalence::query(
&self.write_connection,
&[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)],
)
.and_then(move |result| {
if result.affected_rows() == 1 {
future::ok(true).left_future()
} else {
// Check that db stores consistent entry
this.get_equivalent_working_copy(
ctx.clone(),
large_repo_id,
large_bcs_id,
small_repo_id,
)
.and_then(move |maybe_equivalent_wc| {
if let Some(equivalent_wc) = maybe_equivalent_wc {
use WorkingCopyEquivalence::*;
let expected_small_bcs_id = match equivalent_wc {
WorkingCopy(wc) => Some(wc),
NoWorkingCopy => None,
};
if expected_small_bcs_id != small_bcs_id {
let err = ErrorKind::InconsistentWorkingCopyEntry {
actual: small_bcs_id,
expected: expected_small_bcs_id,
};
return Err(err.into());
}
}
Ok(false)
})
.right_future()
}
})
.boxify()
}
fn get_equivalent_working_copy(
&self,
_ctx: CoreContext,
source_repo_id: RepositoryId,
source_bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> BoxFuture<Option<WorkingCopyEquivalence>, Error> {
STATS::get_equivalent_working_copy.add_value(1);
cloned!(self.read_master_connection);
SelectWorkingCopyEquivalence::query(
&self.read_connection,
&source_repo_id,
&source_bcs_id,
&target_repo_id,
)
.and_then(move |rows| {
if rows.len() >= 1 {
future::ok(rows.get(0).cloned()).left_future()
} else {
SelectWorkingCopyEquivalence::query(
&read_master_connection,
&source_repo_id,
&source_bcs_id,
&target_repo_id,
)
.map(|rows| rows.get(0).cloned())
.right_future()
}
})
.map(move |maybe_row| match maybe_row {
Some(row) => {
let (large_repo_id, large_bcs_id, _small_repo_id, maybe_small_bcs_id) = row;
if target_repo_id == large_repo_id {
Some(WorkingCopyEquivalence::WorkingCopy(large_bcs_id))
} else {
match maybe_small_bcs_id {
Some(small_bcs_id) => {
Some(WorkingCopyEquivalence::WorkingCopy(small_bcs_id))
}
None => Some(WorkingCopyEquivalence::NoWorkingCopy),
}
}
}
None => None,
})
.boxify()
}
}

View File

@ -17,7 +17,8 @@ use context::CoreContext;
use mononoke_types_mocks::changesetid as bonsai;
use mononoke_types_mocks::repo::{REPO_ONE, REPO_ZERO};
use synced_commit_mapping::{
SqlConstructors, SqlSyncedCommitMapping, SyncedCommitMapping, SyncedCommitMappingEntry,
EquivalentWorkingCopyEntry, SqlConstructors, SqlSyncedCommitMapping, SyncedCommitMapping,
SyncedCommitMappingEntry, WorkingCopyEquivalence,
};
fn add_and_get<M: SyncedCommitMapping>(fb: FacebookInit, mapping: M) {
@ -39,6 +40,37 @@ fn add_and_get<M: SyncedCommitMapping>(fb: FacebookInit, mapping: M) {
.expect("Adding same entry failed")
);
let res = mapping
.get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE)
.wait()
.expect("get equivalent wc failed, should succeed");
assert_eq!(
res,
Some(WorkingCopyEquivalence::WorkingCopy(bonsai::TWOS_CSID))
);
// insert again
let entry =
SyncedCommitMappingEntry::new(REPO_ZERO, bonsai::THREES_CSID, REPO_ONE, bonsai::FOURS_CSID);
assert_eq!(
true,
mapping
.add(ctx.clone(), entry.clone())
.wait()
.expect("Adding new entry failed")
);
let res = mapping
.get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::THREES_CSID, REPO_ONE)
.wait()
.expect("get equivalent wc failed, should succeed");
assert_eq!(
res,
Some(WorkingCopyEquivalence::WorkingCopy(bonsai::FOURS_CSID))
);
let result = mapping
.get(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE)
.wait()
@ -60,6 +92,74 @@ fn missing<M: SyncedCommitMapping>(fb: FacebookInit, mapping: M) {
assert_eq!(result, None);
}
fn equivalent_working_copy<M: SyncedCommitMapping>(fb: FacebookInit, mapping: M) {
let ctx = CoreContext::test_mock(fb);
let result = mapping
.get_equivalent_working_copy(ctx.clone(), REPO_ONE, bonsai::TWOS_CSID, REPO_ZERO)
.wait()
.expect("Failed to fetch equivalent working copy (should succeed with None instead)");
assert_eq!(result, None);
let entry = EquivalentWorkingCopyEntry {
large_repo_id: REPO_ZERO,
large_bcs_id: bonsai::ONES_CSID,
small_repo_id: REPO_ONE,
small_bcs_id: Some(bonsai::TWOS_CSID),
};
let result = mapping
.insert_equivalent_working_copy(ctx.clone(), entry.clone())
.wait()
.expect("Failed to insert working copy");
assert_eq!(result, true);
let result = mapping
.insert_equivalent_working_copy(ctx.clone(), entry)
.wait()
.expect("Failed to insert working copy");
assert_eq!(result, false);
let res = mapping
.get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE)
.wait()
.expect("get equivalent wc failed, should succeed");
assert_eq!(
res,
Some(WorkingCopyEquivalence::WorkingCopy(bonsai::TWOS_CSID))
);
let null_entry = EquivalentWorkingCopyEntry {
large_repo_id: REPO_ZERO,
large_bcs_id: bonsai::THREES_CSID,
small_repo_id: REPO_ONE,
small_bcs_id: None,
};
let result = mapping
.insert_equivalent_working_copy(ctx.clone(), null_entry)
.wait()
.expect("Failed to insert working copy");
assert_eq!(result, true);
let res = mapping
.get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::THREES_CSID, REPO_ONE)
.wait()
.expect("get equivalent wc failed, should succeed");
assert_eq!(res, Some(WorkingCopyEquivalence::NoWorkingCopy));
let should_fail = EquivalentWorkingCopyEntry {
large_repo_id: REPO_ZERO,
large_bcs_id: bonsai::THREES_CSID,
small_repo_id: REPO_ONE,
small_bcs_id: Some(bonsai::TWOS_CSID),
};
assert!(mapping
.insert_equivalent_working_copy(ctx.clone(), should_fail)
.wait()
.is_err());
}
#[fbinit::test]
fn test_add_and_get(fb: FacebookInit) {
async_unit::tokio_unit_test(move || {
@ -73,3 +173,10 @@ fn test_missing(fb: FacebookInit) {
missing(fb, SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap())
});
}
#[fbinit::test]
fn test_equivalent_working_copy(fb: FacebookInit) {
async_unit::tokio_unit_test(move || {
equivalent_working_copy(fb, SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap())
});
}