edenapi_server: log errors during streaming responses

Summary: Use the new `ForwardErr` stream combinator to log errors that occur during a streaming response. Right now, they are just printed to stderr, but in the future we should also do other things such as logging them to Scuba. This diff supersedes the approach from D22720957.

Reviewed By: krallin

Differential Revision: D23780215

fbshipit-source-id: 8d2267f1166e665a62a167a6d95bb0b1797b5767
This commit is contained in:
Arun Kulshreshtha 2020-09-18 22:35:55 -07:00 committed by Facebook GitHub Bot
parent 7f803818ee
commit 71674d87c5
7 changed files with 62 additions and 25 deletions

View File

@ -49,7 +49,7 @@ pub async fn location_to_hash(state: &mut State) -> Result<impl TryIntoResponse,
));
let sctx = ServerContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state).clone();
let hg_repo_ctx = get_repo(&sctx, &rctx, &params.repo).await?;
@ -59,7 +59,7 @@ pub async fn location_to_hash(state: &mut State) -> Result<impl TryIntoResponse,
.into_iter()
.map(move |location| translate_location(hg_repo_ctx.clone(), location));
let response = stream::iter(hgid_list).buffer_unordered(MAX_CONCURRENT_FETCHES_PER_REQUEST);
Ok(cbor_stream(response))
Ok(cbor_stream(rctx, response))
}
pub async fn revlog_data(state: &mut State) -> Result<impl TryIntoResponse, HttpError> {
@ -71,7 +71,7 @@ pub async fn revlog_data(state: &mut State) -> Result<impl TryIntoResponse, Http
));
let sctx = ServerContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state).clone();
let hg_repo_ctx = get_repo(&sctx, &rctx, &params.repo).await?;
@ -82,7 +82,7 @@ pub async fn revlog_data(state: &mut State) -> Result<impl TryIntoResponse, Http
.map(move |hg_id| commit_revlog_data(hg_repo_ctx.clone(), hg_id));
let response =
stream::iter(revlog_commits).buffer_unordered(MAX_CONCURRENT_FETCHES_PER_REQUEST);
Ok(cbor_stream(response))
Ok(cbor_stream(rctx, response))
}
async fn translate_location(

View File

@ -38,13 +38,13 @@ pub async fn complete_trees(state: &mut State) -> Result<impl TryIntoResponse, H
state.put(HandlerInfo::new(&params.repo, EdenApiMethod::CompleteTrees));
let rctx = RequestContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state).clone();
let sctx = ServerContext::borrow_from(state);
let repo = get_repo(&sctx, &rctx, &params.repo).await?;
let request = parse_cbor_request(state).await?;
Ok(cbor_stream(fetch_trees_under_path(&repo, request)?))
Ok(cbor_stream(rctx, fetch_trees_under_path(&repo, request)?))
}
/// Fetch the complete tree under the specified path.

View File

@ -38,13 +38,13 @@ pub async fn files(state: &mut State) -> Result<impl TryIntoResponse, HttpError>
state.put(HandlerInfo::new(&params.repo, EdenApiMethod::Files));
let rctx = RequestContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state).clone();
let sctx = ServerContext::borrow_from(state);
let repo = get_repo(&sctx, &rctx, &params.repo).await?;
let request = parse_cbor_request(state).await?;
Ok(cbor_stream(fetch_all_files(repo, request)))
Ok(cbor_stream(rctx, fetch_all_files(repo, request)))
}
/// Fetch files for all of the requested keys concurrently.

View File

@ -45,13 +45,13 @@ pub async fn history(state: &mut State) -> Result<impl TryIntoResponse, HttpErro
state.put(HandlerInfo::new(&params.repo, EdenApiMethod::History));
let rctx = RequestContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state).clone();
let sctx = ServerContext::borrow_from(state);
let repo = get_repo(&sctx, &rctx, &params.repo).await?;
let request = parse_cbor_request(state).await?;
Ok(cbor_stream(fetch_history(repo, request).await))
Ok(cbor_stream(rctx, fetch_history(repo, request).await))
}
/// Fetch history for all of the requested files concurrently.

View File

