http-client: restructure response interface

Summary:
Restructure the interface of `http-client::AsyncResponse` to make it easier to avoid misuse.

Specifically, both async and non-async responses now consist of two parts: a head (represented by the new `Head` type) and a body. This solves the problem of being able to access the response headers while consuming the response body: there is now an `into_parts` method on `AsyncResponse` that returns `(Head, AsyncBody)`, decoupling ownership of the parts. This approach was inspired by `hyper::Response`.

Previously, this was accomplished by allowing the body to be moved out of the response and replaced with an empty body. This meant that subsequent calls could incorrectly receive an empty body.

Additionally, `AsyncBody` is now an actual type (instead of an alias) which exposes `raw` and `decoded` methods for accessing the body stream. This makes it very explicit what's happening under the hood, and also minimizes the chance of the user forgetting to decode the response.

The new interface looks like:

```
(head, body) = res.into_parts();

// Choose one of the following:
let decoded_content = body.decoded(); // Automatically decompressed content.
let cbor_content = body.cbor(); // Content as deserialized CBOR entries.
let raw_content = body.raw(); // Raw on-wire content.

// Can still access response headers and status.
let status = head.status();

```

One-line usage is still possible with this interface:
```
let content = res.into_body().decoded().try_concat().await?;
```

Reviewed By: yancouto

Differential Revision: D30436322

fbshipit-source-id: 59911afc34b356a9e3295828ac63da5e295f77a6
This commit is contained in:
Arun Kulshreshtha 2021-08-20 10:33:31 -07:00 committed by Facebook GitHub Bot
parent 14dd336b44
commit 422c57a963
6 changed files with 161 additions and 108 deletions

View File

