diff --git a/eden/mononoke/edenapi_server/src/handlers/data.rs b/eden/mononoke/edenapi_server/src/handlers/data.rs index 3d93561f63..e1175fc6cd 100644 --- a/eden/mononoke/edenapi_server/src/handlers/data.rs +++ b/eden/mononoke/edenapi_server/src/handlers/data.rs @@ -5,22 +5,25 @@ * GNU General Public License version 2. */ -use anyhow::Context; -use futures::{stream::FuturesUnordered, TryStreamExt}; +use anyhow::{Context, Error}; +use futures::{stream, Stream, StreamExt}; use gotham::state::{FromState, State}; use gotham_derive::{StateData, StaticResponseExtender}; use serde::Deserialize; -use edenapi_types::{DataEntry, DataRequest, DataResponse}; +use edenapi_types::{DataEntry, DataRequest}; use gotham_ext::{error::HttpError, response::TryIntoResponse}; use mercurial_types::{HgFileNodeId, HgManifestId, HgNodeHash}; use mononoke_api::hg::{HgDataContext, HgDataId, HgRepoContext}; use types::Key; use crate::context::ServerContext; -use crate::errors::{ErrorKind, MononokeErrorExt}; +use crate::errors::ErrorKind; use crate::middleware::RequestContext; -use crate::utils::{cbor_response, get_repo, parse_cbor_request}; +use crate::utils::{cbor_stream, get_repo, parse_cbor_request}; + +/// XXX: This number was chosen arbitrarily. +const MAX_CONCURRENT_FETCHES_PER_REQUEST: usize = 10; #[derive(Debug, Deserialize, StateData, StaticResponseExtender)] pub struct DataParams { @@ -46,42 +49,39 @@ async fn data(state: &mut State) -> Result(&repo, request).await?; - cbor_response(response) + Ok(cbor_stream(fetch_all::(repo, request))) } /// Fetch data for all of the requested keys concurrently. -async fn get_all_entries( - repo: &HgRepoContext, +fn fetch_all( + repo: HgRepoContext, request: DataRequest, -) -> Result { - let fetches = FuturesUnordered::new(); - for key in request.keys { - fetches.push(get_data_entry::(repo.clone(), key)); - } - let entries = fetches.try_collect::>().await?; +) -> impl Stream> { + let fetches = request + .keys + .into_iter() + .map(move |key| fetch::(repo.clone(), key)); - Ok(DataResponse::new(entries)) + stream::iter(fetches).buffer_unordered(MAX_CONCURRENT_FETCHES_PER_REQUEST) } /// Fetch requested data for a single key. /// Note that this function consumes the repo context in order /// to construct a file/tree context for the requested blob. -async fn get_data_entry( - repo: HgRepoContext, - key: Key, -) -> Result { +async fn fetch(repo: HgRepoContext, key: Key) -> Result { let id = ID::from_node_hash(HgNodeHash::from(key.hgid)); let ctx = id .context(repo) .await - .map_err(|e| e.into_http_error(ErrorKind::DataFetchFailed(key.clone())))? - .with_context(|| ErrorKind::KeyDoesNotExist(key.clone())) - .map_err(HttpError::e404)?; + .with_context(|| ErrorKind::DataFetchFailed(key.clone()))? + .with_context(|| ErrorKind::KeyDoesNotExist(key.clone()))?; - let data = ctx.content().await.map_err(HttpError::e500)?; + let data = ctx + .content() + .await + .with_context(|| ErrorKind::DataFetchFailed(key.clone()))?; let parents = ctx.hg_parents().into(); Ok(DataEntry::new(key, data, parents)) diff --git a/eden/mononoke/edenapi_server/src/handlers/history.rs b/eden/mononoke/edenapi_server/src/handlers/history.rs index 7ef44c4c83..e3ab9587a4 100644 --- a/eden/mononoke/edenapi_server/src/handlers/history.rs +++ b/eden/mononoke/edenapi_server/src/handlers/history.rs @@ -7,27 +7,31 @@ use std::convert::TryFrom; -use anyhow::Context; +use anyhow::{Context, Error}; use futures::{ - stream::{BoxStream, FuturesUnordered}, - StreamExt, TryStreamExt, + stream::{self, BoxStream}, + Stream, StreamExt, TryStreamExt, }; use gotham::state::{FromState, State}; use gotham_derive::{StateData, StaticResponseExtender}; use serde::Deserialize; -use edenapi_types::{HistoryRequest, HistoryResponse, HistoryResponseChunk, WireHistoryEntry}; +use cloned::cloned; +use edenapi_types::{HistoryRequest, HistoryResponseChunk, WireHistoryEntry}; use gotham_ext::{error::HttpError, response::TryIntoResponse}; use mercurial_types::{HgFileNodeId, HgNodeHash}; use mononoke_api::hg::HgRepoContext; use types::Key; use crate::context::ServerContext; -use crate::errors::{ErrorKind, MononokeErrorExt}; +use crate::errors::ErrorKind; use crate::middleware::RequestContext; -use crate::utils::{cbor_response, get_repo, parse_cbor_request, to_mpath}; +use crate::utils::{cbor_stream, get_repo, parse_cbor_request, to_mpath}; -type HistoryStream = BoxStream<'static, Result>; +type HistoryStream = BoxStream<'static, Result>; + +/// XXX: This number was chosen arbitrarily. +const MAX_CONCURRENT_FETCHES_PER_REQUEST: usize = 10; #[derive(Debug, Deserialize, StateData, StaticResponseExtender)] pub struct HistoryParams { @@ -41,67 +45,55 @@ pub async fn history(state: &mut State) -> Result Result { - let chunk_stream = FuturesUnordered::new(); - for key in request.keys { - // Save the path for inclusion in the response. - let path = key.path.clone(); +) -> impl Stream> { + let HistoryRequest { keys, length } = request; - // Build a stream of history entries for a single file. - let entry_stream = single_key_history(repo, key, request.length).await?; - - // Build a future that buffers the stream and resolves - // to a HistoryResponseChunk for this file. - let chunk_fut = async { - let entries = entry_stream.try_collect().await?; + let fetches = keys.into_iter().map(move |key| { + // Construct a Future that buffers the full history for this key. + // This should be OK since the history entries are relatively + // small, so unless the history is extremely long, the total + // amount of buffered data should be reasonable. + cloned!(repo); + async move { + let path = key.path.clone(); + let stream = fetch_history_for_key(repo, key, length).await?; + let entries = stream.try_collect().await?; Ok(HistoryResponseChunk { path, entries }) - }; + } + }); - chunk_stream.push(chunk_fut); - } - - // TODO(kulshrax): Don't buffer the results here. - let chunks = chunk_stream.try_collect().await?; - let response = HistoryResponse { chunks }; - - Ok(response) + stream::iter(fetches).buffer_unordered(MAX_CONCURRENT_FETCHES_PER_REQUEST) } -async fn single_key_history( - repo: &HgRepoContext, +async fn fetch_history_for_key( + repo: HgRepoContext, key: Key, length: Option, -) -> Result { +) -> Result { let filenode_id = HgFileNodeId::new(HgNodeHash::from(key.hgid)); - let mpath = to_mpath(&key.path) - .map_err(HttpError::e400)? - .context(ErrorKind::UnexpectedEmptyPath) - .map_err(HttpError::e400)?; + let mpath = to_mpath(&key.path)?.context(ErrorKind::UnexpectedEmptyPath)?; let file = repo .file(filenode_id) .await - .map_err(|e| e.into_http_error(ErrorKind::DataFetchFailed(key.clone())))? - .with_context(|| ErrorKind::KeyDoesNotExist(key.clone())) - .map_err(HttpError::e404)?; + .with_context(|| ErrorKind::DataFetchFailed(key.clone()))? + .with_context(|| ErrorKind::KeyDoesNotExist(key.clone()))?; // Fetch the file's history and convert the entries into // the expected on-the-wire format. let history = file .history(mpath, length) - .map_err(move |e| e.into_http_error(ErrorKind::HistoryFetchFailed(key.clone()))) - // XXX: Use async block because TryStreamExt::and_then - // requires the closure to return a TryFuture. - .and_then(|entry| async { WireHistoryEntry::try_from(entry).map_err(HttpError::e500) }) + .err_into::() + .map_err(move |e| e.context(ErrorKind::HistoryFetchFailed(key.clone()))) + .and_then(|entry| async { WireHistoryEntry::try_from(entry) }) .boxed(); Ok(history) diff --git a/eden/mononoke/edenapi_server/src/handlers/subtree.rs b/eden/mononoke/edenapi_server/src/handlers/subtree.rs index cf5a4a871c..8307ed80d8 100644 --- a/eden/mononoke/edenapi_server/src/handlers/subtree.rs +++ b/eden/mononoke/edenapi_server/src/handlers/subtree.rs @@ -6,12 +6,12 @@ */ use anyhow::Error; -use futures::TryStreamExt; +use futures::{Stream, TryStreamExt}; use gotham::state::{FromState, State}; use gotham_derive::{StateData, StaticResponseExtender}; use serde::Deserialize; -use edenapi_types::{DataEntry, DataResponse, TreeRequest}; +use edenapi_types::{DataEntry, TreeRequest}; use gotham_ext::{error::HttpError, response::TryIntoResponse}; use mercurial_types::{HgManifestId, HgNodeHash}; use mononoke_api::{ @@ -21,9 +21,9 @@ use mononoke_api::{ use types::Key; use crate::context::ServerContext; -use crate::errors::{ErrorKind, MononokeErrorExt}; +use crate::errors::ErrorKind; use crate::middleware::RequestContext; -use crate::utils::{cbor_response, get_repo, parse_cbor_request, to_hg_path, to_mononoke_path}; +use crate::utils::{cbor_stream, get_repo, parse_cbor_request, to_hg_path, to_mononoke_path}; #[derive(Debug, Deserialize, StateData, StaticResponseExtender)] pub struct SubTreeParams { @@ -37,9 +37,8 @@ pub async fn subtree(state: &mut State) -> Result Result Result { +) -> Result>, HttpError> { let path = to_mononoke_path(request.rootdir).map_err(HttpError::e400)?; let root_nodes = request @@ -72,20 +71,13 @@ async fn get_complete_subtree( .map(|hgid| HgManifestId::new(HgNodeHash::from(hgid))) .collect::>(); - let entries = repo + let stream = repo .trees_under_path(path, root_nodes, base_nodes, request.depth) - .map_err(|e| e.into_http_error(ErrorKind::SubtreeRequestFailed)) - .and_then(move |(tree, path)| async { - // XXX: Even though this function isn't async, we need to - // use an async block because `and_then()` requires a Future. - data_entry_for_tree(tree, path).map_err(HttpError::e500) - }) - // TODO(kulshrax): Change this method to return a stream - // instead of buffering the data entires. - .try_collect::>() - .await?; + .err_into::() + .map_err(|e| e.context(ErrorKind::SubtreeRequestFailed)) + .and_then(move |(tree, path)| async { data_entry_for_tree(tree, path) }); - Ok(DataResponse::new(entries)) + Ok(stream) } fn data_entry_for_tree(tree: HgTreeContext, path: MononokePath) -> Result { diff --git a/eden/mononoke/edenapi_server/src/utils/cbor.rs b/eden/mononoke/edenapi_server/src/utils/cbor.rs index beadbb6e50..470a5c4796 100644 --- a/eden/mononoke/edenapi_server/src/utils/cbor.rs +++ b/eden/mononoke/edenapi_server/src/utils/cbor.rs @@ -9,18 +9,19 @@ use anyhow::{Context, Error}; use bytes::Bytes; +use futures::{Stream, TryStreamExt}; use gotham::state::State; use mime::Mime; use once_cell::sync::Lazy; use serde::{de::DeserializeOwned, Serialize}; -use crate::errors::ErrorKind; - use gotham_ext::{ error::HttpError, - response::{BytesBody, TryIntoResponse}, + response::{StreamBody, TryIntoResponse}, }; +use crate::errors::ErrorKind; + use super::get_request_body; static CBOR_MIME: Lazy = Lazy::new(|| "application/cbor".parse().unwrap()); @@ -35,9 +36,18 @@ pub fn to_cbor_bytes(s: S) -> Result { .context(ErrorKind::SerializationFailed) } -pub fn cbor_response(s: S) -> Result { - let bytes = to_cbor_bytes(s).map_err(HttpError::e500)?; - Ok(BytesBody::new(bytes, cbor_mime())) +/// Serialize each item of the input stream as CBOR and return +/// a streaming response. Note that although the input stream +/// can fail, the error type is `anyhow::Error` rather than +/// `HttpError` because the HTTP status code would have already +/// been sent at the time of the failure. +pub fn cbor_stream(stream: S) -> impl TryIntoResponse +where + S: Stream> + Send + 'static, + T: Serialize + Send + 'static, +{ + let byte_stream = stream.and_then(|item| async { to_cbor_bytes(item) }); + StreamBody::new(byte_stream, cbor_mime()) } pub async fn parse_cbor_request(state: &mut State) -> Result { diff --git a/eden/mononoke/edenapi_server/src/utils/mod.rs b/eden/mononoke/edenapi_server/src/utils/mod.rs index acdbb503ef..ea66fb244d 100644 --- a/eden/mononoke/edenapi_server/src/utils/mod.rs +++ b/eden/mononoke/edenapi_server/src/utils/mod.rs @@ -21,7 +21,7 @@ use crate::middleware::RequestContext; pub mod cbor; pub mod convert; -pub use cbor::{cbor_mime, cbor_response, parse_cbor_request, to_cbor_bytes}; +pub use cbor::{cbor_mime, cbor_stream, parse_cbor_request, to_cbor_bytes}; pub use convert::{to_hg_path, to_mononoke_path, to_mpath}; pub async fn get_repo( diff --git a/eden/scm/lib/edenapi/tools/read_res/src/main.rs b/eden/scm/lib/edenapi/tools/read_res/src/main.rs index 5927dfe178..196d867c50 100644 --- a/eden/scm/lib/edenapi/tools/read_res/src/main.rs +++ b/eden/scm/lib/edenapi/tools/read_res/src/main.rs @@ -19,9 +19,10 @@ use std::path::PathBuf; use anyhow::{anyhow, Result}; use serde::de::DeserializeOwned; +use serde_cbor::Deserializer; use structopt::StructOpt; -use edenapi_types::{DataResponse, HistoryResponse, Validity, WireHistoryEntry}; +use edenapi_types::{DataEntry, HistoryResponseChunk, Validity, WireHistoryEntry}; use types::{Key, Parents, RepoPathBuf}; #[derive(Debug, StructOpt)] @@ -107,8 +108,8 @@ fn cmd_data(args: DataArgs) -> Result<()> { } fn cmd_data_ls(args: DataLsArgs) -> Result<()> { - let response: DataResponse = read_input(args.input)?; - for entry in response.entries { + let entries: Vec = read_input(args.input)?; + for entry in entries { println!("{}", entry.key()); } Ok(()) @@ -119,9 +120,8 @@ fn cmd_data_cat(args: DataCatArgs) -> Result<()> { let hgid = args.hgid.parse()?; let key = Key::new(path, hgid); - let response: DataResponse = read_input(args.input)?; - let entry = response - .entries + let entries: Vec = read_input(args.input)?; + let entry = entries .into_iter() .find(|entry| entry.key() == &key) .ok_or_else(|| anyhow!("Key not found"))?; @@ -130,8 +130,8 @@ fn cmd_data_cat(args: DataCatArgs) -> Result<()> { } fn cmd_data_check(args: DataCheckArgs) -> Result<()> { - let response: DataResponse = read_input(args.input)?; - for entry in response.entries { + let entries: Vec = read_input(args.input)?; + for entry in entries { match entry.data().1 { Validity::Valid => {} Validity::Redacted => { @@ -156,10 +156,10 @@ fn cmd_history(args: HistoryArgs) -> Result<()> { } fn cmd_history_ls(args: HistLsArgs) -> Result<()> { - let response: HistoryResponse = read_input(args.input)?; + let chunks: Vec = read_input(args.input)?; // Deduplicate and sort paths. let mut paths = BTreeSet::new(); - for chunk in response.chunks { + for chunk in chunks { paths.insert(chunk.path.into_string()); } for path in paths { @@ -169,8 +169,8 @@ fn cmd_history_ls(args: HistLsArgs) -> Result<()> { } fn cmd_history_show(args: HistShowArgs) -> Result<()> { - let response: HistoryResponse = read_input(args.input)?; - let map = make_history_map(response); + let chunks: Vec = read_input(args.input)?; + let map = make_history_map(chunks); match args.file { Some(ref path) => match map.get(path) { Some(entries) => print_history(path, entries, args.count), @@ -185,9 +185,11 @@ fn cmd_history_show(args: HistShowArgs) -> Result<()> { Ok(()) } -fn make_history_map(response: HistoryResponse) -> BTreeMap> { +fn make_history_map( + chunks: impl IntoIterator, +) -> BTreeMap> { let mut map = BTreeMap::new(); - for chunk in response.chunks { + for chunk in chunks { map.entry(chunk.path.into_string()) .or_insert_with(Vec::new) .extend_from_slice(&chunk.entries); @@ -218,16 +220,20 @@ fn print_history(path: &str, entries: &[WireHistoryEntry], counts_only: bool) { } } -fn read_input(path: Option) -> Result { +fn read_input(path: Option) -> Result> { Ok(match path { Some(path) => { eprintln!("Reading from file: {:?}", &path); let file = File::open(&path)?; - serde_cbor::from_reader(file)? + Deserializer::from_reader(file) + .into_iter() + .collect::, _>>()? } None => { eprintln!("Reading from stdin"); - serde_cbor::from_reader(stdin())? + Deserializer::from_reader(stdin()) + .into_iter() + .collect::, _>>()? } }) }