From cb1ef172ff2623ef44cf10b49fe8de7305986232 Mon Sep 17 00:00:00 2001 From: Arun Kulshreshtha Date: Mon, 1 Apr 2019 20:12:09 -0700 Subject: [PATCH] Add batch data fetching endpoints for Eden API Summary: Add new API server endpoints for performing batch fetches of data and history. The requests and responses are currently serialized as JSON, though the format can be trivially changed later to something like CBOR (or any other format supported by serde). Reviewed By: xavierd Differential Revision: D14573735 fbshipit-source-id: ac46aa8a55db44b695c53f9e32cc4ea2e17504c8 --- apiserver/src/actor/query.rs | 3 ++ apiserver/src/actor/repo.rs | 95 ++++++++++++++++++++++++++++++--- apiserver/src/actor/response.rs | 10 ++++ apiserver/src/main.rs | 65 +++++++++++++++++++++- mercurial_types/src/nodehash.rs | 6 +++ 5 files changed, 169 insertions(+), 10 deletions(-) diff --git a/apiserver/src/actor/query.rs b/apiserver/src/actor/query.rs index 4c1b24987b..d29d90fa0e 100644 --- a/apiserver/src/actor/query.rs +++ b/apiserver/src/actor/query.rs @@ -17,6 +17,7 @@ use apiserver_thrift::types::{ MononokeGetRawParams, MononokeGetTreeParams, MononokeIsAncestorParams, MononokeListDirectoryParams, MononokeRevision, }; +use types::{FileDataRequest, FileHistoryRequest}; use super::lfs::BatchRequest; @@ -70,6 +71,8 @@ pub enum MononokeRepoQuery { oid: String, body: Bytes, }, + EdenGetData(FileDataRequest), + EdenGetHistory(FileHistoryRequest), } pub struct MononokeQuery { diff --git a/apiserver/src/actor/repo.rs b/apiserver/src/actor/repo.rs index 2b2cae0dfa..80d26cb1dd 100644 --- a/apiserver/src/actor/repo.rs +++ b/apiserver/src/actor/repo.rs @@ -15,25 +15,30 @@ use cachelib::LruCachePool; use cloned::cloned; use context::CoreContext; use failure::Error; -use futures::future::{join_all, ok}; -use futures::Stream; -use futures::{Future, IntoFuture}; +use futures::{ + future::{join_all, ok}, + lazy, + stream::iter_ok, + Future, IntoFuture, Stream, +}; use futures_ext::{try_boxfuture, BoxFuture, FutureExt, StreamExt}; use http::uri::Uri; -use mercurial_types::manifest::Content; use mononoke_api; use remotefilelog; use scuba_ext::ScubaSampleBuilder; -use slog::Logger; +use slog::{debug, Logger}; use sshrelay::SshEnvVars; use tracing::TraceContext; use uuid::Uuid; -use mercurial_types::{HgChangesetId, HgFileNodeId, HgManifestId}; +use mercurial_types::{manifest::Content, HgChangesetId, HgFileNodeId, HgManifestId}; use metaconfig_types::RepoConfig; -use types::WireHistoryEntry; +use types::{ + FileDataRequest, FileDataResponse, FileHistoryRequest, FileHistoryResponse, Key, + WireHistoryEntry, +}; -use mononoke_types::{FileContents, RepositoryId}; +use mononoke_types::{FileContents, MPath, RepositoryId}; use reachabilityindex::ReachabilityIndex; use skiplist::{deserialize_skiplist_map, SkiplistIndex}; @@ -46,6 +51,7 @@ use super::{MononokeRepoQuery, MononokeRepoResponse, Revision}; pub struct MononokeRepo { repo: BlobRepo, + logger: Logger, skiplist_index: Arc, sha1_cache: Option, } @@ -99,6 +105,7 @@ impl MononokeRepo { }; skiplist_index.map(|skiplist_index| Self { repo, + logger, skiplist_index, sha1_cache, }) @@ -405,6 +412,74 @@ impl MononokeRepo { .boxify() } + fn eden_get_data( + &self, + ctx: CoreContext, + keys: Vec, + ) -> BoxFuture { + let mut fetches = Vec::new(); + for key in keys { + let filenode = HgFileNodeId::new(key.node().clone().into()); + let fut = self.repo.get_raw_hg_content(ctx.clone(), filenode, false); + + let logger = self.logger.clone(); + let fut = lazy(move || { + debug!(&logger, "fetching data for key: {}", &key); + fut.map(move |blob| (key, blob.into_inner())) + }); + + fetches.push(fut); + } + + iter_ok(fetches) + .buffer_unordered(10) + .collect() + .map(|files| MononokeRepoResponse::EdenGetData { + response: FileDataResponse::new(files), + }) + .from_err() + .boxify() + } + + fn eden_get_history( + &self, + ctx: CoreContext, + keys: Vec, + depth: Option, + ) -> BoxFuture { + let mut fetches = Vec::new(); + for key in keys { + let ctx = ctx.clone(); + let repo = self.repo.clone(); + let filenode = HgFileNodeId::new(key.node().clone().into()); + let logger = self.logger.clone(); + + let fut = MPath::new(key.name()) + .into_future() + .from_err() + .and_then(move |path| { + debug!(&logger, "fetching history for key: {}", &key); + remotefilelog::get_file_history(ctx, repo, filenode, path, depth) + .map(move |entry| { + let entry = WireHistoryEntry::from(entry); + (key.name().to_vec(), entry) + }) + .collect() + .from_err() + }); + + fetches.push(fut); + } + + iter_ok(fetches) + .buffer_unordered(10) + .collect() + .map(|history| MononokeRepoResponse::EdenGetHistory { + response: FileHistoryResponse::new(history.into_iter().flatten()), + }) + .boxify() + } + pub fn send_query( &self, ctx: CoreContext, @@ -437,6 +512,10 @@ impl MononokeRepo { lfs_url, } => self.lfs_batch(repo_name, req, lfs_url), UploadLargeFile { oid, body } => self.upload_large_file(ctx, oid, body), + EdenGetData(FileDataRequest { keys }) => self.eden_get_data(ctx, keys), + EdenGetHistory(FileHistoryRequest { keys, depth }) => { + self.eden_get_history(ctx, keys, depth) + } } } } diff --git a/apiserver/src/actor/response.rs b/apiserver/src/actor/response.rs index 72995acc78..64fc929bf0 100644 --- a/apiserver/src/actor/response.rs +++ b/apiserver/src/actor/response.rs @@ -10,6 +10,8 @@ use actix_web::{self, dev::BodyStream, Body, HttpRequest, HttpResponse, Json, Re use bytes::Bytes; use futures::Stream; +use types::{FileDataResponse, FileHistoryResponse}; + use super::lfs::BatchResponse; use super::model::{Changeset, Entry, EntryWithSizeAndContentHash}; @@ -50,6 +52,12 @@ pub enum MononokeRepoResponse { response: BatchResponse, }, UploadLargeFile {}, + EdenGetData { + response: FileDataResponse, + }, + EdenGetHistory { + response: FileHistoryResponse, + }, } fn binary_response(content: Bytes) -> HttpResponse { @@ -90,6 +98,8 @@ impl Responder for MononokeRepoResponse { DownloadLargeFile { content } => Ok(binary_response(content.into())), LfsBatch { response } => Json(response).respond_to(req), UploadLargeFile {} => Ok(HttpResponse::Ok().into()), + EdenGetData { response } => Json(response).respond_to(req), + EdenGetHistory { response } => Json(response).respond_to(req), } } } diff --git a/apiserver/src/main.rs b/apiserver/src/main.rs index deeb101c68..c69dd7defd 100644 --- a/apiserver/src/main.rs +++ b/apiserver/src/main.rs @@ -7,13 +7,15 @@ #![deny(warnings)] #![feature(try_from)] +use std::sync::Arc; + use actix_web::{http::header, server, App, HttpRequest, HttpResponse, Json, Path, State}; use bytes::Bytes; use clap::{value_t, Arg}; use failure::Fallible; use futures::Future; use http::uri::{Authority, Parts, PathAndQuery, Scheme, Uri}; -use std::sync::Arc; +use tokio::runtime::Runtime; use uuid::Uuid; use context::CoreContext; @@ -26,8 +28,8 @@ use slog::{info, o, Drain, Level, Logger}; use slog_glog_fmt::{kv_categorizer, kv_defaults, GlogFormat}; use slog_logview::LogViewDrain; use sshrelay::SshEnvVars; -use tokio::runtime::Runtime; use tracing::TraceContext; +use types::{FileDataRequest, FileHistoryRequest}; mod actor; mod errors; @@ -43,6 +45,7 @@ use crate::middleware::ScubaMiddleware; mod config { pub const SCUBA_TABLE: &str = "mononoke_apiserver"; + pub const MAX_PAYLOAD_SIZE: usize = 1024 * 1024 * 1024; } // Currently logging and scuba is handled using the middleware service @@ -336,6 +339,52 @@ fn upload_large_file( ) } +#[derive(Deserialize)] +struct EdenGetDataParams { + repo: String, +} + +fn eden_get_data( + (state, params, req): ( + State, + Path, + Json, + ), +) -> impl Future { + let params = params.into_inner(); + let req = req.into_inner(); + state.mononoke.send_query( + prepare_fake_ctx(&state), + MononokeQuery { + repo: params.repo, + kind: MononokeRepoQuery::EdenGetData(req), + }, + ) +} + +#[derive(Deserialize)] +struct EdenGetHistoryParams { + repo: String, +} + +fn eden_get_history( + (state, params, req): ( + State, + Path, + Json, + ), +) -> impl Future { + let params = params.into_inner(); + let req = req.into_inner(); + state.mononoke.send_query( + prepare_fake_ctx(&state), + MononokeQuery { + repo: params.repo, + kind: MononokeRepoQuery::EdenGetHistory(req), + }, + ) +} + fn setup_logger(debug: bool) -> Logger { let level = if debug { Level::Debug } else { Level::Info }; @@ -583,6 +632,18 @@ fn main() -> Fallible<()> { .resource("/lfs/upload/{oid}", |r| { r.method(http::Method::PUT).with_async(upload_large_file) }) + .resource("/eden/data", |r| { + r.method(http::Method::POST) + .with_async_config(eden_get_data, |cfg| { + (cfg.0).2.limit(config::MAX_PAYLOAD_SIZE); + }) + }) + .resource("/eden/history", |r| { + r.method(http::Method::POST) + .with_async_config(eden_get_history, |cfg| { + (cfg.0).2.limit(config::MAX_PAYLOAD_SIZE); + }) + }) }) }); diff --git a/mercurial_types/src/nodehash.rs b/mercurial_types/src/nodehash.rs index 9610318a5f..6e12205ce5 100644 --- a/mercurial_types/src/nodehash.rs +++ b/mercurial_types/src/nodehash.rs @@ -125,6 +125,12 @@ impl From for HgTypesNode { } } +impl From for HgNodeHash { + fn from(node: HgTypesNode) -> Self { + Self::from_bytes(node.as_ref()).unwrap() + } +} + struct StringVisitor; impl<'de> serde::de::Visitor<'de> for StringVisitor {