mononoke/repo_import: add hg sync checker

Summary:
Related diff: D22816538 (3abc4312af)

In repo_import tool once we move a bookmark to reveal commits to users, we want to check if hg_sync has received the commits. To do this, we extract the largest log id from bookmarks_update_log to compare it with the mutable_counter value related to hg_sync. If the counter value is larger or equal to the log id, we can move the bookmark to the next batch of commits. Otherwise, we sleep, retry fetching the mutable_counter value and compare the two again.
mutable_counters is an sql table that can track bookmarks log update instances with a counter.
This diff adds the functionality to extract the mutable_counters value for hg_sync.

======================
SQL query fix:
In the previous diff (D22816538 (3abc4312af)) we didn't cover the case where we might not get an ID which should return None. This diff fixes this error.

Reviewed By: StanislavGlebik

Differential Revision: D22864223

fbshipit-source-id: f3690263b4eebfe151e50b01a13b0193009e3bfa
This commit is contained in:
Viet Hung Nguyen 2020-08-03 04:00:06 -07:00 committed by Facebook GitHub Bot
parent 3bd5ec74b0
commit 578207d0dc
5 changed files with 235 additions and 23 deletions

View File

@ -184,7 +184,7 @@ queries! {
OFFSET {offset}"
}
read GetLargestLogId(repo_id: RepositoryId) -> (u64) {
read GetLargestLogId(repo_id: RepositoryId) -> (Option<u64>) {
"SELECT MAX(id)
FROM bookmarks_update_log
WHERE repo_id = {repo_id}"
@ -559,9 +559,20 @@ impl BookmarkUpdateLog for SqlBookmarks {
&self.connections.read_connection
};
GetLargestLogId::query(&connection, &self.repo_id)
let query = GetLargestLogId::query(&connection, &self.repo_id)
.compat()
.map_ok(|entries| entries.first().map(|entry| entry.0))
.boxed();
query
.and_then(move |entries| {
let entry = entries.into_iter().next();
match entry {
Some(count) => future::ok(count.0),
None => {
let msg = format!("Failed to query largest log id");
future::err(Error::msg(msg))
}
}
})
.boxed()
}
}

View File

