mononoke: per wireproto command timeout

Summary:
Previously we had a timeout per session i.e. multiple wireproto command will
share the same timeout. It had a few disadvantages:

1) The main disadvantage was that if connection had timed out we didn't log
stats such as number of files, response size etc and we didn't log parameters
to scribe. The latter is even a bigger problem, because we usually want to
replay requests that were slow and timed out and not the requests that finished
quickly.

2) The less important disadvantage is that we have clients that do small
request from the server and then keep the connection open for a long time.
Eventually we kill the connection and log it as an error. With this change
the connection will be open until client closes it. That might potentially be
a problem, and if that's the case then we can reintroduce perconnection
timeout.

Initially I was planning to use tokio::util::timer to implement all the
timeouts, but it has different behaviour for stream - it only allows to set
per-item timeout, while we want timeout for the whole stream.
(https://docs.rs/tokio/0.1/tokio/timer/struct.Timeout.html#futures-and-streams)
To overcome it I implemented simple combinator StreamWithTimeout which does
exactly what I want.

Reviewed By: HarveyHunt

Differential Revision: D13731966

fbshipit-source-id: 211240267c7568cedd18af08155d94bf9246ecc3
This commit is contained in:
Stanislau Hlebik 2019-01-18 08:33:10 -08:00 committed by Facebook Github Bot
parent cf3b9b55eb
commit b909f2bc9c
3 changed files with 140 additions and 32 deletions

View File

@ -26,6 +26,7 @@ extern crate tokio_threadpool;
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{future, Async, AsyncSink, Future, IntoFuture, Poll, Sink, Stream};
use tokio::timer::Delay;
use tokio_io::codec::{Decoder, Encoder};
use tokio_io::AsyncWrite;
use tokio_threadpool::blocking;
@ -48,6 +49,8 @@ pub use futures_ordered::{futures_ordered, FuturesOrdered};
pub use select_all::{select_all, SelectAll};
pub use stream_wrappers::{BoxStreamWrapper, CollectNoConsume, StreamWrapper, TakeWhile};
use std::time::{Duration, Instant};
/// Map `Item` and `Error` to `()`
///
/// Adapt an existing `Future` to return unit `Item` and `Error`, while still
@ -266,6 +269,18 @@ pub trait StreamExt: Stream {
{
StreamEither::B(self)
}
// It's different from tokio::timer::Timeout in that it sets a timeout on the whole Stream,
// not just on a single Stream item
fn whole_stream_timeout(self, duration: Duration) -> StreamWithTimeout<Self>
where
Self: Sized,
{
StreamWithTimeout {
stream: self,
delay: Delay::new(Instant::now() + duration),
}
}
}
impl<T> StreamExt for T where T: Stream {}
@ -437,6 +452,42 @@ impl<In: Stream> Stream for ReturnRemainder<In> {
}
}
pub enum StreamTimeoutError {
Error(failure::Error),
Timeout,
}
pub struct StreamWithTimeout<S> {
delay: Delay,
stream: S,
}
impl<S: Stream<Error = failure::Error>> Stream for StreamWithTimeout<S> {
type Item = S::Item;
type Error = StreamTimeoutError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.delay.poll() {
Ok(Async::Ready(())) => {
return Err(StreamTimeoutError::Timeout);
}
Err(err) => {
return Err(StreamTimeoutError::Error(failure::err_msg(format!(
"internal error: timeout failed {}",
err
))));
}
_ => {}
};
match self.stream.poll() {
Ok(Async::Ready(item)) => Ok(Async::Ready(item)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(StreamTimeoutError::Error(err)),
}
}
}
/// A convenience macro for working with `io::Result<T>` from the `Read` and
/// `Write` traits.
///
@ -849,4 +900,40 @@ mod test {
assert_eq!(res.len(), messages_num);
})
}
#[test]
fn whole_stream_timeout_test() {
use futures::Stream;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::timer::Interval;
let count = Arc::new(AtomicUsize::new(0));
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let f = Interval::new(Instant::now(), Duration::new(1, 0))
.map({
let count = count.clone();
move |item| {
count.fetch_add(1, Ordering::Relaxed);
item
}
})
.map_err(|_| failure::err_msg("error"))
.take(10)
.whole_stream_timeout(Duration::new(3, 0))
.collect();
let res = runtime.block_on(f);
assert!(res.is_err());
match res {
Err(StreamTimeoutError::Timeout) => {}
_ => {
panic!("expected timeout");
}
};
assert!(count.load(Ordering::Relaxed) < 5);
}
}

