enable single process to work on multiple repos.

Summary:
This change enables the filler job to work on all repos available instead of a
single one. We are still going to be able to dedicate the job to a certain repo
(by crafting a config with a single repo enabled) but we can put the entire
long tail for low-traffic repos under a single job.

This requires D24110335 to land in configerator to work.

Reviewed By: krallin

Differential Revision: D24136239

fbshipit-source-id: 4b77d1667c37cc55f11c3087b02a09dbae29db0f
This commit is contained in:
Mateusz Kwapich 2020-10-08 05:33:40 -07:00 committed by Facebook GitHub Bot
parent c02ee1b1d1
commit 42a783999d
13 changed files with 381 additions and 236 deletions

View File

@ -1,4 +1,4 @@
// @generated SignedSource<<09b5a829b6febce1cbbc803c3ed5d77c>>
// @generated SignedSource<<14aa1e3032368c0faf151c7c8c9cd6f1>>
// DO NOT EDIT THIS FILE MANUALLY!
// This file is a mechanical copy of the version in the configerator repo. To
// modify it, edit the copy in the configerator repo instead and copy it over by
@ -392,12 +392,19 @@ struct RawBundle2ReplayParams {
1: optional bool preserve_raw_bundle2,
}
enum RawCommitcloudBookmarksFiller {
DISABLED = 0,
BACKFILL = 1,
FORWARDFILL = 2,
}
struct RawInfinitepushParams {
1: bool allow_writes,
2: optional string namespace_pattern,
3: optional bool hydrate_getbundle_response,
4: optional bool populate_reverse_filler_queue,
5: optional string commit_scribe_category,
6: RawCommitcloudBookmarksFiller bookmarks_filler,
}
struct RawFilestoreParams {

View File

@ -970,6 +970,7 @@ mod test {
[infinitepush]
allow_writes = true
namespace_pattern = "foobar/.+"
bookmarks_filler = 0
[filestore]
chunk_size = 768
@ -1180,6 +1181,7 @@ mod test {
hydrate_getbundle_response: false,
populate_reverse_filler_queue: false,
commit_scribe_category: None,
bookmarks_filler: Default::default(),
},
list_keys_patterns_max: 123,
hook_max_file_size: 456,

View File

@ -11,19 +11,19 @@ use std::convert::TryInto;
use anyhow::{anyhow, Context, Result};
use bookmarks_types::BookmarkName;
use metaconfig_types::{
BookmarkOrRegex, BookmarkParams, Bundle2ReplayParams, CacheWarmupParams, ComparableRegex,
DerivedDataConfig, HookBypass, HookConfig, HookManagerParams, HookParams,
InfinitepushNamespace, InfinitepushParams, LfsParams, PushParams, PushrebaseFlags,
PushrebaseParams, RepoClientKnobs, SegmentedChangelogConfig, ServiceWriteRestrictions,
SourceControlServiceMonitoring, SourceControlServiceParams, StorageConfig, UnodeVersion,
WireprotoLoggingConfig,
BookmarkOrRegex, BookmarkParams, Bundle2ReplayParams, CacheWarmupParams,
CommitcloudBookmarksFillerMode, ComparableRegex, DerivedDataConfig, HookBypass, HookConfig,
HookManagerParams, HookParams, InfinitepushNamespace, InfinitepushParams, LfsParams,
PushParams, PushrebaseFlags, PushrebaseParams, RepoClientKnobs, SegmentedChangelogConfig,
ServiceWriteRestrictions, SourceControlServiceMonitoring, SourceControlServiceParams,
StorageConfig, UnodeVersion, WireprotoLoggingConfig,
};
use mononoke_types::{MPath, PrefixTrie};
use regex::Regex;
use repos::{
RawBookmarkConfig, RawBundle2ReplayParams, RawCacheWarmupConfig, RawDerivedDataConfig,
RawHookConfig, RawHookManagerParams, RawInfinitepushParams, RawLfsParams, RawPushParams,
RawPushrebaseParams, RawRepoClientKnobs, RawSegmentedChangelogConfig,
RawBookmarkConfig, RawBundle2ReplayParams, RawCacheWarmupConfig, RawCommitcloudBookmarksFiller,
RawDerivedDataConfig, RawHookConfig, RawHookManagerParams, RawInfinitepushParams, RawLfsParams,
RawPushParams, RawPushrebaseParams, RawRepoClientKnobs, RawSegmentedChangelogConfig,
RawServiceWriteRestrictions, RawSourceControlServiceMonitoring, RawSourceControlServiceParams,
RawUnodeVersion, RawWireprotoLoggingConfig,
};
@ -267,6 +267,14 @@ impl Convert for RawInfinitepushParams {
hydrate_getbundle_response: self.hydrate_getbundle_response.unwrap_or(false),
populate_reverse_filler_queue: self.populate_reverse_filler_queue.unwrap_or(false),
commit_scribe_category: self.commit_scribe_category,
bookmarks_filler: match self.bookmarks_filler {
RawCommitcloudBookmarksFiller::DISABLED => CommitcloudBookmarksFillerMode::DISABLED,
RawCommitcloudBookmarksFiller::BACKFILL => CommitcloudBookmarksFillerMode::BACKFILL,
RawCommitcloudBookmarksFiller::FORWARDFILL => {
CommitcloudBookmarksFillerMode::FORWARDFILL
}
_ => return Err(anyhow!("unknown bookmarks filler operation mode!")),
},
})
}
}

