Add batch data fetching endpoints for Eden API

Summary: Add new API server endpoints for performing batch fetches of data and history. The requests and responses are currently serialized as JSON, though the format can be trivially changed later to something like CBOR (or any other format supported by serde).

Reviewed By: xavierd

Differential Revision: D14573735

fbshipit-source-id: ac46aa8a55db44b695c53f9e32cc4ea2e17504c8
This commit is contained in:
Arun Kulshreshtha 2019-04-01 20:12:09 -07:00 committed by Facebook Github Bot
parent 54ca1d89cc
commit cb1ef172ff
5 changed files with 169 additions and 10 deletions

View File

@ -17,6 +17,7 @@ use apiserver_thrift::types::{
MononokeGetRawParams, MononokeGetTreeParams, MononokeIsAncestorParams,
MononokeListDirectoryParams, MononokeRevision,
};
use types::{FileDataRequest, FileHistoryRequest};
use super::lfs::BatchRequest;
@ -70,6 +71,8 @@ pub enum MononokeRepoQuery {
oid: String,
body: Bytes,
},
EdenGetData(FileDataRequest),
EdenGetHistory(FileHistoryRequest),
}
pub struct MononokeQuery {

View File

@ -15,25 +15,30 @@ use cachelib::LruCachePool;
use cloned::cloned;
use context::CoreContext;
use failure::Error;
use futures::future::{join_all, ok};
use futures::Stream;
use futures::{Future, IntoFuture};
use futures::{
future::{join_all, ok},
lazy,
stream::iter_ok,
Future, IntoFuture, Stream,
};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt, StreamExt};
use http::uri::Uri;
use mercurial_types::manifest::Content;
use mononoke_api;
use remotefilelog;
use scuba_ext::ScubaSampleBuilder;
use slog::Logger;
use slog::{debug, Logger};
use sshrelay::SshEnvVars;
use tracing::TraceContext;
use uuid::Uuid;
use mercurial_types::{HgChangesetId, HgFileNodeId, HgManifestId};
use mercurial_types::{manifest::Content, HgChangesetId, HgFileNodeId, HgManifestId};
use metaconfig_types::RepoConfig;
use types::WireHistoryEntry;
use types::{
FileDataRequest, FileDataResponse, FileHistoryRequest, FileHistoryResponse, Key,
WireHistoryEntry,
};
use mononoke_types::{FileContents, RepositoryId};
use mononoke_types::{FileContents, MPath, RepositoryId};
use reachabilityindex::ReachabilityIndex;
use skiplist::{deserialize_skiplist_map, SkiplistIndex};
@ -46,6 +51,7 @@ use super::{MononokeRepoQuery, MononokeRepoResponse, Revision};
pub struct MononokeRepo {
repo: BlobRepo,
logger: Logger,
skiplist_index: Arc<SkiplistIndex>,
sha1_cache: Option<LruCachePool>,
}
@ -99,6 +105,7 @@ impl MononokeRepo {
};
skiplist_index.map(|skiplist_index| Self {
repo,
logger,
skiplist_index,
sha1_cache,
})
@ -405,6 +412,74 @@ impl MononokeRepo {
.boxify()
}
fn eden_get_data(
&self,
ctx: CoreContext,
keys: Vec<Key>,
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
let mut fetches = Vec::new();
for key in keys {
let filenode = HgFileNodeId::new(key.node().clone().into());
let fut = self.repo.get_raw_hg_content(ctx.clone(), filenode, false);
let logger = self.logger.clone();
let fut = lazy(move || {
debug!(&logger, "fetching data for key: {}", &key);
fut.map(move |blob| (key, blob.into_inner()))
});
fetches.push(fut);
}
iter_ok(fetches)
.buffer_unordered(10)
.collect()
.map(|files| MononokeRepoResponse::EdenGetData {
response: FileDataResponse::new(files),
})
.from_err()
.boxify()
}
fn eden_get_history(
&self,
ctx: CoreContext,
keys: Vec<Key>,
depth: Option<u32>,
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
let mut fetches = Vec::new();
for key in keys {
let ctx = ctx.clone();
let repo = self.repo.clone();
let filenode = HgFileNodeId::new(key.node().clone().into());
let logger = self.logger.clone();
let fut = MPath::new(key.name())
.into_future()
.from_err()
.and_then(move |path| {
debug!(&logger, "fetching history for key: {}", &key);
remotefilelog::get_file_history(ctx, repo, filenode, path, depth)
.map(move |entry| {
let entry = WireHistoryEntry::from(entry);
(key.name().to_vec(), entry)
})
.collect()
.from_err()
});
fetches.push(fut);
}
iter_ok(fetches)
.buffer_unordered(10)
.collect()
.map(|history| MononokeRepoResponse::EdenGetHistory {
response: FileHistoryResponse::new(history.into_iter().flatten()),
})
.boxify()
}
pub fn send_query(
&self,
ctx: CoreContext,
@ -437,6 +512,10 @@ impl MononokeRepo {
lfs_url,
} => self.lfs_batch(repo_name, req, lfs_url),
UploadLargeFile { oid, body } => self.upload_large_file(ctx, oid, body),
EdenGetData(FileDataRequest { keys }) => self.eden_get_data(ctx, keys),
EdenGetHistory(FileHistoryRequest { keys, depth }) => {
self.eden_get_history(ctx, keys, depth)
}
}
}
}

