mononoke/lfs_server: automatically consume HTTP response bodies when dropped

Summary:
If we don't read the body for a response, then Hyper cannot return the
connection to the pool. So, let's do it automatically upon dropping. This will
typically happen when we send a request to upstream then don't read the
response.

I seem to remember this used to work fine at some point, but looking at the
code I think it's actually broken now and we don't reuse upstream connections
if we skip waiting for upstream in a batch request. So, let's fix it once and
for all with a more robust abstraction.

Reviewed By: HarveyHunt

Differential Revision: D22206742

fbshipit-source-id: 2da1c008556e1d964c1cc337d58f06f8d691a916
This commit is contained in:
Thomas Orozco 2020-06-24 09:59:09 -07:00 committed by Facebook GitHub Bot
parent b60ff4403f
commit 76606260c2

View File

@ -15,7 +15,10 @@ use std::sync::{
use anyhow::{Context, Error};
use bytes::Bytes;
use cached_config::ConfigHandle;
use futures::stream::{Stream, TryStreamExt};
use futures::{
future,
stream::{Stream, StreamExt, TryStreamExt},
};
use gotham::state::{FromState, State};
use gotham_derive::StateData;
use gotham_ext::body_ext::BodyExt;
@ -26,6 +29,7 @@ use http::{
use hyper::{Body, Request};
use permission_checker::{ArcPermissionChecker, MononokeIdentitySet};
use slog::Logger;
use tokio::runtime::Handle;
use blobrepo::BlobRepo;
use context::CoreContext;
@ -190,14 +194,20 @@ pub struct RepositoryRequestContext {
client: HttpClient,
}
pub struct HttpClientResponse<S> {
pub struct HttpClientResponse<S: Stream<Item = Result<Bytes, Error>> + Send + 'static> {
headers: HeaderMap,
body: S,
body: Option<S>,
handle: Handle,
}
impl<S: Stream<Item = Result<Bytes, Error>>> HttpClientResponse<S> {
pub async fn concat(self) -> Result<Bytes, Error> {
let body = self.body.try_concat_body(&self.headers)?.await?;
impl<S: Stream<Item = Result<Bytes, Error>> + Send + 'static> HttpClientResponse<S> {
pub async fn concat(mut self) -> Result<Bytes, Error> {
let body = self
.body
.take()
.expect("Body cannot be missing")
.try_concat_body(&self.headers)?
.await?;
Ok(body)
}
@ -206,8 +216,17 @@ impl<S: Stream<Item = Result<Bytes, Error>>> HttpClientResponse<S> {
Ok(())
}
pub fn into_inner(self) -> S {
self.body
pub fn into_inner(mut self) -> S {
self.body.take().expect("Body cannot be missing")
}
}
impl<S: Stream<Item = Result<Bytes, Error>> + Send + 'static> Drop for HttpClientResponse<S> {
fn drop(&mut self) {
if let Some(body) = self.body.take() {
let discard = body.for_each(|_| future::ready(()));
self.handle.spawn(discard);
}
}
}
@ -281,7 +300,8 @@ impl RepositoryRequestContext {
// our own wrapper type that wraps the response and the headers.
Ok(HttpClientResponse {
headers: head.headers,
body: body.map_err(Error::from),
body: Some(body.map_err(Error::from)),
handle: Handle::current(),
})
};