View File

@ -975,6 +975,23 @@ impl InfinitepushNamespace {
}
}
/// Commit cloud bookmark filler operation mode for the repo.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CommitcloudBookmarksFillerMode {
/// No filling.
DISABLED = 0,
/// Backfill old entries.
BACKFILL = 1,
/// Fill the entries forward.
FORWARDFILL = 2,
}
impl Default for CommitcloudBookmarksFillerMode {
fn default() -> Self {
CommitcloudBookmarksFillerMode::DISABLED
}
}
/// Infinitepush configuration. Note that it is legal to not allow Infinitepush (server = false),
/// while still providing a namespace. Doing so will prevent regular pushes to the namespace, as
/// well as allow the creation of Infinitepush scratchbookmarks through e.g. replicating them from
@ -997,6 +1014,9 @@ pub struct InfinitepushParams {
/// Scribe category we log new commits to
pub commit_scribe_category: Option<String>,
/// Bookmarks filler operation mode
pub bookmarks_filler: CommitcloudBookmarksFillerMode,
}
impl Default for InfinitepushParams {
@ -1007,6 +1027,7 @@ impl Default for InfinitepushParams {
hydrate_getbundle_response: false,
populate_reverse_filler_queue: false,
commit_scribe_category: None,
bookmarks_filler: Default::default(),
}
}
}

View File

@ -12,7 +12,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::Error;
use anyhow::{Context, Error};
use blobrepo_factory::{BlobstoreOptions, Caching, ReadOnlyStorage};
pub use bookmarks::BookmarkName;
use cached_config::ConfigStore;
@ -87,7 +87,7 @@ impl Mononoke {
disabled_hooks: HashMap<String, HashSet<String>>,
) -> Result<Self, Error> {
let common_config = configs.common;
let repos = future::join_all(
let repos = future::try_join_all(
configs
.repos
.into_iter()
@ -112,14 +112,14 @@ impl Mononoke {
disabled_hooks,
)
.await
.expect("failed to initialize repo");
.with_context(|| format!("could not initialize repo '{}'", &name))?;
debug!(logger, "Initialized {}", &name);
(name, Arc::new(repo))
Ok::<_, Error>((name, Arc::new(repo)))
}
}
}),
)
.await
.await?
.into_iter()
.collect();
Ok(Self { repos })

View File

