gotham_ext: move content streams into separate module

Summary:
The `gotham_ext::response` module was getting a bit large, so this diff moves `ContentMeta`, `ContentStream`, and `CompressedContentStream` into a new submodule, alongside the contents of the old `content_encoding` module. This way, the `response` module remains entirely centered around the `TryIntoResponse` trait (and the various body structs that implement that trait).

Later diffs in this stack will be adding an additional layer between the content streams and the body structs, at which point it probably doesn't make sense to have these right next to each other. Splitting them out now will allow for better code organization going forward.

Reviewed By: krallin

Differential Revision: D23777492

fbshipit-source-id: 86e598dcb37578d3b22217a2a65f1bde84d72215
This commit is contained in:
Arun Kulshreshtha 2020-09-18 01:32:09 -07:00 committed by Facebook GitHub Bot
parent 12b0140d9c
commit 6e5f95067a
7 changed files with 183 additions and 152 deletions

View File

@ -16,8 +16,9 @@ use once_cell::sync::Lazy;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use gotham_ext::{ use gotham_ext::{
content::ContentStream,
error::HttpError, error::HttpError,
response::{ContentStream, StreamBody, TryIntoResponse}, response::{StreamBody, TryIntoResponse},
}; };
use crate::errors::ErrorKind; use crate::errors::ErrorKind;

View File

@ -0,0 +1,12 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
pub mod encoding;
pub mod stream;
pub use encoding::{ContentCompression, ContentEncoding};
pub use stream::{CompressedContentStream, ContentMeta, ContentStream};

View File

@ -0,0 +1,161 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::pin::Pin;
use anyhow::Error;
use async_compression::stream::{BrotliEncoder, GzipEncoder, ZstdEncoder};
use bytes::Bytes;
use futures::{
stream::{BoxStream, Stream, StreamExt, TryStreamExt},
task::{Context, Poll},
};
use pin_project::pin_project;
use super::encoding::{ContentCompression, ContentEncoding};
pub trait ContentMeta {
/// Provide the content (i.e. Content-Encoding) for the underlying content. This will be sent
/// to the client.
fn content_encoding(&self) -> ContentEncoding;
/// Provide the length of the content in this stream, if available (i.e. Content-Length). If
/// provided, this must be the actual length of the stream. If missing, the transfer will be
/// chunked.
fn content_length(&self) -> Option<u64>;
}
#[pin_project]
pub struct CompressedContentStream<'a> {
inner: BoxStream<'a, Result<Bytes, Error>>,
content_compression: ContentCompression,
}
impl<'a> CompressedContentStream<'a> {
pub fn new<S>(inner: S, content_compression: ContentCompression) -> Self
where
S: Stream<Item = Result<Bytes, Error>> + Send + 'a,
{
use std::io;
let inner = inner.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let inner = match content_compression {
ContentCompression::Zstd => ZstdEncoder::new(inner).map_err(Error::from).boxed(),
ContentCompression::Brotli => BrotliEncoder::new(inner).map_err(Error::from).boxed(),
ContentCompression::Gzip => GzipEncoder::new(inner).map_err(Error::from).boxed(),
};
Self {
inner,
content_compression,
}
}
}
impl ContentMeta for CompressedContentStream<'_> {
fn content_length(&self) -> Option<u64> {
None
}
fn content_encoding(&self) -> ContentEncoding {
ContentEncoding::Compressed(self.content_compression)
}
}
impl Stream for CompressedContentStream<'_> {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next_unpin(ctx)
}
}
#[pin_project]
pub struct ContentStream<S> {
#[pin]
inner: S,
content_length: Option<u64>,
}
impl<S> ContentStream<S> {
pub fn new(inner: S) -> Self {
Self {
inner,
content_length: None,
}
}
/// Set a Content-Length for this stream. This *must* match the exact size of the uncompressed
/// content that will be sent, since that is what the client will expect.
pub fn content_length(self, content_length: u64) -> Self {
Self {
content_length: Some(content_length),
..self
}
}
}
impl<S> ContentMeta for ContentStream<S> {
fn content_length(&self) -> Option<u64> {
self.content_length
}
fn content_encoding(&self) -> ContentEncoding {
ContentEncoding::Identity
}
}
impl<S> Stream for ContentStream<S>
where
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(ctx)
}
}
/// Provide an implementation of ContentMeta that propagates through Either (i.e. left_stream(),
/// right_stream()).
impl<A, B> ContentMeta for futures::future::Either<A, B>
where
A: ContentMeta,
B: ContentMeta,
{
fn content_length(&self) -> Option<u64> {
// left_stream(), right_stream() doesn't change the stream data.
match self {
Self::Left(a) => a.content_length(),
Self::Right(b) => b.content_length(),
}
}
fn content_encoding(&self) -> ContentEncoding {
// left_stream(), right_stream() doesn't change the stream data.
match self {
Self::Left(a) => a.content_encoding(),
Self::Right(b) => b.content_encoding(),
}
}
}
impl<S, F> ContentMeta for futures::stream::InspectOk<S, F>
where
S: ContentMeta,
{
fn content_length(&self) -> Option<u64> {
// inspect_ok doesn't change the stream data.
self.get_ref().content_length()
}
fn content_encoding(&self) -> ContentEncoding {
// inspect_ok doesn't change the stream data.
self.get_ref().content_encoding()
}
}

View File

@ -6,7 +6,7 @@
*/ */
pub mod body_ext; pub mod body_ext;
pub mod content_encoding; pub mod content;
pub mod error; pub mod error;
pub mod handler; pub mod handler;
pub mod middleware; pub mod middleware;

View File

