mirror of
https://github.com/facebook/sapling.git
synced 2024-10-09 16:31:02 +03:00
mononoke: add futures_watchdog, a crate to help find stalls
Summary: Like it says in the title. This adds a crate that provides a combinator that lets us easily find stalls caused by futures that stay in `poll()` for too long. The goal is to make this minimal overhead for whoever is using it: all you need is to import it + give it a logger. It automatically looks up the line where it's called and gives it back to you in logs. This uses the `track_caller` functionality to make this work. Reviewed By: farnz Differential Revision: D26250068 fbshipit-source-id: a1458e5adebac7eab6c2de458f679c7215147937
This commit is contained in:
parent
201a50db11
commit
c88a08b9df
@ -279,6 +279,7 @@ members = [
|
||||
"common/bounded_traversal",
|
||||
"common/copy_utils",
|
||||
"common/dedupmap",
|
||||
"common/futures_watchdog",
|
||||
"common/iterhelpers",
|
||||
"common/rust/caching_ext",
|
||||
"common/rust/slog_ext",
|
||||
|
@ -28,6 +28,7 @@ fastlog = { path = "../../derived_data/fastlog", version = "0.1.0" }
|
||||
filenodes = { path = "../../filenodes", version = "0.1.0" }
|
||||
filestore = { path = "../../filestore", version = "0.1.0" }
|
||||
fsnodes = { path = "../../derived_data/fsnodes", version = "0.1.0" }
|
||||
futures_watchdog = { path = "../../common/futures_watchdog", version = "0.1.0" }
|
||||
git_types = { path = "../../git/git_types", version = "0.1.0" }
|
||||
memblob = { path = "../../blobstore/memblob", version = "0.1.0" }
|
||||
mercurial_derived_data = { path = "../../derived_data/mercurial_derived_data", version = "0.1.0" }
|
||||
|
@ -35,6 +35,7 @@ use filenodes::Filenodes;
|
||||
use filestore::FilestoreConfig;
|
||||
use fsnodes::RootFsnodeId;
|
||||
use futures::{compat::Future01CompatExt, future, try_join};
|
||||
use futures_watchdog::WatchdogExt;
|
||||
use git_types::TreeHandle;
|
||||
use maplit::hashset;
|
||||
use memblob::Memblob;
|
||||
@ -142,7 +143,8 @@ impl<'a> BlobrepoBuilder<'a> {
|
||||
// FIXME: remove clone when make_metadata_sql_factory is async-await
|
||||
self.logger.clone(),
|
||||
)
|
||||
.compat();
|
||||
.compat()
|
||||
.watched(self.logger);
|
||||
|
||||
let blobstore = make_blobstore(
|
||||
self.fb,
|
||||
@ -152,7 +154,8 @@ impl<'a> BlobrepoBuilder<'a> {
|
||||
&self.blobstore_options,
|
||||
&self.logger,
|
||||
self.config_store,
|
||||
);
|
||||
)
|
||||
.watched(self.logger);
|
||||
|
||||
let (sql_factory, blobstore) = future::try_join(sql_factory, blobstore).await?;
|
||||
|
||||
@ -168,6 +171,7 @@ impl<'a> BlobrepoBuilder<'a> {
|
||||
self.reponame,
|
||||
self.blobstore_options.cachelib_options,
|
||||
)
|
||||
.watched(self.logger)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
13
eden/mononoke/common/futures_watchdog/Cargo.toml
Normal file
13
eden/mononoke/common/futures_watchdog/Cargo.toml
Normal file
@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "futures_watchdog"
|
||||
edition = "2018"
|
||||
version = "0.1.0"
|
||||
authors = ['Facebook']
|
||||
license = "GPLv2+"
|
||||
include = ["src/**/*.rs"]
|
||||
|
||||
[dependencies]
|
||||
futures = { version = "0.3.5", features = ["async-await", "compat"] }
|
||||
maybe-owned = "0.3.4"
|
||||
pin-project = "0.4"
|
||||
slog = { version = "2.5", features = ["max_level_debug"] }
|
113
eden/mononoke/common/futures_watchdog/src/lib.rs
Normal file
113
eden/mononoke/common/futures_watchdog/src/lib.rs
Normal file
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Copyright (c) Facebook, Inc. and its affiliates.
|
||||
*
|
||||
* This software may be used and distributed according to the terms of the
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use futures::{
|
||||
future::Future,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use maybe_owned::MaybeOwned;
|
||||
use pin_project::pin_project;
|
||||
use slog::{Logger, Record};
|
||||
use std::pin::Pin;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[pin_project]
|
||||
pub struct WatchedFuture<R, F> {
|
||||
reporter: R,
|
||||
#[pin]
|
||||
inner: F,
|
||||
}
|
||||
|
||||
impl<R, F> Future for WatchedFuture<R, F>
|
||||
where
|
||||
R: Reporter,
|
||||
F: Future,
|
||||
{
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
let now = Instant::now();
|
||||
let ret = this.inner.poll(cx);
|
||||
this.reporter.report(now.elapsed());
|
||||
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Reporter {
|
||||
fn report(&self, poll: Duration);
|
||||
}
|
||||
|
||||
pub struct SlogReporter<'a> {
|
||||
logger: MaybeOwned<'a, Logger>,
|
||||
location: slog::RecordLocation,
|
||||
max_poll: Duration,
|
||||
}
|
||||
|
||||
impl Reporter for SlogReporter<'_> {
|
||||
fn report(&self, poll: Duration) {
|
||||
if poll <= self.max_poll {
|
||||
return;
|
||||
}
|
||||
|
||||
self.logger.log(&Record::new(
|
||||
&slog::RecordStatic {
|
||||
location: &self.location,
|
||||
level: slog::Level::Warning,
|
||||
tag: "futures_watchdog",
|
||||
},
|
||||
&format_args!("Slow poll() ran for {:?}", poll),
|
||||
slog::b!(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
pub trait WatchdogExt: Sized {
|
||||
#[track_caller]
|
||||
fn watched<'a, L>(self, logger: L) -> WatchedFuture<SlogReporter<'a>, Self>
|
||||
where
|
||||
L: Into<MaybeOwned<'a, Logger>>;
|
||||
}
|
||||
|
||||
impl<F> WatchdogExt for F
|
||||
where
|
||||
F: Future + Sized,
|
||||
{
|
||||
#[track_caller]
|
||||
fn watched<'a, L>(self, logger: L) -> WatchedFuture<SlogReporter<'a>, Self>
|
||||
where
|
||||
L: Into<MaybeOwned<'a, Logger>>,
|
||||
{
|
||||
let logger = logger.into();
|
||||
|
||||
let location = std::panic::Location::caller();
|
||||
|
||||
let location = slog::RecordLocation {
|
||||
file: location.file(),
|
||||
line: location.line(),
|
||||
column: location.column(),
|
||||
function: "",
|
||||
module: "",
|
||||
};
|
||||
|
||||
// This is a bit arbitrary but generally a very conservative default.
|
||||
let max_poll = Duration::from_millis(500);
|
||||
|
||||
let reporter = SlogReporter {
|
||||
logger,
|
||||
location,
|
||||
max_poll,
|
||||
};
|
||||
|
||||
WatchedFuture {
|
||||
reporter,
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
}
|
@ -22,6 +22,7 @@ derived_data = { path = "../derived_data", version = "0.1.0" }
|
||||
fastlog = { path = "../derived_data/fastlog", version = "0.1.0" }
|
||||
filestore = { path = "../filestore", version = "0.1.0" }
|
||||
fsnodes = { path = "../derived_data/fsnodes", version = "0.1.0" }
|
||||
futures_watchdog = { path = "../common/futures_watchdog", version = "0.1.0" }
|
||||
hook_manager_factory = { path = "../hooks/hook_manager_factory", version = "0.1.0" }
|
||||
hooks = { path = "../hooks", version = "0.1.0" }
|
||||
live_commit_sync_config = { path = "../commit_rewriting/live_commit_sync_config", version = "0.1.0" }
|
||||
|
@ -19,7 +19,8 @@ use cached_config::ConfigStore;
|
||||
use cloned::cloned;
|
||||
use fbinit::FacebookInit;
|
||||
use futures::future;
|
||||
use slog::{debug, info, Logger};
|
||||
use futures_watchdog::WatchdogExt;
|
||||
use slog::{debug, info, o, Logger};
|
||||
use sql_ext::facebook::MysqlOptions;
|
||||
pub use warm_bookmarks_cache::BookmarkUpdateDelay;
|
||||
|
||||
@ -87,6 +88,7 @@ impl Mononoke {
|
||||
async move {
|
||||
info!(&env.logger, "Initializing repo: {}", &name);
|
||||
let repo = Repo::new(env, name.clone(), config, common_config)
|
||||
.watched(env.logger.new(o!("repo" => name.clone())))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("could not initialize repo '{}'", &name)
|
||||
|
@ -30,6 +30,7 @@ use filestore::{Alias, FetchKey};
|
||||
use futures::compat::{Future01CompatExt, Stream01CompatExt};
|
||||
use futures::stream::{Stream, StreamExt, TryStreamExt};
|
||||
use futures::try_join;
|
||||
use futures_watchdog::WatchdogExt;
|
||||
use hook_manager_factory::make_hook_manager;
|
||||
use hooks::HookManager;
|
||||
use itertools::Itertools;
|
||||
@ -180,7 +181,7 @@ impl Repo {
|
||||
&logger,
|
||||
&env.config_store,
|
||||
);
|
||||
let blob_repo = builder.build().await?;
|
||||
let blob_repo = builder.build().watched(&logger).await?;
|
||||
|
||||
let ctx = CoreContext::new_with_logger(env.fb, logger.clone());
|
||||
|
||||
@ -264,12 +265,12 @@ impl Repo {
|
||||
hook_manager,
|
||||
sql_read_write_status,
|
||||
) = try_join!(
|
||||
repo_permission_checker,
|
||||
service_permission_checker,
|
||||
skiplist_index,
|
||||
warm_bookmarks_cache,
|
||||
hook_manager,
|
||||
sql_read_write_status,
|
||||
repo_permission_checker.watched(&logger),
|
||||
service_permission_checker.watched(&logger),
|
||||
skiplist_index.watched(&logger),
|
||||
warm_bookmarks_cache.watched(&logger),
|
||||
hook_manager.watched(&logger),
|
||||
sql_read_write_status.watched(&logger),
|
||||
)?;
|
||||
|
||||
let readonly_fetcher = RepoReadWriteFetcher::new(
|
||||
|
@ -9,6 +9,7 @@ include = ["src/**/*.rs"]
|
||||
[dependencies]
|
||||
alpn = { path = "../alpn", version = "0.1.0" }
|
||||
cmdlib = { path = "../cmdlib", version = "0.1.0" }
|
||||
futures_watchdog = { path = "../common/futures_watchdog", version = "0.1.0" }
|
||||
monitoring = { path = "monitoring", version = "0.1.0" }
|
||||
mononoke_api = { path = "../mononoke_api", version = "0.1.0" }
|
||||
repo_listener = { path = "repo_listener", version = "0.1.0" }
|
||||
|
@ -14,6 +14,7 @@ use cloned::cloned;
|
||||
use cmdlib::{args, monitoring::ReadyFlagService};
|
||||
use fbinit::FacebookInit;
|
||||
use futures::channel::oneshot;
|
||||
use futures_watchdog::WatchdogExt;
|
||||
use mononoke_api::{
|
||||
BookmarkUpdateDelay, Mononoke, MononokeEnvironment, WarmBookmarksCacheDerivedData,
|
||||
};
|
||||
@ -161,7 +162,10 @@ fn main(fb: FacebookInit) -> Result<()> {
|
||||
warm_bookmarks_cache_delay: BookmarkUpdateDelay::Disallow,
|
||||
};
|
||||
|
||||
let mononoke = Mononoke::new(&env, config.clone()).await?;
|
||||
let mononoke = Mononoke::new(&env, config.clone())
|
||||
.watched(&root_log)
|
||||
.await?;
|
||||
info!(&root_log, "Built Mononoke");
|
||||
|
||||
repo_listener::create_repo_listeners(
|
||||
fb,
|
||||
|
@ -29,6 +29,7 @@ COMMON_ARGS=(
|
||||
--mysql-master-only
|
||||
--tunables-config "file:$TESTTMP/mononoke_tunables.json"
|
||||
--local-configerator-path "$TESTTMP/configerator"
|
||||
--log-exclude-tag "futures_watchdog"
|
||||
)
|
||||
if [[ -n "$MYSQL_CLIENT" ]]; then
|
||||
COMMON_ARGS+=(--use-mysql-client)
|
||||
|
Loading…
Reference in New Issue
Block a user