@ -38,13 +38,13 @@ pub async fn trees(state: &mut State) -> Result<impl TryIntoResponse, HttpError>
state.put(HandlerInfo::new(&params.repo, EdenApiMethod::Trees));
let rctx = RequestContext::borrow_from(state);
let rctx = RequestContext::borrow_from(state).clone();
let sctx = ServerContext::borrow_from(state);
let repo = get_repo(&sctx, &rctx, &params.repo).await?;
let request = parse_cbor_request(state).await?;
Ok(cbor_stream(fetch_all_trees(repo, request)))
Ok(cbor_stream(rctx, fetch_all_trees(repo, request)))
}
/// Fetch trees for all of the requested keys concurrently.

View File

@ -5,25 +5,53 @@
* GNU General Public License version 2.
*/
use anyhow::Error;
use futures::{
channel::mpsc::{self, Sender},
prelude::*,
};
use gotham::state::{request_id, FromState, State};
use gotham_derive::StateData;
use hyper::{Body, Response};
use slog::{o, Logger};
use slog::{error, o, Logger};
use cloned::cloned;
use context::{CoreContext, SessionContainer};
use fbinit::FacebookInit;
use gotham_ext::middleware::{ClientIdentity, Middleware};
use scuba::ScubaSampleBuilder;
use sshrelay::Metadata;
const ERROR_CHANNEL_CAPACITY: usize = 1000;
#[derive(StateData, Clone)]
pub struct RequestContext {
pub ctx: CoreContext,
pub logger: Logger,
pub error_tx: Sender<Error>,
}
impl RequestContext {
fn new(ctx: CoreContext) -> Self {
Self { ctx }
async fn new(ctx: CoreContext, logger: Logger) -> Self {
let (error_tx, mut error_rx) = mpsc::channel(ERROR_CHANNEL_CAPACITY);
let rctx = Self {
ctx,
logger,
error_tx,
};
// Spawn error logging task.
let _ = tokio::spawn({
cloned!(rctx);
async move {
while let Some(error) = error_rx.next().await {
error!(&rctx.logger, "{:?}", error);
}
}
});
rctx
}
}
@ -53,9 +81,9 @@ impl Middleware for RequestContextMiddleware {
let request_id = request_id(&state);
let logger = self.logger.new(o!("request_id" => request_id.to_string()));
let ctx = session.new_context(logger, ScubaSampleBuilder::with_discard());
let ctx = session.new_context(logger.clone(), ScubaSampleBuilder::with_discard());
state.put(RequestContext::new(ctx));
state.put(RequestContext::new(ctx, logger).await);
None
}

View File

@ -9,7 +9,7 @@
use anyhow::{Context, Error};
use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use futures::prelude::*;
use gotham::state::State;
use mime::Mime;
use once_cell::sync::Lazy;
@ -19,9 +19,11 @@ use gotham_ext::{
content::ContentStream,
error::HttpError,
response::{StreamBody, TryIntoResponse},
stream_ext::GothamTryStreamExt,
};
use crate::errors::ErrorKind;
use crate::middleware::RequestContext;
use super::get_request_body;
@ -37,18 +39,25 @@ pub fn to_cbor_bytes<S: Serialize>(s: S) -> Result<Bytes, Error> {
.context(ErrorKind::SerializationFailed)
}
/// Serialize each item of the input stream as CBOR and return
/// a streaming response. Note that although the input stream
/// can fail, the error type is `anyhow::Error` rather than
/// `HttpError` because the HTTP status code would have already
/// been sent at the time of the failure.
pub fn cbor_stream<S, T>(stream: S) -> impl TryIntoResponse
/// Serialize each item of the input stream as CBOR and return a streaming
/// response. Any errors yielded by the stream will be filtered out and reported
/// to the request context; this ensures that a mid-stream error will not
/// prematurely terminate the response.
pub fn cbor_stream<S, T>(rctx: RequestContext, stream: S) -> impl TryIntoResponse
where
S: Stream<Item = Result<T, Error>> + Send + 'static,
T: Serialize + Send + 'static,
{
let byte_stream = stream.and_then(|item| async { to_cbor_bytes(item) });
StreamBody::new(ContentStream::new(byte_stream), cbor_mime())
let content_stream = ContentStream::new(byte_stream).forward_err(rctx.error_tx);
// XXX: This is a hack to turn this back into a TryStream that implements ContentMeta.
// Ordinarily, ContentStreams should not be nested like this.
//
// TODO(kulshrax): Delete this line once ContentStream accepts plain Streams.
let content_stream = ContentStream::new(content_stream.map(<Result<_, anyhow::Error>>::Ok));
StreamBody::new(content_stream, cbor_mime())
}
pub async fn parse_cbor_request<R: DeserializeOwned>(state: &mut State) -> Result<R, HttpError> {