Mapping for commit syncing to identify commits by source repo

Summary: When we sync commits, we will need to be able to cross-reference the commits between the source repo and the target repo. Make it easy to track this mapping.

Reviewed By: StanislavGlebik

Differential Revision: D16896918

fbshipit-source-id: 3738cf2067291f16ce9b62509763ac526d1d35c2
This commit is contained in:
Simon Farnsworth 2019-08-20 13:24:16 -07:00 committed by Facebook Github Bot
parent b576f2a738
commit 13008dece8
3 changed files with 400 additions and 0 deletions

View File

@ -0,0 +1,9 @@
CREATE TABLE synced_commit_mapping (
small_repo_id INTEGER NOT NULL,
small_bcs_id BINARY(32) NOT NULL,
large_repo_id INTEGER NOT NULL,
large_bcs_id BINARY(32) NOT NULL,
UNIQUE (small_repo_id, small_bcs_id, large_repo_id),
UNIQUE (small_repo_id, small_bcs_id, large_repo_id, large_bcs_id),
PRIMARY KEY (small_repo_id, small_bcs_id)
);

View File

@ -0,0 +1,273 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
use std::collections::BTreeMap;
use std::sync::Arc;
use sql::Connection;
pub use sql_ext::SqlConstructors;
use cloned::cloned;
use context::CoreContext;
use failure_ext::Error;
use futures::{future, Future};
use futures_ext::{BoxFuture, FutureExt};
use mononoke_types::{ChangesetId, RepositoryId};
use sql::queries;
use stats::{define_stats, Timeseries};
// TODO(simonfar): Once we've proven the concept, we want to cache these
define_stats! {
prefix = "mononoke.synced_commit_mapping";
gets: timeseries(RATE, SUM),
gets_master: timeseries(RATE, SUM),
adds: timeseries(RATE, SUM),
get_alls: timeseries(RATE, SUM),
get_alls_master: timeseries(RATE, SUM),
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct SyncedCommitMappingEntry {
pub large_repo_id: RepositoryId,
pub large_bcs_id: ChangesetId,
pub small_repo_id: RepositoryId,
pub small_bcs_id: ChangesetId,
}
impl SyncedCommitMappingEntry {
pub fn new(
large_repo_id: RepositoryId,
large_bcs_id: ChangesetId,
small_repo_id: RepositoryId,
small_bcs_id: ChangesetId,
) -> Self {
Self {
large_repo_id,
large_bcs_id,
small_repo_id,
small_bcs_id,
}
}
}
pub trait SyncedCommitMapping: Send + Sync {
/// Given the full large, small mapping, store it in the DB.
/// Future resolves to true if the mapping was saved, false otherwise
fn add(&self, ctx: CoreContext, entry: SyncedCommitMappingEntry) -> BoxFuture<bool, Error>;
/// Find the mapping entry for a given source commit and target repo
fn get(
&self,
ctx: CoreContext,
source_repo_id: RepositoryId,
bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> BoxFuture<Option<ChangesetId>, Error>;
/// Get all mapping entries for a given source commit
fn get_all(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
bcs_id: ChangesetId,
) -> BoxFuture<BTreeMap<RepositoryId, ChangesetId>, Error>;
}
impl SyncedCommitMapping for Arc<dyn SyncedCommitMapping> {
fn add(&self, ctx: CoreContext, entry: SyncedCommitMappingEntry) -> BoxFuture<bool, Error> {
(**self).add(ctx, entry)
}
fn get(
&self,
ctx: CoreContext,
source_repo_id: RepositoryId,
bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> BoxFuture<Option<ChangesetId>, Error> {
(**self).get(ctx, source_repo_id, bcs_id, target_repo_id)
}
fn get_all(
&self,
ctx: CoreContext,
repo_id: RepositoryId,
bcs_id: ChangesetId,
) -> BoxFuture<BTreeMap<RepositoryId, ChangesetId>, Error> {
(**self).get_all(ctx, repo_id, bcs_id)
}
}
#[derive(Clone)]
pub struct SqlSyncedCommitMapping {
write_connection: Connection,
read_connection: Connection,
read_master_connection: Connection,
}
queries! {
write InsertMapping(values: (
large_repo_id: RepositoryId,
large_bcs_id: ChangesetId,
small_repo_id: RepositoryId,
small_bcs_id: ChangesetId,
)) {
insert_or_ignore,
"{insert_or_ignore} INTO synced_commit_mapping (large_repo_id, large_bcs_id, small_repo_id, small_bcs_id) VALUES {values}"
}
read SelectMapping(
source_repo_id: RepositoryId,
bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> (RepositoryId, ChangesetId, RepositoryId, ChangesetId) {
"SELECT large_repo_id, large_bcs_id, small_repo_id, small_bcs_id
FROM synced_commit_mapping
WHERE (large_repo_id = {source_repo_id} AND large_bcs_id = {bcs_id} AND small_repo_id = {target_repo_id}) OR
(small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id} AND large_repo_id = {target_repo_id})"
}
read SelectAllMapping(
source_repo_id: RepositoryId,
bcs_id: ChangesetId,
) -> (RepositoryId, ChangesetId, RepositoryId, ChangesetId) {
"SELECT large_repo_id, large_bcs_id, small_repo_id, small_bcs_id
FROM synced_commit_mapping
WHERE (large_repo_id = {source_repo_id} AND large_bcs_id = {bcs_id}) OR
(small_repo_id = {source_repo_id} AND small_bcs_id = {bcs_id})"
}
}
impl SqlConstructors for SqlSyncedCommitMapping {
const LABEL: &'static str = "synced_commit_mapping";
fn from_connections(
write_connection: Connection,
read_connection: Connection,
read_master_connection: Connection,
) -> Self {
Self {
write_connection,
read_connection,
read_master_connection,
}
}
fn get_up_query() -> &'static str {
include_str!("../schemas/sqlite-synced-commit-mapping.sql")
}
}
impl SyncedCommitMapping for SqlSyncedCommitMapping {
fn add(&self, _ctx: CoreContext, entry: SyncedCommitMappingEntry) -> BoxFuture<bool, Error> {
STATS::adds.add_value(1);
let SyncedCommitMappingEntry {
large_repo_id,
large_bcs_id,
small_repo_id,
small_bcs_id,
} = entry;
InsertMapping::query(
&self.write_connection,
&[(&large_repo_id, &large_bcs_id, &small_repo_id, &small_bcs_id)],
)
.and_then(move |result| {
if result.affected_rows() == 1 {
Ok(true)
} else {
Ok(false)
}
})
.boxify()
}
fn get(
&self,
_ctx: CoreContext,
source_repo_id: RepositoryId,
bcs_id: ChangesetId,
target_repo_id: RepositoryId,
) -> BoxFuture<Option<ChangesetId>, Error> {
STATS::gets.add_value(1);
SelectMapping::query(
&self.read_connection,
&source_repo_id,
&bcs_id,
&target_repo_id,
)
.and_then({
cloned!(self.read_master_connection);
move |rows| {
if rows.is_empty() {
STATS::gets_master.add_value(1);
SelectMapping::query(
&read_master_connection,
&source_repo_id,
&bcs_id,
&target_repo_id,
)
.left_future()
} else {
future::ok(rows).right_future()
}
}
})
.map(move |rows| {
if rows.len() == 1 {
let (large_repo_id, large_bcs_id, _small_repo_id, small_bcs_id) = rows[0];
if target_repo_id == large_repo_id {
Some(large_bcs_id)
} else {
Some(small_bcs_id)
}
} else {
None
}
})
.boxify()
}
fn get_all(
&self,
_ctx: CoreContext,
repo_id: RepositoryId,
bcs_id: ChangesetId,
) -> BoxFuture<BTreeMap<RepositoryId, ChangesetId>, Error> {
STATS::get_alls.add_value(1);
SelectAllMapping::query(&self.read_connection, &repo_id, &bcs_id)
.and_then({
cloned!(self.read_master_connection);
move |rows| {
if rows.is_empty() {
STATS::get_alls_master.add_value(1);
SelectAllMapping::query(&read_master_connection, &repo_id, &bcs_id)
.left_future()
} else {
future::ok(rows).right_future()
}
}
})
.map(move |rows| {
rows.into_iter()
.map(
|(large_repo_id, large_bcs_id, small_repo_id, small_bcs_id)| {
if repo_id == large_repo_id {
(small_repo_id, small_bcs_id)
} else {
(large_repo_id, large_bcs_id)
}
},
)
.collect()
})
.boxify()
}
}

View File

@ -0,0 +1,118 @@
// Copyright (c) 2017-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
//! Tests for the synced commits mapping.
#![deny(warnings)]
use futures::{future, lazy, Future};
use maplit::btreemap;
use tokio;
use context::CoreContext;
use mononoke_types_mocks::changesetid as bonsai;
use mononoke_types_mocks::repo::{REPO_ONE, REPO_TWO, REPO_ZERO};
use synced_commit_mapping::{
SqlConstructors, SqlSyncedCommitMapping, SyncedCommitMapping, SyncedCommitMappingEntry,
};
fn add_and_get<M: SyncedCommitMapping>(mapping: M) {
let ctx = CoreContext::test_mock();
let entry =
SyncedCommitMappingEntry::new(REPO_ZERO, bonsai::ONES_CSID, REPO_ONE, bonsai::TWOS_CSID);
assert_eq!(
true,
mapping
.add(ctx.clone(), entry.clone())
.wait()
.expect("Adding new entry failed")
);
assert_eq!(
false,
mapping
.add(ctx.clone(), entry)
.wait()
.expect("Adding same entry failed")
);
let result = mapping
.get(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID, REPO_ONE)
.wait()
.expect("Get failed");
assert_eq!(result, Some(bonsai::TWOS_CSID));
let result = mapping
.get(ctx.clone(), REPO_ONE, bonsai::TWOS_CSID, REPO_ZERO)
.wait()
.expect("Get failed");
assert_eq!(result, Some(bonsai::ONES_CSID));
}
fn get_all<M: SyncedCommitMapping>(mapping: M) {
let ctx = CoreContext::test_mock();
let entry =
SyncedCommitMappingEntry::new(REPO_ZERO, bonsai::ONES_CSID, REPO_ONE, bonsai::TWOS_CSID);
mapping
.add(ctx.clone(), entry.clone())
.wait()
.expect("Adding new entry failed");
let entry =
SyncedCommitMappingEntry::new(REPO_ZERO, bonsai::ONES_CSID, REPO_TWO, bonsai::THREES_CSID);
mapping
.add(ctx.clone(), entry.clone())
.wait()
.expect("Adding new entry failed");
let result = mapping
.get_all(ctx.clone(), REPO_ZERO, bonsai::ONES_CSID)
.wait()
.expect("Get failed");
assert_eq!(
result,
btreemap! {REPO_ONE => bonsai::TWOS_CSID, REPO_TWO => bonsai::THREES_CSID}
);
let result = mapping
.get_all(ctx.clone(), REPO_ONE, bonsai::TWOS_CSID)
.wait()
.expect("Get failed");
assert_eq!(result, btreemap! {REPO_ZERO => bonsai::ONES_CSID});
}
fn missing<M: SyncedCommitMapping>(mapping: M) {
let ctx = CoreContext::test_mock();
let result = mapping
.get(ctx.clone(), REPO_ONE, bonsai::TWOS_CSID, REPO_ZERO)
.wait()
.expect("Failed to fetch missing changeset (should succeed with None instead)");
assert_eq!(result, None);
}
#[test]
fn test_add_and_get() {
tokio::run(lazy(|| {
future::ok(add_and_get(
SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap(),
))
}));
}
#[test]
fn test_missing() {
tokio::run(lazy(|| {
future::ok(missing(
SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap(),
))
}));
}
#[test]
fn test_get_all() {
tokio::run(lazy(|| {
future::ok(get_all(
SqlSyncedCommitMapping::with_sqlite_in_memory().unwrap(),
))
}));
}