@ -1138,6 +1138,13 @@ fn test_get_largest_log_id(fb: FacebookInit) {
.with_repo_id(REPO_ZERO);
let name_1 = create_bookmark_name("book");
assert_eq!(
bookmarks
.get_largest_log_id(ctx.clone(), Freshness::MostRecent)
.await
.unwrap(),
None
);
let mut txn = bookmarks.create_transaction(ctx.clone());
txn.force_set(&name_1, ONES_CSID, BookmarkUpdateReason::TestMove, None)
.unwrap();

View File

@ -18,6 +18,7 @@ import_tools = { path = "../git/import_tools" }
mercurial_types = { path = "../mercurial/types" }
mononoke_types = { path = "../mononoke_types" }
movers = { path = "../commit_rewriting/movers" }
mutable_counters = { path = "../mutable_counters" }
topo_sort = { path = "../common/topo_sort" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
@ -31,5 +32,8 @@ tokio = { version = "=0.2.13", features = ["full"] }
[dev-dependencies]
blobrepo_factory = { path = "../blobrepo/factory" }
blobstore = { path = "../blobstore" }
mercurial_types-mocks = { path = "../mercurial/types/mocks" }
mononoke_types-mocks = { path = "../mononoke_types/mocks" }
sql_construct = { path = "../common/sql_construct" }
tests_utils = { path = "../tests/utils" }
tokio-compat = "0.1"

View File

@ -9,7 +9,7 @@
use anyhow::{format_err, Error};
use blobrepo::{save_bonsai_changesets, BlobRepo};
use blobrepo_hg::BlobRepoHg;
use bookmarks::{BookmarkName, BookmarkUpdateReason};
use bookmarks::{BookmarkName, BookmarkUpdateLog, BookmarkUpdateReason, Freshness};
use clap::Arg;
use cmdlib::args;
use cmdlib::helpers::block_execute;
@ -26,10 +26,12 @@ use import_tools::{GitimportPreferences, GitimportTarget};
use mercurial_types::{HgChangesetId, MPath};
use mononoke_types::{BonsaiChangeset, ChangesetId};
use movers::DefaultAction;
use mutable_counters::{MutableCounters, SqlMutableCounters};
use serde::{Deserialize, Serialize};
use serde_json;
use slog::info;
use std::collections::HashMap;
use std::convert::TryInto;
use std::num::NonZeroUsize;
use std::path::Path;
use tokio::{process, time};
@ -44,6 +46,7 @@ const ARG_PHAB_CHECK_DISABLED: &str = "disable-phabricator-check";
const ARG_X_REPO_CHECK_DISABLED: &str = "disable-x-repo-check";
const ARG_HG_SYNC_CHECK_DISABLED: &str = "disable-hg-sync-check";
const ARG_SLEEP_TIME: &str = "sleep-time";
const LATEST_REPLAYED_REQUEST_KEY: &'static str = "latest-replayed-request";
#[derive(Deserialize, Clone, Debug)]
struct GraphqlQueryObj {
@ -156,6 +159,7 @@ async fn move_bookmark(
bookmark_suffix: &str,
checker_flags: &CheckerFlags<'_>,
sleep_time: u64,
mutable_counters: &SqlMutableCounters,
) -> Result<(), Error> {
if shifted_bcs.is_empty() {
return Err(format_err!("There is no bonsai changeset present"));
@ -201,31 +205,94 @@ async fn move_bookmark(
ctx.logger(),
"Set bookmark {:?} to point to {:?}", bookmark, curr_csid
);
// if a check is disabled, we have already passed the check
let mut passed_phab_check = checker_flags.phab_check_disabled;
let mut _passed_x_repo_check = checker_flags.x_repo_check_disabled;
let mut _passed_hg_sync_check = checker_flags.hg_sync_check_disabled;
let hg_csid = repo
.get_hg_from_bonsai_changeset(ctx.clone(), curr_csid)
.compat()
.await?;
while !passed_phab_check {
let call_sign = checker_flags.call_sign.as_ref().unwrap();
passed_phab_check = phabricator_commit_check(&call_sign, &hg_csid).await?;
if !passed_phab_check {
info!(
ctx.logger(),
"Phabricator hasn't parsed commit: {:?}", hg_csid
);
time::delay_for(time::Duration::from_secs(sleep_time)).await;
}
}
check_dependent_systems(
&ctx,
&repo,
&checker_flags,
hg_csid,
sleep_time,
&mutable_counters,
)
.await?;
old_csid = curr_csid;
}
Ok(())
}
async fn check_dependent_systems(
ctx: &CoreContext,
repo: &BlobRepo,
checker_flags: &CheckerFlags<'_>,
hg_csid: HgChangesetId,
sleep_time: u64,
mutable_counters: &SqlMutableCounters,
) -> Result<(), Error> {
// if a check is disabled, we have already passed the check
let mut passed_phab_check = checker_flags.phab_check_disabled;
let mut _passed_x_repo_check = checker_flags.x_repo_check_disabled;
let mut passed_hg_sync_check = checker_flags.hg_sync_check_disabled;
let repo_id = repo.get_repoid();
while !passed_phab_check {
let call_sign = checker_flags.call_sign.as_ref().unwrap();
passed_phab_check = phabricator_commit_check(&call_sign, &hg_csid).await?;
if !passed_phab_check {
info!(
ctx.logger(),
"Phabricator hasn't parsed commit: {:?}", hg_csid
);
time::delay_for(time::Duration::from_secs(sleep_time)).await;
}
}
let largest_id = match repo
.attribute_expected::<dyn BookmarkUpdateLog>()
.get_largest_log_id(ctx.clone(), Freshness::MostRecent)
.await?
{
Some(id) => id,
None => return Err(format_err!("Couldn't fetch id from bookmarks update log")),
};
/*
In mutable counters table we store the latest bookmark id replayed by mercurial with
LATEST_REPLAYED_REQUEST_KEY key. We use this key to extract the latest replayed id
and compare it with the largest bookmark log id after we move the bookmark.
If the replayed id is larger or equal to the bookmark id, we can try to move the bookmark
to the next batch of commits
*/
while !passed_hg_sync_check {
let mut_counters_value = match mutable_counters
.get_counter(ctx.clone(), repo_id, LATEST_REPLAYED_REQUEST_KEY)
.compat()
.await?
{
Some(value) => value,
None => {
return Err(format_err!(
"Couldn't fetch the counter value from mutable_counters for repo_id {:?}",
repo_id
))
}
};
passed_hg_sync_check = largest_id <= mut_counters_value.try_into().unwrap();
if !passed_hg_sync_check {
info!(
ctx.logger(),
"Waiting for {} to be replayed to hg, the latest replayed is {}",
largest_id,
mut_counters_value
);
time::delay_for(time::Duration::from_secs(sleep_time)).await;
}
}
Ok(())
}
async fn phabricator_commit_check(call_sign: &str, hg_csid: &HgChangesetId) -> Result<bool, Error> {
let commit_id = format!("r{}{}", call_sign, hg_csid);
let query = "query($commit: String!) {
@ -408,6 +475,9 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
block_execute(
async {
let repo = repo.compat().await?;
let mutable_counters = args::open_sql::<SqlMutableCounters>(ctx.fb, &matches)
.compat()
.await?;
let mut shifted_bcs = rewrite_file_paths(&ctx, &repo, &path, &prefix).await?;
shifted_bcs = sort_bcs(&shifted_bcs)?;
derive_bonsais(&ctx, &repo, &shifted_bcs).await?;
@ -419,6 +489,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
&bookmark_suffix,
&checker_flags,
sleep_time,
&mutable_counters,
)
.await
},
@ -432,15 +503,27 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
#[cfg(test)]
mod tests {
use crate::{move_bookmark, sort_bcs, CheckerFlags};
use crate::{
check_dependent_systems, move_bookmark, sort_bcs, CheckerFlags, LATEST_REPLAYED_REQUEST_KEY,
};
use anyhow::Result;
use blobstore::Loadable;
use bookmarks::{BookmarkName, BookmarkUpdateLog, BookmarkUpdateReason, Freshness};
use context::CoreContext;
use fbinit::FacebookInit;
use futures::stream::TryStreamExt;
use futures::{compat::Future01CompatExt, stream::TryStreamExt};
use mercurial_types_mocks::nodehash::ONES_CSID as HG_CSID;
use mononoke_types_mocks::changesetid::{ONES_CSID as MON_CSID, TWOS_CSID};
use mutable_counters::{MutableCounters, SqlMutableCounters};
use sql_construct::SqlConstruct;
use std::time::Duration;
use tests_utils::drawdag::create_from_dag;
use tokio::time;
fn create_bookmark_name(book: &str) -> BookmarkName {
BookmarkName::new(book.to_string()).unwrap()
}
#[fbinit::compat_test]
async fn move_bookmark_test(fb: FacebookInit) -> Result<()> {
@ -455,6 +538,7 @@ mod tests {
call_sign,
};
let sleep_time = 1;
let mutable_counters = SqlMutableCounters::with_sqlite_in_memory().unwrap();
let changesets = create_from_dag(
&ctx,
&blob_repo,
@ -476,6 +560,7 @@ mod tests {
"test_repo",
&checker_flags,
sleep_time,
&mutable_counters,
)
.await?;
// Check the bookmark moves created BookmarkLogUpdate entries
@ -504,4 +589,109 @@ mod tests {
);
Ok(())
}
/*
largest_id mutable_counters value assert
No action None None Error
Move bookmark 1 None Error
Set counter 1 1 Ok(())
Move bookmark 2 1 inifite loop -> timeout
Set counter 2 2 Ok(())
*/
#[fbinit::compat_test]
async fn hg_sync_check_test(fb: FacebookInit) -> Result<()> {
let ctx = CoreContext::test_mock(fb);
let repo = blobrepo_factory::new_memblob_empty(None)?;
let checker_flags = CheckerFlags {
phab_check_disabled: true,
x_repo_check_disabled: true,
hg_sync_check_disabled: false,
call_sign: None,
};
let sleep_time = 1;
let mutable_counters = SqlMutableCounters::with_sqlite_in_memory().unwrap();
let repo_id = repo.get_repoid();
let bookmark = create_bookmark_name("book");
assert!(check_dependent_systems(
&ctx,
&repo,
&checker_flags,
HG_CSID,
sleep_time,
&mutable_counters
)
.await
.is_err());
let mut txn = repo.update_bookmark_transaction(ctx.clone());
txn.create(&bookmark, MON_CSID, BookmarkUpdateReason::TestMove, None)?;
txn.commit().await.unwrap();
assert!(check_dependent_systems(
&ctx,
&repo,
&checker_flags,
HG_CSID,
sleep_time,
&mutable_counters
)
.await
.is_err());
mutable_counters
.set_counter(ctx.clone(), repo_id, LATEST_REPLAYED_REQUEST_KEY, 1, None)
.compat()
.await?;
check_dependent_systems(
&ctx,
&repo,
&checker_flags,
HG_CSID,
sleep_time,
&mutable_counters,
)
.await?;
let mut txn = repo.update_bookmark_transaction(ctx.clone());
txn.update(
&bookmark,
TWOS_CSID,
MON_CSID,
BookmarkUpdateReason::TestMove,
None,
)?;
txn.commit().await.unwrap();
let timed_out = time::timeout(
Duration::from_millis(2000),
check_dependent_systems(
&ctx,
&repo,
&checker_flags,
HG_CSID,
sleep_time,
&mutable_counters,
),
)
.await
.is_err();
assert!(timed_out);
mutable_counters
.set_counter(ctx.clone(), repo_id, LATEST_REPLAYED_REQUEST_KEY, 2, None)
.compat()
.await?;
check_dependent_systems(
&ctx,
&repo,
&checker_flags,
HG_CSID,
sleep_time,
&mutable_counters,
)
.await?;
Ok(())
}
}

View File

@ -35,7 +35,7 @@
# Import it into Mononoke
$ cd "$TESTTMP"
$ repo_import "$GIT_REPO" --dest-path "new_dir/new_repo" --batch-size 3 --bookmark-suffix "new_repo" --disable-phabricator-check
$ repo_import "$GIT_REPO" --dest-path "new_dir/new_repo" --batch-size 3 --bookmark-suffix "new_repo" --disable-phabricator-check --disable-hg-sync-check
* using repo "repo" repoid RepositoryId(0) (glob)
* Created ce435b03d4ef526648f8654c61e26ae5cc1069cc => ChangesetId(Blake2(f7cbf75d9c08ff96896ed2cebd0327aa514e58b1dd9901d50129b9e08f4aa062)) (glob)
* Created 2c01e4a5658421e2bfcd08e31d9b69399319bcd3 => ChangesetId(Blake2(f7708ed066b1c23591f862148e0386ec704a450e572154cc52f87ca0e394a0fb)) (glob)