mirror of
https://github.com/facebook/sapling.git
synced 2025-01-08 14:46:47 +03:00
mononoke/fastreplay: sample Scuba logging
Summary: This updates fastreplay to support sampling of the samples we log to Scuba. This lets us control the volume we log to Scuba. Reviewed By: farnz Differential Revision: D19722223 fbshipit-source-id: 2e43f201a3e5930f5f6a29749c35e0e0dea341d2
This commit is contained in:
parent
142b803911
commit
84aebf00f4
@ -6,8 +6,10 @@
|
||||
* directory of this source tree.
|
||||
*/
|
||||
|
||||
use anyhow::{Context, Error};
|
||||
use serde::Deserialize;
|
||||
use std::convert::TryInto;
|
||||
use std::convert::TryFrom;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use fastreplay_structs::FastReplayConfig as RawFastReplayConfig;
|
||||
|
||||
@ -26,6 +28,7 @@ impl Default for FastReplayConfig {
|
||||
inner: RawFastReplayConfig {
|
||||
admission_rate: 100,
|
||||
max_concurrency: 50,
|
||||
scuba_sampling_target: 1,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -36,9 +39,28 @@ impl FastReplayConfig {
|
||||
self.inner.admission_rate
|
||||
}
|
||||
|
||||
pub fn max_concurrency(&self) -> u64 {
|
||||
// NOTE: The config comes as an i64. it should be > 0 since we validate that, but let's be
|
||||
pub fn max_concurrency(&self) -> Result<NonZeroU64, Error> {
|
||||
// NOTE: The config comes as an i64. It should be > 0 since we validate that, but let's be
|
||||
// safe if not.
|
||||
self.inner.max_concurrency.try_into().unwrap_or(50)
|
||||
NonZeroU64::new(u64::try_from(self.inner.max_concurrency)?)
|
||||
.ok_or_else(|| Error::msg("invalid scuba_sampling_target"))
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"While converting {:?} to max_concurrency",
|
||||
self.inner.max_concurrency
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn scuba_sampling_target(&self) -> Result<NonZeroU64, Error> {
|
||||
// NOTE: The config comes as an i64. Same as above.
|
||||
NonZeroU64::new(u64::try_from(self.inner.scuba_sampling_target)?)
|
||||
.ok_or_else(|| Error::msg("invalid scuba_sampling_target"))
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"While converting {:?} to scuba_sampling_target",
|
||||
self.inner.scuba_sampling_target
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
|
||||
pub struct FastReplayDispatcher {
|
||||
fb: FacebookInit,
|
||||
scuba: ScubaSampleBuilder,
|
||||
logger: Logger,
|
||||
repo: MononokeRepo,
|
||||
wireproto_logging: Arc<WireprotoLogging>,
|
||||
@ -30,7 +29,6 @@ impl FastReplayDispatcher {
|
||||
pub fn new(
|
||||
fb: FacebookInit,
|
||||
logger: Logger,
|
||||
scuba: ScubaSampleBuilder,
|
||||
repo: MononokeRepo,
|
||||
remote_args_blobstore: Option<Arc<dyn Blobstore>>,
|
||||
hash_validation_percentage: usize,
|
||||
@ -41,7 +39,6 @@ impl FastReplayDispatcher {
|
||||
Ok(Self {
|
||||
fb,
|
||||
logger,
|
||||
scuba,
|
||||
repo,
|
||||
wireproto_logging: Arc::new(noop_wireproto),
|
||||
remote_args_blobstore,
|
||||
@ -49,8 +46,8 @@ impl FastReplayDispatcher {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn client(&self) -> RepoClient {
|
||||
let logging = LoggingContainer::new(self.logger.clone(), self.scuba.clone());
|
||||
pub fn client(&self, scuba: ScubaSampleBuilder) -> RepoClient {
|
||||
let logging = LoggingContainer::new(self.logger.clone(), scuba);
|
||||
let session = SessionContainer::new_with_defaults(self.fb);
|
||||
|
||||
RepoClient::new(
|
||||
@ -69,7 +66,7 @@ impl FastReplayDispatcher {
|
||||
|
||||
pub async fn load_remote_args(&self, key: String) -> Result<String, Error> {
|
||||
let session = SessionContainer::new_with_defaults(self.fb);
|
||||
let ctx = session.new_context(self.logger.clone(), self.scuba.clone());
|
||||
let ctx = session.new_context(self.logger.clone(), ScubaSampleBuilder::with_discard());
|
||||
|
||||
let blobstore = self
|
||||
.remote_args_blobstore
|
||||
|
@ -107,19 +107,20 @@ async fn dispatch(
|
||||
.await
|
||||
.context("While parsing request")?;
|
||||
|
||||
scuba.add("reponame", reponame);
|
||||
let client = dispatcher.client(scuba.clone());
|
||||
|
||||
let stream = match parsed_req {
|
||||
Request::Gettreepack(args) => dispatcher.client().gettreepack(args.0).compat(),
|
||||
Request::Getbundle(args) => dispatcher.client().getbundle(args.0).compat(),
|
||||
Request::GetpackV1(args) => dispatcher
|
||||
.client()
|
||||
Request::Gettreepack(args) => client.gettreepack(args.0).compat(),
|
||||
Request::Getbundle(args) => client.getbundle(args.0).compat(),
|
||||
Request::GetpackV1(args) => client
|
||||
.getpackv1(Box::new(
|
||||
stream::iter(args.entries.into_iter().map(Ok).collect::<Vec<_>>())
|
||||
.boxed()
|
||||
.compat(),
|
||||
))
|
||||
.compat(),
|
||||
Request::GetpackV2(args) => dispatcher
|
||||
.client()
|
||||
Request::GetpackV2(args) => client
|
||||
.getpackv2(Box::new(
|
||||
stream::iter(args.entries.into_iter().map(Ok).collect::<Vec<_>>())
|
||||
.boxed()
|
||||
@ -128,8 +129,6 @@ async fn dispatch(
|
||||
.compat(),
|
||||
};
|
||||
|
||||
scuba.add("reponame", reponame);
|
||||
|
||||
scuba.add("command", req.normal.command.as_ref());
|
||||
if let Some(args) = req.normal.args.as_ref() {
|
||||
scuba.add("command_args", args.as_ref());
|
||||
@ -230,6 +229,8 @@ async fn bootstrap_repositories<'a>(
|
||||
let logger = logger.new(o!("repo" => name.clone()));
|
||||
|
||||
let bootstrap_ctx = {
|
||||
let mut scuba = scuba.clone();
|
||||
scuba.add("reponame", name.clone());
|
||||
let session = SessionContainer::new_with_defaults(fb);
|
||||
session.new_context(logger.clone(), scuba.clone())
|
||||
};
|
||||
@ -288,7 +289,6 @@ async fn bootstrap_repositories<'a>(
|
||||
let dispatcher = FastReplayDispatcher::new(
|
||||
fb,
|
||||
logger.clone(),
|
||||
scuba.clone(),
|
||||
repo,
|
||||
remote_args_blobstore,
|
||||
hash_validation_percentage,
|
||||
@ -358,7 +358,7 @@ async fn fastreplay<R: AsyncRead + Unpin>(
|
||||
|
||||
let config = opts.config.get();
|
||||
|
||||
if load > config.max_concurrency() {
|
||||
if load > config.max_concurrency()?.get() {
|
||||
warn!(
|
||||
&logger,
|
||||
"Waiting for some requests to complete (load: {})...", load
|
||||
@ -381,7 +381,9 @@ async fn fastreplay<R: AsyncRead + Unpin>(
|
||||
count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// NOTE: We clone values here because we need a 'static future to spawn.
|
||||
cloned!(logger, scuba, repos, opts.aliases, count);
|
||||
cloned!(logger, mut scuba, repos, opts.aliases, count);
|
||||
|
||||
scuba.sampled(config.scuba_sampling_target()?);
|
||||
|
||||
let task = async move {
|
||||
defer!({
|
||||
|
@ -52,6 +52,7 @@ Check logging structure
|
||||
"recorded_duration_us": *, (glob)
|
||||
"replay_delay_s": * (glob)
|
||||
"replay_response_size": *, (glob)
|
||||
"sample_rate": 1,
|
||||
"time": * (glob)
|
||||
},
|
||||
"normal": {
|
||||
@ -73,7 +74,8 @@ Check logging structure
|
||||
$ cat > "$live_config" << EOF
|
||||
> {
|
||||
> "admission_rate": 0,
|
||||
> "max_concurrency": 10
|
||||
> "max_concurrency": 10,
|
||||
> "scuba_sampling_target": 1
|
||||
> }
|
||||
> EOF
|
||||
$ fastreplay --live-config "file:${live_config}" --debug < "$WIREPROTO_LOGGING_PATH" 2>&1 | grep "not admitted"
|
||||
@ -87,7 +89,8 @@ Check logging structure
|
||||
$ cat > "$live_config" << EOF
|
||||
> {
|
||||
> "admission_rate": 100,
|
||||
> "max_concurrency": 1
|
||||
> "max_concurrency": 1,
|
||||
> "scuba_sampling_target": 1
|
||||
> }
|
||||
> EOF
|
||||
$ quiet fastreplay --live-config "file:${live_config}" --debug --scuba-log-file "$fastreplay_log" < "$WIREPROTO_LOGGING_PATH"
|
||||
|
Loading…
Reference in New Issue
Block a user