diff --git a/eden/mononoke/edenapi_server/src/utils/cbor.rs b/eden/mononoke/edenapi_server/src/utils/cbor.rs index 7c19912f06..214e1ac304 100644 --- a/eden/mononoke/edenapi_server/src/utils/cbor.rs +++ b/eden/mononoke/edenapi_server/src/utils/cbor.rs @@ -16,8 +16,9 @@ use once_cell::sync::Lazy; use serde::{de::DeserializeOwned, Serialize}; use gotham_ext::{ + content::ContentStream, error::HttpError, - response::{ContentStream, StreamBody, TryIntoResponse}, + response::{StreamBody, TryIntoResponse}, }; use crate::errors::ErrorKind; diff --git a/eden/mononoke/gotham_ext/src/content_encoding.rs b/eden/mononoke/gotham_ext/src/content/encoding.rs similarity index 100% rename from eden/mononoke/gotham_ext/src/content_encoding.rs rename to eden/mononoke/gotham_ext/src/content/encoding.rs diff --git a/eden/mononoke/gotham_ext/src/content/mod.rs b/eden/mononoke/gotham_ext/src/content/mod.rs new file mode 100644 index 0000000000..d2d04d4006 --- /dev/null +++ b/eden/mononoke/gotham_ext/src/content/mod.rs @@ -0,0 +1,12 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +pub mod encoding; +pub mod stream; + +pub use encoding::{ContentCompression, ContentEncoding}; +pub use stream::{CompressedContentStream, ContentMeta, ContentStream}; diff --git a/eden/mononoke/gotham_ext/src/content/stream.rs b/eden/mononoke/gotham_ext/src/content/stream.rs new file mode 100644 index 0000000000..1b667d367f --- /dev/null +++ b/eden/mononoke/gotham_ext/src/content/stream.rs @@ -0,0 +1,161 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +use std::pin::Pin; + +use anyhow::Error; +use async_compression::stream::{BrotliEncoder, GzipEncoder, ZstdEncoder}; +use bytes::Bytes; +use futures::{ + stream::{BoxStream, Stream, StreamExt, TryStreamExt}, + task::{Context, Poll}, +}; +use pin_project::pin_project; + +use super::encoding::{ContentCompression, ContentEncoding}; + +pub trait ContentMeta { + /// Provide the content (i.e. Content-Encoding) for the underlying content. This will be sent + /// to the client. + fn content_encoding(&self) -> ContentEncoding; + + /// Provide the length of the content in this stream, if available (i.e. Content-Length). If + /// provided, this must be the actual length of the stream. If missing, the transfer will be + /// chunked. + fn content_length(&self) -> Option; +} + +#[pin_project] +pub struct CompressedContentStream<'a> { + inner: BoxStream<'a, Result>, + content_compression: ContentCompression, +} + +impl<'a> CompressedContentStream<'a> { + pub fn new(inner: S, content_compression: ContentCompression) -> Self + where + S: Stream> + Send + 'a, + { + use std::io; + + let inner = inner.map_err(|e| io::Error::new(io::ErrorKind::Other, e)); + + let inner = match content_compression { + ContentCompression::Zstd => ZstdEncoder::new(inner).map_err(Error::from).boxed(), + ContentCompression::Brotli => BrotliEncoder::new(inner).map_err(Error::from).boxed(), + ContentCompression::Gzip => GzipEncoder::new(inner).map_err(Error::from).boxed(), + }; + + Self { + inner, + content_compression, + } + } +} + +impl ContentMeta for CompressedContentStream<'_> { + fn content_length(&self) -> Option { + None + } + + fn content_encoding(&self) -> ContentEncoding { + ContentEncoding::Compressed(self.content_compression) + } +} + +impl Stream for CompressedContentStream<'_> { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next_unpin(ctx) + } +} + +#[pin_project] +pub struct ContentStream { + #[pin] + inner: S, + content_length: Option, +} + +impl ContentStream { + pub fn new(inner: S) -> Self { + Self { + inner, + content_length: None, + } + } + + /// Set a Content-Length for this stream. This *must* match the exact size of the uncompressed + /// content that will be sent, since that is what the client will expect. + pub fn content_length(self, content_length: u64) -> Self { + Self { + content_length: Some(content_length), + ..self + } + } +} + +impl ContentMeta for ContentStream { + fn content_length(&self) -> Option { + self.content_length + } + + fn content_encoding(&self) -> ContentEncoding { + ContentEncoding::Identity + } +} + +impl Stream for ContentStream +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(ctx) + } +} + +/// Provide an implementation of ContentMeta that propagates through Either (i.e. left_stream(), +/// right_stream()). +impl ContentMeta for futures::future::Either +where + A: ContentMeta, + B: ContentMeta, +{ + fn content_length(&self) -> Option { + // left_stream(), right_stream() doesn't change the stream data. + match self { + Self::Left(a) => a.content_length(), + Self::Right(b) => b.content_length(), + } + } + + fn content_encoding(&self) -> ContentEncoding { + // left_stream(), right_stream() doesn't change the stream data. + match self { + Self::Left(a) => a.content_encoding(), + Self::Right(b) => b.content_encoding(), + } + } +} + +impl ContentMeta for futures::stream::InspectOk +where + S: ContentMeta, +{ + fn content_length(&self) -> Option { + // inspect_ok doesn't change the stream data. + self.get_ref().content_length() + } + + fn content_encoding(&self) -> ContentEncoding { + // inspect_ok doesn't change the stream data. + self.get_ref().content_encoding() + } +} diff --git a/eden/mononoke/gotham_ext/src/lib.rs b/eden/mononoke/gotham_ext/src/lib.rs index 8d3f56d08c..c0103a20c4 100644 --- a/eden/mononoke/gotham_ext/src/lib.rs +++ b/eden/mononoke/gotham_ext/src/lib.rs @@ -6,7 +6,7 @@ */ pub mod body_ext; -pub mod content_encoding; +pub mod content; pub mod error; pub mod handler; pub mod middleware; diff --git a/eden/mononoke/gotham_ext/src/response.rs b/eden/mononoke/gotham_ext/src/response.rs index d67a2d3b1f..cd286e1a98 100644 --- a/eden/mononoke/gotham_ext/src/response.rs +++ b/eden/mononoke/gotham_ext/src/response.rs @@ -8,12 +8,10 @@ use std::convert::TryInto; use anyhow::Error; -use async_compression::stream::{BrotliEncoder, GzipEncoder, ZstdEncoder}; use bytes::Bytes; use futures::{ channel::mpsc, - stream::{BoxStream, Stream, StreamExt, TryStreamExt}, - task::{Context, Poll}, + stream::{Stream, StreamExt}, }; use gotham::{handler::HandlerError, state::State}; use gotham_derive::StateData; @@ -22,10 +20,11 @@ use hyper::{ Body, Response, StatusCode, }; use mime::Mime; -use pin_project::pin_project; -use std::pin::Pin; -use crate::content_encoding::{ContentCompression, ContentEncoding}; +use crate::content::{ + encoding::{ContentCompression, ContentEncoding}, + stream::ContentMeta, +}; use crate::error::HttpError; use crate::middleware::PostRequestCallbacks; use crate::signal_stream::SignalStream; @@ -173,145 +172,3 @@ where Ok(res.body(Body::wrap_stream(stream))?) } } - -pub trait ContentMeta { - /// Provide the content (i.e. Content-Encoding) for the underlying content. This will be sent - /// to the client. - fn content_encoding(&self) -> ContentEncoding; - - /// Provide the length of the content in this stream, if available (i.e. Content-Length). If - /// provided, this must be the actual length of the stream. If missing, the transfer will be - /// chunked. - fn content_length(&self) -> Option; -} - -#[pin_project] -pub struct CompressedContentStream<'a> { - inner: BoxStream<'a, Result>, - content_compression: ContentCompression, -} - -impl<'a> CompressedContentStream<'a> { - pub fn new(inner: S, content_compression: ContentCompression) -> Self - where - S: Stream> + Send + 'a, - { - use std::io; - - let inner = inner.map_err(|e| io::Error::new(io::ErrorKind::Other, e)); - - let inner = match content_compression { - ContentCompression::Zstd => ZstdEncoder::new(inner).map_err(Error::from).boxed(), - ContentCompression::Brotli => BrotliEncoder::new(inner).map_err(Error::from).boxed(), - ContentCompression::Gzip => GzipEncoder::new(inner).map_err(Error::from).boxed(), - }; - - Self { - inner, - content_compression, - } - } -} - -impl ContentMeta for CompressedContentStream<'_> { - fn content_length(&self) -> Option { - None - } - - fn content_encoding(&self) -> ContentEncoding { - ContentEncoding::Compressed(self.content_compression) - } -} - -impl Stream for CompressedContentStream<'_> { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next_unpin(ctx) - } -} - -#[pin_project] -pub struct ContentStream { - #[pin] - inner: S, - content_length: Option, -} - -impl ContentStream { - pub fn new(inner: S) -> Self { - Self { - inner, - content_length: None, - } - } - - /// Set a Content-Length for this stream. This *must* match the exact size of the uncompressed - /// content that will be sent, since that is what the client will expect. - pub fn content_length(self, content_length: u64) -> Self { - Self { - content_length: Some(content_length), - ..self - } - } -} - -impl ContentMeta for ContentStream { - fn content_length(&self) -> Option { - self.content_length - } - - fn content_encoding(&self) -> ContentEncoding { - ContentEncoding::Identity - } -} - -impl Stream for ContentStream -where - S: Stream, -{ - type Item = S::Item; - - fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(ctx) - } -} - -/// Provide an implementation of ContentMeta that propagates through Either (i.e. left_stream(), -/// right_stream()). -impl ContentMeta for futures::future::Either -where - A: ContentMeta, - B: ContentMeta, -{ - fn content_length(&self) -> Option { - // left_stream(), right_stream() doesn't change the stream data. - match self { - Self::Left(a) => a.content_length(), - Self::Right(b) => b.content_length(), - } - } - - fn content_encoding(&self) -> ContentEncoding { - // left_stream(), right_stream() doesn't change the stream data. - match self { - Self::Left(a) => a.content_encoding(), - Self::Right(b) => b.content_encoding(), - } - } -} - -impl ContentMeta for futures::stream::InspectOk -where - S: ContentMeta, -{ - fn content_length(&self) -> Option { - // inspect_ok doesn't change the stream data. - self.get_ref().content_length() - } - - fn content_encoding(&self) -> ContentEncoding { - // inspect_ok doesn't change the stream data. - self.get_ref().content_encoding() - } -} diff --git a/eden/mononoke/lfs_server/src/download.rs b/eden/mononoke/lfs_server/src/download.rs index 3559b064dd..3baa7603a1 100644 --- a/eden/mononoke/lfs_server/src/download.rs +++ b/eden/mononoke/lfs_server/src/download.rs @@ -18,10 +18,10 @@ use serde::Deserialize; use filestore::{self, Alias, FetchKey}; use gotham_ext::{ - content_encoding::ContentEncoding, + content::{CompressedContentStream, ContentEncoding, ContentStream}, error::HttpError, middleware::ScubaMiddlewareState, - response::{CompressedContentStream, ContentStream, StreamBody, TryIntoResponse}, + response::{StreamBody, TryIntoResponse}, }; use mononoke_types::{hash::Sha256, ContentId}; use redactedblobstore::has_redaction_root_cause;