remove wireproto logging used in traffic replay

Summary:
Traffic replay is gone. Now we can delete code that dumped wireproto traffic.

The logging that's left could be still somewhat useful: https://fburl.com/scuba/mononoke_test_perf/uismnrv9

Reviewed By: HarveyHunt

Differential Revision: D33898167

fbshipit-source-id: f5f12f3626c578ef90db99a45e5749fe8a94049f
This commit is contained in:
Jan Mazur 2022-02-23 06:43:01 -08:00 committed by Facebook GitHub Bot
parent dd7871293c
commit 9eb31562d7
13 changed files with 31 additions and 473 deletions

View File

@ -1,4 +1,4 @@
// @generated SignedSource<<6b99a7a00dc996d894251726c4e30b11>>
// @generated SignedSource<<b2d2c0d4013f8eb421d0be15b74a2a75>>
// DO NOT EDIT THIS FILE MANUALLY!
// This file is a mechanical copy of the version in the configerator repo. To
// modify it, edit the copy in the configerator repo instead and copy it over by
@ -120,7 +120,6 @@ struct RawRepoConfig {
18: optional RawPushParams push,
19: optional RawPushrebaseParams pushrebase,
20: optional RawLfsParams lfs,
21: optional RawWireprotoLoggingConfig wireproto_logging,
22: optional i64 hash_validation_percentage,
23: optional string skiplist_index_blobstore_key,
24: optional RawBundle2ReplayParams bundle2_replay_params,
@ -549,13 +548,6 @@ struct RawCommitSyncConfig {
4: optional string version_name,
} (rust.exhaustive)
struct RawWireprotoLoggingConfig {
1: optional string scribe_category,
2: optional string storage_config,
3: optional i64 remote_arg_size_threshold,
4: optional string local_path,
} (rust.exhaustive)
struct RawSourceControlServiceParams {
// Whether ordinary users can write through the source control service.
1: bool permit_writes,

View File

@ -149,7 +149,6 @@ fn parse_with_repo_definition(
push,
pushrebase,
lfs,
wireproto_logging,
hash_validation_percentage,
skiplist_index_blobstore_key,
bundle2_replay_params,
@ -197,11 +196,6 @@ fn parse_with_repo_definition(
.ok_or_else(|| anyhow!("missing storage_config from configuration"))?,
)?;
let wireproto_logging = wireproto_logging
.map(|raw| crate::convert::repo::convert_wireproto_logging_config(raw, get_storage))
.transpose()?
.unwrap_or_default();
let cache_warmup = cache_warmup.convert()?;
let hook_manager_params = hook_manager_params.convert()?;
@ -287,7 +281,6 @@ fn parse_with_repo_definition(
push,
pushrebase,
lfs,
wireproto_logging,
hash_validation_percentage,
readonly,
redaction,
@ -460,7 +453,7 @@ mod test {
PushrebaseParams, RemoteDatabaseConfig, RemoteMetadataDatabaseConfig, RepoClientKnobs,
SegmentedChangelogConfig, ShardableRemoteDatabaseConfig, ShardedRemoteDatabaseConfig,
SmallRepoCommitSyncConfig, SourceControlServiceMonitoring, SourceControlServiceParams,
UnodeVersion, WireprotoLoggingConfig,
UnodeVersion,
};
use mononoke_types::MPath;
use nonzero_ext::nonzero;
@ -710,10 +703,6 @@ mod test {
repo_client_use_warm_bookmarks_cache=true
phabricator_callsign="FBS"
[wireproto_logging]
scribe_category="category"
storage_config="main"
[cache_warmup]
bookmark="master"
commit_limit=100
@ -1029,14 +1018,6 @@ mod test {
rollout_percentage: 56,
generate_lfs_blob_in_hg_sync_job: true,
},
wireproto_logging: WireprotoLoggingConfig {
scribe_category: Some("category".to_string()),
storage_config_and_threshold: Some((
main_storage_config.clone(),
crate::convert::repo::DEFAULT_ARG_SIZE_THRESHOLD,
)),
local_path: None,
},
hash_validation_percentage: 0,
readonly: RepoReadOnly::ReadWrite,
redaction: Redaction::Enabled,
@ -1143,7 +1124,6 @@ mod test {
push: Default::default(),
pushrebase: Default::default(),
lfs: Default::default(),
wireproto_logging: Default::default(),
hash_validation_percentage: 0,
readonly: RepoReadOnly::ReadWrite,
redaction: Redaction::Enabled,

View File

@ -17,7 +17,7 @@ use metaconfig_types::{
HookManagerParams, HookParams, InfinitepushNamespace, InfinitepushParams, LfsParams,
PushParams, PushrebaseFlags, PushrebaseParams, RepoClientKnobs, SegmentedChangelogConfig,
ServiceWriteRestrictions, SourceControlServiceMonitoring, SourceControlServiceParams,
StorageConfig, UnodeVersion, WireprotoLoggingConfig,
UnodeVersion,
};
use mononoke_types::{ChangesetId, MPath, PrefixTrie};
use regex::Regex;
@ -26,49 +26,12 @@ use repos::{
RawDerivedDataTypesConfig, RawHookConfig, RawHookManagerParams, RawInfinitepushParams,
RawLfsParams, RawPushParams, RawPushrebaseParams, RawRepoClientKnobs,
RawSegmentedChangelogConfig, RawServiceWriteRestrictions, RawSourceControlServiceMonitoring,
RawSourceControlServiceParams, RawWireprotoLoggingConfig,
RawSourceControlServiceParams,
};
use crate::convert::Convert;
use crate::errors::ConfigurationError;
pub(crate) const DEFAULT_ARG_SIZE_THRESHOLD: u64 = 500_000;
pub(crate) fn convert_wireproto_logging_config(
raw: RawWireprotoLoggingConfig,
get_storage: impl Fn(&str) -> Result<StorageConfig>,
) -> Result<WireprotoLoggingConfig> {
let RawWireprotoLoggingConfig {
scribe_category,
storage_config: wireproto_storage_config,
remote_arg_size_threshold,
local_path,
} = raw;
let storage_config_and_threshold = match (wireproto_storage_config, remote_arg_size_threshold) {
(Some(storage_config), Some(threshold)) => Some((storage_config, threshold as u64)),
(None, Some(_threshold)) => {
return Err(anyhow!(
"Invalid configuration: wireproto threshold is specified, but storage config is not"
));
}
(Some(storage_config), None) => Some((storage_config, DEFAULT_ARG_SIZE_THRESHOLD)),
(None, None) => None,
};
let storage_config_and_threshold = storage_config_and_threshold
.map(|(storage_config, threshold)| {
get_storage(&storage_config).map(|config| (config, threshold))
})
.transpose()?;
Ok(WireprotoLoggingConfig {
scribe_category,
storage_config_and_threshold,
local_path,
})
}
impl Convert for RawCacheWarmupConfig {
type Output = CacheWarmupParams;

View File

@ -163,9 +163,6 @@ pub struct RepoConfig {
pub pushrebase: PushrebaseParams,
/// LFS configuration options
pub lfs: LfsParams,
/// Configuration for logging all wireproto requests with full arguments.
/// Used for replay on shadow tier.
pub wireproto_logging: WireprotoLoggingConfig,
/// What percent of read request verifies that returned content matches the hash
pub hash_validation_percentage: usize,
/// Should this repo reject write attempts
@ -1292,33 +1289,6 @@ pub struct SmallRepoPermanentConfig {
pub bookmark_prefix: AsciiString,
}
/// Configuration for logging wireproto commands and arguments
/// This is used by traffic replay script to replay on prod traffic on shadow tier
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct WireprotoLoggingConfig {
/// Scribe category to log to
pub scribe_category: Option<String>,
/// Storage config to store wireproto arguments. The arguments can be quite big,
/// so storing separately would make sense.
/// Second parameter is threshold. If wireproto arguments are bigger than this threshold
/// then they will be stored in remote storage defined by first parameter. Note that if
/// `storage_config_and_threshold` is not specified then wireproto wireproto arguments will
/// be inlined
pub storage_config_and_threshold: Option<(StorageConfig, u64)>,
/// Local path where to log replay data that would be sent to Scribe.
pub local_path: Option<String>,
}
impl Default for WireprotoLoggingConfig {
fn default() -> Self {
Self {
scribe_category: None,
storage_config_and_threshold: None,
local_path: None,
}
}
}
/// Source Control Service options
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SourceControlServiceParams {

View File

@ -5,78 +5,20 @@
* GNU General Public License version 2.
*/
use anyhow::Error;
use blobstore::{Blobstore, BlobstoreBytes};
use chrono::Utc;
use cloned::cloned;
use context::{CoreContext, PerfCounters, SessionId};
use fbinit::FacebookInit;
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt};
use futures_01_ext::FutureExt as _;
use futures_old::{future, Future};
use context::{CoreContext, PerfCounters};
use futures_stats::{FutureStats, StreamStats};
use hgproto::GettreepackArgs;
use iterhelpers::chunk_by_accumulation;
use mercurial_types::HgManifestId;
use mononoke_types::MPath;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
#[cfg(fbcode_build)]
use scribe::ScribeClient;
use scuba_ext::ScubaVerbosityLevel;
use scuba_ext::{MononokeScubaSampleBuilder, ScribeClientImplementation, ScubaValue};
use stats::prelude::*;
use scuba_ext::{MononokeScubaSampleBuilder, ScubaValue};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use time_ext::DurationExt;
define_stats! {
prefix = "mononoke.repo_client.logging";
wireproto_blobstore_success: timeseries(Rate, Sum),
wireproto_blobstore_failure: timeseries(Rate, Sum),
wireproto_scribe_success: timeseries(Rate, Sum),
wireproto_scribe_failure: timeseries(Rate, Sum),
wireproto_serialization_failure: timeseries(Rate, Sum),
}
const COLUMN_SIZE_LIMIT: usize = 500_1000;
const FULL_ARGS_LOG_TAG: &str = "Full Command Args";
pub struct WireprotoLogging {
reponame: String,
scribe_args: Option<(ScribeClientImplementation, String)>,
blobstore_and_threshold: Option<(Arc<dyn Blobstore>, u64)>,
scuba_builder: MononokeScubaSampleBuilder,
}
impl WireprotoLogging {
pub fn new(
fb: FacebookInit,
reponame: String,
scribe_category: Option<String>,
blobstore_and_threshold: Option<(Arc<dyn Blobstore>, u64)>,
log_file: Option<&str>,
) -> Result<Self, Error> {
let scribe_args = scribe_category.map(|cat| (ScribeClientImplementation::new(fb), cat));
// We use a Scuba sample builder to produce samples to log. We also use that to allow
// logging to a file: we never log to an actual Scuba category here.
let mut scuba_builder = MononokeScubaSampleBuilder::with_discard();
scuba_builder.add_common_server_data();
if let Some(log_file) = log_file {
scuba_builder = scuba_builder.with_log_file(log_file)?;
}
Ok(Self {
reponame,
scribe_args,
blobstore_and_threshold,
scuba_builder,
})
}
}
#[derive(Copy, Clone)]
pub enum CommandStats<'a> {
Future(&'a FutureStats),
@ -84,13 +26,6 @@ pub enum CommandStats<'a> {
}
impl<'a> CommandStats<'a> {
pub fn completion_time(&self) -> Duration {
match self {
Self::Future(ref stats) => stats.completion_time,
Self::Stream(ref stats) => stats.completion_time,
}
}
fn insert_stats<'b>(
&self,
scuba: &'b mut MononokeScubaSampleBuilder,
@ -114,33 +49,18 @@ impl<'a> From<&'a StreamStats> for CommandStats<'a> {
}
}
/// Logs wireproto requests both to scuba and scribe.
/// Scuba logs are used for analysis of performance of both shadow and prod Mononoke tiers
/// Scribe logs are used for replaying prod wireproto requests on shadow tier. So
/// Scribe logging should be disabled on shadow tier.
/// Logs wireproto requests both to scuba.
/// Scuba logs are used for analysis of performance.
#[must_use = "A CommandLogger does not do anything if you don't use it"]
pub struct CommandLogger {
inner: ScubaOnlyCommandLogger,
command: String,
/// This scribe category main purpose is to tail the prod requests and replay them on the
/// shadow tier.
wireproto: Arc<WireprotoLogging>,
}
impl CommandLogger {
pub fn new(
ctx: CoreContext,
command: String,
wireproto: Arc<WireprotoLogging>,
request_perf_counters: Arc<PerfCounters>,
) -> Self {
pub fn new(ctx: CoreContext, request_perf_counters: Arc<PerfCounters>) -> Self {
let inner = ScubaOnlyCommandLogger::new(ctx, request_perf_counters);
Self {
inner,
command,
wireproto,
}
Self { inner }
}
/// Opts-out of replaying the wireproto request on the shadow tier.
@ -149,24 +69,8 @@ impl CommandLogger {
self.inner
}
pub fn finalize_command<'a>(
self,
ctx: CoreContext,
stats: impl Into<CommandStats<'a>>,
args: Option<&serde_json::Value>,
) {
let stats = stats.into();
let Self {
inner,
command,
wireproto,
} = self;
let session_id = inner.ctx.metadata().session_id().clone();
inner.log_command_processed(stats);
do_wireproto_logging(ctx, wireproto, command, session_id, stats, args);
pub fn finalize_command<'a>(self, stats: impl Into<CommandStats<'a>>) {
self.inner.log_command_processed(stats.into());
}
pub fn add_scuba_extra(&mut self, k: impl Into<String>, v: impl Into<ScubaValue>) {
@ -220,112 +124,6 @@ impl ScubaOnlyCommandLogger {
}
}
fn do_wireproto_logging<'a>(
ctx: CoreContext,
wireproto: Arc<WireprotoLogging>,
command: String,
session_id: SessionId,
stats: CommandStats<'a>,
args: Option<&serde_json::Value>,
) {
let args = args
.map(|a| a.to_string())
.unwrap_or_else(|| "".to_string());
// Use a MononokeScubaSampleBuilder to build a sample to send in Scribe. Reach into the other Scuba
// sample to grab a few datapoints from there as well.
let mut builder = wireproto.scuba_builder.clone();
builder
.add("command", command)
.add("duration", stats.completion_time().as_micros_unchecked())
.add("source_control_server_type", "mononoke")
.add("mononoke_session_uuid", session_id.into_string())
.add("reponame", wireproto.reponame.clone());
if let Some(client_hostname) = ctx.session().metadata().client_hostname() {
builder.add("client_hostname", client_hostname.clone());
}
let f = future::lazy(move || {
let prepare_fut = match wireproto.blobstore_and_threshold {
Some((ref blobstore, ref remote_arg_size_threshold)) => {
if args.len() as u64 > *remote_arg_size_threshold {
// Key is generated randomly. Another option would be to
// take a hash of arguments, but I don't want to spend cpu cycles on
// computing hashes. Random string should be good enough.
let key = format!(
"wireproto_replay.{}.{}",
Utc::now().to_rfc3339(),
generate_random_string(16),
);
{
cloned!(ctx, blobstore, key);
async move {
blobstore
.put(&ctx, key, BlobstoreBytes::from_bytes(args))
.await
}
}
.boxed()
.compat()
.map(move |()| {
STATS::wireproto_blobstore_success.add_value(1);
builder.add("remote_args", key);
builder
})
.inspect_err(|_| {
STATS::wireproto_blobstore_failure.add_value(1);
})
.left_future()
} else {
builder.add("args", args);
future::ok(builder).right_future()
}
}
None => {
builder.add("args", args);
future::ok(builder).right_future()
}
};
prepare_fut
.map(move |mut builder| {
// We use the Scuba sample and log it to Scribe, then we also log in the Scuba
// sample, but this is built using discard(), so at most it'll log to a file for
// debug / tests.
let sample = builder.get_sample();
// We can't really do anything with the errors, so let's just log them
if let Some((ref scribe_client, ref scribe_category)) = wireproto.scribe_args {
if let Ok(sample_json) = sample.to_json() {
let res = scribe_client.offer(scribe_category, &sample_json.to_string());
if res.is_ok() {
STATS::wireproto_scribe_success.add_value(1);
} else {
STATS::wireproto_scribe_failure.add_value(1);
}
} else {
STATS::wireproto_serialization_failure.add_value(1);
}
}
builder.log();
})
.or_else(|_| Result::<_, Error>::Ok(()))
});
tokio::spawn(f.compat());
}
fn generate_random_string(len: usize) -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.map(char::from)
.take(len)
.collect()
}
fn debug_format_directory<T: AsRef<[u8]>>(directory: &T) -> String {
String::from_utf8_lossy(&hgproto::batch::escape(directory)).to_string()
}

View File

@ -92,7 +92,6 @@ mod monitor;
mod session_bookmarks_cache;
mod tests;
pub use logging::WireprotoLogging;
use logging::{
debug_format_manifest, debug_format_path, log_getpack_params_verbose,
log_gettreepack_params_verbose, CommandLogger,
@ -164,10 +163,6 @@ fn gettreepack_scuba_sampling_rate(params: &GettreepackArgs) -> SamplingRate {
}
}
fn debug_format_nodes<'a>(nodes: impl IntoIterator<Item = &'a HgChangesetId>) -> String {
nodes.into_iter().map(|node| format!("{}", node)).join(" ")
}
fn debug_format_manifests<'a>(nodes: impl IntoIterator<Item = &'a HgManifestId>) -> String {
nodes.into_iter().map(debug_format_manifest).join(" ")
}
@ -197,18 +192,6 @@ fn debug_format_directories<'a, T: AsRef<[u8]> + 'a>(
String::from_utf8_lossy(out.as_ref()).to_string()
}
// Generic for HashSet, Vec, etc...
fn format_utf8_bytes_list<T, C>(entries: C) -> String
where
T: AsRef<[u8]>,
C: IntoIterator<Item = T>,
{
entries
.into_iter()
.map(|entry| String::from_utf8_lossy(entry.as_ref()).into_owned())
.join(",")
}
lazy_static! {
static ref SLOW_REQUEST_THRESHOLD: Duration = Duration::from_secs(1);
}
@ -412,7 +395,6 @@ pub struct RepoClient {
// We currently fix it by caching bookmarks at the beginning of discovery.
// TODO: T45411456 Fix this by teaching the client to expect extra commits to correspond to the bookmarks.
session_bookmarks_cache: Arc<SessionBookmarkCache>,
wireproto_logging: Arc<WireprotoLogging>,
maybe_push_redirector_args: Option<PushRedirectorArgs>,
force_lfs: Arc<AtomicBool>,
unhydrated_commits: Arc<AtomicBool>,
@ -429,7 +411,6 @@ impl RepoClient {
session: SessionContainer,
logging: LoggingContainer,
preserve_raw_bundle2: bool,
wireproto_logging: Arc<WireprotoLogging>,
maybe_push_redirector_args: Option<PushRedirectorArgs>,
knobs: RepoClientKnobs,
maybe_backup_repo_source: Option<BlobRepo>,
@ -442,7 +423,6 @@ impl RepoClient {
logging,
preserve_raw_bundle2,
session_bookmarks_cache,
wireproto_logging,
maybe_push_redirector_args,
force_lfs: Arc::new(AtomicBool::new(false)),
unhydrated_commits: Arc::new(AtomicBool::new(false)),
@ -506,12 +486,7 @@ impl RepoClient {
self.session
.new_context_with_scribe(logger, scuba, self.logging.scribe().clone());
let command_logger = CommandLogger::new(
ctx.clone(),
command.to_owned(),
self.wireproto_logging.clone(),
self.request_perf_counters.clone(),
);
let command_logger = CommandLogger::new(ctx.clone(), self.request_perf_counters.clone());
(ctx, command_logger)
}
@ -920,8 +895,7 @@ impl RepoClient {
);
log_getpack_params_verbose(&ctx, &encoded_params);
let json_params = json! {encoded_params};
command_logger.finalize_command(ctx, &stats, Some(&json_params));
command_logger.finalize_command(&stats);
future::ready(())
}
@ -1487,14 +1461,6 @@ impl HgCommands for RepoClient {
// @wireprotocommand('getbundle', '*')
fn getbundle(&self, args: GetbundleArgs) -> BoxStream<BytesOld, Error> {
self.command_stream(ops::GETBUNDLE, UNSAMPLED, |ctx, command_logger| {
let value = json!({
"bundlecaps": format_utf8_bytes_list(&args.bundlecaps),
"common": debug_format_nodes(&args.common),
"heads": debug_format_nodes(&args.heads),
"listkeys": format_utf8_bytes_list(&args.listkeys),
});
let value = json!(vec![value]);
let s = self
.create_bundle(ctx.clone(), args)
.compat()
@ -1502,11 +1468,10 @@ impl HgCommands for RepoClient {
.yield_periodically()
.flatten_err()
.timed({
cloned!(ctx);
move |stats| {
STATS::getbundle_ms
.add_value(stats.completion_time.as_millis_unchecked() as i64);
command_logger.finalize_command(ctx, &stats, Some(&value));
command_logger.finalize_command(&stats);
future::ready(())
}
})
@ -1871,14 +1836,13 @@ impl HgCommands for RepoClient {
}
})
.timed({
cloned!(ctx);
move |stats| {
if stats.completion_time > *SLOW_REQUEST_THRESHOLD {
command_logger.add_trimmed_scuba_extra("command_args", &args);
}
STATS::gettreepack_ms
.add_value(stats.completion_time.as_millis_unchecked() as i64);
command_logger.finalize_command(ctx, &stats, Some(&args));
command_logger.finalize_command(&stats);
future::ready(())
}
})
@ -2023,7 +1987,7 @@ impl HgCommands for RepoClient {
.flatten_err()
.map_ok(bytes_ext::copy_from_new)
.timed(|stats| {
command_logger.finalize_command(ctx, &stats, None);
command_logger.finalize_command(&stats);
future::ready(())
})
.boxed()
@ -2114,7 +2078,7 @@ impl HgCommands for RepoClient {
}
STATS::getcommitdata_ms
.add_value(stats.completion_time.as_millis_unchecked() as i64);
command_logger.finalize_command(ctx, &stats, Some(&args));
command_logger.finalize_command(&stats);
future::ready(())
})
.boxed()

View File

@ -431,16 +431,12 @@ async fn run_and_check_if_lfs(
MononokeScubaSampleBuilder::with_discard(),
);
let noop_wireproto =
WireprotoLogging::new(ctx.fb, mononoke_repo.reponame().clone(), None, None, None)?;
let repo_client = RepoClient::new(
mononoke_repo,
ctx.session().clone(),
logging,
false, // Don't preserve raw bundle 2 (we don't push)
Arc::new(noop_wireproto),
None, // No PushRedirectorArgs
None, // No PushRedirectorArgs
Default::default(),
None, // No backup repo source
);

View File

@ -13,7 +13,7 @@
mod client;
mod errors;
pub use client::{fetch_treepack_part_input, gettreepack_entries, RepoClient, WireprotoLogging};
pub use client::{fetch_treepack_part_input, gettreepack_entries, RepoClient};
pub use getbundle_response::{
find_commits_to_send, find_new_draft_commits_and_derive_filenodes_for_public_roots,
};

View File

@ -20,11 +20,15 @@ mod wireproto_sink;
pub use crate::connection_acceptor::wait_for_connections_closed;
use crate::connection_acceptor::connection_acceptor;
use crate::repo_handlers::repo_handlers;
use anyhow::{Context as _, Result};
use blobstore_factory::ReadOnlyStorage;
use cached_config::ConfigStore;
use cmdlib::monitoring::ReadyFlagService;
use fbinit::FacebookInit;
use futures::channel::oneshot;
use metaconfig_types::CommonConfig;
use mononoke_api::Mononoke;
use openssl::ssl::SslAcceptor;
use rate_limiting::RateLimitEnvironment;
@ -35,20 +39,12 @@ use sql_ext::facebook::MysqlOptions;
use std::path::PathBuf;
use std::sync::{atomic::AtomicBool, Arc};
use blobstore_factory::BlobstoreOptions;
use cmdlib::monitoring::ReadyFlagService;
use metaconfig_types::CommonConfig;
use crate::connection_acceptor::connection_acceptor;
use crate::repo_handlers::repo_handlers;
const CONFIGERATOR_RATE_LIMITING_CONFIG: &str = "scm/mononoke/ratelimiting/ratelimits";
pub async fn create_repo_listeners<'a>(
fb: FacebookInit,
common_config: CommonConfig,
mononoke: Mononoke,
blobstore_options: &'a BlobstoreOptions,
mysql_options: &'a MysqlOptions,
root_log: Logger,
sockname: String,
@ -79,11 +75,9 @@ pub async fn create_repo_listeners<'a>(
let handlers = repo_handlers(
fb,
&mononoke,
blobstore_options,
mysql_options,
readonly_storage,
&root_log,
config_store,
scuba,
)
.await?;

View File

@ -5,28 +5,23 @@
* GNU General Public License version 2.
*/
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{format_err, Context, Error};
use backsyncer::{open_backsyncer_dbs, TargetRepoDbs};
use blobrepo::BlobRepo;
use blobstore_factory::{make_blobstore, BlobstoreOptions, ReadOnlyStorage};
use blobstore_factory::ReadOnlyStorage;
use cache_warmup::cache_warmup;
use cached_config::ConfigStore;
use cloned::cloned;
use context::CoreContext;
use fbinit::FacebookInit;
use metaconfig_types::{
BackupRepoConfig, CommonCommitSyncConfig, RepoClientKnobs, WireprotoLoggingConfig,
};
use metaconfig_types::{BackupRepoConfig, CommonCommitSyncConfig, RepoClientKnobs};
use mononoke_api::Mononoke;
use mononoke_types::RepositoryId;
use repo_client::{MononokeRepo, PushRedirectorArgs, WireprotoLogging};
use repo_client::{MononokeRepo, PushRedirectorArgs};
use scuba_ext::MononokeScubaSampleBuilder;
use slog::{debug, info, o, Logger};
use sql_construct::SqlConstructFromMetadataDatabaseConfig;
use sql_ext::facebook::MysqlOptions;
use std::collections::HashMap;
use synced_commit_mapping::SqlSyncedCommitMapping;
@ -42,7 +37,6 @@ use crate::errors::ErrorKind;
struct IncompleteRepoHandler {
logger: Logger,
scuba: MononokeScubaSampleBuilder,
wireproto_logging: Arc<WireprotoLogging>,
repo: MononokeRepo,
preserve_raw_bundle2: bool,
maybe_incomplete_push_redirector_args: Option<IncompletePushRedirectorArgs>,
@ -95,7 +89,6 @@ impl IncompleteRepoHandler {
let IncompleteRepoHandler {
logger,
scuba,
wireproto_logging,
repo,
preserve_raw_bundle2,
maybe_incomplete_push_redirector_args,
@ -124,7 +117,6 @@ impl IncompleteRepoHandler {
Ok(RepoHandler {
logger,
scuba,
wireproto_logging,
repo,
preserve_raw_bundle2,
maybe_push_redirector_args,
@ -152,7 +144,6 @@ fn try_find_repo_by_name<'a>(
pub struct RepoHandler {
pub logger: Logger,
pub scuba: MononokeScubaSampleBuilder,
pub wireproto_logging: Arc<WireprotoLogging>,
pub repo: MononokeRepo,
pub preserve_raw_bundle2: bool,
pub maybe_push_redirector_args: Option<PushRedirectorArgs>,
@ -163,11 +154,9 @@ pub struct RepoHandler {
pub async fn repo_handlers<'a>(
fb: FacebookInit,
mononoke: &'a Mononoke,
blobstore_options: &'a BlobstoreOptions,
mysql_options: &'a MysqlOptions,
readonly_storage: ReadOnlyStorage,
root_log: &Logger,
config_store: &'a ConfigStore,
scuba: &MononokeScubaSampleBuilder,
) -> Result<HashMap<String, RepoHandler>, Error> {
let futs = mononoke.repos().map(|repo| async move {
@ -184,7 +173,6 @@ pub async fn repo_handlers<'a>(
let cache_warmup_params = config.cache_warmup.clone();
let db_config = config.storage_config.metadata.clone();
let preserve_raw_bundle2 = config.bundle2_replay_params.preserve_raw_bundle2.clone();
let wireproto_logging = config.wireproto_logging.clone();
let common_commit_sync_config = repo
.live_commit_sync_config()
@ -212,17 +200,6 @@ pub async fn repo_handlers<'a>(
readonly_storage.0,
)?;
let wireproto_logging = create_wireproto_logging(
fb,
reponame.clone(),
blobstore_options,
mysql_options,
readonly_storage,
wireproto_logging,
logger.clone(),
config_store,
);
let backsyncer_dbs = open_backsyncer_dbs(
ctx.clone(),
blobrepo.clone(),
@ -233,15 +210,15 @@ pub async fn repo_handlers<'a>(
info!(
logger,
"Creating MononokeRepo, CommitSyncMapping, WireprotoLogging, TargetRepoDbs, \
"Creating MononokeRepo, CommitSyncMapping, TargetRepoDbs, \
WarmBookmarksCache"
);
let mononoke_repo =
MononokeRepo::new(ctx.fb, repo.clone(), mysql_options, readonly_storage);
let (mononoke_repo, wireproto_logging, backsyncer_dbs) =
futures::future::try_join3(mononoke_repo, wireproto_logging, backsyncer_dbs).await?;
let (mononoke_repo, backsyncer_dbs) =
futures::future::try_join(mononoke_repo, backsyncer_dbs).await?;
let maybe_incomplete_push_redirector_args = common_commit_sync_config.and_then({
cloned!(logger);
@ -277,7 +254,6 @@ pub async fn repo_handlers<'a>(
IncompleteRepoHandler {
logger,
scuba: scuba.clone(),
wireproto_logging: Arc::new(wireproto_logging),
repo: mononoke_repo,
preserve_raw_bundle2,
maybe_incomplete_push_redirector_args,
@ -312,53 +288,3 @@ async fn build_repo_handlers(
}
Ok(res)
}
async fn create_wireproto_logging<'a>(
fb: FacebookInit,
reponame: String,
blobstore_options: &'a BlobstoreOptions,
mysql_options: &'a MysqlOptions,
readonly_storage: ReadOnlyStorage,
wireproto_logging_config: WireprotoLoggingConfig,
logger: Logger,
config_store: &'a ConfigStore,
) -> Result<WireprotoLogging, Error> {
let WireprotoLoggingConfig {
storage_config_and_threshold,
scribe_category,
local_path,
} = wireproto_logging_config;
let blobstore_and_threshold = match storage_config_and_threshold {
Some((storage_config, threshold)) => {
if readonly_storage.0 {
return Err(format_err!(
"failed to create blobstore for wireproto logging because storage is readonly",
));
}
let blobstore = make_blobstore(
fb,
storage_config.blobstore,
mysql_options,
readonly_storage,
blobstore_options,
&logger,
config_store,
&blobstore_factory::default_scrub_handler(),
None,
)
.await?;
Some((blobstore, threshold))
}
None => None,
};
WireprotoLogging::new(
fb,
reponame,
scribe_category,
blobstore_and_threshold,
local_path.as_ref().map(|p| p.as_ref()),
)
}

View File

@ -81,7 +81,6 @@ pub async fn request_handler(
let RepoHandler {
logger,
mut scuba,
wireproto_logging,
repo,
preserve_raw_bundle2,
maybe_push_redirector_args,
@ -150,7 +149,6 @@ pub async fn request_handler(
session.clone(),
logging,
preserve_raw_bundle2,
wireproto_logging,
maybe_push_redirector_args,
repo_client_knobs,
maybe_backup_repo_source,

View File

@ -115,7 +115,6 @@ fn main(fb: FacebookInit) -> Result<()> {
let env = app.environment();
let mysql_options = env.mysql_options.clone();
let readonly_storage = env.readonly_storage.clone();
let blobstore_options = env.blobstore_options.clone();
let scuba = env.scuba_sample_builder.clone();
let warm_bookmarks_cache_scuba = env.warm_bookmarks_cache_scuba_sample_builder.clone();
@ -143,7 +142,6 @@ fn main(fb: FacebookInit) -> Result<()> {
fb,
common,
mononoke,
&blobstore_options,
&mysql_options,
root_log,
host_port,

View File

@ -878,27 +878,6 @@ list_keys_patterns_max=$LIST_KEYS_PATTERNS_MAX
CONFIG
fi
if [[ -n "${WIREPROTO_LOGGING_PATH:-}" ]]; then
cat >> "repos/$reponame/server.toml" <<CONFIG
[wireproto_logging]
local_path="$WIREPROTO_LOGGING_PATH"
CONFIG
if [[ -n "${WIREPROTO_LOGGING_BLOBSTORE:-}" ]]; then
cat >> "repos/$reponame/server.toml" <<CONFIG
storage_config="traffic_replay_blobstore"
remote_arg_size_threshold=0
[storage.traffic_replay_blobstore.metadata.local]
local_db_path="$TESTTMP/monsql"
[storage.traffic_replay_blobstore.blobstore.blob_files]
path = "$WIREPROTO_LOGGING_BLOBSTORE"
CONFIG
fi
fi
# path = "$TESTTMP/traffic-replay-blobstore"
if [[ -n "${ONLY_FAST_FORWARD_BOOKMARK:-}" ]]; then
cat >> "repos/$reponame/server.toml" <<CONFIG
[[bookmarks]]