mononoke: prepare walker blobstore for multiple repo jobs

Summary: Allows the walker blobstore code to be used by more than one blobrepo.  This is a step to reduce the number of jobs needed to scrub small repos.

Reviewed By: StanislavGlebik

Differential Revision: D25422937

fbshipit-source-id: e2d11239f172f50680bb6e10dd60026c9e6c3c3d
This commit is contained in:
Alex Hornby 2020-12-23 02:05:59 -08:00 committed by Facebook GitHub Bot
parent a7658c112e
commit 9985458fa1
3 changed files with 58 additions and 48 deletions

View File

@ -33,7 +33,6 @@ mononoke_types = { path = "../mononoke_types" }
multiplexedblob = { path = "../blobstore/multiplexedblob" }
newfilenodes = { path = "../newfilenodes" }
phases = { path = "../phases" }
prefixblob = { path = "../blobstore/prefixblob" }
samplingblob = { path = "../blobstore/samplingblob" }
scuba_ext = { path = "../common/scuba_ext" }
skeleton_manifest = { path = "../derived_data/skeleton_manifest" }
@ -61,7 +60,6 @@ filetime = "0.2.9"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
hex = "0.4"
inlinable_string = "0.1"
internment = {version = "0.4.1", features = ["serde"]}
itertools = "0.8"
once_cell = "1.4"

View File