View File

@ -12,11 +12,12 @@ use std::iter::FromIterator;
use std::mem;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use bytes::{BufMut, Bytes, BytesMut};
use failure::err_msg;
use futures::{future, stream, stream::empty, Async, Future, IntoFuture, Poll, Stream};
use futures_ext::{select_all, BoxFuture, BoxStream, FutureExt, StreamExt};
use futures_ext::{select_all, BoxFuture, BoxStream, FutureExt, StreamExt, StreamTimeoutError};
use futures_stats::{StreamStats, Timed, TimedStreamTrait};
use itertools::Itertools;
use stats::Histogram;
@ -41,6 +42,8 @@ use reachabilityindex::LeastCommonAncestorsHint;
use scribe::ScribeClient;
use scuba_ext::{ScribeClientImplementation, ScubaSampleBuilder, ScubaSampleBuilderExt};
use serde_json;
use tokio::timer::timeout::Error as TimeoutError;
use tokio::util::FutureExt as TokioFutureExt;
use tracing::Traced;
use blobrepo::BlobRepo;
@ -95,6 +98,24 @@ where
.join(",")
}
fn timeout_duration() -> Duration {
Duration::from_secs(15 * 60)
}
fn process_timeout_error(err: TimeoutError<Error>) -> Error {
match err.into_inner() {
Some(err) => err,
None => err_msg("timeout"),
}
}
fn process_stream_timeout_error(err: StreamTimeoutError) -> Error {
match err {
StreamTimeoutError::Error(err) => err,
StreamTimeoutError::Timeout => err_msg("timeout"),
}
}
fn wireprotocaps() -> Vec<String> {
vec![
"lookup".to_string(),
@ -525,6 +546,8 @@ impl HgCommands for RepoClient {
.collect()
})
.collect()
.timeout(timeout_duration())
.map_err(process_timeout_error)
.traced(self.ctx.trace(), ops::BETWEEN, trace_args!())
.timed(move |stats, _| {
scuba_logger
@ -548,6 +571,8 @@ impl HgCommands for RepoClient {
.collect()
.map(|v| v.into_iter().collect())
.from_err()
.timeout(timeout_duration())
.map_err(process_timeout_error)
.traced(self.ctx.trace(), ops::HEADS, trace_args!())
.timed(move |stats, _| {
scuba_logger
@ -620,6 +645,8 @@ impl HgCommands for RepoClient {
};
lookup_fut
.timeout(timeout_duration())
.map_err(process_timeout_error)
.traced(self.ctx.trace(), ops::LOOKUP, trace_args!())
.timed(move |stats, _| {
scuba_logger
@ -660,6 +687,8 @@ impl HgCommands for RepoClient {
.collect();
known_nodes
})
.timeout(timeout_duration())
.map_err(process_timeout_error)
.traced(self.ctx.trace(), ops::KNOWN, trace_args!())
.timed(move |stats, known_nodes| {
if let Ok(known) = known_nodes {
@ -698,6 +727,8 @@ impl HgCommands for RepoClient {
Ok(res) => res.boxify(),
Err(err) => stream::once(Err(err)).boxify(),
}
.whole_stream_timeout(timeout_duration())
.map_err(process_stream_timeout_error)
.traced(self.ctx.trace(), ops::GETBUNDLE, trace_args!())
.timed(move |stats, _| {
STATS::getbundle_ms.add_value(stats.completion_time.as_millis_unchecked() as i64);
@ -720,6 +751,8 @@ impl HgCommands for RepoClient {
let mut scuba_logger = self.prepared_ctx(ops::HELLO, None).scuba().clone();
future::ok(res)
.timeout(timeout_duration())
.map_err(process_timeout_error)
.traced(self.ctx.trace(), ops::HELLO, trace_args!())
.timed(move |stats, _| {
scuba_logger
@ -750,6 +783,8 @@ impl HgCommands for RepoClient {
.map(|(name, value)| (Vec::from(name.to_string()), value));
HashMap::from_iter(bookiter)
})
.timeout(timeout_duration())
.map_err(process_timeout_error)
.traced(self.ctx.trace(), ops::LISTKEYS, trace_args!())
.timed(move |stats, _| {
scuba_logger
@ -792,7 +827,9 @@ impl HgCommands for RepoClient {
self.phases_hint.clone(),
);
res.traced(self.ctx.trace(), ops::UNBUNDLE, trace_args!())
res.timeout(timeout_duration())
.map_err(process_timeout_error)
.traced(self.ctx.trace(), ops::UNBUNDLE, trace_args!())
.timed(move |stats, _| {
if let Ok(counters) = serde_json::to_string(&ctx.perf_counters()) {
scuba_logger.add("extra_context", counters);
@ -817,6 +854,8 @@ impl HgCommands for RepoClient {
let mut wireproto_logger = self.wireproto_logger(ops::GETTREEPACK, Some(args));
self.gettreepack_untimed(params)
.whole_stream_timeout(timeout_duration())
.map_err(process_stream_timeout_error)
.traced(self.ctx.trace(), ops::GETTREEPACK, trace_args!())
.inspect({
cloned!(self.ctx);
@ -902,6 +941,8 @@ impl HgCommands for RepoClient {
.set_max_counter("getfiles_max_file_size", len);
}
})
.whole_stream_timeout(timeout_duration())
.map_err(process_stream_timeout_error)
.timed({
cloned!(self.ctx);
move |stats, _| {
@ -989,6 +1030,8 @@ impl HgCommands for RepoClient {
}
})
.flatten_stream()
.whole_stream_timeout(timeout_duration())
.map_err(process_stream_timeout_error)
.boxify()
}
}

View File

@ -7,7 +7,7 @@
use std::mem;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::Instant;
use failure::{prelude::*, SlogKVError};
use futures::{Future, Sink, Stream};
@ -17,7 +17,6 @@ use slog_kvfilter::KVFilter;
use slog_term;
use stats::Histogram;
use time_ext::DurationExt;
use tokio::util::FutureExt as TokioFutureExt;
use tracing::{TraceContext, Traced};
use uuid::Uuid;
@ -146,13 +145,7 @@ pub fn request_handler(
// If we got an error at this point, then catch it and print a message
endres
// Don't wait for more that 15 mins for a request
.timeout(Duration::from_secs(15 * 60))
.traced(
&trace,
"wireproto request",
trace_args!(),
)
.traced(&trace, "wireproto request", trace_args!())
.timed(move |stats, result| {
let mut wireproto_calls = wireproto_calls.lock().expect("lock poisoned");
let wireproto_calls = mem::replace(&mut *wireproto_calls, Vec::new());
@ -164,31 +157,16 @@ pub fn request_handler(
match result {
Ok(_) => scuba_logger.log_with_msg("Request finished - Success", None),
Err(err) => if err.is_inner() {
Err(err) => {
scuba_logger.log_with_msg("Request finished - Failure", format!("{:#?}", err));
} else if err.is_elapsed() {
scuba_logger.log_with_msg("Request finished - Timeout", None);
} else {
scuba_logger.log_with_msg(
"Request finished - Unexpected timer error",
format!("{:#?}", err),
);
},
}
}
scuba_logger.log_with_trace(&trace)
})
.map_err(move |err| {
if err.is_inner() {
error!(ctx.logger(), "Command failed";
SlogKVError(err.into_inner().unwrap()),
"remote" => "true");
} else if err.is_elapsed() {
error!(ctx.logger(), "Timeout while handling request";
"remote" => "true");
} else {
crit!(ctx.logger(), "Unexpected error";
SlogKVError(err.into_timer().unwrap().into()),
"remote" => "true");
}
error!(ctx.logger(), "Command failed";
SlogKVError(err),
"remote" => "true"
);
})
}