@ -210,9 +210,9 @@ impl Client {
// entries. This allows multiplexing the streams using `select_all`.
let streams = responses.into_iter().map(|fut| {
stream::once(async move {
let mut res = raise_for_status(fut.await?).await?;
let res = raise_for_status(fut.await?).await?;
tracing::debug!("{:?}", ResponseMeta::from(&res));
Ok::<_, EdenApiError>(res.cbor::<T>().err_into())
Ok::<_, EdenApiError>(res.into_body().cbor::<T>().err_into())
})
.try_flatten()
.boxed()
@ -628,12 +628,13 @@ impl EdenApi for Client {
let url = self.url(paths::FULL_IDMAP_CLONE_DATA, Some(&repo))?;
let req = self.configure(Request::post(url))?;
let mut async_response = req
let async_response = req
.send_async()
.await
.context("error receiving async response")?;
let response_bytes = async_response
.body()
.into_body()
.decoded()
.try_fold(Vec::new(), |mut acc, v| {
if let Some(callback) = &mut progress {
// strictly speaking not correct because it does not count overhead
@ -1103,12 +1104,13 @@ fn split_into_batches<T>(
}
}
async fn raise_for_status(mut res: AsyncResponse) -> Result<AsyncResponse, EdenApiError> {
if res.status().as_u16() < 400 {
async fn raise_for_status(res: AsyncResponse) -> Result<AsyncResponse, EdenApiError> {
let status = res.status();
if status.as_u16() < 400 {
return Ok(res);
}
let body = res.body().try_concat().await?;
let body = res.into_body().decoded().try_concat().await?;
let mut message = String::from_utf8_lossy(&body).into_owned();
if message.len() >= 9 && &*message[..9].to_lowercase() == "<!doctype" {
@ -1118,10 +1120,7 @@ async fn raise_for_status(mut res: AsyncResponse) -> Result<AsyncResponse, EdenA
message.push_str("... (truncated)")
}
Err(EdenApiError::HttpError {
status: res.status(),
message,
})
Err(EdenApiError::HttpError { status, message })
}
fn raise_not_implemented<T>() -> Result<T, EdenApiError> {

View File

@ -114,11 +114,11 @@ fn read_input() -> Result<Vec<u8>> {
Ok(buf)
}
async fn write_response(mut res: AsyncResponse) -> Result<()> {
async fn write_response(res: AsyncResponse) -> Result<()> {
eprintln!("Status: {:?} {}", res.version(), res.status());
eprintln!("{:?}", res.headers());
let body = res.body().try_concat().await?;
let body = res.into_body().decoded().try_concat().await?;
if atty::is(atty::Stream::Stdout) {
println!("{}", String::from_utf8_lossy(&body).escape_default())

View File

@ -337,7 +337,7 @@ mod tests {
let client = HttpClient::new();
let stats = client.send(vec![req1, req2, req3], |res| {
let res = res.unwrap();
assert_eq!(res.status, StatusCode::CREATED);
assert_eq!(res.head.status, StatusCode::CREATED);
assert!(not_received.remove(&*res.body));
Ok(())
})?;
@ -460,8 +460,8 @@ mod tests {
not_received.insert(body3.to_vec());
for res in responses {
assert_eq!(res.status, StatusCode::CREATED);
let body = res.body.try_concat().await?;
assert_eq!(res.head.status, StatusCode::CREATED);
let body = res.into_body().raw().try_concat().await?;
assert!(not_received.remove(&*body));
}

View File

@ -817,14 +817,15 @@ mod tests {
mock.assert();
assert_eq!(res.status, StatusCode::OK);
assert_eq!(res.head.status, StatusCode::OK);
assert_eq!(&*res.body, &b"Hello, world!"[..]);
assert_eq!(
res.headers.get(header::CONTENT_TYPE).unwrap(),
res.head.headers.get(header::CONTENT_TYPE).unwrap(),
HeaderValue::from_static("text/plain")
);
assert_eq!(
res.headers
res.head
.headers
.get(HeaderName::from_bytes(b"X-Served-By")?)
.unwrap(),
HeaderValue::from_static("mock")
@ -851,19 +852,20 @@ mod tests {
mock.assert();
assert_eq!(res.status, StatusCode::OK);
assert_eq!(res.head.status, StatusCode::OK);
assert_eq!(
res.headers.get(header::CONTENT_TYPE).unwrap(),
res.head.headers.get(header::CONTENT_TYPE).unwrap(),
HeaderValue::from_static("text/plain")
);
assert_eq!(
res.headers
res.head
.headers
.get(HeaderName::from_bytes(b"X-Served-By")?)
.unwrap(),
HeaderValue::from_static("mock")
);
let body = res.body.try_concat().await?;
let body = res.into_body().raw().try_concat().await?;
assert_eq!(&*body, &b"Hello, world!"[..]);
Ok(())
@ -883,14 +885,15 @@ mod tests {
mock.assert();
assert_eq!(res.status, StatusCode::OK);
assert_eq!(res.head.status, StatusCode::OK);
assert!(res.body.is_empty());
assert_eq!(
res.headers.get(header::CONTENT_TYPE).unwrap(),
res.head.headers.get(header::CONTENT_TYPE).unwrap(),
HeaderValue::from_static("text/plain")
);
assert_eq!(
res.headers
res.head
.headers
.get(HeaderName::from_bytes(b"X-Served-By")?)
.unwrap(),
HeaderValue::from_static("mock")
@ -913,7 +916,7 @@ mod tests {
let res = Request::post(url).body(body.as_bytes()).send()?;
mock.assert();
assert_eq!(res.status, StatusCode::CREATED);
assert_eq!(res.head.status, StatusCode::CREATED);
Ok(())
}
@ -933,7 +936,7 @@ mod tests {
let res = Request::post(url).body(body_bytes).send()?;
mock.assert();
assert_eq!(res.status, StatusCode::CREATED);
assert_eq!(res.head.status, StatusCode::CREATED);
Ok(())
}
@ -955,7 +958,7 @@ mod tests {
.send()?;
mock.assert();
assert_eq!(res.status, StatusCode::CREATED);
assert_eq!(res.head.status, StatusCode::CREATED);
Ok(())
}
@ -977,7 +980,7 @@ mod tests {
let res = Request::post(url).json(&body)?.send()?;
mock.assert();
assert_eq!(res.status, StatusCode::CREATED);
assert_eq!(res.head.status, StatusCode::CREATED);
Ok(())
}
@ -1006,7 +1009,7 @@ mod tests {
let res = Request::post(url).cbor(&body)?.send()?;
mock.assert();
assert_eq!(res.status, StatusCode::CREATED);
assert_eq!(res.head.status, StatusCode::CREATED);
Ok(())
}

View File

@ -7,7 +7,6 @@
use std::convert::TryFrom;
use std::io::Cursor;
use std::mem;
use std::pin::Pin;
use anyhow::anyhow;
@ -28,14 +27,13 @@ use crate::request::Encoding;
use crate::stream::{BufferedStream, CborStream};
#[derive(Debug)]
pub struct Response {
pub struct Head {
pub(crate) version: Version,
pub(crate) status: StatusCode,
pub(crate) headers: HeaderMap,
pub(crate) body: Vec<u8>,
}
impl Response {
impl Head {
/// Get the HTTP version of the response.
pub fn version(&self) -> Version {
self.version
@ -51,15 +49,48 @@ impl Response {
&self.headers
}
/// Get the response's encoding from the Content-Encoding header.
pub fn encoding(&self) -> Result<Encoding, HttpClientError> {
self.headers
.get(header::CONTENT_ENCODING)
.map(|encoding| Ok(encoding.to_str()?.into()))
.unwrap_or(Ok(Encoding::Identity))
.map_err(|_: header::ToStrError| {
HttpClientError::BadResponse(anyhow!("Invalid Content-Encoding"))
})
}
}
#[derive(Debug)]
pub struct Response {
pub(crate) head: Head,
pub(crate) body: Vec<u8>,
}
impl Response {
/// Get the HTTP version of the response.
pub fn version(&self) -> Version {
self.head.version
}
/// Get the HTTP status code of the response.
pub fn status(&self) -> StatusCode {
self.head.status
}
/// Get the response's headers.
pub fn headers(&self) -> &HeaderMap {
&self.head.headers
}
/// Get the response's body.
pub fn body(&self) -> &[u8] {
&self.body
}
/// Move the response's body out of the response.
/// Subsequent calls will return an empty body.
pub fn take_body(&mut self) -> Vec<u8> {
mem::take(&mut self.body)
/// Split the
pub fn into_parts(self) -> (Head, Vec<u8>) {
(self.head, self.body)
}
/// Deserialize the response body from JSON.
@ -87,9 +118,11 @@ impl TryFrom<&mut Buffered> for Response {
};
Ok(Self {
version,
status,
headers: buffered.take_headers(),
head: Head {
version,
status,
headers: buffered.take_headers(),
},
body: buffered.take_body(),
})
}
@ -108,19 +141,73 @@ macro_rules! decode {
}};
}
pub type AsyncBody = Pin<Box<dyn Stream<Item = Result<Vec<u8>, HttpClientError>> + Send + 'static>>;
pub type CborStreamBody<T> = CborStream<T, AsyncBody, Vec<u8>, HttpClientError>;
pub type BufferedStreamBody = BufferedStream<AsyncBody, Vec<u8>, HttpClientError>;
pub type ByteStream =
Pin<Box<dyn Stream<Item = Result<Vec<u8>, HttpClientError>> + Send + 'static>>;
pub struct AsyncBody {
// This is a `Result` so that an invalid Content-Encoding header does not prevent the caller
// from accessing the raw body stream if desired. The error will only be propagated if the
// caller actually wants to decode the body stream.
encoding: Result<Encoding, HttpClientError>,
body: ByteStream,
}
pub type CborStreamBody<T> = CborStream<T, ByteStream, Vec<u8>, HttpClientError>;
pub type BufferedStreamBody = BufferedStream<ByteStream, Vec<u8>, HttpClientError>;
impl AsyncBody {
/// Get a stream of the response's body content.
///
/// This method is the preferred way of accessing the response's body stream. The data will be
/// automatically decoded based on the response's Content-Encoding header.
pub fn decoded(self) -> ByteStream {
let Self { encoding, body } = self;
stream::once(async move {
Ok(match encoding? {
Encoding::Identity => body.boxed(),
Encoding::Brotli => decode!(BrotliDecoder, body),
Encoding::Deflate => decode!(DeflateDecoder, body),
Encoding::Gzip => decode!(GzipDecoder, body),
Encoding::Zstd => decode!(ZstdDecoder, body),
other => {
return Err(HttpClientError::BadResponse(anyhow!(
"Unsupported Content-Encoding: {:?}",
other
)));
}
})
})
.try_flatten()
.boxed()
}
/// Get a stream of the response's raw on-the-wire content.
///
/// Note that the caller is responsible for decoding the response if it is compressed. Most
/// callers will want to use the `decoded` method instead which does this automatically.
pub fn raw(self) -> ByteStream {
self.body
}
/// Attempt to deserialize the incoming data as a stream of CBOR values.
pub fn cbor<T: DeserializeOwned>(self) -> CborStreamBody<T> {
CborStream::new(self.decoded())
}
/// Create a buffered body stream that ensures that all yielded chunks
/// (except the last) are at least as large as the given chunk size.
pub fn buffered(self, size: usize) -> BufferedStreamBody {
BufferedStream::new(self.decoded(), size)
}
}
pub struct AsyncResponse {
pub(crate) version: Version,
pub(crate) status: StatusCode,
pub(crate) headers: HeaderMap,
pub(crate) head: Head,
pub(crate) body: AsyncBody,
}
impl AsyncResponse {
pub async fn new(streams: ResponseStreams) -> Result<Self, HttpClientError> {
pub(crate) async fn new(streams: ResponseStreams) -> Result<Self, HttpClientError> {
let ResponseStreams {
headers_rx,
body_rx,
@ -174,77 +261,40 @@ impl AsyncResponse {
.try_filter(|chunk| future::ready(!chunk.is_empty()))
.boxed();
Ok(Self {
let head = Head {
version,
status,
headers,
body,
})
};
let encoding = head.encoding();
let body = AsyncBody { encoding, body };
Ok(Self { head, body })
}
/// Get the HTTP version of the response.
pub fn version(&self) -> Version {
self.version
self.head.version
}
/// Get the HTTP status code of the response.
pub fn status(&self) -> StatusCode {
self.status
self.head.status
}
/// Get the response's headers.
pub fn headers(&self) -> &HeaderMap {
&self.headers
&self.head.headers
}
/// Get the response's raw body stream, consisting of the raw bytes from
/// the wire. The data may be compresssed, depending on the value of the
/// `Content-Encoding` header.
pub fn raw_body(&mut self) -> AsyncBody {
mem::replace(&mut self.body, stream::empty().boxed())
/// Consume the response and obtain its body stream.
pub fn into_body(self) -> AsyncBody {
self.body
}
/// Get the response's body stream. This will move the body stream out of
/// the response; subsequent calls will return an empty body stream.
pub fn body(&mut self) -> AsyncBody {
let body = self.raw_body();
let encoding = self
.headers
.get(header::CONTENT_ENCODING)
.map(|encoding| Ok(encoding.to_str()?.into()))
.unwrap_or(Ok(Encoding::Identity))
.map_err(|_: header::ToStrError| {
HttpClientError::BadResponse(anyhow!("Invalid Content-Encoding"))
});
stream::once(async move {
Ok(match encoding? {
Encoding::Identity => body.boxed(),
Encoding::Brotli => decode!(BrotliDecoder, body),
Encoding::Deflate => decode!(DeflateDecoder, body),
Encoding::Gzip => decode!(GzipDecoder, body),
Encoding::Zstd => decode!(ZstdDecoder, body),
other => {
return Err(HttpClientError::BadResponse(anyhow!(
"Unsupported Content-Encoding: {:?}",
other
)));
}
})
})
.try_flatten()
.boxed()
}
/// Attempt to deserialize the incoming data as a stream of CBOR values.
pub fn cbor<T: DeserializeOwned>(&mut self) -> CborStreamBody<T> {
CborStream::new(self.body())
}
/// Create a buffered body stream that ensures that all yielded chunks
/// (except the last) are at least as large as the given chunk size.
pub fn buffered(&mut self, size: usize) -> BufferedStreamBody {
BufferedStream::new(self.body(), size)
/// Split the response into its head and body.
pub fn into_parts(self) -> (Head, AsyncBody) {
(self.head, self.body)
}
}
@ -272,14 +322,14 @@ mod tests {
.create();
let url = Url::parse(&mockito::server_url())?.join("test")?;
let mut res = Request::get(url)
let res = Request::get(url)
.accept_encoding(Encoding::all())
.send_async()
.await?;
mock.assert();
let body = res.body().try_concat().await?;
let body = res.into_body().decoded().try_concat().await?;
assert_eq!(&*body, &uncompressed[..]);
Ok(())

View File

@ -1145,16 +1145,16 @@ impl LfsRemoteInner {
.await
.map_err(|_| TransferError::Timeout(request_timeout))?;
let mut reply = match reply {
let reply = match reply {
Some(r) => r?,
None => {
return Err(TransferError::EndOfStream);
}
};
let status = reply.status();
let headers = reply.headers().clone();
let (head, body) = reply.into_parts();
let status = head.status();
if !status.is_success() {
return Err(TransferError::HttpStatus(status));
}
@ -1162,11 +1162,12 @@ impl LfsRemoteInner {
check_status(status)?;
let start = Instant::now();
let mut body = reply.body();
let mut body = body.decoded();
let mut chunks: Vec<Vec<u8>> = vec![];
while let Some(res) = timeout(request_timeout, body.next()).await.transpose() {
let chunk = res.map_err(|_| {
let request_id = headers
let request_id = head
.headers()
.get("x-request-id")
.and_then(|c| std::str::from_utf8(c.as_bytes()).ok())
.unwrap_or("?")