mononoke/apiserver: stream file contents

Summary:
While investigating API server memory usage with HarveyHunt (made possible by the fact that he enabled Strobelight there!), we eventually identified that the root cause is that Actix keeps around a pool of response buffers after it responds to requests.

The pool contains up to 128 buffers, and over time those will resize up to whatever is the size of the biggest content the API server served up in a single chunk. So, if you're periodically serving 500MB chunks, then eventually you'll have 128 * 500MB chunks in your pool. That's kinda sad for memory usage, Harvey has a post coming up explaining this more in-depth. Look for `SharedBytesPool` in the Actix code if you're curious to know more.

One way to improve things is to avoid letting those buffers get too big, by serving our responses in smaller chunks. Since we now have a Filestore that chunks, we can do that, sot that's what this patch does.

Now, this only works for files that are _actually_ chunked when the Filestore returns them. Indeed, other files will have just one big chunk so Actix will still reuse a big chunk (though we _could_ fix this by re-chunking on the fly when we read things). However, the plan here is to simply go back and re-chunk existing large blobs in our repos, then we're done.

As an aside, note that I didn't touch the Thrift API here. So, that API still puts everything into a single buffer. That's not ideal, but it's not catastrophic since that buffer gets freed (unlike the Actix response buffers).

Reviewed By: fanzeyi

Differential Revision: D16709352

fbshipit-source-id: b01fba478118bd8f286e096cf52b8e7d7b7e8c42
This commit is contained in:
Thomas Orozco 2019-08-09 01:57:35 -07:00 committed by Facebook Github Bot
parent ce79695ce3
commit 9f888ae48e
5 changed files with 108 additions and 43 deletions

View File

@ -0,0 +1,61 @@
// Copyright (c) 2018-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
// NOTE: This FileStream wrapper is a bit of a hack while we have an API server that builds on top
// of a BlobRepo abstraction that flattens streams. Indeed, in the API server, we need to know if a
// file is going to exist before we create a response using its stream of bytes. Otherwise, when we
// tell Actix to serve a response using that stream, we don't know if the file exists, so we tell
// Actix to serve up a 200. When Actix tries reading the stream to send it to the client, it gets a
// "not found" error, but by then it's too late to serve a 404 to the client, and Actix just closes
// the connection (it doesn't send _anything_ back, in fact). So, we "fix" that by requiring that
// binary responses have to be created from a FileStream, and the only way to create a FileStream
// is to give it a Stream, and doing that will poll for the first element of the stream to see if
// it's an error (which effectively undoes the flattening BlobRepo did). This is meant to be a
// temporary fix while we work out a better API for getting file streams out of Blobrepo.
//
// If you'd like to refactor this, then the right way to test your fix is to ask a streaming
// endpoint for something that doesn't exist. If you get a 404, you succeeded, congratulations! If
// the connection is closed, or you get a 500, try again :(
use bytes::Bytes;
use failure::Error;
use futures::{
stream::{iter_ok, once},
Future, Stream,
};
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use mercurial_types::FileBytes;
pub struct FileStream(BoxStream<Bytes, Error>);
impl FileStream {
pub fn into_bytes_stream(self) -> BoxStream<Bytes, Error> {
self.0
}
}
pub trait IntoFileStream {
fn into_filestream(self) -> BoxFuture<FileStream, Error>;
}
impl<S> IntoFileStream for S
where
S: Stream<Item = FileBytes, Error = Error> + Send + 'static,
{
fn into_filestream(self) -> BoxFuture<FileStream, Error> {
self.map(FileBytes::into_bytes)
.into_future()
.map_err(|(err, _stream)| err)
.map(|(bytes, stream)| {
let stream = match bytes {
Some(bytes) => once(Ok(bytes)).chain(stream).left_stream(),
None => iter_ok(vec![]).right_stream(),
};
FileStream(stream.boxify())
})
.boxify()
}
}

View File

@ -22,6 +22,7 @@ use metaconfig_parser::RepoConfigs;
use crate::cache::CacheManager;
use crate::errors::ErrorKind;
mod file_stream;
mod lfs;
mod model;
mod query;

View File