@ -674,7 +674,7 @@ impl RepoContext {
}
/// The configuration for the referenced repository.
pub(crate) fn config(&self) -> &RepoConfig {
pub fn config(&self) -> &RepoConfig {
&self.repo.config
}

View File

@ -7,13 +7,12 @@ license = "GPLv2+"
include = ["schemas/**/*.sql", "src/**/*.rs"]
[dependencies]
blobrepo = { path = "../blobrepo" }
blobrepo_hg = { path = "../blobrepo/blobrepo_hg" }
bookmarks = { path = "../bookmarks" }
cmdlib = { path = "../cmdlib" }
context = { path = "../server/context" }
mercurial_types = { path = "../mercurial/types" }
metaconfig_parser = { path = "../metaconfig/parser" }
metaconfig_types = { path = "../metaconfig/types" }
mononoke_api = { path = "../mononoke_api" }
scuba_ext = { path = "../common/scuba_ext" }
sql_construct = { path = "../common/sql_construct" }
sql_ext = { path = "../common/rust/sql_ext" }
@ -23,6 +22,7 @@ sql = { git = "https://github.com/facebookexperimental/rust-shed.git", branch =
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
ascii = "1.0"
async-trait = "0.1.29"
chrono = { version = "0.4", features = ["serde"] }
clap = "2.33"
futures = { version = "0.3.5", features = ["async-await", "compat"] }

View File

@ -9,19 +9,17 @@
#![feature(async_closure)]
#![deny(warnings)]
use anyhow::{format_err, Error, Result};
use bookmarks::BookmarkName;
use anyhow::{Error, Result};
use clap::{Arg, ArgMatches};
use cloned::cloned;
use cmdlib::{args, helpers::block_execute};
use fbinit::FacebookInit;
use futures::{compat::Future01CompatExt, future, stream::StreamExt};
use mercurial_types::HgChangesetId;
use metaconfig_types::RepoConfig;
use futures::{future, stream::StreamExt};
use metaconfig_parser::load_repo_configs;
use metaconfig_types::CommitcloudBookmarksFillerMode;
use mononoke_api::Mononoke;
use scuba_ext::ScubaSampleBuilder;
use sql_construct::{facebook::FbSqlConstruct, SqlConstruct};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::delay_for;
@ -86,6 +84,8 @@ fn main(fb: FacebookInit) -> Result<()> {
let app = args::MononokeApp::new("Replay bookmarks from Mercurial into Mononoke")
.with_advanced_args_hidden()
.with_fb303_args()
.with_test_args()
.with_all_repos()
.build()
.arg(
Arg::with_name(ARG_CTX_SCUBA_TABLE)
@ -152,14 +152,51 @@ fn main(fb: FacebookInit) -> Result<()> {
let matches = app.get_matches();
let readonly_storage = args::parse_readonly_storage(&matches);
args::init_cachelib(fb, &matches, None);
let caching = args::init_cachelib(fb, &matches, None);
let logger = args::init_logging(fb, &matches);
let queue = open_sql(fb, &matches, readonly_storage.0);
let (repo_name, RepoConfig { infinitepush, .. }) = args::get_config(fb, &matches)?;
let blobrepo = args::open_repo(fb, &logger, &matches);
let config_path = matches
.value_of("mononoke-config-path")
.expect("must set config path");
let repo_configs = load_repo_configs(fb, config_path)?;
let backfill = Backfill(matches.is_present(ARG_BACKFILL));
let mode = if backfill.0 {
CommitcloudBookmarksFillerMode::BACKFILL
} else {
CommitcloudBookmarksFillerMode::FORWARDFILL
};
let repo_names: Vec<_> = repo_configs
.repos
.iter()
.filter_map(|(name, config)| {
if config.infinitepush.bookmarks_filler == mode {
Some(name.clone())
} else {
None
}
})
.collect();
let config_store = args::maybe_init_config_store(fb, &logger, &matches)
.expect("failed to instantiate ConfigStore");
let mononoke = Mononoke::new(
fb,
logger.clone(),
repo_configs,
args::parse_mysql_options(&matches),
caching,
args::parse_readonly_storage(&matches),
args::parse_blobstore_options(&matches),
config_store,
args::parse_disabled_hooks_with_repo_prefix(&matches, &logger)?,
);
let buffer_size = BufferSize(args::get_usize(
&matches,
ARG_BUFFER_SIZE,
@ -173,47 +210,24 @@ fn main(fb: FacebookInit) -> Result<()> {
let maybe_max_iterations = args::get_usize_opt(&matches, ARG_MAX_ITERATIONS);
let maybe_delay = args::get_u64_opt(&matches, ARG_DELAY);
let mut status_scuba = match matches.value_of(ARG_STATUS_SCUBA_TABLE) {
let status_scuba = match matches.value_of(ARG_STATUS_SCUBA_TABLE) {
Some(table) => ScubaSampleBuilder::new(fb, table),
None => ScubaSampleBuilder::with_discard(),
};
status_scuba
.add_common_server_data()
.add("reponame", repo_name.as_ref());
let infinitepush_namespace = infinitepush.namespace.ok_or(format_err!(
"Infinitepush is not enabled in repository {:?}",
repo_name
))?;
let main = async {
let (queue, blobrepo) = future::try_join(queue, blobrepo.compat()).await?;
let infinitepush_namespace = Arc::new(infinitepush_namespace);
let do_replay = {
cloned!(logger);
move |name: BookmarkName, hg_cs_id: HgChangesetId| {
sync_bookmark::sync_bookmark(
fb,
blobrepo.clone(),
logger.clone(),
infinitepush_namespace.clone(),
name,
hg_cs_id,
)
}
};
let (queue, mononoke) = future::try_join(queue, mononoke).await?;
let replay_fn = &sync_bookmark::SyncBookmark::new(&fb, &mononoke, &logger);
let stream = replay_stream::process_replay_stream(
&queue,
repo_name,
&repo_names,
backfill,
buffer_size,
queue_limit,
status_scuba,
logger.clone(),
do_replay,
&logger,
replay_fn,
);
let mut stream = match maybe_max_iterations {

View File

@ -6,11 +6,11 @@
*/
use anyhow::{format_err, Error, Result};
use async_trait::async_trait;
use bookmarks::BookmarkName;
use chrono::Local;
use cloned::cloned;
use futures::stream::{self, Stream, StreamExt};
use futures::Future;
use mercurial_types::HgChangesetId;
use scuba_ext::ScubaSampleBuilder;
use slog::{info, Logger};
@ -19,7 +19,7 @@ use std::fmt::Debug;
use crate::errors::ErrorKind;
use crate::sql_replay_bookmarks_queue::{
Backfill, BookmarkBatch, Entry, QueueLimit, SqlReplayBookmarksQueue,
Backfill, BookmarkBatch, Entry, QueueLimit, RepoName, SqlReplayBookmarksQueue,
};
define_stats! {
@ -32,62 +32,70 @@ type History = Vec<Result<(), ErrorKind>>;
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug, Hash)]
pub struct BufferSize(pub usize);
async fn replay_one_bookmark<F, R>(
#[async_trait]
pub trait ReplayFn: Copy {
async fn replay(
&self,
repo: RepoName,
bookmark_name: BookmarkName,
hg_cs_id: HgChangesetId,
) -> Result<(), ErrorKind>;
}
async fn replay_one_bookmark<'a, R: ReplayFn>(
repo_name: RepoName,
bookmark: BookmarkName,
mut entries: Vec<Entry>,
do_replay: R,
) -> (BookmarkName, Result<Vec<Entry>>, History)
where
F: Future<Output = Result<(), ErrorKind>>,
R: Fn(BookmarkName, HgChangesetId) -> F + Sized + Clone,
{
replay: R,
) -> (RepoName, BookmarkName, Result<Vec<Entry>>, History) {
let mut history = vec![];
while let Some((_id, hg_cs_id, _timestamp)) = entries.last() {
let ret = do_replay(bookmark.clone(), hg_cs_id.clone()).await;
let ret = replay
.replay(repo_name.clone(), bookmark.clone(), hg_cs_id.clone())
.await;
let ok = ret.is_ok();
history.push(ret);
if ok {
return (bookmark, Ok(entries), history);
return (repo_name, bookmark, Ok(entries), history);
}
entries.pop();
}
let e = format_err!("No valid changeset to replay for bookmark: {:?}", bookmark);
(bookmark, Err(e), history)
(repo_name, bookmark, Err(e), history)
}
async fn process_replay_single_batch<F, R>(
queue: &SqlReplayBookmarksQueue,
repo_name: String,
async fn process_replay_single_batch<'a, R: ReplayFn>(
queue: &'a SqlReplayBookmarksQueue,
enabled_repo_names: &'a [String],
backfill: Backfill,
buffer_size: BufferSize,
queue_limit: QueueLimit,
status_scuba: ScubaSampleBuilder,
logger: Logger,
do_replay: R,
) -> Result<(), Error>
where
F: Future<Output = Result<(), ErrorKind>>,
R: Fn(BookmarkName, HgChangesetId) -> F + Sized + Clone,
{
let batch = queue.fetch_batch(&repo_name, backfill, queue_limit).await?;
logger: &Logger,
replay: R,
) -> Result<(), Error> {
let batch = queue
.fetch_batch(enabled_repo_names, backfill, queue_limit)
.await?;
STATS::batch_loaded.add_value(batch.len() as i64);
info!(logger, "Processing batch: {:?} entries", batch.len());
stream::iter(batch.into_iter())
.for_each_concurrent(buffer_size.0, {
cloned!(logger, mut status_scuba, queue, do_replay);
move |(bookmark, BookmarkBatch { dt, entries })| {
cloned!(logger, mut status_scuba, queue, do_replay);
cloned!(logger, mut status_scuba, queue);
move |((repo, bookmark), BookmarkBatch { dt, entries })| {
cloned!(logger, mut status_scuba, queue);
async move {
cloned!(queue, do_replay);
let (bookmark, outcome, history) =
replay_one_bookmark(bookmark, entries, do_replay).await;
cloned!(queue);
let (repo_name, bookmark, outcome, history) =
replay_one_bookmark(repo, bookmark, entries, replay).await;
info!(
logger,
"Outcome: bookmark: {:?}: success: {:?}",
"Outcome: repo: {:?}: bookmark: {:?}: success: {:?}",
repo_name,
bookmark,
outcome.is_ok()
);
@ -98,6 +106,7 @@ where
.num_milliseconds();
status_scuba
.add("reponame", repo_name)
.add("bookmark", bookmark.into_string())
.add("history", format!("{:?}", history))
.add("success", outcome.is_ok())
@ -116,32 +125,28 @@ where
Ok(())
}
pub fn process_replay_stream<'a, F, R>(
pub fn process_replay_stream<'a, R: ReplayFn + 'a>(
queue: &'a SqlReplayBookmarksQueue,
repo_name: String,
enabled_repo_names: &'a [String],
backfill: Backfill,
buffer_size: BufferSize,
queue_limit: QueueLimit,
status_scuba: ScubaSampleBuilder,
logger: Logger,
do_replay: R,
) -> impl Stream<Item = Result<(), Error>> + 'a
where
F: Future<Output = Result<(), ErrorKind>> + 'a,
R: Fn(BookmarkName, HgChangesetId) -> F + Sized + Clone + 'a,
{
logger: &'a Logger,
replay: R,
) -> impl Stream<Item = Result<(), Error>> + 'a {
stream::repeat(()).then({
move |_| {
cloned!(logger, status_scuba, repo_name, do_replay);
cloned!(status_scuba);
process_replay_single_batch(
queue,
repo_name,
enabled_repo_names,
backfill,
buffer_size,
queue_limit,
status_scuba,
logger,
do_replay,
replay,
)
}
})
@ -159,22 +164,52 @@ mod test {
const BUFFER_SIZE: BufferSize = BufferSize(10);
const QUEUE_LIMIT: QueueLimit = QueueLimit(10);
async fn replay_success(_name: BookmarkName, _cs_id: HgChangesetId) -> Result<(), ErrorKind> {
Ok(())
}
async fn replay_fail(_name: BookmarkName, _cs_id: HgChangesetId) -> Result<(), ErrorKind> {
Err(ErrorKind::BlobRepoError(Error::msg("err")))
}
async fn replay_twos(_name: BookmarkName, cs_id: HgChangesetId) -> Result<(), ErrorKind> {
if cs_id == nodehash::TWOS_CSID {
#[derive(Copy, Clone)]
struct ReplaySuccess;
#[async_trait]
impl ReplayFn for ReplaySuccess {
async fn replay(
&self,
_repo: RepoName,
_name: BookmarkName,
_cs_id: HgChangesetId,
) -> Result<(), ErrorKind> {
Ok(())
} else {
}
}
#[derive(Copy, Clone)]
struct ReplayFail;
#[async_trait]
impl ReplayFn for ReplayFail {
async fn replay(
&self,
_repo: RepoName,
_name: BookmarkName,
_cs_id: HgChangesetId,
) -> Result<(), ErrorKind> {
Err(ErrorKind::BlobRepoError(Error::msg("err")))
}
}
#[derive(Copy, Clone)]
struct ReplayTwos;
#[async_trait]
impl ReplayFn for ReplayTwos {
async fn replay(
&self,
_repo: RepoName,
_name: BookmarkName,
cs_id: HgChangesetId,
) -> Result<(), ErrorKind> {
if cs_id == nodehash::TWOS_CSID {
Ok(())
} else {
Err(ErrorKind::BlobRepoError(Error::msg("err")))
}
}
}
fn scuba() -> ScubaSampleBuilder {
ScubaSampleBuilder::with_discard()
}
@ -213,20 +248,22 @@ mod test {
process_replay_stream(
&queue,
repo.clone(),
vec![repo.clone()].as_slice(),
NOT_BACKFILL,
BUFFER_SIZE,
QUEUE_LIMIT,
scuba(),
logger(),
replay_success,
&logger(),
ReplaySuccess,
)
.boxed()
.next()
.await
.unwrap()?;
let real = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
assert_eq!(real, hashmap! {});
Ok(())
@ -262,13 +299,13 @@ mod test {
process_replay_stream(
&queue,
repo.clone(),
vec![repo.clone()].as_slice(),
NOT_BACKFILL,
BUFFER_SIZE,
QUEUE_LIMIT,
scuba(),
logger(),
replay_fail,
&logger(),
ReplayFail,
)
.boxed()
.next()
@ -276,10 +313,12 @@ mod test {
.unwrap()?;
let expected = hashmap! {
book1 => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
book2 => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
(repo.clone(), book1) => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
(repo.clone(), book2) => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
};
let real = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
assert_eq!(real, expected);
Ok(())
@ -322,13 +361,13 @@ mod test {
process_replay_stream(
&queue,
repo.clone(),
vec![repo.clone()].as_slice(),
NOT_BACKFILL,
BUFFER_SIZE,
QUEUE_LIMIT,
scuba(),
logger(),
replay_twos,
&logger(),
ReplayTwos,
)
.boxed()
.next()
@ -337,10 +376,12 @@ mod test {
let expected = hashmap! {
book1 => batch(t0(), vec![(3 as i64, nodehash::THREES_CSID, t0())]),
(repo.clone(), book1) => batch(t0(), vec![(3 as i64, nodehash::THREES_CSID, t0())]),
};
let real = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
assert_eq!(real, expected);
Ok(())

View File

@ -22,7 +22,8 @@ pub struct BookmarkBatch {
pub dt: NaiveDateTime,
pub entries: Vec<Entry>,
}
pub type Batch = HashMap<BookmarkName, BookmarkBatch>;
pub type RepoName = String;
pub type Batch = HashMap<(RepoName, BookmarkName), BookmarkBatch>;
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Debug, Hash)]
pub struct QueueLimit(pub usize);
@ -37,11 +38,11 @@ queries! {
// this, and wouldn't want to have a bad batch of bookmarks at the bottom of the queue block
// the rest. If that becomes a problem, we can a) shard on BookmarkName, and b) update this
// code to have a limit, as long as we coalesce bookmarks we failed to sync in some way.
read FetchQueue(repo_name: String, backfill: i64, limit: usize) -> (i64, BookmarkName, String, String) {
"SELECT id, bookmark, node, created_at
read FetchQueue(backfill: i64, limit: usize, >list enabled_repo_names: String) -> (i64, BookmarkName, String, String, RepoName) {
"SELECT id, bookmark, node, created_at, reponame
FROM replaybookmarksqueue
WHERE reponame = {repo_name} AND synced = 0 AND backfill = {backfill}
ORDER BY id ASC
WHERE reponame IN {enabled_repo_names} AND synced = 0 AND backfill = {backfill}
ORDER BY reponame, id ASC
LIMIT {limit}"
}
@ -74,20 +75,20 @@ impl SqlConstruct for SqlReplayBookmarksQueue {
impl SqlReplayBookmarksQueue {
pub async fn fetch_batch(
&self,
repo_name: &String,
enabled_repo_names: &[String],
backfill: Backfill,
limit: QueueLimit,
) -> Result<Batch, Error> {
let rows = FetchQueue::query(
&self.read_master_connection,
repo_name,
&(if backfill.0 { 1 } else { 0 }),
&limit.0,
enabled_repo_names,
)
.compat()
.await?;
let mut r = HashMap::new();
for (id, bookmark, hex_cs_id, dt) in rows.into_iter() {
for (id, bookmark, hex_cs_id, dt, repo_name) in rows.into_iter() {
let hex_cs_id = AsciiString::from_ascii(hex_cs_id)?;
let cs_id = HgChangesetId::from_ascii_str(&hex_cs_id)?;
@ -98,7 +99,7 @@ impl SqlReplayBookmarksQueue {
// server's time (and this all appears to work properly), Naive dates should be OK.
let dt = NaiveDateTime::parse_from_str(&dt, DATE_TIME_FORMAT)?;
r.entry(bookmark)
r.entry((repo_name, bookmark))
.or_insert_with(|| BookmarkBatch {
dt: dt.clone(),
entries: vec![],
@ -221,11 +222,13 @@ pub(crate) mod test {
];
insert_entries(&queue, &entries).await?;
let real = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
let expected = hashmap! {
book1 => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
book2 => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
(repo.clone(), book1) => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
(repo, book2) => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
};
assert_eq!(real, expected);
@ -285,7 +288,9 @@ pub(crate) mod test {
];
insert_entries(&queue, &entries).await?;
let real = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
let book1_entries = vec![
(1 as i64, nodehash::ONES_CSID, t0()),
@ -295,8 +300,8 @@ pub(crate) mod test {
];
let expected = hashmap! {
book1 => batch(t0(), book1_entries),
book2 => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
(repo.clone(), book1) => batch(t0(), book1_entries),
(repo, book2) => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
};
assert_eq!(real, expected);
@ -355,11 +360,13 @@ pub(crate) mod test {
queue.release_entries(&release).await?;
let real = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
let expected = hashmap! {
book1 => batch(t0(), vec![(3 as i64, nodehash::THREES_CSID, t0())]),
book2 => batch(t0(), vec![(4 as i64, nodehash::FOURS_CSID, t0())]),
(repo.clone(), book1) => batch(t0(), vec![(3 as i64, nodehash::THREES_CSID, t0())]),
(repo, book2) => batch(t0(), vec![(4 as i64, nodehash::FOURS_CSID, t0())]),
};
assert_eq!(real, expected);
@ -396,10 +403,12 @@ pub(crate) mod test {
];
insert_entries(&queue, &entries).await?;
let real = queue.fetch_batch(&repo1, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo1.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
let expected = hashmap! {
book1 => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
(repo1.clone(), book1) => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
};
assert_eq!(real, expected);
@ -434,14 +443,16 @@ pub(crate) mod test {
];
insert_entries(&queue, &entries).await?;
let real = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
let entries = vec![
(1 as i64, nodehash::ONES_CSID, t0()),
(2 as i64, nodehash::TWOS_CSID, t1()),
];
let expected = hashmap! { book1 => batch(t0(), entries) };
let expected = hashmap! { (repo.clone(), book1) => batch(t0(), entries) };
assert_eq!(real, expected);
@ -493,12 +504,12 @@ pub(crate) mod test {
insert_entries(&queue, &entries).await?;
let real = queue
.fetch_batch(&repo, NOT_BACKFILL, QueueLimit(2))
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QueueLimit(2))
.await?;
let expected = hashmap! {
book1 => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
book2 => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
(repo.clone(), book1) => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())]),
(repo.clone(), book2) => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())]),
};
assert_eq!(real, expected);
@ -533,15 +544,19 @@ pub(crate) mod test {
];
insert_entries(&queue, &entries).await?;
let real_backfill = queue.fetch_batch(&repo, BACKFILL, QUEUE_LIMIT).await?;
let real_not_backfill = queue.fetch_batch(&repo, NOT_BACKFILL, QUEUE_LIMIT).await?;
let real_backfill = queue
.fetch_batch(vec![repo.clone()].as_slice(), BACKFILL, QUEUE_LIMIT)
.await?;
let real_not_backfill = queue
.fetch_batch(vec![repo.clone()].as_slice(), NOT_BACKFILL, QUEUE_LIMIT)
.await?;
let expected_backfill = hashmap! {
book1.clone() => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())])
(repo.to_string(), book1.clone()) => batch(t0(), vec![(2 as i64, nodehash::TWOS_CSID, t0())])
};
let expected_not_backfill = hashmap! {
book1.clone() => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())])
(repo.to_string(), book1.clone()) => batch(t0(), vec![(1 as i64, nodehash::ONES_CSID, t0())])
};
assert_eq!(real_backfill, expected_backfill);

View File

@ -5,19 +5,17 @@
* GNU General Public License version 2.
*/
use blobrepo::BlobRepo;
use blobrepo_hg::BlobRepoHg;
use crate::replay_stream::ReplayFn;
use anyhow::format_err;
use async_trait::async_trait;
use bookmarks::BookmarkName;
use cloned::cloned;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::compat::Future01CompatExt;
use futures::try_join;
use mercurial_types::HgChangesetId;
use metaconfig_types::InfinitepushNamespace;
use mononoke_api::{BookmarkFreshness, ChangesetSpecifier, CoreContext, Mononoke};
use slog::{info, Logger};
use stats::prelude::*;
use std::sync::Arc;
use crate::errors::ErrorKind;
@ -30,68 +28,108 @@ define_stats! {
total: timeseries(Rate, Sum),
}
pub async fn sync_bookmark(
fb: FacebookInit,
blobrepo: BlobRepo,
logger: Logger,
infinitepush_namespace: Arc<InfinitepushNamespace>,
name: BookmarkName,
hg_cs_id: HgChangesetId,
) -> Result<(), ErrorKind> {
if !infinitepush_namespace.matches_bookmark(&name) {
return Err(ErrorKind::InvalidBookmarkForNamespace(name.clone()));
}
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let (maybe_new_cs_id, maybe_old_cs_id) = try_join!(
blobrepo.get_bonsai_from_hg(ctx.clone(), hg_cs_id).compat(),
blobrepo.get_bonsai_bookmark(ctx.clone(), &name).compat()
)
.map_err(ErrorKind::BlobRepoError)?;
let res = async {
let new_cs_id = match maybe_new_cs_id {
Some(new_cs_id) => new_cs_id,
None => return Err(ErrorKind::HgChangesetDoesNotExist(hg_cs_id)),
};
cloned!(blobrepo, ctx, logger);
info!(
logger,
"Updating bookmark {:?}: {:?} -> {:?}",
name.clone(),
maybe_old_cs_id,
new_cs_id
);
let mut txn = blobrepo.update_bookmark_transaction(ctx);
match maybe_old_cs_id {
Some(old_cs_id) => {
STATS::update.add_value(1);
txn.update_scratch(&name, new_cs_id, old_cs_id)
}
None => {
STATS::create.add_value(1);
txn.create_scratch(&name, new_cs_id)
}
}
.map_err(ErrorKind::BlobRepoError)?;
let success = txn.commit().await.map_err(ErrorKind::BlobRepoError)?;
if !success {
return Err(ErrorKind::BookmarkTransactionFailed);
}
Ok(())
}
.await;
STATS::total.add_value(1);
if res.is_ok() {
STATS::success.add_value(1);
} else {
STATS::failure.add_value(1);
}
res
pub struct SyncBookmark<'a, 'b, 'c> {
fb: &'a FacebookInit,
mononoke: &'b Mononoke,
logger: &'c Logger,
}
impl<'a, 'b, 'c> SyncBookmark<'a, 'b, 'c> {
pub fn new(fb: &'a FacebookInit, mononoke: &'b Mononoke, logger: &'c Logger) -> Self {
SyncBookmark {
fb,
mononoke,
logger,
}
}
}
#[async_trait]
impl<'a, 'b, 'c> ReplayFn for &SyncBookmark<'a, 'b, 'c> {
async fn replay(
&self,
repo_name: String,
bookmark_name: BookmarkName,
hg_cs_id: HgChangesetId,
) -> Result<(), ErrorKind> {
let ctx = CoreContext::new_with_logger(self.fb.clone(), self.logger.clone());
let repo = self
.mononoke
.repo(ctx.clone(), &repo_name)
.await
.map_err(|e| ErrorKind::BlobRepoError(e.into()))?
.ok_or_else(|| format_err!("repo doesn't exist: {:?}", repo_name))
.map_err(ErrorKind::BlobRepoError)?;
let infinitepush_namespace = repo
.config()
.infinitepush
.namespace
.as_ref()
.ok_or_else(|| format_err!("Infinitepush is not enabled in repository {:?}", repo_name))
.map_err(ErrorKind::BlobRepoError)?;
if !infinitepush_namespace.matches_bookmark(&bookmark_name) {
return Err(ErrorKind::InvalidBookmarkForNamespace(
bookmark_name.clone(),
));
}
let (maybe_new_cs_id, maybe_old_cs) = try_join!(
repo.resolve_specifier(ChangesetSpecifier::Hg(hg_cs_id)),
repo.resolve_bookmark(bookmark_name.as_str(), BookmarkFreshness::MostRecent)
)
.map_err(|e| ErrorKind::BlobRepoError(e.into()))?;
let maybe_old_cs_id = maybe_old_cs.map(|cs| cs.id());
let res = async {
let new_cs_id = match maybe_new_cs_id {
Some(new_cs_id) => new_cs_id,
None => return Err(ErrorKind::HgChangesetDoesNotExist(hg_cs_id)),
};
cloned!(ctx, self.logger);
info!(
logger,
"Updating repo: {:?} {:?}: {:?} -> {:?}",
repo_name.clone(),
bookmark_name.clone(),
maybe_old_cs_id,
new_cs_id
);
let blobrepo = repo.blob_repo();
let mut txn = blobrepo.update_bookmark_transaction(ctx);
match maybe_old_cs_id {
Some(old_cs_id) => {
STATS::update.add_value(1);
txn.update_scratch(&bookmark_name, new_cs_id, old_cs_id)
}
None => {
STATS::create.add_value(1);
txn.create_scratch(&bookmark_name, new_cs_id)
}
}
.map_err(ErrorKind::BlobRepoError)?;
let success = txn.commit().await.map_err(ErrorKind::BlobRepoError)?;
if !success {
return Err(ErrorKind::BookmarkTransactionFailed);
}
Ok(())
}
.await;
STATS::total.add_value(1);
if res.is_ok() {
STATS::success.add_value(1);
} else {
STATS::failure.add_value(1);
}
res
}
}

View File

@ -241,9 +241,10 @@ function mononoke_bookmarks_filler {
GLOG_minloglevel=5 "$MONONOKE_BOOKMARKS_FILLER" \
"${COMMON_ARGS[@]}" \
--repo-id $REPOID \
--mononoke-config-path "$TESTTMP"/mononoke-config \
"$@" "$sql_source" "$sql_name"
--test-instance \
--local-configerator-path="$TESTTMP/configerator" \
"$@" "$sql_source" "$sql_name" 2>&1 | grep mononoke_commitcloud_bookmarks_filler
}
function create_mutable_counters_sqlite3_db {
@ -932,6 +933,7 @@ function write_infinitepush_config {
allow_writes = ${INFINITEPUSH_ALLOW_WRITES:-true}
hydrate_getbundle_response = ${INFINITEPUSH_HYDRATE_GETBUNDLE_RESPONSE:-false}
populate_reverse_filler_queue = ${INFINITEPUSH_POPULATE_REVERSE_FILLER_QUEUE:-false}
bookmarks_filler = 2
${namespace}
CONFIG
fi

View File

@ -9,6 +9,7 @@
setup configuration
$ INFINITEPUSH_NAMESPACE_REGEX='^scratch/.+$' setup_common_config
$ setup_configerator_configs
$ cd $TESTTMP
setup repo
@ -42,16 +43,14 @@ Create the replaybookmarks table
Run the filler with no work
$ mononoke_bookmarks_filler --max-iterations 1
* using repo "repo" repoid RepositoryId(0) (glob)
* Processing batch: 0 entries (glob)
Run the filler with valid work (create)
$ insert_replaybookmarks_entry repo "$BOOK_OK" "$NODE1"
$ mononoke_bookmarks_filler --max-iterations 1
* using repo "repo" repoid RepositoryId(0) (glob)
* Processing batch: 1 entries (glob)
* Updating bookmark BookmarkName { bookmark: "scratch/123" }: None -> ChangesetId(Blake2(d2ebff6a6aa240a684a4623afd028afd208d3f81f06f0e525b2fd11eb6ba47ac)) (glob)
* Outcome: bookmark: BookmarkName { bookmark: "scratch/123" }: success: true (glob)
* Updating repo: "repo" BookmarkName { bookmark: "scratch/123" }: None -> ChangesetId(Blake2(d2ebff6a6aa240a684a4623afd028afd208d3f81f06f0e525b2fd11eb6ba47ac)) (glob)
* Outcome: repo: "repo": bookmark: BookmarkName { bookmark: "scratch/123" }: success: true (glob)
$ mononoke_admin bookmarks get "$BOOK_OK"
* using repo "repo" repoid RepositoryId(0) (glob)
(HG) cb9a30b04b9df854f40d21fdac525408f3bd6c78
@ -59,10 +58,9 @@ Run the filler with valid work (create)
Run the filler with valid work (update)
$ insert_replaybookmarks_entry repo "$BOOK_OK" "$NODE2"
$ mononoke_bookmarks_filler --max-iterations 1
* using repo "repo" repoid RepositoryId(0) (glob)
* Processing batch: 1 entries (glob)
* Updating bookmark BookmarkName { bookmark: "scratch/123" }: Some(ChangesetId(Blake2(d2ebff6a6aa240a684a4623afd028afd208d3f81f06f0e525b2fd11eb6ba47ac))) -> ChangesetId(Blake2(c97399683492face21a2dcc6c422e117ec67365b87ecb53c4152c0052945bdfe)) (glob)
* Outcome: bookmark: BookmarkName { bookmark: "scratch/123" }: success: true (glob)
* Updating repo: "repo" BookmarkName { bookmark: "scratch/123" }: Some(ChangesetId(Blake2(d2ebff6a6aa240a684a4623afd028afd208d3f81f06f0e525b2fd11eb6ba47ac))) -> ChangesetId(Blake2(c97399683492face21a2dcc6c422e117ec67365b87ecb53c4152c0052945bdfe)) (glob)
* Outcome: repo: "repo": bookmark: BookmarkName { bookmark: "scratch/123" }: success: true (glob)
$ mononoke_admin bookmarks get "$BOOK_OK"
* using repo "repo" repoid RepositoryId(0) (glob)
(HG) 86383633ba7ff1d50a8d2990f0b63d2401110c26
@ -70,6 +68,5 @@ Run the filler with valid work (update)
Run the filler with valid work (bad bookmark)
$ insert_replaybookmarks_entry repo "$BOOK_BAD" "$NODE2"
$ mononoke_bookmarks_filler --max-iterations 1
* using repo "repo" repoid RepositoryId(0) (glob)
* Processing batch: 1 entries (glob)
* Outcome: bookmark: BookmarkName { bookmark: "master" }: success: false (glob)
* Outcome: repo: "repo": bookmark: BookmarkName { bookmark: "master" }: success: false (glob)