mirror of
https://github.com/facebook/sapling.git
synced 2024-10-11 09:17:30 +03:00
mononoke/cache_warmup: log stats to Scuba
Summary: I'd like to see the amount of data loaded into cache (i.e. blobstore gets) made by cache_warmup. This allows for this. While in there, I also removed the separate `Logger` that was being passed in `cache_warmup` since we have one in `CoreContext`. Reviewed By: HarveyHunt Differential Revision: D19160133 fbshipit-source-id: edddf40009f6fa39de6d149fd8eebe42b6e17402
This commit is contained in:
parent
fef360b284
commit
9e393883d3
@ -13,11 +13,13 @@ use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use futures::{future, Future, IntoFuture, Stream};
|
||||
use futures_ext::{spawn_future, FutureExt};
|
||||
use futures_stats::Timed;
|
||||
use manifest::{Entry, ManifestOps};
|
||||
use mercurial_types::{HgChangesetId, HgFileNodeId, RepoPath};
|
||||
use metaconfig_types::CacheWarmupParams;
|
||||
use revset::AncestorsNodeStream;
|
||||
use slog::{debug, info, Logger};
|
||||
use scuba_ext::ScubaSampleBuilderExt;
|
||||
use slog::{debug, info};
|
||||
use tracing::{trace_args, Traced};
|
||||
use upload_trace::{manifold_thrift::thrift::RequestContext, UploadTrace};
|
||||
|
||||
@ -41,7 +43,6 @@ fn blobstore_and_filenodes_warmup(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
revision: HgChangesetId,
|
||||
logger: Logger,
|
||||
) -> impl Future<Item = (), Error = Error> {
|
||||
// TODO(stash): Arbitrary number. Tweak somehow?
|
||||
let buffer_size = 100;
|
||||
@ -73,18 +74,21 @@ fn blobstore_and_filenodes_warmup(
|
||||
})
|
||||
.buffered(buffer_size)
|
||||
.for_each({
|
||||
cloned!(logger);
|
||||
cloned!(ctx);
|
||||
let mut i = 0;
|
||||
move |_| {
|
||||
i += 1;
|
||||
if i % 10000 == 0 {
|
||||
debug!(logger, "manifests warmup: fetched {}th entry", i);
|
||||
debug!(ctx.logger(), "manifests warmup: fetched {}th entry", i);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.map(move |()| {
|
||||
debug!(logger, "finished manifests warmup");
|
||||
.map({
|
||||
cloned!(ctx);
|
||||
move |()| {
|
||||
debug!(ctx.logger(), "finished manifests warmup");
|
||||
}
|
||||
})
|
||||
.traced(
|
||||
&ctx.trace(),
|
||||
@ -99,9 +103,8 @@ fn changesets_warmup(
|
||||
start_rev: HgChangesetId,
|
||||
repo: BlobRepo,
|
||||
cs_limit: usize,
|
||||
logger: Logger,
|
||||
) -> impl Future<Item = (), Error = Error> {
|
||||
info!(logger, "about to start warming up changesets cache");
|
||||
info!(ctx.logger(), "about to start warming up changesets cache");
|
||||
|
||||
repo.get_bonsai_from_hg(ctx.clone(), start_rev)
|
||||
.and_then({
|
||||
@ -111,13 +114,13 @@ fn changesets_warmup(
|
||||
}
|
||||
})
|
||||
.and_then({
|
||||
let ctx = ctx.clone();
|
||||
cloned!(ctx);
|
||||
move |start_rev| {
|
||||
AncestorsNodeStream::new(ctx, &repo.get_changeset_fetcher(), start_rev)
|
||||
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), start_rev)
|
||||
.take(cs_limit as u64)
|
||||
.collect()
|
||||
.map(move |_| {
|
||||
debug!(logger, "finished changesets warmup");
|
||||
debug!(ctx.logger(), "finished changesets warmup");
|
||||
})
|
||||
}
|
||||
})
|
||||
@ -129,41 +132,47 @@ fn do_cache_warmup(
|
||||
repo: BlobRepo,
|
||||
bookmark: BookmarkName,
|
||||
commit_limit: usize,
|
||||
logger: Logger,
|
||||
) -> impl Future<Item = (), Error = Error> {
|
||||
let ctx = ctx.clone_and_reset();
|
||||
|
||||
repo.get_bookmark(ctx.clone(), &bookmark)
|
||||
.and_then({
|
||||
let logger = logger.clone();
|
||||
let repo = repo.clone();
|
||||
let ctx = ctx.clone();
|
||||
cloned!(repo, ctx);
|
||||
move |bookmark_rev| match bookmark_rev {
|
||||
Some(bookmark_rev) => {
|
||||
let blobstore_warmup = spawn_future(blobstore_and_filenodes_warmup(
|
||||
ctx.clone(),
|
||||
repo.clone(),
|
||||
bookmark_rev,
|
||||
logger.clone(),
|
||||
));
|
||||
let cs_warmup = spawn_future(changesets_warmup(
|
||||
ctx,
|
||||
bookmark_rev,
|
||||
repo,
|
||||
commit_limit,
|
||||
logger,
|
||||
));
|
||||
let cs_warmup =
|
||||
spawn_future(changesets_warmup(ctx, bookmark_rev, repo, commit_limit));
|
||||
blobstore_warmup.join(cs_warmup).map(|_| ()).left_future()
|
||||
}
|
||||
None => {
|
||||
info!(logger, "{} bookmark not found!", bookmark);
|
||||
info!(ctx.logger(), "{} bookmark not found!", bookmark);
|
||||
Err(errors::ErrorKind::BookmarkNotFound(bookmark).into())
|
||||
.into_future()
|
||||
.right_future()
|
||||
}
|
||||
}
|
||||
})
|
||||
.map(move |()| {
|
||||
info!(logger, "finished initial warmup");
|
||||
()
|
||||
.map({
|
||||
cloned!(ctx);
|
||||
move |()| {
|
||||
info!(ctx.logger(), "finished initial warmup");
|
||||
()
|
||||
}
|
||||
})
|
||||
.timed({
|
||||
cloned!(ctx);
|
||||
move |stats, _| {
|
||||
let mut scuba = ctx.scuba().clone();
|
||||
scuba.add_future_stats(&stats);
|
||||
ctx.perf_counters().insert_perf_counters(&mut scuba);
|
||||
scuba.log_with_msg("Cache warmup complete", None);
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.traced(&ctx.trace(), "cache_warmup", trace_args! {})
|
||||
.map({
|
||||
@ -200,17 +209,12 @@ pub fn cache_warmup(
|
||||
ctx: CoreContext,
|
||||
repo: BlobRepo,
|
||||
cache_warmup: Option<CacheWarmupParams>,
|
||||
logger: Logger,
|
||||
) -> impl Future<Item = (), Error = Error> {
|
||||
match cache_warmup {
|
||||
Some(cache_warmup) => do_cache_warmup(
|
||||
ctx,
|
||||
repo,
|
||||
cache_warmup.bookmark,
|
||||
cache_warmup.commit_limit,
|
||||
logger.clone(),
|
||||
)
|
||||
.left_future(),
|
||||
Some(cache_warmup) => {
|
||||
do_cache_warmup(ctx, repo, cache_warmup.bookmark, cache_warmup.commit_limit)
|
||||
.left_future()
|
||||
}
|
||||
None => Ok(()).into_future().right_future(),
|
||||
}
|
||||
}
|
||||
|
@ -361,6 +361,11 @@ impl CoreContext {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn clone_and_reset(&self) -> Self {
|
||||
self.session
|
||||
.new_context(self.logger().clone(), self.scuba().clone())
|
||||
}
|
||||
|
||||
pub fn with_mutated_scuba(
|
||||
&self,
|
||||
sample: impl FnOnce(ScubaSampleBuilder) -> ScubaSampleBuilder,
|
||||
|
@ -392,14 +392,13 @@ pub fn repo_handlers(
|
||||
);
|
||||
|
||||
info!(logger, "Warming up cache");
|
||||
let initial_warmup = cache_warmup(
|
||||
ctx.clone(),
|
||||
blobrepo.clone(),
|
||||
cache_warmup_params,
|
||||
listen_log.clone(),
|
||||
)
|
||||
.chain_err(format!("while warming up cache for repo: {}", reponame))
|
||||
.from_err();
|
||||
let initial_warmup =
|
||||
cache_warmup(ctx.clone(), blobrepo.clone(), cache_warmup_params)
|
||||
.chain_err(format!(
|
||||
"while warming up cache for repo: {}",
|
||||
reponame
|
||||
))
|
||||
.from_err();
|
||||
|
||||
let mutable_counters = Arc::new(sql_mutable_counters);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user