add an option to limit commit_history to descendants of commit

Summary: This will be used for commits_between replacement

Differential Revision: D22234236

fbshipit-source-id: c0c8550d97a9e8b42034d605e24ff54251fbd13e
This commit is contained in:
Mateusz Kwapich 2020-06-30 08:08:15 -07:00 committed by Facebook GitHub Bot
parent 66a810a68c
commit 15256a91be
5 changed files with 114 additions and 24 deletions

View File

@ -21,7 +21,7 @@ use context::CoreContext;
use derived_data::BonsaiDerived;
use fsnodes::RootFsnodeId;
use futures::compat::{Future01CompatExt, Stream01CompatExt};
use futures::future::{self, try_join, FutureExt, Shared};
use futures::future::{self, try_join, try_join_all, FutureExt, Shared};
use futures::stream::{self, Stream, StreamExt, TryStreamExt};
use manifest::{Diff as ManifestDiff, Entry as ManifestEntry, ManifestOps, PathOrPrefix};
use maplit::hashset;
@ -312,7 +312,8 @@ impl ChangesetContext {
Ok(bonsai.file_changes)
}
/// Returns `true` if this commit is an ancestor of `other_commit`.
/// Returns `true` if this commit is an ancestor of `other_commit`. A commit is considered its
/// own ancestor for the purpose of this call.
pub async fn is_ancestor_of(&self, other_commit: ChangesetId) -> Result<bool, MononokeError> {
let is_ancestor_of = self
.repo()
@ -607,9 +608,21 @@ impl ChangesetContext {
pub async fn history(
&self,
until_timestamp: Option<i64>,
descendants_of: Option<ChangesetId>,
) -> impl Stream<Item = Result<ChangesetContext, MononokeError>> + '_ {
let descendants_of = descendants_of.map(|id| Self::new(self.repo().clone(), id));
if let Some(ancestor) = descendants_of.as_ref() {
// If the the start commit is not descendant of the argument exit early.
match ancestor.is_ancestor_of(self.id()).await {
Ok(false) => return stream::empty().boxed(),
Err(e) => return stream::once(async { Err(e) }).boxed(),
_ => (),
}
}
let cs_info_enabled = self.repo.derive_changeset_info_enabled();
// Helper allowing us to terminate walk when we reach `until_timestamp`.
let terminate = until_timestamp.map(move |until_timestamp| {
move |changeset_id| async move {
let info = if cs_info_enabled {
@ -635,24 +648,52 @@ impl ChangesetContext {
// starting state
(hashset! { self.id() }, VecDeque::from(vec![self.id()])),
// unfold
move |(mut visited, mut queue)| async move {
move |(mut visited, mut queue)| {
cloned!(descendants_of);
async move {
if let Some(changeset_id) = queue.pop_front() {
// Terminate in two cases:
// 1. When `until_timestamp` is reached
if let Some(terminate) = terminate {
if terminate(changeset_id).await? {
return Ok(Some((None, (visited, queue))));
}
}
let parents = self
// 2. When we reach the `descendants_of` ancestor
if let Some(ancestor) = descendants_of.as_ref() {
if changeset_id == ancestor.id() {
return Ok(Some((Some(changeset_id), (visited, queue))));
}
}
let mut parents = self
.repo()
.blob_repo()
.get_changeset_parents_by_bonsai(self.ctx().clone(), changeset_id)
.compat()
.await?;
if parents.len() > 1 {
if let Some(ancestor) = descendants_of.as_ref() {
// In case of merge, find out which branches are worth traversing by
// doing ancestry check.
parents =
try_join_all(parents.into_iter().map(|parent| async move {
Ok::<_, MononokeError>((
parent,
ancestor.is_ancestor_of(parent).await?,
))
}))
.await?
.into_iter()
.filter_map(|(parent, ancestry)| ancestry.then_some(parent))
.collect();
}
}
queue.extend(parents.into_iter().filter(|parent| visited.insert(*parent)));
Ok(Some((Some(changeset_id), (visited, queue))))
} else {
Ok::<_, MononokeError>(None)
}
}
},
)
.try_filter_map(move |changeset_id| {

View File

@ -6,6 +6,7 @@
*/
#![feature(backtrace)]
#![feature(bool_to_option)]
#![deny(warnings)]
use std::collections::HashMap;

View File

@ -299,7 +299,7 @@ async fn commit_history(fb: FacebookInit) -> Result<()> {
// The commit history includes all commits, including empty ones.
let history: Vec<_> = cs
.history(None)
.history(None, None)
.await
.and_then(|cs| async move { Ok(cs.id()) })
.try_collect()
@ -330,7 +330,7 @@ async fn commit_history(fb: FacebookInit) -> Result<()> {
.await?
.expect("changeset exists");
let history: Vec<_> = cs
.history(None)
.history(None, None)
.await
.and_then(|cs| async move { Ok(cs.id()) })
.try_collect()
@ -350,7 +350,7 @@ async fn commit_history(fb: FacebookInit) -> Result<()> {
// Setting until_timestamp omits some commits.
let history: Vec<_> = cs
.history(Some(2500))
.history(Some(2500), None)
.await
.and_then(|cs| async move { Ok(cs.id()) })
.try_collect()
@ -365,5 +365,32 @@ async fn commit_history(fb: FacebookInit) -> Result<()> {
]
);
// Setting descendendants_of omits some commits.
let cs = repo
.changeset(ChangesetSpecifier::Bonsai(changesets["c2"]))
.await?
.expect("changeset exists");
let history: Vec<_> = cs
.history(Some(2500), Some(changesets["b2"]))
.await
.and_then(|cs| async move { Ok(cs.id()) })
.try_collect()
.await?;
assert_eq!(
history,
vec![
changesets["c2"],
changesets["m2"],
changesets["e2"],
changesets["e3"],
changesets["a4"],
changesets["b3"],
changesets["c1"],
changesets["e1"],
changesets["m1"],
changesets["b2"],
]
);
Ok(())
}

View File

@ -321,7 +321,12 @@ impl SourceControlServiceImpl {
commit: thrift::CommitSpecifier,
params: thrift::CommitHistoryParams,
) -> Result<thrift::CommitHistoryResponse, errors::ServiceError> {
let (_repo, changeset) = self.repo_changeset(ctx, &commit).await?;
let (repo, changeset) = self.repo_changeset(ctx, &commit).await?;
let descendants_of = if let Some(descendants_of) = params.descendants_of {
Some(self.changeset_id(&repo, &descendants_of).await?)
} else {
None
};
let limit: usize = check_range_and_convert("limit", params.limit, 0..)?;
let skip: usize = check_range_and_convert("skip", params.skip, 0..)?;
@ -348,7 +353,7 @@ impl SourceControlServiceImpl {
.into());
}
let history_stream = changeset.history(after_timestamp).await;
let history_stream = changeset.history(after_timestamp, descendants_of).await;
let history = collect_history(
history_stream,
skip,

View File

@ -14,8 +14,8 @@ use fbinit::FacebookInit;
use futures_stats::{FutureStats, TimedFutureExt};
use identity::Identity;
use mononoke_api::{
ChangesetContext, ChangesetSpecifier, CoreContext, FileContext, FileId, Mononoke, RepoContext,
SessionContainer, TreeContext, TreeId,
ChangesetContext, ChangesetId, ChangesetSpecifier, CoreContext, FileContext, FileId, Mononoke,
RepoContext, SessionContainer, TreeContext, TreeId,
};
use mononoke_types::hash::{Sha1, Sha256};
use permission_checker::{MononokeIdentity, MononokeIdentitySet};
@ -28,6 +28,7 @@ use srserver::RequestContext;
use stats::prelude::*;
use time_ext::DurationExt;
use crate::commit_id::CommitIdExt;
use crate::errors;
use crate::from_request::FromRequest;
use crate::params::AddScubaParams;
@ -175,6 +176,21 @@ impl SourceControlServiceImpl {
Ok((repo, changeset))
}
/// Get the changeset id specified by a `thrift::CommitId`.
pub(crate) async fn changeset_id(
&self,
repo: &RepoContext,
id: &thrift::CommitId,
) -> Result<ChangesetId, errors::ServiceError> {
let changeset_specifier = ChangesetSpecifier::from_request(&id)?;
Ok(repo
.resolve_specifier(changeset_specifier)
.await?
.ok_or_else(|| {
errors::commit_not_found(format!("repo={} commit={}", repo.name(), id.to_string()))
})?)
}
/// Get the repo and tree specified by a `thrift::TreeSpecifier`.
///
/// Returns `None` if the tree is specified by commit path and that path