From 9f888ae48edc008445cff1c318b5277b0d56a058 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Fri, 9 Aug 2019 01:57:35 -0700 Subject: [PATCH] mononoke/apiserver: stream file contents Summary: While investigating API server memory usage with HarveyHunt (made possible by the fact that he enabled Strobelight there!), we eventually identified that the root cause is that Actix keeps around a pool of response buffers after it responds to requests. The pool contains up to 128 buffers, and over time those will resize up to whatever is the size of the biggest content the API server served up in a single chunk. So, if you're periodically serving 500MB chunks, then eventually you'll have 128 * 500MB chunks in your pool. That's kinda sad for memory usage, Harvey has a post coming up explaining this more in-depth. Look for `SharedBytesPool` in the Actix code if you're curious to know more. One way to improve things is to avoid letting those buffers get too big, by serving our responses in smaller chunks. Since we now have a Filestore that chunks, we can do that, sot that's what this patch does. Now, this only works for files that are _actually_ chunked when the Filestore returns them. Indeed, other files will have just one big chunk so Actix will still reuse a big chunk (though we _could_ fix this by re-chunking on the fly when we read things). However, the plan here is to simply go back and re-chunk existing large blobs in our repos, then we're done. As an aside, note that I didn't touch the Thrift API here. So, that API still puts everything into a single buffer. That's not ideal, but it's not catastrophic since that buffer gets freed (unlike the Actix response buffers). Reviewed By: fanzeyi Differential Revision: D16709352 fbshipit-source-id: b01fba478118bd8f286e096cf52b8e7d7b7e8c42 --- apiserver/src/actor/file_stream.rs | 61 ++++++++++++++++++++++++++++++ apiserver/src/actor/mod.rs | 1 + apiserver/src/actor/repo.rs | 25 +++++------- apiserver/src/actor/response.rs | 23 +++++------ apiserver/src/thrift/mononoke.rs | 41 +++++++++++++------- 5 files changed, 108 insertions(+), 43 deletions(-) create mode 100644 apiserver/src/actor/file_stream.rs diff --git a/apiserver/src/actor/file_stream.rs b/apiserver/src/actor/file_stream.rs new file mode 100644 index 0000000000..3a0b3d367b --- /dev/null +++ b/apiserver/src/actor/file_stream.rs @@ -0,0 +1,61 @@ +// Copyright (c) 2018-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +// NOTE: This FileStream wrapper is a bit of a hack while we have an API server that builds on top +// of a BlobRepo abstraction that flattens streams. Indeed, in the API server, we need to know if a +// file is going to exist before we create a response using its stream of bytes. Otherwise, when we +// tell Actix to serve a response using that stream, we don't know if the file exists, so we tell +// Actix to serve up a 200. When Actix tries reading the stream to send it to the client, it gets a +// "not found" error, but by then it's too late to serve a 404 to the client, and Actix just closes +// the connection (it doesn't send _anything_ back, in fact). So, we "fix" that by requiring that +// binary responses have to be created from a FileStream, and the only way to create a FileStream +// is to give it a Stream, and doing that will poll for the first element of the stream to see if +// it's an error (which effectively undoes the flattening BlobRepo did). This is meant to be a +// temporary fix while we work out a better API for getting file streams out of Blobrepo. +// +// If you'd like to refactor this, then the right way to test your fix is to ask a streaming +// endpoint for something that doesn't exist. If you get a 404, you succeeded, congratulations! If +// the connection is closed, or you get a 500, try again :( + +use bytes::Bytes; +use failure::Error; +use futures::{ + stream::{iter_ok, once}, + Future, Stream, +}; +use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt}; +use mercurial_types::FileBytes; + +pub struct FileStream(BoxStream); + +impl FileStream { + pub fn into_bytes_stream(self) -> BoxStream { + self.0 + } +} + +pub trait IntoFileStream { + fn into_filestream(self) -> BoxFuture; +} + +impl IntoFileStream for S +where + S: Stream + Send + 'static, +{ + fn into_filestream(self) -> BoxFuture { + self.map(FileBytes::into_bytes) + .into_future() + .map_err(|(err, _stream)| err) + .map(|(bytes, stream)| { + let stream = match bytes { + Some(bytes) => once(Ok(bytes)).chain(stream).left_stream(), + None => iter_ok(vec![]).right_stream(), + }; + FileStream(stream.boxify()) + }) + .boxify() + } +} diff --git a/apiserver/src/actor/mod.rs b/apiserver/src/actor/mod.rs index c7e2379e4c..2eccc06258 100644 --- a/apiserver/src/actor/mod.rs +++ b/apiserver/src/actor/mod.rs @@ -22,6 +22,7 @@ use metaconfig_parser::RepoConfigs; use crate::cache::CacheManager; use crate::errors::ErrorKind; +mod file_stream; mod lfs; mod model; mod query; diff --git a/apiserver/src/actor/repo.rs b/apiserver/src/actor/repo.rs index 7eff11ceda..7c0a426ce6 100644 --- a/apiserver/src/actor/repo.rs +++ b/apiserver/src/actor/repo.rs @@ -48,6 +48,7 @@ use crate::cache::CacheManager; use crate::errors::ErrorKind; use crate::from_string as FS; +use super::file_stream::IntoFileStream; use super::lfs::{build_response, BatchRequest}; use super::model::{Entry, EntryWithSizeAndContentHash}; use super::{MononokeRepoQuery, MononokeRepoResponse, Revision}; @@ -166,10 +167,8 @@ impl MononokeRepo { Content::File(stream) | Content::Executable(stream) | Content::Symlink(stream) => stream - .concat2() // TODO (T47717165): Stream file contents out. - .map(|file_bytes| MononokeRepoResponse::GetRawFile { - content: file_bytes.into_bytes(), - }) + .into_filestream() + .map(MononokeRepoResponse::GetRawFile) .left_future(), _ => Err(ErrorKind::InvalidInput(path.to_string(), None).into()) .into_future() @@ -231,12 +230,8 @@ impl MononokeRepo { self.repo .get_file_content(ctx, HgFileNodeId::new(blobhash)) - .concat2() // TODO (T47717165): Stream file contents out. - .and_then(|file_bytes| { - Ok(MononokeRepoResponse::GetBlobContent { - content: file_bytes.into_bytes(), - }) - }) + .into_filestream() + .map(MononokeRepoResponse::GetBlobContent) .from_err() .boxify() } @@ -344,17 +339,15 @@ impl MononokeRepo { // TODO (T47378130): Use a more native filestore interface here. self.repo .get_file_content_id_by_sha256(ctx.clone(), sha256_oid) - .map({ + .and_then({ cloned!(self.repo, ctx); move |content_id| { - let stream = repo - .get_file_content_by_content_id(ctx, content_id) - .map(|s| s.into_bytes()) - .boxify(); - MononokeRepoResponse::DownloadLargeFile(stream) + repo.get_file_content_by_content_id(ctx, content_id) + .into_filestream() } }) .from_err() + .map(MononokeRepoResponse::DownloadLargeFile) .boxify() } diff --git a/apiserver/src/actor/response.rs b/apiserver/src/actor/response.rs index 5779858b6c..0b6463433c 100644 --- a/apiserver/src/actor/response.rs +++ b/apiserver/src/actor/response.rs @@ -20,21 +20,15 @@ use types::{ DataEntry, RepoPathBuf, WireHistoryEntry, }; +use super::file_stream::FileStream; use super::lfs::BatchResponse; use super::model::{Changeset, Entry, EntryWithSizeAndContentHash}; type StreamingDataResponse = BoxStream; type StreamingHistoryResponse = BoxStream<(RepoPathBuf, WireHistoryEntry), Error>; -// WARNING: Do not re-arrange the order of this enum. #[derive(Serialize, Deserialize)] pub enum MononokeRepoResponse { - GetRawFile { - content: Bytes, - }, - GetBlobContent { - content: Bytes, - }, ListDirectory { files: Vec, }, @@ -57,7 +51,11 @@ pub enum MononokeRepoResponse { // NOTE: Please add serializable responses before this line #[serde(skip)] - DownloadLargeFile(BoxStream), + GetRawFile(FileStream), + #[serde(skip)] + GetBlobContent(FileStream), + #[serde(skip)] + DownloadLargeFile(FileStream), #[serde(skip)] EdenGetData(DataResponse), #[serde(skip)] @@ -107,11 +105,8 @@ where .body(Body::Streaming(stream as BodyStream)) } -fn streaming_binary_response(stream: S) -> HttpResponse -where - S: Stream + Send + 'static, -{ - let stream = stream.from_err().boxify(); +fn streaming_binary_response(stream: FileStream) -> HttpResponse { + let stream = stream.into_bytes_stream().from_err().boxify(); HttpResponse::Ok() .content_type("application/octet-stream") @@ -127,7 +122,6 @@ impl Responder for MononokeRepoResponse { use self::MononokeRepoResponse::*; match self { - GetRawFile { content } | GetBlobContent { content } => Ok(binary_response(content)), ListDirectory { files } => Json(files).respond_to(req), GetTree { files } => Json(files).respond_to(req), GetChangeset { changeset } => Json(changeset).respond_to(req), @@ -141,6 +135,7 @@ impl Responder for MononokeRepoResponse { })), LfsBatch { response } => Json(response).respond_to(req), UploadLargeFile {} => Ok(HttpResponse::Ok().into()), + GetRawFile(stream) | GetBlobContent(stream) => Ok(streaming_binary_response(stream)), DownloadLargeFile(stream) => Ok(streaming_binary_response(stream)), EdenGetData(response) => Ok(cbor_response(response)), EdenGetHistory(response) => Ok(cbor_response(response)), diff --git a/apiserver/src/thrift/mononoke.rs b/apiserver/src/thrift/mononoke.rs index 7417cf5e6a..720cdc9c70 100644 --- a/apiserver/src/thrift/mononoke.rs +++ b/apiserver/src/thrift/mononoke.rs @@ -23,7 +23,8 @@ use async_trait::async_trait; use cloned::cloned; use context::CoreContext; use failure::{err_msg, Error}; -use futures::{Future, IntoFuture}; +use futures::{Future, IntoFuture, Stream}; +use futures_ext::FutureExt; use futures_preview::compat::Future01CompatExt; use scuba_ext::ScubaSampleBuilder; use slog::Logger; @@ -81,16 +82,17 @@ impl MononokeAPIServiceImpl { scuba } - async fn convert_and_call( + async fn convert_and_call( &self, ctx: CoreContext, - params: P, - mapper: F, + params: Params, + mapper: Mapper, ) -> Result where - F: FnMut(MononokeRepoResponse) -> Result, - MononokeQuery: TryFrom, + Mapper: FnMut(MononokeRepoResponse) -> Mapped, + MononokeQuery: TryFrom, Err: From, + Mapped: IntoFuture, { params .try_into() @@ -141,11 +143,18 @@ impl MononokeApiservice for MononokeAPIServiceImpl { .convert_and_call( ctx.clone(), params, - |resp: MononokeRepoResponse| match resp { - MononokeRepoResponse::GetRawFile { content } => Ok(content.to_vec()), + move |resp: MononokeRepoResponse| match resp { + MononokeRepoResponse::GetRawFile(stream) => stream + .into_bytes_stream() + .from_err() + .concat2() + .map(|bytes| bytes.to_vec()) + .left_future(), _ => Err(ErrorKind::InternalError(err_msg( "Actor returned wrong response type to query".to_string(), - ))), + ))) + .into_future() + .right_future(), }, ) .await; @@ -311,12 +320,18 @@ impl MononokeApiservice for MononokeAPIServiceImpl { ctx.clone(), params, |resp: MononokeRepoResponse| match resp { - MononokeRepoResponse::GetBlobContent { content } => Ok(MononokeBlob { - content: content.to_vec(), - }), + MononokeRepoResponse::GetBlobContent(stream) => stream + .into_bytes_stream() + .from_err() + .concat2() + .map(|bytes| bytes.to_vec()) + .map(|content| MononokeBlob { content }) + .left_future(), _ => Err(ErrorKind::InternalError(err_msg( "Actor returned wrong response type to query".to_string(), - ))), + ))) + .into_future() + .right_future(), }, ) .await;