View File

@ -10,6 +10,8 @@ use actix_web::{self, dev::BodyStream, Body, HttpRequest, HttpResponse, Json, Re
use bytes::Bytes;
use futures::Stream;
use types::{FileDataResponse, FileHistoryResponse};
use super::lfs::BatchResponse;
use super::model::{Changeset, Entry, EntryWithSizeAndContentHash};
@ -50,6 +52,12 @@ pub enum MononokeRepoResponse {
response: BatchResponse,
},
UploadLargeFile {},
EdenGetData {
response: FileDataResponse,
},
EdenGetHistory {
response: FileHistoryResponse,
},
}
fn binary_response(content: Bytes) -> HttpResponse {
@ -90,6 +98,8 @@ impl Responder for MononokeRepoResponse {
DownloadLargeFile { content } => Ok(binary_response(content.into())),
LfsBatch { response } => Json(response).respond_to(req),
UploadLargeFile {} => Ok(HttpResponse::Ok().into()),
EdenGetData { response } => Json(response).respond_to(req),
EdenGetHistory { response } => Json(response).respond_to(req),
}
}
}

View File

@ -7,13 +7,15 @@
#![deny(warnings)]
#![feature(try_from)]
use std::sync::Arc;
use actix_web::{http::header, server, App, HttpRequest, HttpResponse, Json, Path, State};
use bytes::Bytes;
use clap::{value_t, Arg};
use failure::Fallible;
use futures::Future;
use http::uri::{Authority, Parts, PathAndQuery, Scheme, Uri};
use std::sync::Arc;
use tokio::runtime::Runtime;
use uuid::Uuid;
use context::CoreContext;
@ -26,8 +28,8 @@ use slog::{info, o, Drain, Level, Logger};
use slog_glog_fmt::{kv_categorizer, kv_defaults, GlogFormat};
use slog_logview::LogViewDrain;
use sshrelay::SshEnvVars;
use tokio::runtime::Runtime;
use tracing::TraceContext;
use types::{FileDataRequest, FileHistoryRequest};
mod actor;
mod errors;
@ -43,6 +45,7 @@ use crate::middleware::ScubaMiddleware;
mod config {
pub const SCUBA_TABLE: &str = "mononoke_apiserver";
pub const MAX_PAYLOAD_SIZE: usize = 1024 * 1024 * 1024;
}
// Currently logging and scuba is handled using the middleware service
@ -336,6 +339,52 @@ fn upload_large_file(
)
}
#[derive(Deserialize)]
struct EdenGetDataParams {
repo: String,
}
fn eden_get_data(
(state, params, req): (
State<HttpServerState>,
Path<EdenGetDataParams>,
Json<FileDataRequest>,
),
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
let params = params.into_inner();
let req = req.into_inner();
state.mononoke.send_query(
prepare_fake_ctx(&state),
MononokeQuery {
repo: params.repo,
kind: MononokeRepoQuery::EdenGetData(req),
},
)
}
#[derive(Deserialize)]
struct EdenGetHistoryParams {
repo: String,
}
fn eden_get_history(
(state, params, req): (
State<HttpServerState>,
Path<EdenGetHistoryParams>,
Json<FileHistoryRequest>,
),
) -> impl Future<Item = MononokeRepoResponse, Error = ErrorKind> {
let params = params.into_inner();
let req = req.into_inner();
state.mononoke.send_query(
prepare_fake_ctx(&state),
MononokeQuery {
repo: params.repo,
kind: MononokeRepoQuery::EdenGetHistory(req),
},
)
}
fn setup_logger(debug: bool) -> Logger {
let level = if debug { Level::Debug } else { Level::Info };
@ -583,6 +632,18 @@ fn main() -> Fallible<()> {
.resource("/lfs/upload/{oid}", |r| {
r.method(http::Method::PUT).with_async(upload_large_file)
})
.resource("/eden/data", |r| {
r.method(http::Method::POST)
.with_async_config(eden_get_data, |cfg| {
(cfg.0).2.limit(config::MAX_PAYLOAD_SIZE);
})
})
.resource("/eden/history", |r| {
r.method(http::Method::POST)
.with_async_config(eden_get_history, |cfg| {
(cfg.0).2.limit(config::MAX_PAYLOAD_SIZE);
})
})
})
});

View File

@ -125,6 +125,12 @@ impl From<HgNodeHash> for HgTypesNode {
}
}
impl From<HgTypesNode> for HgNodeHash {
fn from(node: HgTypesNode) -> Self {
Self::from_bytes(node.as_ref()).unwrap()
}
}
struct StringVisitor;
impl<'de> serde::de::Visitor<'de> for StringVisitor {