@ -8,12 +8,10 @@
use std::convert::TryInto; use std::convert::TryInto;
use anyhow::Error; use anyhow::Error;
use async_compression::stream::{BrotliEncoder, GzipEncoder, ZstdEncoder};
use bytes::Bytes; use bytes::Bytes;
use futures::{ use futures::{
channel::mpsc, channel::mpsc,
stream::{BoxStream, Stream, StreamExt, TryStreamExt}, stream::{Stream, StreamExt},
task::{Context, Poll},
}; };
use gotham::{handler::HandlerError, state::State}; use gotham::{handler::HandlerError, state::State};
use gotham_derive::StateData; use gotham_derive::StateData;
@ -22,10 +20,11 @@ use hyper::{
Body, Response, StatusCode, Body, Response, StatusCode,
}; };
use mime::Mime; use mime::Mime;
use pin_project::pin_project;
use std::pin::Pin;
use crate::content_encoding::{ContentCompression, ContentEncoding}; use crate::content::{
encoding::{ContentCompression, ContentEncoding},
stream::ContentMeta,
};
use crate::error::HttpError; use crate::error::HttpError;
use crate::middleware::PostRequestCallbacks; use crate::middleware::PostRequestCallbacks;
use crate::signal_stream::SignalStream; use crate::signal_stream::SignalStream;
@ -173,145 +172,3 @@ where
Ok(res.body(Body::wrap_stream(stream))?) Ok(res.body(Body::wrap_stream(stream))?)
} }
} }
pub trait ContentMeta {
/// Provide the content (i.e. Content-Encoding) for the underlying content. This will be sent
/// to the client.
fn content_encoding(&self) -> ContentEncoding;
/// Provide the length of the content in this stream, if available (i.e. Content-Length). If
/// provided, this must be the actual length of the stream. If missing, the transfer will be
/// chunked.
fn content_length(&self) -> Option<u64>;
}
#[pin_project]
pub struct CompressedContentStream<'a> {
inner: BoxStream<'a, Result<Bytes, Error>>,
content_compression: ContentCompression,
}
impl<'a> CompressedContentStream<'a> {
pub fn new<S>(inner: S, content_compression: ContentCompression) -> Self
where
S: Stream<Item = Result<Bytes, Error>> + Send + 'a,
{
use std::io;
let inner = inner.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let inner = match content_compression {
ContentCompression::Zstd => ZstdEncoder::new(inner).map_err(Error::from).boxed(),
ContentCompression::Brotli => BrotliEncoder::new(inner).map_err(Error::from).boxed(),
ContentCompression::Gzip => GzipEncoder::new(inner).map_err(Error::from).boxed(),
};
Self {
inner,
content_compression,
}
}
}
impl ContentMeta for CompressedContentStream<'_> {
fn content_length(&self) -> Option<u64> {
None
}
fn content_encoding(&self) -> ContentEncoding {
ContentEncoding::Compressed(self.content_compression)
}
}
impl Stream for CompressedContentStream<'_> {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next_unpin(ctx)
}
}
#[pin_project]
pub struct ContentStream<S> {
#[pin]
inner: S,
content_length: Option<u64>,
}
impl<S> ContentStream<S> {
pub fn new(inner: S) -> Self {
Self {
inner,
content_length: None,
}
}
/// Set a Content-Length for this stream. This *must* match the exact size of the uncompressed
/// content that will be sent, since that is what the client will expect.
pub fn content_length(self, content_length: u64) -> Self {
Self {
content_length: Some(content_length),
..self
}
}
}
impl<S> ContentMeta for ContentStream<S> {
fn content_length(&self) -> Option<u64> {
self.content_length
}
fn content_encoding(&self) -> ContentEncoding {
ContentEncoding::Identity
}
}
impl<S> Stream for ContentStream<S>
where
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(ctx)
}
}
/// Provide an implementation of ContentMeta that propagates through Either (i.e. left_stream(),
/// right_stream()).
impl<A, B> ContentMeta for futures::future::Either<A, B>
where
A: ContentMeta,
B: ContentMeta,
{
fn content_length(&self) -> Option<u64> {
// left_stream(), right_stream() doesn't change the stream data.
match self {
Self::Left(a) => a.content_length(),
Self::Right(b) => b.content_length(),
}
}
fn content_encoding(&self) -> ContentEncoding {
// left_stream(), right_stream() doesn't change the stream data.
match self {
Self::Left(a) => a.content_encoding(),
Self::Right(b) => b.content_encoding(),
}
}
}
impl<S, F> ContentMeta for futures::stream::InspectOk<S, F>
where
S: ContentMeta,
{
fn content_length(&self) -> Option<u64> {
// inspect_ok doesn't change the stream data.
self.get_ref().content_length()
}
fn content_encoding(&self) -> ContentEncoding {
// inspect_ok doesn't change the stream data.
self.get_ref().content_encoding()
}
}

View File

@ -18,10 +18,10 @@ use serde::Deserialize;
use filestore::{self, Alias, FetchKey}; use filestore::{self, Alias, FetchKey};
use gotham_ext::{ use gotham_ext::{
content_encoding::ContentEncoding, content::{CompressedContentStream, ContentEncoding, ContentStream},
error::HttpError, error::HttpError,
middleware::ScubaMiddlewareState, middleware::ScubaMiddlewareState,
response::{CompressedContentStream, ContentStream, StreamBody, TryIntoResponse}, response::{StreamBody, TryIntoResponse},
}; };
use mononoke_types::{hash::Sha256, ContentId}; use mononoke_types::{hash::Sha256, ContentId};
use redactedblobstore::has_redaction_root_cause; use redactedblobstore::has_redaction_root_cause;