Change serve_forever (and thus LFS and SCS servers) to new futures

Summary: As part of the tokio-0.2 migration effort, prepare `serve_forever` users to rely only on standard futures.

Reviewed By: krallin

Differential Revision: D19396792

fbshipit-source-id: 324272dd62c3d37a255cdb6f0eb825ca09420371
This commit is contained in:
Simon Farnsworth 2020-01-21 16:10:07 -08:00 committed by Facebook Github Bot
parent 563f267f8c
commit 7ab550edff
3 changed files with 85 additions and 163 deletions

View File

@ -6,18 +6,20 @@
* directory of this source tree.
*/
use std::{cmp::min, fs, future::Future, io, path::Path, str::FromStr, thread, time::Duration};
use std::{cmp::min, fs, future::Future, io, path::Path, str::FromStr, time::Duration};
use anyhow::{bail, format_err, Context, Error, Result};
use clap::ArgMatches;
use cloned::cloned;
use fbinit::FacebookInit;
use futures::sync::oneshot::Receiver;
use futures::{future as old_future, sync, Future as OldFuture, IntoFuture};
use futures_ext::{BoxFuture, FutureExt};
use panichandler::Fate;
use signal_hook::{iterator::Signals, SIGINT, SIGTERM};
use futures::{Future as OldFuture, IntoFuture};
use futures_ext::{BoxFuture, FutureExt as OldFutureExt};
use futures_preview::{future, StreamExt, TryFutureExt};
use slog::{debug, error, info, Logger};
use tokio_preview::{
signal::unix::{signal, SignalKind},
time,
};
use crate::args;
use crate::monitoring;
@ -30,7 +32,7 @@ use mercurial_types::{HgChangesetId, HgManifestId};
use metaconfig_types::MetadataDBConfig;
use mononoke_types::ChangesetId;
use sql_ext::MysqlOptions;
use stats::{schedule_stats_aggregation, schedule_stats_aggregation_preview};
use stats::schedule_stats_aggregation_preview;
pub const ARG_SHUTDOWN_GRACE_PERIOD: &str = "shutdown-grace-period";
pub const ARG_FORCE_SHUTDOWN_PERIOD: &str = "force-shutdown-period";
@ -317,108 +319,61 @@ pub fn create_runtime(
/// called. This should perform any steps required to quiesce the
/// server. Requests should still be accepted.
///
/// After the configured quiesce timeout, the `server` future is
/// cancelled, and the `shutdown` callback is called. This should do
/// any additional work to stop accepting connections and wait until all
/// outstanding requests have been handled.
/// After the configured quiesce timeout, the `shutdown` future is awaited.
/// This should do any additional work to stop accepting connections and wait
/// until all outstanding requests have been handled. The `server` future
/// continues to run while `shutdown` is being awaited.
///
/// Currently the `shutdown` callback can return `true` to indicate that
/// the runtime should wait until it is idle before shutting down. Note
/// that this option will be removed in Tokio 0.2.
///
/// When `shutdown` completes, or when the force shutdown timer expires, the
/// runtime will be shutdown and the process will exit.
pub fn serve_forever<Server, QuiesceFn, ShutdownFn>(
runtime: tokio_compat::runtime::Runtime,
/// Once `shutdown` returns, the `server` future is cancelled, and the process
/// exits. If `shutdown_timeout` is exceeded, an error is returned.
pub fn serve_forever<Server, QuiesceFn, ShutdownFut>(
mut runtime: tokio_compat::runtime::Runtime,
server: Server,
logger: &Logger,
quiesce: QuiesceFn,
shutdown_grace_period: Duration,
shutdown: ShutdownFn,
shutdown: ShutdownFut,
shutdown_timeout: Duration,
) -> Result<(), Error>
where
Server: OldFuture<Item = (), Error = ()> + Send + 'static,
Server: Future<Output = ()> + Send + 'static,
QuiesceFn: FnOnce(),
ShutdownFn: FnOnce() -> bool,
ShutdownFut: Future<Output = ()>,
{
// Block until receiving a signal that tells us to exit.
let block = || -> Result<(), Error> {
let signals = Signals::new(&[SIGTERM, SIGINT])?;
for signal in signals.forever() {
info!(&logger, "Signalled: {}", signal);
break;
}
Ok(())
};
block_on_fn(
runtime,
server,
logger,
block,
quiesce,
shutdown_grace_period,
shutdown,
shutdown_timeout,
)?;
runtime.block_on_std(async {
let mut terminate = signal(SignalKind::terminate())?;
let mut interrupt = signal(SignalKind::interrupt())?;
// This future becomes ready when we receive a termination signal
let signalled = future::select(terminate.next(), interrupt.next());
Ok(())
}
let stats_agg = schedule_stats_aggregation_preview()
.map_err(|_| Error::msg("Failed to create stats aggregation worker"))?;
// Note: this returns a JoinHandle, which we drop, thus detaching the task
// It thus does not count towards shutdown_on_idle below
tokio_preview::task::spawn(stats_agg);
pub fn block_on_fn<Server, QuiesceFn, ShutdownFn, BlockFn>(
runtime: tokio_compat::runtime::Runtime,
server: Server,
logger: &Logger,
block: BlockFn,
quiesce: QuiesceFn,
shutdown_grace_period: Duration,
shutdown: ShutdownFn,
shutdown_timeout: Duration,
) -> Result<(), Error>
where
Server: OldFuture<Item = (), Error = ()> + Send + 'static,
QuiesceFn: FnOnce(),
ShutdownFn: FnOnce() -> bool,
BlockFn: FnOnce() -> Result<(), Error>,
{
let (shutdown_pub, shutdown_sub) = sync::oneshot::channel::<()>();
let main = join_stats_agg(server, shutdown_sub)?;
runtime.spawn(main);
// Spawn the server onto its own task
tokio_preview::task::spawn(server);
block()?;
// Now wait for the termination signal
signalled.await;
// Shutting down: wait for the grace period.
quiesce();
info!(
&logger,
"Waiting {}s before shutting down server",
shutdown_grace_period.as_secs(),
);
thread::sleep(shutdown_grace_period);
// Shutting down: wait for the grace period.
quiesce();
info!(
&logger,
"Waiting {}s before shutting down server",
shutdown_grace_period.as_secs(),
);
info!(&logger, "Shutting down...");
let _ = shutdown_pub.send(());
time::delay_for(shutdown_grace_period).await;
// Create a background thread to panic if we fail to shutdown within the timeout.
panichandler::set_panichandler(Fate::Abort);
thread::spawn(move || {
thread::sleep(shutdown_timeout);
panic!("Timed out shutting down runtime");
});
if shutdown() {
runtime
.shutdown_on_idle()
.wait()
.map_err(|_| Error::msg("Failed to shutdown runtime!"))?;
} else {
runtime
.shutdown_now()
.wait()
.map_err(|_| Error::msg("Failed to shutdown runtime!"))?;
}
Ok(())
info!(&logger, "Shutting down...");
time::timeout(shutdown_timeout, shutdown)
.map_err(|_| Error::msg("Timed out shutting down server"))
.await
})
}
/// Executes the future and waits for it to finish.
@ -457,35 +412,6 @@ where
})
}
/// Join the future with stats aggregator and return a new joined future
fn join_stats_agg<F>(future: F, shutdown_sub: Receiver<()>) -> Result<BoxFuture<(), ()>, Error>
where
F: OldFuture<Item = (), Error = ()> + Send + 'static,
{
let stats_agg = schedule_stats_aggregation()
.map_err(|_| Error::msg("Failed to create stats aggregation worker"))?
.discard();
let main = (stats_agg, future)
.into_future()
.select2(shutdown_sub)
.then({
move |res| -> Result<(), ()> {
match res {
Ok(old_future::Either::B(_)) => Ok(()),
Err(old_future::Either::A(_)) => Err(()),
_ => {
// NOTE: We need to panic here, because otherwise main is going to be blocked on
// waiting for a signal forever. This shouldn't normally ever happen.
unreachable!("Terminated or signal listener was dropped.")
}
}
}
});
Ok(main.boxify())
}
#[cfg(test)]
mod test {
use super::*;
@ -493,7 +419,6 @@ mod test {
use anyhow::Error;
use futures_preview::future::lazy;
use slog_glog_fmt;
use tokio_timer::sleep;
fn create_logger() -> Logger {
slog_glog_fmt::facebook_logger().unwrap()
@ -522,23 +447,4 @@ mod test {
let res = block_execute(future, fb, "test_app", &logger, &matches);
assert!(res.is_err());
}
#[test]
fn test_block_on_fn_shutsdown() {
let logger = create_logger();
let matches = exec_matches();
let runtime = args::init_runtime(&matches).unwrap();
let server = sleep(Duration::from_secs(42)).discard();
block_on_fn(
runtime,
server,
&logger,
|| -> Result<(), Error> { Ok(()) },
|| (),
Duration::from_secs(0),
|| true,
Duration::from_secs(10),
)
.unwrap();
}
}

View File

@ -17,10 +17,16 @@ use cloned::cloned;
use fbinit::FacebookInit;
use futures::{Future, IntoFuture};
use futures_ext::FutureExt as Futures01Ext;
use futures_preview::TryFutureExt;
use futures_util::{compat::Future01CompatExt, future::try_join_all, try_join};
use futures_preview::{
channel::oneshot,
compat::Future01CompatExt,
future::{lazy, select, try_join_all},
FutureExt, TryFutureExt,
};
// TODO: When we get rid of old futures, this can come from `futures` (can't while new futures is called `futures_preview`)
use futures_util::try_join;
use gotham::{bind_server, bind_server_with_pre_state};
use slog::{info, warn};
use slog::{error, info, warn};
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
@ -285,7 +291,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
let mut runtime = args::init_runtime(&matches)?;
let repos: HashMap<_, _> = runtime
.block_on(try_join_all(futs).compat())?
.block_on_std(try_join_all(futs))?
.into_iter()
.collect();
@ -410,20 +416,28 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
};
info!(&logger, "Listening on {:?}", addr);
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
serve_forever(
runtime,
server,
select(
server.compat().map_err({
let logger = logger.clone();
move |e| error!(&logger, "Unhandled error: {:?}", e)
}),
shutdown_rx,
)
.map(|_| ()),
&logger,
move || will_exit.store(true, Ordering::Relaxed),
args::get_shutdown_grace_period(&matches)?,
move || {
// Currently we rely on `shutdown_on_idle` waiting for all
// in-flight requests to complete.
//
// TODO: in preparation for migration to Tokio 0.2, wait
// until all requests have completed.
true
},
lazy(move |_| {
let _ = shutdown_tx.send(());
// Currently we kill off in-flight requests as soon as we've closed the listener.
// If this is a problem in prod, this would be the point at which to wait
// for all connections to shut down.
// To do this properly, we'd need to track the `Connection` futures that Gotham
// gets from Hyper, tell them to gracefully shutdown, then wait for them to complete
}),
args::get_shutdown_timeout(&matches)?,
)?;

View File

@ -21,7 +21,7 @@ use fb303::server::make_FacebookService_server;
use fb303_core::server::make_BaseService_server;
use fbinit::FacebookInit;
use futures_ext::FutureExt as Futures01Ext;
use futures_preview::{FutureExt, TryFutureExt};
use futures_preview::{compat::Future01CompatExt, FutureExt, TryFutureExt};
use metaconfig_parser::RepoConfigs;
use mononoke_api::{CoreContext, Mononoke};
use panichandler::Fate;
@ -31,6 +31,7 @@ use srserver::service_framework::{
BuildModule, Fb303Module, ProfileModule, ServiceFramework, ThriftStatsModule,
};
use srserver::{ThriftServer, ThriftServerBuilder};
use tokio_preview::task;
mod commit_id;
mod errors;
@ -168,16 +169,17 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
.expect("failed to start thrift service");
serve_forever(
runtime,
monitoring_forever.discard(),
monitoring_forever.discard().compat().map(|_| ()),
&logger,
move || will_exit.store(true, Ordering::Relaxed),
args::get_shutdown_grace_period(&matches)?,
move || {
// Calling `stop` blocks until the service has completed all requests.
service_framework.stop();
// Runtime should shutdown now.
false
async {
// Note that async blocks are lazy, so this isn't called until first poll
let _ = task::spawn_blocking(move || {
// Calling `stop` blocks until the service has completed all requests.
service_framework.stop();
})
.await;
},
args::get_shutdown_timeout(&matches)?,
)?;