From 71674d87c5336028ff8aab666361d8326d4dce5b Mon Sep 17 00:00:00 2001 From: Arun Kulshreshtha Date: Fri, 18 Sep 2020 22:35:55 -0700 Subject: [PATCH] edenapi_server: log errors during streaming responses Summary: Use the new `ForwardErr` stream combinator to log errors that occur during a streaming response. Right now, they are just printed to stderr, but in the future we should also do other things such as logging them to Scuba. This diff supersedes the approach from D22720957. Reviewed By: krallin Differential Revision: D23780215 fbshipit-source-id: 8d2267f1166e665a62a167a6d95bb0b1797b5767 --- .../edenapi_server/src/handlers/commit.rs | 8 ++-- .../src/handlers/complete_trees.rs | 4 +- .../edenapi_server/src/handlers/files.rs | 4 +- .../edenapi_server/src/handlers/history.rs | 4 +- .../edenapi_server/src/handlers/trees.rs | 4 +- .../src/middleware/request_context.rs | 38 ++++++++++++++++--- .../mononoke/edenapi_server/src/utils/cbor.rs | 25 ++++++++---- 7 files changed, 62 insertions(+), 25 deletions(-) diff --git a/eden/mononoke/edenapi_server/src/handlers/commit.rs b/eden/mononoke/edenapi_server/src/handlers/commit.rs index 177cce2c71..a4e884151f 100644 --- a/eden/mononoke/edenapi_server/src/handlers/commit.rs +++ b/eden/mononoke/edenapi_server/src/handlers/commit.rs @@ -49,7 +49,7 @@ pub async fn location_to_hash(state: &mut State) -> Result Result Result { @@ -71,7 +71,7 @@ pub async fn revlog_data(state: &mut State) -> Result Result Result Result state.put(HandlerInfo::new(¶ms.repo, EdenApiMethod::Files)); - let rctx = RequestContext::borrow_from(state); + let rctx = RequestContext::borrow_from(state).clone(); let sctx = ServerContext::borrow_from(state); let repo = get_repo(&sctx, &rctx, ¶ms.repo).await?; let request = parse_cbor_request(state).await?; - Ok(cbor_stream(fetch_all_files(repo, request))) + Ok(cbor_stream(rctx, fetch_all_files(repo, request))) } /// Fetch files for all of the requested keys concurrently. diff --git a/eden/mononoke/edenapi_server/src/handlers/history.rs b/eden/mononoke/edenapi_server/src/handlers/history.rs index fc08700e02..0ac5c9e71a 100644 --- a/eden/mononoke/edenapi_server/src/handlers/history.rs +++ b/eden/mononoke/edenapi_server/src/handlers/history.rs @@ -45,13 +45,13 @@ pub async fn history(state: &mut State) -> Result Result state.put(HandlerInfo::new(¶ms.repo, EdenApiMethod::Trees)); - let rctx = RequestContext::borrow_from(state); + let rctx = RequestContext::borrow_from(state).clone(); let sctx = ServerContext::borrow_from(state); let repo = get_repo(&sctx, &rctx, ¶ms.repo).await?; let request = parse_cbor_request(state).await?; - Ok(cbor_stream(fetch_all_trees(repo, request))) + Ok(cbor_stream(rctx, fetch_all_trees(repo, request))) } /// Fetch trees for all of the requested keys concurrently. diff --git a/eden/mononoke/edenapi_server/src/middleware/request_context.rs b/eden/mononoke/edenapi_server/src/middleware/request_context.rs index ec0b8248bd..f046182b4f 100644 --- a/eden/mononoke/edenapi_server/src/middleware/request_context.rs +++ b/eden/mononoke/edenapi_server/src/middleware/request_context.rs @@ -5,25 +5,53 @@ * GNU General Public License version 2. */ +use anyhow::Error; +use futures::{ + channel::mpsc::{self, Sender}, + prelude::*, +}; use gotham::state::{request_id, FromState, State}; use gotham_derive::StateData; use hyper::{Body, Response}; -use slog::{o, Logger}; +use slog::{error, o, Logger}; +use cloned::cloned; use context::{CoreContext, SessionContainer}; use fbinit::FacebookInit; use gotham_ext::middleware::{ClientIdentity, Middleware}; use scuba::ScubaSampleBuilder; use sshrelay::Metadata; +const ERROR_CHANNEL_CAPACITY: usize = 1000; + #[derive(StateData, Clone)] pub struct RequestContext { pub ctx: CoreContext, + pub logger: Logger, + pub error_tx: Sender, } impl RequestContext { - fn new(ctx: CoreContext) -> Self { - Self { ctx } + async fn new(ctx: CoreContext, logger: Logger) -> Self { + let (error_tx, mut error_rx) = mpsc::channel(ERROR_CHANNEL_CAPACITY); + + let rctx = Self { + ctx, + logger, + error_tx, + }; + + // Spawn error logging task. + let _ = tokio::spawn({ + cloned!(rctx); + async move { + while let Some(error) = error_rx.next().await { + error!(&rctx.logger, "{:?}", error); + } + } + }); + + rctx } } @@ -53,9 +81,9 @@ impl Middleware for RequestContextMiddleware { let request_id = request_id(&state); let logger = self.logger.new(o!("request_id" => request_id.to_string())); - let ctx = session.new_context(logger, ScubaSampleBuilder::with_discard()); + let ctx = session.new_context(logger.clone(), ScubaSampleBuilder::with_discard()); - state.put(RequestContext::new(ctx)); + state.put(RequestContext::new(ctx, logger).await); None } diff --git a/eden/mononoke/edenapi_server/src/utils/cbor.rs b/eden/mononoke/edenapi_server/src/utils/cbor.rs index 214e1ac304..97545ed443 100644 --- a/eden/mononoke/edenapi_server/src/utils/cbor.rs +++ b/eden/mononoke/edenapi_server/src/utils/cbor.rs @@ -9,7 +9,7 @@ use anyhow::{Context, Error}; use bytes::Bytes; -use futures::{Stream, TryStreamExt}; +use futures::prelude::*; use gotham::state::State; use mime::Mime; use once_cell::sync::Lazy; @@ -19,9 +19,11 @@ use gotham_ext::{ content::ContentStream, error::HttpError, response::{StreamBody, TryIntoResponse}, + stream_ext::GothamTryStreamExt, }; use crate::errors::ErrorKind; +use crate::middleware::RequestContext; use super::get_request_body; @@ -37,18 +39,25 @@ pub fn to_cbor_bytes(s: S) -> Result { .context(ErrorKind::SerializationFailed) } -/// Serialize each item of the input stream as CBOR and return -/// a streaming response. Note that although the input stream -/// can fail, the error type is `anyhow::Error` rather than -/// `HttpError` because the HTTP status code would have already -/// been sent at the time of the failure. -pub fn cbor_stream(stream: S) -> impl TryIntoResponse +/// Serialize each item of the input stream as CBOR and return a streaming +/// response. Any errors yielded by the stream will be filtered out and reported +/// to the request context; this ensures that a mid-stream error will not +/// prematurely terminate the response. +pub fn cbor_stream(rctx: RequestContext, stream: S) -> impl TryIntoResponse where S: Stream> + Send + 'static, T: Serialize + Send + 'static, { let byte_stream = stream.and_then(|item| async { to_cbor_bytes(item) }); - StreamBody::new(ContentStream::new(byte_stream), cbor_mime()) + let content_stream = ContentStream::new(byte_stream).forward_err(rctx.error_tx); + + // XXX: This is a hack to turn this back into a TryStream that implements ContentMeta. + // Ordinarily, ContentStreams should not be nested like this. + // + // TODO(kulshrax): Delete this line once ContentStream accepts plain Streams. + let content_stream = ContentStream::new(content_stream.map(>::Ok)); + + StreamBody::new(content_stream, cbor_mime()) } pub async fn parse_cbor_request(state: &mut State) -> Result {