diff --git a/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql b/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql index 2fd6cf615f..15e42ccfe8 100644 --- a/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql +++ b/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql @@ -1,5 +1,5 @@ CREATE TABLE `synced_commit_mapping` ( - `mapping_id` INTEGER PRIMARY KEY, + `mapping_id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, `small_repo_id` int(11) NOT NULL, `small_bcs_id` binary(32) NOT NULL, `large_repo_id` int(11) NOT NULL, @@ -7,3 +7,16 @@ CREATE TABLE `synced_commit_mapping` ( UNIQUE (`large_repo_id`,`small_repo_id`,`small_bcs_id`), UNIQUE (`small_repo_id`,`large_repo_id`,`large_bcs_id`) ); + +CREATE TABLE `synced_working_copy_equivalence` ( + `mapping_id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + `small_repo_id` int(11) NOT NULL, + `small_bcs_id` binary(32), + `large_repo_id` int(11) NOT NULL, + `large_bcs_id` binary(32) NOT NULL, + UNIQUE (`large_repo_id`,`small_repo_id`,`large_bcs_id`) +); + + -- Small bcs id can map to multiple large bcs ids + CREATE INDEX small_bcs_key ON synced_working_copy_equivalence + (`large_repo_id`,`small_repo_id`,`small_bcs_id`); diff --git a/commit_rewriting/synced_commit_mapping/src/lib.rs b/commit_rewriting/synced_commit_mapping/src/lib.rs index 299b7149ab..b616c6f66a 100644 --- a/commit_rewriting/synced_commit_mapping/src/lib.rs +++ b/commit_rewriting/synced_commit_mapping/src/lib.rs @@ -15,19 +15,33 @@ pub use sql_ext::SqlConstructors; use cloned::cloned; use context::CoreContext; -use failure_ext::Error; +use failure_ext::{Error, Fail}; use futures::{future, Future}; use futures_ext::{BoxFuture, FutureExt}; use mononoke_types::{ChangesetId, RepositoryId}; use sql::queries; use stats::{define_stats, Timeseries}; +#[derive(Debug, Eq, Fail, PartialEq)] +pub enum ErrorKind { + #[fail( + display = "tried to insert inconsistent small bcs id {:?}, while db has {:?}", + _0, _1 + )] + InconsistentWorkingCopyEntry { + expected: Option, + actual: Option, + }, +} + // TODO(simonfar): Once we've proven the concept, we want to cache these define_stats! { prefix = "mononoke.synced_commit_mapping"; gets: timeseries(RATE, SUM), gets_master: timeseries(RATE, SUM), adds: timeseries(RATE, SUM), + insert_working_copy_eqivalence: timeseries(RATE, SUM), + get_equivalent_working_copy: timeseries(RATE, SUM), } #[derive(Clone, Debug, Eq, Hash, PartialEq)] @@ -54,6 +68,23 @@ impl SyncedCommitMappingEntry { } } +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct EquivalentWorkingCopyEntry { + pub large_repo_id: RepositoryId, + pub large_bcs_id: ChangesetId, + pub small_repo_id: RepositoryId, + pub small_bcs_id: Option, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum WorkingCopyEquivalence { + /// There's no matching working copy. It can happen if a pre-big-merge commit from one small + /// repo is mapped into another small repo + NoWorkingCopy, + /// ChangesetId of matching working copy. + WorkingCopy(ChangesetId), +} + pub trait SyncedCommitMapping: Send + Sync { /// Given the full large, small mapping, store it in the DB. /// Future resolves to true if the mapping was saved, false otherwise @@ -67,6 +98,28 @@ pub trait SyncedCommitMapping: Send + Sync { bcs_id: ChangesetId, target_repo_id: RepositoryId, ) -> BoxFuture, Error>; + + /// Inserts equivalent working copy of a large bcs id. It's similar to mapping entry, + /// however there are a few differences: + /// 1) For (large repo, small repo) pair, many large commits can map to the same small commit + /// 2) Small commit can be null + /// + /// If there's a mapping between small and large commits, then equivalent working copy is + /// the same as the same as the mapping. + fn insert_equivalent_working_copy( + &self, + ctx: CoreContext, + entry: EquivalentWorkingCopyEntry, + ) -> BoxFuture; + + /// Finds equivalent working copy + fn get_equivalent_working_copy( + &self, + ctx: CoreContext, + source_repo_id: RepositoryId, + source_bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> BoxFuture, Error>; } impl SyncedCommitMapping for Arc { @@ -83,6 +136,24 @@ impl SyncedCommitMapping for Arc { ) -> BoxFuture, Error> { (**self).get(ctx, source_repo_id, bcs_id, target_repo_id) } + + fn insert_equivalent_working_copy( + &self, + ctx: CoreContext, + entry: EquivalentWorkingCopyEntry, + ) -> BoxFuture { + (**self).insert_equivalent_working_copy(ctx, entry) + } + + fn get_equivalent_working_copy( + &self, + ctx: CoreContext, + source_repo_id: RepositoryId, + source_bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> BoxFuture, Error> { + (**self).get_equivalent_working_copy(ctx, source_repo_id, source_bcs_id, target_repo_id) + } } #[derive(Clone)] @@ -113,6 +184,30 @@ queries! { WHERE (large_repo_id = {source_repo_id} AND large_bcs_id = {bcs_id} AND small_repo_id = {target_repo_id}) OR (small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id} AND large_repo_id = {target_repo_id})" } + + write InsertWorkingCopyEquivalence(values: ( + large_repo_id: RepositoryId, + large_bcs_id: ChangesetId, + small_repo_id: RepositoryId, + small_bcs_id: Option, + )) { + insert_or_ignore, + "{insert_or_ignore} INTO synced_working_copy_equivalence (large_repo_id, large_bcs_id, small_repo_id, small_bcs_id) VALUES {values}" + } + + read SelectWorkingCopyEquivalence( + source_repo_id: RepositoryId, + bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> (RepositoryId, ChangesetId, RepositoryId, Option) { + "SELECT large_repo_id, large_bcs_id, small_repo_id, small_bcs_id + FROM synced_working_copy_equivalence + WHERE (large_repo_id = {source_repo_id} AND small_repo_id = {target_repo_id} AND large_bcs_id = {bcs_id}) + OR (large_repo_id = {target_repo_id} AND small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id}) + ORDER BY mapping_id ASC + LIMIT 1 + " + } } impl SqlConstructors for SqlSyncedCommitMapping { @@ -146,18 +241,35 @@ impl SyncedCommitMapping for SqlSyncedCommitMapping { small_bcs_id, } = entry; - InsertMapping::query( - &self.write_connection, - &[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)], - ) - .and_then(move |result| { - if result.affected_rows() == 1 { - Ok(true) - } else { - Ok(false) - } - }) - .boxify() + self.write_connection + .start_transaction() + .and_then(move |txn| { + InsertMapping::query_with_transaction( + txn, + &[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)], + ) + .and_then(move |(txn, _result)| { + InsertWorkingCopyEquivalence::query_with_transaction( + txn, + &[( + &large_repo_id, + &large_bcs_id, + &small_repo_id, + &Some(small_bcs_id), + )], + ) + .and_then(|(txn, result)| { + txn.commit().map(move |()| { + if result.affected_rows() == 1 { + true + } else { + false + } + }) + }) + }) + }) + .boxify() } fn get( @@ -206,4 +318,108 @@ impl SyncedCommitMapping for SqlSyncedCommitMapping { }) .boxify() } + + fn insert_equivalent_working_copy( + &self, + ctx: CoreContext, + entry: EquivalentWorkingCopyEntry, + ) -> BoxFuture { + STATS::insert_working_copy_eqivalence.add_value(1); + + let EquivalentWorkingCopyEntry { + large_repo_id, + large_bcs_id, + small_repo_id, + small_bcs_id, + } = entry; + + let this = self.clone(); + InsertWorkingCopyEquivalence::query( + &self.write_connection, + &[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)], + ) + .and_then(move |result| { + if result.affected_rows() == 1 { + future::ok(true).left_future() + } else { + // Check that db stores consistent entry + this.get_equivalent_working_copy( + ctx.clone(), + large_repo_id, + large_bcs_id, + small_repo_id, + ) + .and_then(move |maybe_equivalent_wc| { + if let Some(equivalent_wc) = maybe_equivalent_wc { + use WorkingCopyEquivalence::*; + let expected_small_bcs_id = match equivalent_wc { + WorkingCopy(wc) => Some(wc), + NoWorkingCopy => None, + }; + + if expected_small_bcs_id != small_bcs_id { + let err = ErrorKind::InconsistentWorkingCopyEntry { + actual: small_bcs_id, + expected: expected_small_bcs_id, + }; + return Err(err.into()); + } + } + Ok(false) + }) + .right_future() + } + }) + .boxify() + } + + fn get_equivalent_working_copy( + &self, + _ctx: CoreContext, + source_repo_id: RepositoryId, + source_bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> BoxFuture, Error> { + STATS::get_equivalent_working_copy.add_value(1); + + cloned!(self.read_master_connection); + SelectWorkingCopyEquivalence::query( + &self.read_connection, + &source_repo_id, + &source_bcs_id, + &target_repo_id, + ) + .and_then(move |rows| { + if rows.len() >= 1 { + future::ok(rows.get(0).cloned()).left_future() + } else { + SelectWorkingCopyEquivalence::query( + &read_master_connection, + &source_repo_id, + &source_bcs_id, + &target_repo_id, + ) + .map(|rows| rows.get(0).cloned()) + .right_future() + } + }) + .map(move |maybe_row| match maybe_row { + Some(row) => { + let (large_repo_id, large_bcs_id, _small_repo_id, maybe_small_bcs_id) = row; + + if target_repo_id == large_repo_id { + Some(WorkingCopyEquivalence::WorkingCopy(large_bcs_id)) + } else { + match maybe_small_bcs_id { + Some(small_bcs_id) => { + Some(WorkingCopyEquivalence::WorkingCopy(small_bcs_id)) + } + None => Some(WorkingCopyEquivalence::NoWorkingCopy), + } + } + } + None => None, + }) + .boxify() + } } diff --git a/commit_rewriting/synced_commit_mapping/test/main.rs b/commit_rewriting/synced_commit_mapping/test/main.rs index f8e62e0c64..e9d34d8d16 100644 --- a/commit_rewriting/synced_commit_mapping/test/main.rs +++ b/commit_rewriting/synced_commit_mapping/test/main.rs @@ -17,7 +17,8 @@ use context::CoreContext; use mononoke_types_mocks::changesetid as bonsai; use mononoke_types_mocks::repo::{REPO_ONE, REPO_ZERO}; use synced_commit_mapping::{ - SqlConstructors, SqlSyncedCommitMapping, SyncedCommitMapping, SyncedCommitMappingEntry, + EquivalentWorkingCopyEntry, SqlConstructors, SqlSyncedCommitMapping, SyncedCommitMapping, + SyncedCommitMappingEntry, WorkingCopyEquivalence, }; fn add_and_get(fb: FacebookInit, mapping: M) { @@ -39,6 +40,37 @@ fn add_and_get(fb: FacebookInit, mapping: M) { .expect("Adding same entry failed") ); + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!( + res, + Some(WorkingCopyEquivalence::WorkingCopy(bonsai::TWOS_CSID)) + ); + + // insert again + let entry = + SyncedCommitMappingEntry::new(REPO_ZERO, bonsai::THREES_CSID, REPO_ONE, bonsai::FOURS_CSID); + assert_eq!( + true, + mapping + .add(ctx.clone(), entry.clone()) + .wait() + .expect("Adding new entry failed") + ); + + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::THREES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!( + res, + Some(WorkingCopyEquivalence::WorkingCopy(bonsai::FOURS_CSID)) + ); + let result = mapping .get(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE) .wait() @@ -60,6 +92,74 @@ fn missing(fb: FacebookInit, mapping: M) { assert_eq!(result, None); } +fn equivalent_working_copy(fb: FacebookInit, mapping: M) { + let ctx = CoreContext::test_mock(fb); + let result = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ONE, bonsai::TWOS_CSID, REPO_ZERO) + .wait() + .expect("Failed to fetch equivalent working copy (should succeed with None instead)"); + assert_eq!(result, None); + + let entry = EquivalentWorkingCopyEntry { + large_repo_id: REPO_ZERO, + large_bcs_id: bonsai::ONES_CSID, + small_repo_id: REPO_ONE, + small_bcs_id: Some(bonsai::TWOS_CSID), + }; + let result = mapping + .insert_equivalent_working_copy(ctx.clone(), entry.clone()) + .wait() + .expect("Failed to insert working copy"); + assert_eq!(result, true); + + let result = mapping + .insert_equivalent_working_copy(ctx.clone(), entry) + .wait() + .expect("Failed to insert working copy"); + assert_eq!(result, false); + + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!( + res, + Some(WorkingCopyEquivalence::WorkingCopy(bonsai::TWOS_CSID)) + ); + + let null_entry = EquivalentWorkingCopyEntry { + large_repo_id: REPO_ZERO, + large_bcs_id: bonsai::THREES_CSID, + small_repo_id: REPO_ONE, + small_bcs_id: None, + }; + + let result = mapping + .insert_equivalent_working_copy(ctx.clone(), null_entry) + .wait() + .expect("Failed to insert working copy"); + assert_eq!(result, true); + + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::THREES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!(res, Some(WorkingCopyEquivalence::NoWorkingCopy)); + + let should_fail = EquivalentWorkingCopyEntry { + large_repo_id: REPO_ZERO, + large_bcs_id: bonsai::THREES_CSID, + small_repo_id: REPO_ONE, + small_bcs_id: Some(bonsai::TWOS_CSID), + }; + assert!(mapping + .insert_equivalent_working_copy(ctx.clone(), should_fail) + .wait() + .is_err()); +} + #[fbinit::test] fn test_add_and_get(fb: FacebookInit) { async_unit::tokio_unit_test(move || { @@ -73,3 +173,10 @@ fn test_missing(fb: FacebookInit) { missing(fb, SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap()) }); } + +#[fbinit::test] +fn test_equivalent_working_copy(fb: FacebookInit) { + async_unit::tokio_unit_test(move || { + equivalent_working_copy(fb, SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap()) + }); +}