@ -5,7 +5,7 @@
* GNU General Public License version 2.
*/
use crate::validate::{CHECK_FAIL, CHECK_TYPE, NODE_KEY, REPO};
use crate::validate::{CHECK_FAIL, CHECK_TYPE, ERROR_MSG, NODE_KEY, REPO};
use anyhow::{format_err, Error};
use blobstore::{Blobstore, BlobstoreMetadata};
@ -15,17 +15,16 @@ use blobstore_factory::{
use cached_config::ConfigStore;
use context::CoreContext;
use fbinit::FacebookInit;
use inlinable_string::InlinableString;
use metaconfig_types::{BlobConfig, BlobstoreId, ScrubAction};
use mononoke_types::{repo::REPO_PREFIX_REGEX, RepositoryId};
use multiplexedblob::{LoggingScrubHandler, ScrubHandler};
use prefixblob::PrefixBlobstore;
use samplingblob::{SamplingBlobstore, SamplingHandler};
use scuba::value::{NullScubaValue, ScubaValue};
use scuba_ext::MononokeScubaSampleBuilder;
use slog::Logger;
use sql_ext::facebook::MysqlOptions;
use stats::prelude::*;
use std::{convert::From, sync::Arc};
use std::{collections::HashMap, convert::From, str::FromStr, sync::Arc};
define_stats! {
prefix = "mononoke.walker";
@ -38,7 +37,7 @@ pub const BLOBSTORE_ID: &'static str = "blobstore_id";
pub struct StatsScrubHandler {
scuba: MononokeScubaSampleBuilder,
subcommand_stats_key: &'static str,
repo_stats_key: String,
repo_id_to_name: HashMap<RepositoryId, String>,
inner: LoggingScrubHandler,
}
@ -47,17 +46,24 @@ impl StatsScrubHandler {
quiet: bool,
scuba: MononokeScubaSampleBuilder,
subcommand_stats_key: &'static str,
repo_stats_key: String,
repo_id_to_name: HashMap<RepositoryId, String>,
) -> Self {
Self {
scuba,
subcommand_stats_key,
repo_stats_key,
inner: LoggingScrubHandler::new(quiet),
repo_id_to_name,
}
}
}
pub fn get_repo_id_from_key(key: &str) -> Result<Option<RepositoryId>, Error> {
REPO_PREFIX_REGEX
.captures(&key)
.and_then(|m| m.get(1).map(|m| RepositoryId::from_str(m.as_str())))
.transpose()
}
impl ScrubHandler for StatsScrubHandler {
fn on_repair(
&self,
@ -75,10 +81,41 @@ impl ScrubHandler for StatsScrubHandler {
None => ScubaValue::Null(NullScubaValue::Int),
};
self.scuba
.clone()
// If we start to run in multi-repo mode this will need to be prefix aware instead
.add(REPO, self.repo_stats_key.clone())
let mut scuba = self.scuba.clone();
let repo_id = get_repo_id_from_key(key);
match repo_id {
Ok(Some(repo_id)) => {
if let Some(repo_name) = self.repo_id_to_name.get(&repo_id) {
scuba.add(REPO, repo_name.clone());
if is_repaired {
STATS::scrub_repaired.add_value(
1,
(
self.subcommand_stats_key,
blobstore_id.to_string(),
repo_name.clone(),
),
);
} else {
STATS::scrub_repair_required.add_value(
1,
(
self.subcommand_stats_key,
blobstore_id.to_string(),
repo_name.clone(),
),
);
}
}
}
Ok(_) => {}
Err(e) => {
scuba.add(ERROR_MSG, format!("{:?}", e));
}
}
scuba
.add(BLOBSTORE_ID, blobstore_id)
// TODO parse out NodeType from string key prefix if we can. Or better, make blobstore keys typed?
.add(NODE_KEY, key)
@ -87,25 +124,6 @@ impl ScrubHandler for StatsScrubHandler {
.add("session", ctx.session().metadata().session_id().to_string())
.add("ctime", ctime)
.log();
if is_repaired {
STATS::scrub_repaired.add_value(
1,
(
self.subcommand_stats_key,
blobstore_id.to_string(),
self.repo_stats_key.clone(),
),
);
} else {
STATS::scrub_repair_required.add_value(
1,
(
self.subcommand_stats_key,
blobstore_id.to_string(),
self.repo_stats_key.clone(),
),
);
}
}
}
@ -144,14 +162,12 @@ pub async fn open_blobstore(
mysql_options: MysqlOptions,
blob_config: BlobConfig,
inner_blobstore_id: Option<u64>,
// TODO(ahornby) take multiple prefix for when scrubbing multiple repos
prefix: Option<String>,
readonly_storage: ReadOnlyStorage,
scrub_action: Option<ScrubAction>,
blobstore_sampler: Option<Arc<dyn SamplingHandler>>,
scuba_builder: MononokeScubaSampleBuilder,
walk_stats_key: &'static str,
repo_stats_key: String,
repo_id_to_name: HashMap<RepositoryId, String>,
blobstore_options: BlobstoreOptions,
logger: Logger,
config_store: &ConfigStore,
@ -163,7 +179,7 @@ pub async fn open_blobstore(
false,
scuba_builder.clone(),
walk_stats_key,
repo_stats_key.clone(),
repo_id_to_name.clone(),
)) as Arc<dyn ScrubHandler>
});
@ -183,9 +199,11 @@ pub async fn open_blobstore(
// Make sure the repair stats are set to zero for each store.
// Without this the new stats only show up when a repair is needed (i.e. as they get incremented),
// which makes them harder to monitor on (no datapoints rather than a zero datapoint at start).
for s in &[STATS::scrub_repaired, STATS::scrub_repair_required] {
for (id, _ty, _config) in &blobstores {
s.add_value(0, (walk_stats_key, id.to_string(), repo_stats_key.clone()));
for name in repo_id_to_name.values() {
for s in &[STATS::scrub_repaired, STATS::scrub_repair_required] {
for (id, _ty, _config) in &blobstores {
s.add_value(0, (walk_stats_key, id.to_string(), name.clone()));
}
}
}
@ -257,12 +275,5 @@ pub async fn open_blobstore(
None => Arc::new(blobstore) as Arc<dyn blobstore::Blobstore>,
};
if let Some(prefix) = prefix {
return Ok(Arc::new(PrefixBlobstore::new(
blobstore,
InlinableString::from(prefix),
)));
}
Ok(blobstore)
}

View File

@ -994,19 +994,20 @@ pub fn setup_common<'a>(
.map(ScrubAction::from_str)
.transpose()?;
let repo_id_to_name = HashMap::from_iter(Some((config.repoid, repo_name.clone())));
// Open the blobstore explicitly so we can do things like run on one side of a multiplex
let blobstore = blobstore::open_blobstore(
fb,
mysql_options.clone(),
storage_config.blobstore,
inner_blobstore_id,
None,
readonly_storage,
scrub_action,
blobstore_sampler,
scuba_builder.clone(),
walk_stats_key,
repo_name.clone(),
repo_id_to_name,
blobstore_options.clone(),
logger.clone(),
config_store,