edenapi_service: automatically compress EdenApiHandler responses

Summary: The new `EdenApiHandler` framework for defining EdenAPI endpoints provides a common place where responses are encoded. This diff adds automatic content compression at this point, using the received `Accept-Encoding` header from the request to determine what compression, if any, should be used. As a result, all endpoints that implement `EdenApiHandler` will get compression for free.

Reviewed By: yancouto

Differential Revision: D30553242

fbshipit-source-id: 9eda54cbf81dd1e03abec769744c96b16fad64ea
This commit is contained in:
Arun Kulshreshtha 2021-09-08 11:32:12 -07:00 committed by Facebook GitHub Bot
parent 70e4650d9c
commit 354fe9e111
3 changed files with 49 additions and 8 deletions

View File

@ -10,7 +10,7 @@ use std::pin::Pin;
use anyhow::{Context, Error};
use edenapi_types::ToWire;
use futures::{stream::TryStreamExt, FutureExt};
use futures::{stream::TryStreamExt, FutureExt, Stream};
use gotham::{
handler::{HandlerError as GothamHandlerError, HandlerFuture},
middleware::state::StateMiddleware,
@ -24,8 +24,9 @@ use gotham::{
};
use gotham_derive::StateData;
use gotham_ext::{
content_encoding::ContentEncoding,
error::{ErrorFormatter, HttpError},
response::build_response,
response::{build_response, encode_stream, ResponseTryStreamExt, StreamBody, TryIntoResponse},
};
use hyper::{Body, Response};
use mime::Mime;
@ -33,7 +34,7 @@ use serde::{Deserialize, Serialize};
use crate::context::ServerContext;
use crate::middleware::RequestContext;
use crate::utils::{cbor_stream_filtered_errors, get_repo, parse_wire_request};
use crate::utils::{cbor_mime, get_repo, parse_wire_request, to_cbor_bytes};
mod bookmarks;
mod clone;
@ -197,6 +198,7 @@ async fn handler_wrapper<Handler: EdenApiHandler>(
let res = async {
let path = Handler::PathExtractor::take_from(&mut state);
let query_string = Handler::QueryStringExtractor::take_from(&mut state);
let content_encoding = ContentEncoding::from_state(&state);
state.put(HandlerInfo::new(path.repo(), Handler::API_METHOD));
@ -206,9 +208,7 @@ async fn handler_wrapper<Handler: EdenApiHandler>(
let repo = get_repo(&sctx, &rctx, path.repo(), None).await?;
let request = parse_wire_request::<<Handler::Request as ToWire>::Wire>(&mut state).await?;
match Handler::handler(repo, path, query_string, request).await {
Ok(responses) => Ok(cbor_stream_filtered_errors(
responses.map_ok(ToWire::to_wire),
)),
Ok(responses) => Ok(encode_response_stream(responses, content_encoding)),
Err(HandlerError::E500(err)) => Err(HttpError::e500(err)),
Err(HandlerError::E400(err)) => Err(HttpError::e400(err)),
}
@ -218,6 +218,20 @@ async fn handler_wrapper<Handler: EdenApiHandler>(
build_response(res, state, &JsonErrorFomatter)
}
/// Encode a stream of EdenAPI responses into its final on-wire representation.
///
/// This involves converting each item to its wire format, CBOR serializing them, and then
/// optionally compressing the resulting byte stream based on the specified Content-Encoding.
pub fn encode_response_stream<S, T>(stream: S, encoding: ContentEncoding) -> impl TryIntoResponse
where
S: Stream<Item = Result<T, Error>> + Send + 'static,
T: ToWire + Send + 'static,
{
let stream = stream.and_then(|item| async move { to_cbor_bytes(&item.to_wire()) });
let stream = encode_stream(stream, encoding, None).capture_first_err();
StreamBody::new(stream, cbor_mime())
}
// We use a struct here (rather than just a global function) just for the convenience
// of writing `Handlers::setup::<MyHandler>(route)`
// instead of `setup_handler::<MyHandler, _, _>(route)`, to make things clearer.

View File

@ -19,5 +19,5 @@ pub use response::{
build_error_response, build_response, BytesBody, EmptyBody, StreamBody, TryIntoResponse,
};
pub use response_meta::{BodyMeta, HeadersMeta, PendingResponseMeta, ResponseMeta};
pub use stream::{CompressedResponseStream, ResponseStream};
pub use stream::{encode_stream, CompressedResponseStream, ResponseStream};
pub use stream_ext::ResponseTryStreamExt;

View File

@ -11,13 +11,40 @@ use anyhow::Error;
use async_compression::tokio::bufread::{GzipEncoder, ZstdEncoder};
use bytes::Bytes;
use futures::{
future::Either,
stream::{BoxStream, Stream, StreamExt, TryStreamExt},
task::{Context, Poll},
};
use pin_project::pin_project;
use tokio_util::io::{ReaderStream, StreamReader};
use crate::content_encoding::ContentCompression;
use crate::content_encoding::{ContentCompression, ContentEncoding};
/// Create a response stream using the specified Content-Encoding.
///
/// The resulting stream may or may not be compressed depending on the chosen encoding. Optionally,
/// the caller can specify the value for the `Content-Length` header. This is only useful in cases
/// where the response isn't compressed (i.e., the encoding is set to `ContentEncoding::Identity`)
/// because otherwise, we would need to send the post-compression size of the content, which cannot
/// be known in advance.
pub fn encode_stream<S>(
stream: S,
encoding: ContentEncoding,
length: Option<u64>,
) -> Either<ResponseStream<S>, CompressedResponseStream<'static>>
where
S: Stream<Item = Result<Bytes, Error>> + Send + 'static,
{
match (encoding, length) {
(ContentEncoding::Identity, Some(size)) => ResponseStream::new(stream)
.set_content_length(size)
.left_stream(),
(ContentEncoding::Identity, None) => ResponseStream::new(stream).left_stream(),
(ContentEncoding::Compressed(c), _) => {
CompressedResponseStream::new(stream, c).right_stream()
}
}
}
#[pin_project]
pub struct CompressedResponseStream<'a> {