From 3bebf387533e27ff10b80135f4e7de85576a0ef0 Mon Sep 17 00:00:00 2001 From: Stanislau Hlebik Date: Tue, 29 Oct 2019 07:09:31 -0700 Subject: [PATCH] mononoke: add synced_working_copy_equivalence table Summary: This table is needed for backsyncer to track commits that are "equivalent" between large ans small repo i.e. the closest ancestor ancestor of large commit that was rewritten in the small repo. It's very similar to normal `synced_commit_mapping`, but with a few important differences: 1) Small bcs id can be NULL, meaning that commit from large repo doesn't remap to any commit from the small repo [1] 2) Multiple large bcs ids can point to the same small bcs id 3) Inserting new entry in synced_commit_mapping automatically inserts entry in synced_working_copy_equivalence - if a commit was remapped, then it's by definition an equivalent working copy Reviewed By: ikostia Differential Revision: D18172894 fbshipit-source-id: 14d5f692521282223778e8c9ad46191d0e24d618 --- .../schemas/sqlite-synced-commit-mapping.sql | 15 +- .../synced_commit_mapping/src/lib.rs | 242 +++++++++++++++++- .../synced_commit_mapping/test/main.rs | 109 +++++++- 3 files changed, 351 insertions(+), 15 deletions(-) diff --git a/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql b/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql index 2fd6cf615f..15e42ccfe8 100644 --- a/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql +++ b/commit_rewriting/synced_commit_mapping/schemas/sqlite-synced-commit-mapping.sql @@ -1,5 +1,5 @@ CREATE TABLE `synced_commit_mapping` ( - `mapping_id` INTEGER PRIMARY KEY, + `mapping_id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, `small_repo_id` int(11) NOT NULL, `small_bcs_id` binary(32) NOT NULL, `large_repo_id` int(11) NOT NULL, @@ -7,3 +7,16 @@ CREATE TABLE `synced_commit_mapping` ( UNIQUE (`large_repo_id`,`small_repo_id`,`small_bcs_id`), UNIQUE (`small_repo_id`,`large_repo_id`,`large_bcs_id`) ); + +CREATE TABLE `synced_working_copy_equivalence` ( + `mapping_id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + `small_repo_id` int(11) NOT NULL, + `small_bcs_id` binary(32), + `large_repo_id` int(11) NOT NULL, + `large_bcs_id` binary(32) NOT NULL, + UNIQUE (`large_repo_id`,`small_repo_id`,`large_bcs_id`) +); + + -- Small bcs id can map to multiple large bcs ids + CREATE INDEX small_bcs_key ON synced_working_copy_equivalence + (`large_repo_id`,`small_repo_id`,`small_bcs_id`); diff --git a/commit_rewriting/synced_commit_mapping/src/lib.rs b/commit_rewriting/synced_commit_mapping/src/lib.rs index 299b7149ab..b616c6f66a 100644 --- a/commit_rewriting/synced_commit_mapping/src/lib.rs +++ b/commit_rewriting/synced_commit_mapping/src/lib.rs @@ -15,19 +15,33 @@ pub use sql_ext::SqlConstructors; use cloned::cloned; use context::CoreContext; -use failure_ext::Error; +use failure_ext::{Error, Fail}; use futures::{future, Future}; use futures_ext::{BoxFuture, FutureExt}; use mononoke_types::{ChangesetId, RepositoryId}; use sql::queries; use stats::{define_stats, Timeseries}; +#[derive(Debug, Eq, Fail, PartialEq)] +pub enum ErrorKind { + #[fail( + display = "tried to insert inconsistent small bcs id {:?}, while db has {:?}", + _0, _1 + )] + InconsistentWorkingCopyEntry { + expected: Option, + actual: Option, + }, +} + // TODO(simonfar): Once we've proven the concept, we want to cache these define_stats! { prefix = "mononoke.synced_commit_mapping"; gets: timeseries(RATE, SUM), gets_master: timeseries(RATE, SUM), adds: timeseries(RATE, SUM), + insert_working_copy_eqivalence: timeseries(RATE, SUM), + get_equivalent_working_copy: timeseries(RATE, SUM), } #[derive(Clone, Debug, Eq, Hash, PartialEq)] @@ -54,6 +68,23 @@ impl SyncedCommitMappingEntry { } } +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct EquivalentWorkingCopyEntry { + pub large_repo_id: RepositoryId, + pub large_bcs_id: ChangesetId, + pub small_repo_id: RepositoryId, + pub small_bcs_id: Option, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum WorkingCopyEquivalence { + /// There's no matching working copy. It can happen if a pre-big-merge commit from one small + /// repo is mapped into another small repo + NoWorkingCopy, + /// ChangesetId of matching working copy. + WorkingCopy(ChangesetId), +} + pub trait SyncedCommitMapping: Send + Sync { /// Given the full large, small mapping, store it in the DB. /// Future resolves to true if the mapping was saved, false otherwise @@ -67,6 +98,28 @@ pub trait SyncedCommitMapping: Send + Sync { bcs_id: ChangesetId, target_repo_id: RepositoryId, ) -> BoxFuture, Error>; + + /// Inserts equivalent working copy of a large bcs id. It's similar to mapping entry, + /// however there are a few differences: + /// 1) For (large repo, small repo) pair, many large commits can map to the same small commit + /// 2) Small commit can be null + /// + /// If there's a mapping between small and large commits, then equivalent working copy is + /// the same as the same as the mapping. + fn insert_equivalent_working_copy( + &self, + ctx: CoreContext, + entry: EquivalentWorkingCopyEntry, + ) -> BoxFuture; + + /// Finds equivalent working copy + fn get_equivalent_working_copy( + &self, + ctx: CoreContext, + source_repo_id: RepositoryId, + source_bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> BoxFuture, Error>; } impl SyncedCommitMapping for Arc { @@ -83,6 +136,24 @@ impl SyncedCommitMapping for Arc { ) -> BoxFuture, Error> { (**self).get(ctx, source_repo_id, bcs_id, target_repo_id) } + + fn insert_equivalent_working_copy( + &self, + ctx: CoreContext, + entry: EquivalentWorkingCopyEntry, + ) -> BoxFuture { + (**self).insert_equivalent_working_copy(ctx, entry) + } + + fn get_equivalent_working_copy( + &self, + ctx: CoreContext, + source_repo_id: RepositoryId, + source_bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> BoxFuture, Error> { + (**self).get_equivalent_working_copy(ctx, source_repo_id, source_bcs_id, target_repo_id) + } } #[derive(Clone)] @@ -113,6 +184,30 @@ queries! { WHERE (large_repo_id = {source_repo_id} AND large_bcs_id = {bcs_id} AND small_repo_id = {target_repo_id}) OR (small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id} AND large_repo_id = {target_repo_id})" } + + write InsertWorkingCopyEquivalence(values: ( + large_repo_id: RepositoryId, + large_bcs_id: ChangesetId, + small_repo_id: RepositoryId, + small_bcs_id: Option, + )) { + insert_or_ignore, + "{insert_or_ignore} INTO synced_working_copy_equivalence (large_repo_id, large_bcs_id, small_repo_id, small_bcs_id) VALUES {values}" + } + + read SelectWorkingCopyEquivalence( + source_repo_id: RepositoryId, + bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> (RepositoryId, ChangesetId, RepositoryId, Option) { + "SELECT large_repo_id, large_bcs_id, small_repo_id, small_bcs_id + FROM synced_working_copy_equivalence + WHERE (large_repo_id = {source_repo_id} AND small_repo_id = {target_repo_id} AND large_bcs_id = {bcs_id}) + OR (large_repo_id = {target_repo_id} AND small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id}) + ORDER BY mapping_id ASC + LIMIT 1 + " + } } impl SqlConstructors for SqlSyncedCommitMapping { @@ -146,18 +241,35 @@ impl SyncedCommitMapping for SqlSyncedCommitMapping { small_bcs_id, } = entry; - InsertMapping::query( - &self.write_connection, - &[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)], - ) - .and_then(move |result| { - if result.affected_rows() == 1 { - Ok(true) - } else { - Ok(false) - } - }) - .boxify() + self.write_connection + .start_transaction() + .and_then(move |txn| { + InsertMapping::query_with_transaction( + txn, + &[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)], + ) + .and_then(move |(txn, _result)| { + InsertWorkingCopyEquivalence::query_with_transaction( + txn, + &[( + &large_repo_id, + &large_bcs_id, + &small_repo_id, + &Some(small_bcs_id), + )], + ) + .and_then(|(txn, result)| { + txn.commit().map(move |()| { + if result.affected_rows() == 1 { + true + } else { + false + } + }) + }) + }) + }) + .boxify() } fn get( @@ -206,4 +318,108 @@ impl SyncedCommitMapping for SqlSyncedCommitMapping { }) .boxify() } + + fn insert_equivalent_working_copy( + &self, + ctx: CoreContext, + entry: EquivalentWorkingCopyEntry, + ) -> BoxFuture { + STATS::insert_working_copy_eqivalence.add_value(1); + + let EquivalentWorkingCopyEntry { + large_repo_id, + large_bcs_id, + small_repo_id, + small_bcs_id, + } = entry; + + let this = self.clone(); + InsertWorkingCopyEquivalence::query( + &self.write_connection, + &[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)], + ) + .and_then(move |result| { + if result.affected_rows() == 1 { + future::ok(true).left_future() + } else { + // Check that db stores consistent entry + this.get_equivalent_working_copy( + ctx.clone(), + large_repo_id, + large_bcs_id, + small_repo_id, + ) + .and_then(move |maybe_equivalent_wc| { + if let Some(equivalent_wc) = maybe_equivalent_wc { + use WorkingCopyEquivalence::*; + let expected_small_bcs_id = match equivalent_wc { + WorkingCopy(wc) => Some(wc), + NoWorkingCopy => None, + }; + + if expected_small_bcs_id != small_bcs_id { + let err = ErrorKind::InconsistentWorkingCopyEntry { + actual: small_bcs_id, + expected: expected_small_bcs_id, + }; + return Err(err.into()); + } + } + Ok(false) + }) + .right_future() + } + }) + .boxify() + } + + fn get_equivalent_working_copy( + &self, + _ctx: CoreContext, + source_repo_id: RepositoryId, + source_bcs_id: ChangesetId, + target_repo_id: RepositoryId, + ) -> BoxFuture, Error> { + STATS::get_equivalent_working_copy.add_value(1); + + cloned!(self.read_master_connection); + SelectWorkingCopyEquivalence::query( + &self.read_connection, + &source_repo_id, + &source_bcs_id, + &target_repo_id, + ) + .and_then(move |rows| { + if rows.len() >= 1 { + future::ok(rows.get(0).cloned()).left_future() + } else { + SelectWorkingCopyEquivalence::query( + &read_master_connection, + &source_repo_id, + &source_bcs_id, + &target_repo_id, + ) + .map(|rows| rows.get(0).cloned()) + .right_future() + } + }) + .map(move |maybe_row| match maybe_row { + Some(row) => { + let (large_repo_id, large_bcs_id, _small_repo_id, maybe_small_bcs_id) = row; + + if target_repo_id == large_repo_id { + Some(WorkingCopyEquivalence::WorkingCopy(large_bcs_id)) + } else { + match maybe_small_bcs_id { + Some(small_bcs_id) => { + Some(WorkingCopyEquivalence::WorkingCopy(small_bcs_id)) + } + None => Some(WorkingCopyEquivalence::NoWorkingCopy), + } + } + } + None => None, + }) + .boxify() + } } diff --git a/commit_rewriting/synced_commit_mapping/test/main.rs b/commit_rewriting/synced_commit_mapping/test/main.rs index f8e62e0c64..e9d34d8d16 100644 --- a/commit_rewriting/synced_commit_mapping/test/main.rs +++ b/commit_rewriting/synced_commit_mapping/test/main.rs @@ -17,7 +17,8 @@ use context::CoreContext; use mononoke_types_mocks::changesetid as bonsai; use mononoke_types_mocks::repo::{REPO_ONE, REPO_ZERO}; use synced_commit_mapping::{ - SqlConstructors, SqlSyncedCommitMapping, SyncedCommitMapping, SyncedCommitMappingEntry, + EquivalentWorkingCopyEntry, SqlConstructors, SqlSyncedCommitMapping, SyncedCommitMapping, + SyncedCommitMappingEntry, WorkingCopyEquivalence, }; fn add_and_get(fb: FacebookInit, mapping: M) { @@ -39,6 +40,37 @@ fn add_and_get(fb: FacebookInit, mapping: M) { .expect("Adding same entry failed") ); + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!( + res, + Some(WorkingCopyEquivalence::WorkingCopy(bonsai::TWOS_CSID)) + ); + + // insert again + let entry = + SyncedCommitMappingEntry::new(REPO_ZERO, bonsai::THREES_CSID, REPO_ONE, bonsai::FOURS_CSID); + assert_eq!( + true, + mapping + .add(ctx.clone(), entry.clone()) + .wait() + .expect("Adding new entry failed") + ); + + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::THREES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!( + res, + Some(WorkingCopyEquivalence::WorkingCopy(bonsai::FOURS_CSID)) + ); + let result = mapping .get(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE) .wait() @@ -60,6 +92,74 @@ fn missing(fb: FacebookInit, mapping: M) { assert_eq!(result, None); } +fn equivalent_working_copy(fb: FacebookInit, mapping: M) { + let ctx = CoreContext::test_mock(fb); + let result = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ONE, bonsai::TWOS_CSID, REPO_ZERO) + .wait() + .expect("Failed to fetch equivalent working copy (should succeed with None instead)"); + assert_eq!(result, None); + + let entry = EquivalentWorkingCopyEntry { + large_repo_id: REPO_ZERO, + large_bcs_id: bonsai::ONES_CSID, + small_repo_id: REPO_ONE, + small_bcs_id: Some(bonsai::TWOS_CSID), + }; + let result = mapping + .insert_equivalent_working_copy(ctx.clone(), entry.clone()) + .wait() + .expect("Failed to insert working copy"); + assert_eq!(result, true); + + let result = mapping + .insert_equivalent_working_copy(ctx.clone(), entry) + .wait() + .expect("Failed to insert working copy"); + assert_eq!(result, false); + + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!( + res, + Some(WorkingCopyEquivalence::WorkingCopy(bonsai::TWOS_CSID)) + ); + + let null_entry = EquivalentWorkingCopyEntry { + large_repo_id: REPO_ZERO, + large_bcs_id: bonsai::THREES_CSID, + small_repo_id: REPO_ONE, + small_bcs_id: None, + }; + + let result = mapping + .insert_equivalent_working_copy(ctx.clone(), null_entry) + .wait() + .expect("Failed to insert working copy"); + assert_eq!(result, true); + + let res = mapping + .get_equivalent_working_copy(ctx.clone(), REPO_ZERO, bonsai::THREES_CSID, REPO_ONE) + .wait() + .expect("get equivalent wc failed, should succeed"); + + assert_eq!(res, Some(WorkingCopyEquivalence::NoWorkingCopy)); + + let should_fail = EquivalentWorkingCopyEntry { + large_repo_id: REPO_ZERO, + large_bcs_id: bonsai::THREES_CSID, + small_repo_id: REPO_ONE, + small_bcs_id: Some(bonsai::TWOS_CSID), + }; + assert!(mapping + .insert_equivalent_working_copy(ctx.clone(), should_fail) + .wait() + .is_err()); +} + #[fbinit::test] fn test_add_and_get(fb: FacebookInit) { async_unit::tokio_unit_test(move || { @@ -73,3 +173,10 @@ fn test_missing(fb: FacebookInit) { missing(fb, SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap()) }); } + +#[fbinit::test] +fn test_equivalent_working_copy(fb: FacebookInit) { + async_unit::tokio_unit_test(move || { + equivalent_working_copy(fb, SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap()) + }); +}