@ -48,6 +48,7 @@ use crate::cache::CacheManager;
use crate::errors::ErrorKind;
use crate::from_string as FS;
use super::file_stream::IntoFileStream;
use super::lfs::{build_response, BatchRequest};
use super::model::{Entry, EntryWithSizeAndContentHash};
use super::{MononokeRepoQuery, MononokeRepoResponse, Revision};
@ -166,10 +167,8 @@ impl MononokeRepo {
Content::File(stream)
| Content::Executable(stream)
| Content::Symlink(stream) => stream
.concat2() // TODO (T47717165): Stream file contents out.
.map(|file_bytes| MononokeRepoResponse::GetRawFile {
content: file_bytes.into_bytes(),
})
.into_filestream()
.map(MononokeRepoResponse::GetRawFile)
.left_future(),
_ => Err(ErrorKind::InvalidInput(path.to_string(), None).into())
.into_future()
@ -231,12 +230,8 @@ impl MononokeRepo {
self.repo
.get_file_content(ctx, HgFileNodeId::new(blobhash))
.concat2() // TODO (T47717165): Stream file contents out.
.and_then(|file_bytes| {
Ok(MononokeRepoResponse::GetBlobContent {
content: file_bytes.into_bytes(),
})
})
.into_filestream()
.map(MononokeRepoResponse::GetBlobContent)
.from_err()
.boxify()
}
@ -344,17 +339,15 @@ impl MononokeRepo {
// TODO (T47378130): Use a more native filestore interface here.
self.repo
.get_file_content_id_by_sha256(ctx.clone(), sha256_oid)
.map({
.and_then({
cloned!(self.repo, ctx);
move |content_id| {
let stream = repo
.get_file_content_by_content_id(ctx, content_id)
.map(|s| s.into_bytes())
.boxify();
MononokeRepoResponse::DownloadLargeFile(stream)
repo.get_file_content_by_content_id(ctx, content_id)
.into_filestream()
}
})
.from_err()
.map(MononokeRepoResponse::DownloadLargeFile)
.boxify()
}

View File

@ -20,21 +20,15 @@ use types::{
DataEntry, RepoPathBuf, WireHistoryEntry,
};
use super::file_stream::FileStream;
use super::lfs::BatchResponse;
use super::model::{Changeset, Entry, EntryWithSizeAndContentHash};
type StreamingDataResponse = BoxStream<DataEntry, Error>;
type StreamingHistoryResponse = BoxStream<(RepoPathBuf, WireHistoryEntry), Error>;
// WARNING: Do not re-arrange the order of this enum.
#[derive(Serialize, Deserialize)]
pub enum MononokeRepoResponse {
GetRawFile {
content: Bytes,
},
GetBlobContent {
content: Bytes,
},
ListDirectory {
files: Vec<Entry>,
},
@ -57,7 +51,11 @@ pub enum MononokeRepoResponse {
// NOTE: Please add serializable responses before this line
#[serde(skip)]
DownloadLargeFile(BoxStream<Bytes, Error>),
GetRawFile(FileStream),
#[serde(skip)]
GetBlobContent(FileStream),
#[serde(skip)]
DownloadLargeFile(FileStream),
#[serde(skip)]
EdenGetData(DataResponse),
#[serde(skip)]
@ -107,11 +105,8 @@ where
.body(Body::Streaming(stream as BodyStream))
}
fn streaming_binary_response<S>(stream: S) -> HttpResponse
where
S: Stream<Item = Bytes, Error = Error> + Send + 'static,
{
let stream = stream.from_err().boxify();
fn streaming_binary_response(stream: FileStream) -> HttpResponse {
let stream = stream.into_bytes_stream().from_err().boxify();
HttpResponse::Ok()
.content_type("application/octet-stream")
@ -127,7 +122,6 @@ impl Responder for MononokeRepoResponse {
use self::MononokeRepoResponse::*;
match self {
GetRawFile { content } | GetBlobContent { content } => Ok(binary_response(content)),
ListDirectory { files } => Json(files).respond_to(req),
GetTree { files } => Json(files).respond_to(req),
GetChangeset { changeset } => Json(changeset).respond_to(req),
@ -141,6 +135,7 @@ impl Responder for MononokeRepoResponse {
})),
LfsBatch { response } => Json(response).respond_to(req),
UploadLargeFile {} => Ok(HttpResponse::Ok().into()),
GetRawFile(stream) | GetBlobContent(stream) => Ok(streaming_binary_response(stream)),
DownloadLargeFile(stream) => Ok(streaming_binary_response(stream)),
EdenGetData(response) => Ok(cbor_response(response)),
EdenGetHistory(response) => Ok(cbor_response(response)),

View File

@ -23,7 +23,8 @@ use async_trait::async_trait;
use cloned::cloned;
use context::CoreContext;
use failure::{err_msg, Error};
use futures::{Future, IntoFuture};
use futures::{Future, IntoFuture, Stream};
use futures_ext::FutureExt;
use futures_preview::compat::Future01CompatExt;
use scuba_ext::ScubaSampleBuilder;
use slog::Logger;
@ -81,16 +82,17 @@ impl MononokeAPIServiceImpl {
scuba
}
async fn convert_and_call<F, P, Ret, Err>(
async fn convert_and_call<Params, Mapper, Mapped, Ret, Err>(
&self,
ctx: CoreContext,
params: P,
mapper: F,
params: Params,
mapper: Mapper,
) -> Result<Ret, Err>
where
F: FnMut(MononokeRepoResponse) -> Result<Ret, ErrorKind>,
MononokeQuery: TryFrom<P, Error = Error>,
Mapper: FnMut(MononokeRepoResponse) -> Mapped,
MononokeQuery: TryFrom<Params, Error = Error>,
Err: From<MononokeAPIException>,
Mapped: IntoFuture<Item = Ret, Error = ErrorKind>,
{
params
.try_into()
@ -141,11 +143,18 @@ impl MononokeApiservice for MononokeAPIServiceImpl {
.convert_and_call(
ctx.clone(),
params,
|resp: MononokeRepoResponse| match resp {
MononokeRepoResponse::GetRawFile { content } => Ok(content.to_vec()),
move |resp: MononokeRepoResponse| match resp {
MononokeRepoResponse::GetRawFile(stream) => stream
.into_bytes_stream()
.from_err()
.concat2()
.map(|bytes| bytes.to_vec())
.left_future(),
_ => Err(ErrorKind::InternalError(err_msg(
"Actor returned wrong response type to query".to_string(),
))),
)))
.into_future()
.right_future(),
},
)
.await;
@ -311,12 +320,18 @@ impl MononokeApiservice for MononokeAPIServiceImpl {
ctx.clone(),
params,
|resp: MononokeRepoResponse| match resp {
MononokeRepoResponse::GetBlobContent { content } => Ok(MononokeBlob {
content: content.to_vec(),
}),
MononokeRepoResponse::GetBlobContent(stream) => stream
.into_bytes_stream()
.from_err()
.concat2()
.map(|bytes| bytes.to_vec())
.map(|content| MononokeBlob { content })
.left_future(),
_ => Err(ErrorKind::InternalError(err_msg(
"Actor returned wrong response type to query".to_string(),
))),
)))
.into_future()
.right_future(),
},
)
.await;