mononoke: new get_file_history method in API

Summary:
Simple implementation of the `file_history` using batched Unodes. It only contains 2 additional parameters: `skip` and `limit`.
Current implementation is very simple: it queries batched history only once and returns result based on skip, limit and first history batch.

Reviewed By: StanislavGlebik

Differential Revision: D17262719

fbshipit-source-id: 054944ec6d1ea0c75d879d33798e20720b35ae1a
This commit is contained in:
Aida Getoeva 2019-09-11 15:51:56 -07:00 committed by Facebook Github Bot
parent 16f0f7a6e4
commit d23e2b2348
8 changed files with 386 additions and 40 deletions

View File

@ -11,10 +11,11 @@ use futures_ext::BoxFuture;
use apiserver_thrift::client::{make_MononokeAPIService, MononokeAPIService};
use apiserver_thrift::types::{
MononokeBlob, MononokeBranches, MononokeChangeset, MononokeDirectory, MononokeDirectoryUnodes,
MononokeGetBlobParams, MononokeGetBranchesParams, MononokeGetChangesetParams,
MononokeGetLastCommitOnPathParams, MononokeGetRawParams, MononokeGetTreeParams,
MononokeIsAncestorParams, MononokeListDirectoryParams, MononokeListDirectoryUnodesParams,
MononokeNodeHash, MononokeRevision, MononokeTreeHash,
MononokeFileHistory, MononokeGetBlobParams, MononokeGetBranchesParams,
MononokeGetChangesetParams, MononokeGetFileHistoryParams, MononokeGetLastCommitOnPathParams,
MononokeGetRawParams, MononokeGetTreeParams, MononokeIsAncestorParams,
MononokeListDirectoryParams, MononokeListDirectoryUnodesParams, MononokeNodeHash,
MononokeRevision, MononokeTreeHash,
};
use srclient::SRChannelBuilder;
@ -69,6 +70,22 @@ impl MononokeAPIClient {
})
}
pub fn get_file_history(
&self,
revision: String,
path: String,
limit: i32,
skip: i32,
) -> BoxFuture<MononokeFileHistory, failure_ext::Error> {
self.inner.get_file_history(&MononokeGetFileHistoryParams {
repo: self.repo.clone(),
revision: MononokeRevision::commit_hash(revision),
path: path.into_bytes(),
limit,
skip,
})
}
pub fn get_last_commit_on_path(
&self,
revision: String,

View File

@ -7,6 +7,7 @@
use std::string::String;
use clap::{App, Arg, ArgMatches, SubCommand};
use cmdlib::args;
use futures::{Future, IntoFuture};
use futures_ext::{BoxFuture, FutureExt};
@ -53,6 +54,28 @@ fn get_branches(client: MononokeAPIClient) -> BoxFuture<(), ()> {
.boxify()
}
fn get_file_history(client: MononokeAPIClient, matches: &ArgMatches<'_>) -> BoxFuture<(), ()> {
let path = matches.value_of("path").expect("must provide path");
let revision = matches.value_of("revision").expect("must provide revision");
let limit = args::get_i32(&matches, "limit", 100);
let skip = args::get_i32(&matches, "skip", 0);
client
.get_file_history(revision.to_string(), path.to_string(), limit, skip)
.and_then(|r| {
Ok(
serde_json::to_string(&r)
.unwrap_or("Error converting response to json".to_string()),
)
})
.map_err(|e| eprintln!("error: {}", e))
.map(|res| {
println!("{}", res);
})
.boxify()
}
fn get_last_commit_on_path(
client: MononokeAPIClient,
matches: &ArgMatches<'_>,
@ -212,6 +235,40 @@ fn main() -> Result<(), ()> {
),
)
.subcommand(SubCommand::with_name("get_branches").about("get all branches"))
.subcommand(
SubCommand::with_name("get_file_history")
.about("get history of changes for the given file")
.arg(
Arg::with_name("revision")
.short("c")
.long("revision")
.value_name("HASH")
.help("hash/bookmark of the revision you want to query")
.required(true),
)
.arg(
Arg::with_name("path")
.short("p")
.long("path")
.value_name("PATH")
.help("path to the file or directory")
.required(true),
)
.arg(
Arg::with_name("limit")
.short("l")
.value_name("NUM")
.help("number of history commits to query")
.required(true),
)
.arg(
Arg::with_name("skip")
.short("s")
.value_name("NUM")
.help("number of the latest commits in history to skip")
.default_value("0"),
),
)
.subcommand(
SubCommand::with_name("get_last_commit_on_path")
.about("get the last commit cantaining changes on the path")
@ -330,6 +387,8 @@ fn main() -> Result<(), ()> {
get_changeset(client, matches)
} else if let Some(_) = matches.subcommand_matches("get_branches") {
get_branches(client)
} else if let Some(matches) = matches.subcommand_matches("get_file_history") {
get_file_history(client, matches)
} else if let Some(matches) = matches.subcommand_matches("get_last_commit_on_path") {
get_last_commit_on_path(client, matches)
} else if let Some(matches) = matches.subcommand_matches("list_directory") {

View File

@ -54,6 +54,14 @@ struct MononokeGetBranchesParams{
1: string repo,
}
struct MononokeGetFileHistoryParams{
1: string repo,
2: MononokeRevision revision,
3: binary path,
4: i32 limit,
5: i32 skip,
}
struct MononokeGetLastCommitOnPathParams{
1: string repo,
2: MononokeRevision revision,
@ -114,6 +122,10 @@ struct MononokeFile {
5: optional string content_sha1,
}
struct MononokeFileHistory {
1: list<MononokeChangeset> history,
}
struct MononokeDirectoryUnodes {
1: list<MononokeEntryUnodes> entries,
}
@ -144,6 +156,9 @@ service MononokeAPIService extends source_control.SourceControlService {
MononokeBranches get_branches(1: MononokeGetBranchesParams params)
throws (1: MononokeAPIException e),
MononokeFileHistory get_file_history(1: MononokeGetFileHistoryParams params)
throws (1: MononokeAPIException e),
MononokeChangeset get_last_commit_on_path(1: MononokeGetLastCommitOnPathParams params)
throws (1: MononokeAPIException e),

View File

@ -13,9 +13,9 @@ use serde_derive::Serialize;
use apiserver_thrift::types::{
MononokeGetBlobParams, MononokeGetBranchesParams, MononokeGetChangesetParams,
MononokeGetLastCommitOnPathParams, MononokeGetRawParams, MononokeGetTreeParams,
MononokeIsAncestorParams, MononokeListDirectoryParams, MononokeListDirectoryUnodesParams,
MononokeRevision,
MononokeGetFileHistoryParams, MononokeGetLastCommitOnPathParams, MononokeGetRawParams,
MononokeGetTreeParams, MononokeIsAncestorParams, MononokeListDirectoryParams,
MononokeListDirectoryUnodesParams, MononokeRevision,
};
use types::api::{DataRequest, HistoryRequest, TreeRequest};
@ -51,6 +51,12 @@ pub enum MononokeRepoQuery {
revision: Revision,
},
GetBranches,
GetFileHistory {
path: String,
revision: Revision,
limit: i32,
skip: i32,
},
GetLastCommitOnPath {
path: String,
revision: Revision,
@ -121,6 +127,26 @@ impl TryFrom<MononokeGetBranchesParams> for MononokeQuery {
}
}
impl TryFrom<MononokeGetFileHistoryParams> for MononokeQuery {
type Error = Error;
fn try_from(params: MononokeGetFileHistoryParams) -> Result<MononokeQuery, Self::Error> {
let repo = params.repo;
let path = String::from_utf8(params.path)?;
let limit = params.limit;
let skip = params.skip;
params.revision.try_into().map(|rev| MononokeQuery {
repo,
kind: MononokeRepoQuery::GetFileHistory {
path,
revision: rev,
limit,
skip,
},
})
}
}
impl TryFrom<MononokeGetLastCommitOnPathParams> for MononokeQuery {
type Error = Error;

View File

@ -6,6 +6,7 @@
use std::time::{Duration, Instant};
use std::{
cmp,
collections::{BTreeMap, HashMap},
convert::{TryFrom, TryInto},
sync::{Arc, RwLock},
@ -20,10 +21,11 @@ use context::CoreContext;
use derive_unode_manifest::derived_data_unodes::{RootUnodeManifestId, RootUnodeManifestMapping};
use derived_data::BonsaiDerived;
use failure::Error;
use fastlog::{prefetch_history, RootFastlog, RootFastlogMapping};
use futures::{
future::{self, err, join_all, ok},
lazy,
stream::{iter_ok, repeat, FuturesUnordered},
stream::{futures_ordered, iter_ok, repeat, FuturesUnordered},
Future, IntoFuture, Stream,
};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt, StreamExt};
@ -34,7 +36,9 @@ use repo_client::gettreepack_entries;
use slog::{debug, Logger};
use time_ext::DurationExt;
use mercurial_types::{manifest::Content, HgChangesetId, HgEntry, HgFileNodeId, HgManifestId};
use mercurial_types::{
blobs::HgBlobChangeset, manifest::Content, HgChangesetId, HgEntry, HgFileNodeId, HgManifestId,
};
use metaconfig_types::{CommonConfig, RepoConfig};
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
use stats::{define_stats, Timeseries};
@ -43,7 +47,10 @@ use types::{
DataEntry, Key, RepoPathBuf, WireHistoryEntry,
};
use mononoke_types::{ChangesetId, FileUnodeId, MPath, ManifestUnodeId, RepositoryId};
use mononoke_types::{
fastlog_batch::max_entries_in_fastlog_batch, ChangesetId, FileUnodeId, MPath, ManifestUnodeId,
RepositoryId,
};
use reachabilityindex::ReachabilityIndex;
use skiplist::{deserialize_skiplist_index, SkiplistIndex};
@ -64,6 +71,7 @@ define_stats! {
get_tree: timeseries(RATE, SUM),
get_changeset: timeseries(RATE, SUM),
get_branches: timeseries(RATE, SUM),
get_file_history: timeseries(RATE, SUM),
get_last_commit_on_path: timeseries(RATE, SUM),
is_ancestor: timeseries(RATE, SUM),
eden_get_data: timeseries(RATE, SUM),
@ -233,7 +241,7 @@ impl MononokeRepo {
}
}
fn get_root_manifest_unode_entry(
fn get_unode_entry(
&self,
ctx: CoreContext,
revision: Revision,
@ -274,6 +282,37 @@ impl MononokeRepo {
.boxify()
}
fn do_get_last_commit_on_path(
&self,
ctx: CoreContext,
revision: Revision,
path: String,
) -> BoxFuture<HgBlobChangeset, ErrorKind> {
cloned!(ctx, self.repo);
let blobstore = repo.get_blobstore();
self.get_unode_entry(ctx.clone(), revision, path)
.and_then({
cloned!(blobstore, ctx);
move |entry| entry.load(ctx, &blobstore).map_err(Error::from).from_err()
})
.and_then({
cloned!(ctx, repo);
move |unode| {
let changeset_id = match unode {
ManifestEntry::Tree(mf_unode) => mf_unode.linknode().clone(),
ManifestEntry::Leaf(file_unode) => file_unode.linknode().clone(),
};
repo.get_hg_from_bonsai_changeset(ctx, changeset_id)
.from_err()
}
})
.and_then(move |hg_changeset_id| {
repo.get_changeset_by_changesetid(ctx.clone(), hg_changeset_id)
.from_err()
})
.boxify()
}
fn get_raw_file(
&self,
ctx: CoreContext,
@ -346,6 +385,148 @@ impl MononokeRepo {
.boxify()
}
fn get_file_history(
&self,
ctx: CoreContext,
revision: Revision,
path: String,
limit: i32,
skip: i32,
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
STATS::get_file_history.add_value(1);
let limit = limit as usize;
let skip = skip as usize;
// for now we fetch only one history batch
let max_entries = max_entries_in_fastlog_batch();
if skip >= max_entries {
return future::err(ErrorKind::InvalidInput(
format!("cannot skip {}, batch size is {}", skip, max_entries),
None,
))
.boxify();
}
if limit + skip > max_entries {
return future::err(ErrorKind::InvalidInput(
format!("cannot fetch {}, batch size is {}", limit, max_entries),
None,
))
.boxify();
}
if limit == 0 {
return future::err(ErrorKind::InvalidInput(
"0 commits requested".to_string(),
None,
))
.boxify();
}
// it's not necessary to fetch history in this case, we need just the most recent commit
if skip == 0 && limit == 1 {
return self
.do_get_last_commit_on_path(ctx.clone(), revision, path)
.and_then(move |changeset| {
changeset
.try_into()
.map_err(Error::from)
.map_err(ErrorKind::from)
})
.map(move |changeset| MononokeRepoResponse::GetFileHistory {
history: vec![changeset],
})
.boxify();
}
cloned!(ctx, self.repo);
let bcs_id_fut = self.get_bonsai_id_from_revision(ctx.clone(), revision.clone());
self.get_unode_entry(ctx.clone(), revision.clone(), path.clone())
.join(bcs_id_fut)
.and_then({
cloned!(ctx, repo);
move |(entry, bcs_id)| {
// optimistically try to fetch history for a unode
prefetch_history(ctx.clone(), repo.clone(), entry)
.map_err(Error::from)
.from_err()
.and_then({
move |maybe_history| match maybe_history {
Some(history) => ok(history).left_future(),
// if there is no history, let's try to derive batched fastlog data
// and fetch history again
None => {
let fastlog_derived_mapping = Arc::new(
RootFastlogMapping::new(Arc::new(repo.get_blobstore())),
);
RootFastlog::derive(
ctx.clone(),
repo.clone(),
fastlog_derived_mapping,
bcs_id,
)
.map_err(ErrorKind::InternalError)
.and_then({
cloned!(ctx, repo);
move |_| {
prefetch_history(ctx.clone(), repo.clone(), entry)
.map_err(Error::from)
.from_err()
}
})
.and_then(move |maybe_history| {
maybe_history.ok_or(ErrorKind::NotFound(
format!("{:?} {:?}", revision, path),
None,
))
})
.right_future()
}
}
})
}
})
.and_then({
cloned!(ctx, repo);
move |history| {
let number = cmp::min(history.len(), skip + limit);
if number < skip {
// we skip more commits than the history has
ok(vec![]).left_future()
} else {
let changeset_ids: Vec<_> = history[skip..number]
.into_iter()
.map(|(cs_id, _)| *cs_id)
.collect();
repo.get_hg_bonsai_mapping(ctx.clone(), changeset_ids)
.from_err()
.right_future()
}
}
})
.and_then({
move |hg_bcs_id_mapping| {
let mut history_chunk_fut = vec![];
for (hg_changeset_id, _) in hg_bcs_id_mapping {
cloned!(ctx, repo);
history_chunk_fut.push(
repo.get_changeset_by_changesetid(ctx.clone(), hg_changeset_id)
.from_err()
.and_then(move |changeset| {
changeset
.try_into()
.map_err(Error::from)
.map_err(ErrorKind::from)
}),
);
}
futures_ordered(history_chunk_fut).collect()
}
})
.map(move |history_chunk| MononokeRepoResponse::GetFileHistory {
history: history_chunk,
})
.boxify()
}
fn get_last_commit_on_path(
&self,
ctx: CoreContext,
@ -354,28 +535,7 @@ impl MononokeRepo {
) -> BoxFuture<MononokeRepoResponse, ErrorKind> {
STATS::get_last_commit_on_path.add_value(1);
cloned!(ctx, self.repo);
let blobstore = repo.get_blobstore();
self.get_root_manifest_unode_entry(ctx.clone(), revision, path)
.and_then({
cloned!(blobstore, ctx);
move |entry| entry.load(ctx, &blobstore).map_err(Error::from).from_err()
})
.and_then({
cloned!(ctx, repo);
move |unode| {
let changeset_id = match unode {
ManifestEntry::Tree(mf_unode) => mf_unode.linknode().clone(),
ManifestEntry::Leaf(file_unode) => file_unode.linknode().clone(),
};
repo.get_hg_from_bonsai_changeset(ctx, changeset_id)
.from_err()
}
})
.and_then(move |hg_changeset_id| {
repo.get_changeset_by_changesetid(ctx.clone(), hg_changeset_id)
.from_err()
})
self.do_get_last_commit_on_path(ctx.clone(), revision, path)
.and_then(move |changeset| {
changeset
.try_into()
@ -429,7 +589,7 @@ impl MononokeRepo {
cloned!(ctx, self.repo);
let blobstore = repo.get_blobstore();
self.get_root_manifest_unode_entry(ctx.clone(), revision, path.clone())
self.get_unode_entry(ctx.clone(), revision, path.clone())
.and_then({
cloned!(blobstore, ctx);
move |entry| match entry {
@ -725,6 +885,12 @@ impl MononokeRepo {
GetTree { hash } => self.get_tree(ctx, hash),
GetChangeset { revision } => self.get_changeset(ctx, revision),
GetBranches => self.get_branches(ctx),
GetFileHistory {
revision,
path,
limit,
skip,
} => self.get_file_history(ctx, revision, path, limit, skip),
GetLastCommitOnPath { revision, path } => {
self.get_last_commit_on_path(ctx, revision, path)
}

View File

@ -42,6 +42,9 @@ pub enum MononokeRepoResponse {
GetBranches {
branches: BTreeMap<String, String>,
},
GetFileHistory {
history: Vec<Changeset>,
},
GetLastCommitOnPath {
commit: Changeset,
},
@ -125,6 +128,7 @@ impl Responder for MononokeRepoResponse {
GetTree { files } => Json(files).respond_to(req),
GetChangeset { changeset } => Json(changeset).respond_to(req),
GetBranches { branches } => Json(branches).respond_to(req),
GetFileHistory { history } => Json(history).respond_to(req),
GetLastCommitOnPath { commit } => Json(commit).respond_to(req),
IsAncestor { answer } => Ok(binary_response({
if answer {

View File

@ -9,15 +9,15 @@ use std::{convert::TryFrom, convert::TryInto, mem::size_of, sync::Arc};
use crate::errors::ErrorKind;
use apiserver_thrift::server::MononokeApiservice;
use apiserver_thrift::services::mononoke_apiservice::{
GetBlobExn, GetBranchesExn, GetChangesetExn, GetLastCommitOnPathExn, GetRawExn, GetTreeExn,
IsAncestorExn, ListDirectoryExn, ListDirectoryUnodesExn,
GetBlobExn, GetBranchesExn, GetChangesetExn, GetFileHistoryExn, GetLastCommitOnPathExn,
GetRawExn, GetTreeExn, IsAncestorExn, ListDirectoryExn, ListDirectoryUnodesExn,
};
use apiserver_thrift::types::{
MononokeAPIException, MononokeBlob, MononokeBranches, MononokeChangeset, MononokeDirectory,
MononokeDirectoryUnodes, MononokeGetBlobParams, MononokeGetBranchesParams,
MononokeGetChangesetParams, MononokeGetLastCommitOnPathParams, MononokeGetRawParams,
MononokeGetTreeParams, MononokeIsAncestorParams, MononokeListDirectoryParams,
MononokeListDirectoryUnodesParams, MononokeRevision,
MononokeDirectoryUnodes, MononokeFileHistory, MononokeGetBlobParams, MononokeGetBranchesParams,
MononokeGetChangesetParams, MononokeGetFileHistoryParams, MononokeGetLastCommitOnPathParams,
MononokeGetRawParams, MononokeGetTreeParams, MononokeIsAncestorParams,
MononokeListDirectoryParams, MononokeListDirectoryUnodesParams, MononokeRevision,
};
use apiserver_thrift::MononokeRevision::UnknownField;
use async_trait::async_trait;
@ -243,6 +243,54 @@ impl MononokeApiservice for MononokeAPIServiceImpl {
resp
}
async fn get_file_history(
&self,
params: MononokeGetFileHistoryParams,
) -> Result<MononokeFileHistory, GetFileHistoryExn> {
let scuba = self.create_scuba_logger(
Some(params.path.clone()),
Some(params.revision.clone()),
params.repo.clone(),
);
let ctx = self.create_ctx(scuba);
let resp = self
.convert_and_call(
ctx.clone(),
params,
|resp: MononokeRepoResponse| match resp {
MononokeRepoResponse::GetFileHistory { history } => Ok(MononokeFileHistory {
history: history
.into_iter()
.map(|commit| MononokeChangeset::from(commit))
.collect(),
}),
_ => Err(ErrorKind::InternalError(err_msg(
"Actor returned wrong response type to query".to_string(),
))),
},
)
.await;
log_response_size(
ctx.scuba().clone(),
resp.as_ref()
.map(|resp| {
resp.history
.iter()
.map(|commit| {
commit.commit_hash.as_bytes().len()
+ commit.message.len()
+ commit.author.as_bytes().len()
+ size_of::<i64>()
})
.sum()
})
.unwrap_or(0),
);
resp
}
async fn get_last_commit_on_path(
&self,
params: MononokeGetLastCommitOnPathParams,

View File

@ -801,6 +801,17 @@ pub fn get_u64_opt<'a>(matches: &ArgMatches<'a>, key: &str) -> Option<u64> {
})
}
#[inline]
pub fn get_i32<'a>(matches: &ArgMatches<'a>, key: &str, default: i32) -> i32 {
matches
.value_of(key)
.map(|val| {
val.parse::<i32>()
.expect(&format!("{} must be integer", key))
})
.unwrap_or(default)
}
#[inline]
pub fn get_i64_opt<'a>(matches: &ArgMatches<'a>, key: &str) -> Option<i64> {
matches.value_of(key).map(|val| {