edenapi_server: return streaming responses

Summary: Make most of the handlers on the EdenAPI server return streaming responses.

Reviewed By: krallin

Differential Revision: D22014652

fbshipit-source-id: 177e540e1372e7dfcba73c594614f0225da3a10f
This commit is contained in:
Arun Kulshreshtha 2020-06-15 09:09:31 -07:00 committed by Facebook GitHub Bot
parent 33b5dd5c41
commit 7d72408ade
6 changed files with 114 additions and 114 deletions

View File

@ -5,22 +5,25 @@
* GNU General Public License version 2.
*/
use anyhow::Context;
use futures::{stream::FuturesUnordered, TryStreamExt};
use anyhow::{Context, Error};
use futures::{stream, Stream, StreamExt};
use gotham::state::{FromState, State};
use gotham_derive::{StateData, StaticResponseExtender};
use serde::Deserialize;
use edenapi_types::{DataEntry, DataRequest, DataResponse};
use edenapi_types::{DataEntry, DataRequest};
use gotham_ext::{error::HttpError, response::TryIntoResponse};
use mercurial_types::{HgFileNodeId, HgManifestId, HgNodeHash};
use mononoke_api::hg::{HgDataContext, HgDataId, HgRepoContext};
use types::Key;
use crate::context::ServerContext;
use crate::errors::{ErrorKind, MononokeErrorExt};
use crate::errors::ErrorKind;
use crate::middleware::RequestContext;
use crate::utils::{cbor_response, get_repo, parse_cbor_request};
use crate::utils::{cbor_stream, get_repo, parse_cbor_request};
/// XXX: This number was chosen arbitrarily.
const MAX_CONCURRENT_FETCHES_PER_REQUEST: usize = 10;
#[derive(Debug, Deserialize, StateData, StaticResponseExtender)]
pub struct DataParams {
@ -46,42 +49,39 @@ async fn data<ID: HgDataId>(state: &mut State) -> Result<impl TryIntoResponse, H
let repo = get_repo(&sctx, &rctx, &params.repo).await?;
let request = parse_cbor_request(state).await?;
let response = get_all_entries::<ID>(&repo, request).await?;
cbor_response(response)
Ok(cbor_stream(fetch_all::<ID>(repo, request)))
}
/// Fetch data for all of the requested keys concurrently.
async fn get_all_entries<ID: HgDataId>(
repo: &HgRepoContext,
fn fetch_all<ID: HgDataId>(
repo: HgRepoContext,
request: DataRequest,
) -> Result<DataResponse, HttpError> {
let fetches = FuturesUnordered::new();
for key in request.keys {
fetches.push(get_data_entry::<ID>(repo.clone(), key));
}
let entries = fetches.try_collect::<Vec<_>>().await?;
) -> impl Stream<Item = Result<DataEntry, Error>> {
let fetches = request
.keys
.into_iter()
.map(move |key| fetch::<ID>(repo.clone(), key));
Ok(DataResponse::new(entries))
stream::iter(fetches).buffer_unordered(MAX_CONCURRENT_FETCHES_PER_REQUEST)
}
/// Fetch requested data for a single key.
/// Note that this function consumes the repo context in order
/// to construct a file/tree context for the requested blob.
async fn get_data_entry<ID: HgDataId>(
repo: HgRepoContext,
key: Key,
) -> Result<DataEntry, HttpError> {
async fn fetch<ID: HgDataId>(repo: HgRepoContext, key: Key) -> Result<DataEntry, Error> {
let id = ID::from_node_hash(HgNodeHash::from(key.hgid));
let ctx = id
.context(repo)
.await
.map_err(|e| e.into_http_error(ErrorKind::DataFetchFailed(key.clone())))?
.with_context(|| ErrorKind::KeyDoesNotExist(key.clone()))
.map_err(HttpError::e404)?;
.with_context(|| ErrorKind::DataFetchFailed(key.clone()))?
.with_context(|| ErrorKind::KeyDoesNotExist(key.clone()))?;
let data = ctx.content().await.map_err(HttpError::e500)?;
let data = ctx
.content()
.await
.with_context(|| ErrorKind::DataFetchFailed(key.clone()))?;
let parents = ctx.hg_parents().into();
Ok(DataEntry::new(key, data, parents))

View File

@ -7,27 +7,31 @@
use std::convert::TryFrom;
use anyhow::Context;
use anyhow::{Context, Error};
use futures::{
stream::{BoxStream, FuturesUnordered},
StreamExt, TryStreamExt,
stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt,
};
use gotham::state::{FromState, State};
use gotham_derive::{StateData, StaticResponseExtender};
use serde::Deserialize;
use edenapi_types::{HistoryRequest, HistoryResponse, HistoryResponseChunk, WireHistoryEntry};
use cloned::cloned;
use edenapi_types::{HistoryRequest, HistoryResponseChunk, WireHistoryEntry};
use gotham_ext::{error::HttpError, response::TryIntoResponse};
use mercurial_types::{HgFileNodeId, HgNodeHash};
use mononoke_api::hg::HgRepoContext;
use types::Key;
use crate::context::ServerContext;
use crate::errors::{ErrorKind, MononokeErrorExt};
use crate::errors::ErrorKind;
use crate::middleware::RequestContext;
use crate::utils::{cbor_response, get_repo, parse_cbor_request, to_mpath};
use crate::utils::{cbor_stream, get_repo, parse_cbor_request, to_mpath};
type HistoryStream = BoxStream<'static, Result<WireHistoryEntry, HttpError>>;
type HistoryStream = BoxStream<'static, Result<WireHistoryEntry, Error>>;
/// XXX: This number was chosen arbitrarily.
const MAX_CONCURRENT_FETCHES_PER_REQUEST: usize = 10;
#[derive(Debug, Deserialize, StateData, StaticResponseExtender)]
pub struct HistoryParams {
@ -41,67 +45,55 @@ pub async fn history(state: &mut State) -> Result<impl TryIntoResponse, HttpErro
let repo = get_repo(&sctx, &rctx, &params.repo).await?;
let request = parse_cbor_request(state).await?;
let response = get_history(&repo, request).await?;
cbor_response(response)
Ok(cbor_stream(fetch_history(repo, request).await))
}
/// Fetch history for all of the requested files concurrently.
async fn get_history(
repo: &HgRepoContext,
async fn fetch_history(
repo: HgRepoContext,
request: HistoryRequest,
) -> Result<HistoryResponse, HttpError> {
let chunk_stream = FuturesUnordered::new();
for key in request.keys {
// Save the path for inclusion in the response.
) -> impl Stream<Item = Result<HistoryResponseChunk, Error>> {
let HistoryRequest { keys, length } = request;
let fetches = keys.into_iter().map(move |key| {
// Construct a Future that buffers the full history for this key.
// This should be OK since the history entries are relatively
// small, so unless the history is extremely long, the total
// amount of buffered data should be reasonable.
cloned!(repo);
async move {
let path = key.path.clone();
// Build a stream of history entries for a single file.
let entry_stream = single_key_history(repo, key, request.length).await?;
// Build a future that buffers the stream and resolves
// to a HistoryResponseChunk for this file.
let chunk_fut = async {
let entries = entry_stream.try_collect().await?;
let stream = fetch_history_for_key(repo, key, length).await?;
let entries = stream.try_collect().await?;
Ok(HistoryResponseChunk { path, entries })
};
chunk_stream.push(chunk_fut);
}
});
// TODO(kulshrax): Don't buffer the results here.
let chunks = chunk_stream.try_collect().await?;
let response = HistoryResponse { chunks };
Ok(response)
stream::iter(fetches).buffer_unordered(MAX_CONCURRENT_FETCHES_PER_REQUEST)
}
async fn single_key_history(
repo: &HgRepoContext,
async fn fetch_history_for_key(
repo: HgRepoContext,
key: Key,
length: Option<u32>,
) -> Result<HistoryStream, HttpError> {
) -> Result<HistoryStream, Error> {
let filenode_id = HgFileNodeId::new(HgNodeHash::from(key.hgid));
let mpath = to_mpath(&key.path)
.map_err(HttpError::e400)?
.context(ErrorKind::UnexpectedEmptyPath)
.map_err(HttpError::e400)?;
let mpath = to_mpath(&key.path)?.context(ErrorKind::UnexpectedEmptyPath)?;
let file = repo
.file(filenode_id)
.await
.map_err(|e| e.into_http_error(ErrorKind::DataFetchFailed(key.clone())))?
.with_context(|| ErrorKind::KeyDoesNotExist(key.clone()))
.map_err(HttpError::e404)?;
.with_context(|| ErrorKind::DataFetchFailed(key.clone()))?
.with_context(|| ErrorKind::KeyDoesNotExist(key.clone()))?;
// Fetch the file's history and convert the entries into
// the expected on-the-wire format.
let history = file
.history(mpath, length)
.map_err(move |e| e.into_http_error(ErrorKind::HistoryFetchFailed(key.clone())))
// XXX: Use async block because TryStreamExt::and_then
// requires the closure to return a TryFuture.
.and_then(|entry| async { WireHistoryEntry::try_from(entry).map_err(HttpError::e500) })
.err_into::<Error>()
.map_err(move |e| e.context(ErrorKind::HistoryFetchFailed(key.clone())))
.and_then(|entry| async { WireHistoryEntry::try_from(entry) })
.boxed();
Ok(history)

View File

@ -6,12 +6,12 @@
*/
use anyhow::Error;
use futures::TryStreamExt;
use futures::{Stream, TryStreamExt};
use gotham::state::{FromState, State};
use gotham_derive::{StateData, StaticResponseExtender};
use serde::Deserialize;
use edenapi_types::{DataEntry, DataResponse, TreeRequest};
use edenapi_types::{DataEntry, TreeRequest};
use gotham_ext::{error::HttpError, response::TryIntoResponse};
use mercurial_types::{HgManifestId, HgNodeHash};
use mononoke_api::{
@ -21,9 +21,9 @@ use mononoke_api::{
use types::Key;
use crate::context::ServerContext;
use crate::errors::{ErrorKind, MononokeErrorExt};
use crate::errors::ErrorKind;
use crate::middleware::RequestContext;
use crate::utils::{cbor_response, get_repo, parse_cbor_request, to_hg_path, to_mononoke_path};
use crate::utils::{cbor_stream, get_repo, parse_cbor_request, to_hg_path, to_mononoke_path};
#[derive(Debug, Deserialize, StateData, StaticResponseExtender)]
pub struct SubTreeParams {
@ -37,9 +37,8 @@ pub async fn subtree(state: &mut State) -> Result<impl TryIntoResponse, HttpErro
let repo = get_repo(&sctx, &rctx, &params.repo).await?;
let request = parse_cbor_request(state).await?;
let response = get_complete_subtree(&repo, request).await?;
cbor_response(response)
Ok(cbor_stream(get_complete_subtree(&repo, request)?))
}
/// Fetch all of the nodes for the subtree under the specified
@ -54,10 +53,10 @@ pub async fn subtree(state: &mut State) -> Result<impl TryIntoResponse, HttpErro
/// a fairly expensive way to request trees. When possible, clients
/// should prefer explicitly request individual tree nodes via the
/// more lightweight `/trees` endpoint.
async fn get_complete_subtree(
fn get_complete_subtree(
repo: &HgRepoContext,
request: TreeRequest,
) -> Result<DataResponse, HttpError> {
) -> Result<impl Stream<Item = Result<DataEntry, Error>>, HttpError> {
let path = to_mononoke_path(request.rootdir).map_err(HttpError::e400)?;
let root_nodes = request
@ -72,20 +71,13 @@ async fn get_complete_subtree(
.map(|hgid| HgManifestId::new(HgNodeHash::from(hgid)))
.collect::<Vec<_>>();
let entries = repo
let stream = repo
.trees_under_path(path, root_nodes, base_nodes, request.depth)
.map_err(|e| e.into_http_error(ErrorKind::SubtreeRequestFailed))
.and_then(move |(tree, path)| async {
// XXX: Even though this function isn't async, we need to
// use an async block because `and_then()` requires a Future.
data_entry_for_tree(tree, path).map_err(HttpError::e500)
})
// TODO(kulshrax): Change this method to return a stream
// instead of buffering the data entires.
.try_collect::<Vec<_>>()
.await?;
.err_into::<Error>()
.map_err(|e| e.context(ErrorKind::SubtreeRequestFailed))
.and_then(move |(tree, path)| async { data_entry_for_tree(tree, path) });
Ok(DataResponse::new(entries))
Ok(stream)
}
fn data_entry_for_tree(tree: HgTreeContext, path: MononokePath) -> Result<DataEntry, Error> {

View File

@ -9,18 +9,19 @@
use anyhow::{Context, Error};
use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use gotham::state::State;
use mime::Mime;
use once_cell::sync::Lazy;
use serde::{de::DeserializeOwned, Serialize};
use crate::errors::ErrorKind;
use gotham_ext::{
error::HttpError,
response::{BytesBody, TryIntoResponse},
response::{StreamBody, TryIntoResponse},
};
use crate::errors::ErrorKind;
use super::get_request_body;
static CBOR_MIME: Lazy<Mime> = Lazy::new(|| "application/cbor".parse().unwrap());
@ -35,9 +36,18 @@ pub fn to_cbor_bytes<S: Serialize>(s: S) -> Result<Bytes, Error> {
.context(ErrorKind::SerializationFailed)
}
pub fn cbor_response<S: Serialize>(s: S) -> Result<impl TryIntoResponse, HttpError> {
let bytes = to_cbor_bytes(s).map_err(HttpError::e500)?;
Ok(BytesBody::new(bytes, cbor_mime()))
/// Serialize each item of the input stream as CBOR and return
/// a streaming response. Note that although the input stream
/// can fail, the error type is `anyhow::Error` rather than
/// `HttpError` because the HTTP status code would have already
/// been sent at the time of the failure.
pub fn cbor_stream<S, T>(stream: S) -> impl TryIntoResponse
where
S: Stream<Item = Result<T, Error>> + Send + 'static,
T: Serialize + Send + 'static,
{
let byte_stream = stream.and_then(|item| async { to_cbor_bytes(item) });
StreamBody::new(byte_stream, cbor_mime())
}
pub async fn parse_cbor_request<R: DeserializeOwned>(state: &mut State) -> Result<R, HttpError> {

View File

@ -21,7 +21,7 @@ use crate::middleware::RequestContext;
pub mod cbor;
pub mod convert;
pub use cbor::{cbor_mime, cbor_response, parse_cbor_request, to_cbor_bytes};
pub use cbor::{cbor_mime, cbor_stream, parse_cbor_request, to_cbor_bytes};
pub use convert::{to_hg_path, to_mononoke_path, to_mpath};
pub async fn get_repo(

View File

@ -19,9 +19,10 @@ use std::path::PathBuf;
use anyhow::{anyhow, Result};
use serde::de::DeserializeOwned;
use serde_cbor::Deserializer;
use structopt::StructOpt;
use edenapi_types::{DataResponse, HistoryResponse, Validity, WireHistoryEntry};
use edenapi_types::{DataEntry, HistoryResponseChunk, Validity, WireHistoryEntry};
use types::{Key, Parents, RepoPathBuf};
#[derive(Debug, StructOpt)]
@ -107,8 +108,8 @@ fn cmd_data(args: DataArgs) -> Result<()> {
}
fn cmd_data_ls(args: DataLsArgs) -> Result<()> {
let response: DataResponse = read_input(args.input)?;
for entry in response.entries {
let entries: Vec<DataEntry> = read_input(args.input)?;
for entry in entries {
println!("{}", entry.key());
}
Ok(())
@ -119,9 +120,8 @@ fn cmd_data_cat(args: DataCatArgs) -> Result<()> {
let hgid = args.hgid.parse()?;
let key = Key::new(path, hgid);
let response: DataResponse = read_input(args.input)?;
let entry = response
.entries
let entries: Vec<DataEntry> = read_input(args.input)?;
let entry = entries
.into_iter()
.find(|entry| entry.key() == &key)
.ok_or_else(|| anyhow!("Key not found"))?;
@ -130,8 +130,8 @@ fn cmd_data_cat(args: DataCatArgs) -> Result<()> {
}
fn cmd_data_check(args: DataCheckArgs) -> Result<()> {
let response: DataResponse = read_input(args.input)?;
for entry in response.entries {
let entries: Vec<DataEntry> = read_input(args.input)?;
for entry in entries {
match entry.data().1 {
Validity::Valid => {}
Validity::Redacted => {
@ -156,10 +156,10 @@ fn cmd_history(args: HistoryArgs) -> Result<()> {
}
fn cmd_history_ls(args: HistLsArgs) -> Result<()> {
let response: HistoryResponse = read_input(args.input)?;
let chunks: Vec<HistoryResponseChunk> = read_input(args.input)?;
// Deduplicate and sort paths.
let mut paths = BTreeSet::new();
for chunk in response.chunks {
for chunk in chunks {
paths.insert(chunk.path.into_string());
}
for path in paths {
@ -169,8 +169,8 @@ fn cmd_history_ls(args: HistLsArgs) -> Result<()> {
}
fn cmd_history_show(args: HistShowArgs) -> Result<()> {
let response: HistoryResponse = read_input(args.input)?;
let map = make_history_map(response);
let chunks: Vec<HistoryResponseChunk> = read_input(args.input)?;
let map = make_history_map(chunks);
match args.file {
Some(ref path) => match map.get(path) {
Some(entries) => print_history(path, entries, args.count),
@ -185,9 +185,11 @@ fn cmd_history_show(args: HistShowArgs) -> Result<()> {
Ok(())
}
fn make_history_map(response: HistoryResponse) -> BTreeMap<String, Vec<WireHistoryEntry>> {
fn make_history_map(
chunks: impl IntoIterator<Item = HistoryResponseChunk>,
) -> BTreeMap<String, Vec<WireHistoryEntry>> {
let mut map = BTreeMap::new();
for chunk in response.chunks {
for chunk in chunks {
map.entry(chunk.path.into_string())
.or_insert_with(Vec::new)
.extend_from_slice(&chunk.entries);
@ -218,16 +220,20 @@ fn print_history(path: &str, entries: &[WireHistoryEntry], counts_only: bool) {
}
}
fn read_input<T: DeserializeOwned>(path: Option<PathBuf>) -> Result<T> {
fn read_input<T: DeserializeOwned>(path: Option<PathBuf>) -> Result<Vec<T>> {
Ok(match path {
Some(path) => {
eprintln!("Reading from file: {:?}", &path);
let file = File::open(&path)?;
serde_cbor::from_reader(file)?
Deserializer::from_reader(file)
.into_iter()
.collect::<Result<Vec<_>, _>>()?
}
None => {
eprintln!("Reading from stdin");
serde_cbor::from_reader(stdin())?
Deserializer::from_reader(stdin())
.into_iter()
.collect::<Result<Vec<_>, _>>()?
}
})
}