remove terminator argument

Summary: now the terminator argument is unused - we can get rid of it.

Differential Revision: D22502594

fbshipit-source-id: e8ecec01002421baee38be0c7e048d08068f2d74
This commit is contained in:
Mateusz Kwapich 2020-07-23 07:31:40 -07:00 committed by Facebook GitHub Bot
parent 39c0b018ce
commit 52863fa3e3
2 changed files with 16 additions and 71 deletions

View File

@ -25,7 +25,6 @@ use manifest::{Entry, ManifestOps};
use mononoke_types::{ChangesetId, FileUnodeId, MPath, ManifestUnodeId};
use stats::prelude::*;
use std::collections::{HashMap, HashSet, VecDeque};
use std::future::Future;
use std::sync::Arc;
use thiserror::Error;
use time_ext::DurationExt;
@ -66,19 +65,6 @@ pub enum HistoryAcrossDeletions {
/// before they're added to the queue, see its docs for details. If you don't need to filter the
/// history you can provide `()` instead for default implementation.
///
/// Can accept a terminator function: a function on changeset id, that returns true if
/// the history fetching on the current branch has to be terminated.
/// The terminator will be called on changeset id when a fatslog batch is going to be
/// fetched for the changeset. If the terminator returns true, fastlog is not fetched,
/// which means that this history branch is terminated. Already prefetched commits are
/// still streamed.
/// It is possible that history is not linear and have 2 or more branches, terminator
/// can drop history fetching on one of the branches and still proceed with others.
/// Usage:
/// as history stream generally is not ordered by commit creation time (due to
/// the BFS order), it's still necessary to drop the stream if the history is
/// already older than the given time frame.
///
/// This is the public API of this crate i.e. what clients should use if they want to
/// fetch the history.
///
@ -109,19 +95,14 @@ pub enum HistoryAcrossDeletions {
/// Why to pop all nodes on the same depth and not just one commit at a time?
/// Because if history contains merges and parents for more than one node on the current depth
/// haven't been fetched yet, we can fetch them at the same time using FuturesUnordered.
pub async fn list_file_history<Terminator, TFut>(
pub async fn list_file_history(
ctx: CoreContext,
repo: BlobRepo,
path: Option<MPath>,
changeset_id: ChangesetId,
terminator: Option<Terminator>,
mut visitor: impl Visitor,
history_across_deletions: HistoryAcrossDeletions,
) -> Result<impl NewStream<Item = Result<ChangesetId, Error>>, FastlogError>
where
Terminator: Fn(ChangesetId) -> TFut + 'static + Clone + Send + Sync,
TFut: Future<Output = Result<bool, Error>> + Send,
{
) -> Result<impl NewStream<Item = Result<ChangesetId, Error>>, FastlogError> {
let mut top_history = vec![];
// get unode entry
let not_found_err = || {
@ -181,14 +162,13 @@ where
},
// unfold
move |state| {
cloned!(ctx, repo, path, terminator);
cloned!(ctx, repo, path);
async move {
do_history_unfold(
ctx.clone(),
repo.clone(),
path.clone(),
state,
terminator,
history_across_deletions,
)
.await
@ -377,18 +357,15 @@ struct TraversalState<V: Visitor> {
visitor: V,
}
async fn do_history_unfold<Terminator, TFut, V>(
async fn do_history_unfold<V>(
ctx: CoreContext,
repo: BlobRepo,
path: Option<MPath>,
state: TraversalState<V>,
terminator: Option<Terminator>,
history_across_deletions: HistoryAcrossDeletions,
) -> Result<Option<(Vec<ChangesetId>, TraversalState<V>)>, Error>
where
V: Visitor,
Terminator: Fn(ChangesetId) -> TFut + Clone,
TFut: Future<Output = Result<bool, Error>>,
{
let TraversalState {
mut history_graph,
@ -399,24 +376,15 @@ where
} = state;
if let Some(prefetch) = prefetch {
let terminate = match terminator {
Some(terminator) => terminator(prefetch.clone()).await?,
_ => false,
};
if !terminate {
prefetch_and_process_history(
&ctx,
&repo,
&mut visitor,
&path,
prefetch.clone(),
&mut history_graph,
)
.await?;
} else {
// We won't be processing that node as we skipped fetching its ancestors.
let _prefetch = bfs.pop_front();
};
prefetch_and_process_history(
&ctx,
&repo,
&mut visitor,
&path,
prefetch.clone(),
&mut history_graph,
)
.await?;
}
let mut history = vec![];
@ -640,7 +608,7 @@ mod test {
use blobrepo_factory::new_memblob_empty;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::future::{self, TryFutureExt};
use futures::future::TryFutureExt;
use tests_utils::CreateCommitContext;
#[fbinit::compat_test]
@ -673,13 +641,11 @@ mod test {
.compat()
.await?;
let terminator = |_cs_id| future::ready(Ok(false));
let history = list_file_history(
ctx,
repo,
path(filename),
top,
Some(terminator),
(),
HistoryAcrossDeletions::Track,
)
@ -748,13 +714,11 @@ mod test {
.compat()
.await?;
let terminator = |_cs_id| future::ready(Ok(false));
let history = list_file_history(
ctx,
repo,
path(filename),
top,
Some(terminator),
(),
HistoryAcrossDeletions::Track,
)
@ -815,13 +779,11 @@ mod test {
.compat()
.await?;
let terminator = |_cs_id| future::ready(Ok(false));
let history = list_file_history(
ctx,
repo,
path(filename),
prev_id,
Some(terminator),
(),
HistoryAcrossDeletions::Track,
)
@ -835,7 +797,7 @@ mod test {
}
#[fbinit::compat_test]
async fn test_list_history_terminator(fb: FacebookInit) -> Result<(), Error> {
async fn test_list_history_visitor(fb: FacebookInit) -> Result<(), Error> {
// Test history termination on one of the history branches.
// The main branch (top) and branch A have commits that change only single file.
//
@ -876,8 +838,6 @@ mod test {
let top = *main_branch.last().unwrap();
main_branch.reverse();
let terminator = Some(|_cs_id| future::ready(Ok(false)));
struct NothingVisitor;
#[async_trait]
impl Visitor for NothingVisitor {
@ -896,7 +856,6 @@ mod test {
repo.clone(),
filepath.clone(),
top.clone(),
terminator,
NothingVisitor {},
HistoryAcrossDeletions::Track,
)
@ -920,13 +879,11 @@ mod test {
Ok(cs_ids.into_iter().next().into_iter().collect())
}
};
let terminator = Some(|_cs_id| future::ready(Ok(false)));
let history = list_file_history(
ctx,
repo,
filepath,
top,
terminator,
SingleBranchOfHistoryVisitor {},
HistoryAcrossDeletions::Track,
)
@ -985,13 +942,11 @@ mod test {
let history = |cs_id, path| {
cloned!(ctx, repo);
async move {
let terminator = Some(|_cs_id| future::ready(Ok(false)));
let history_stream = list_file_history(
ctx.clone(),
repo.clone(),
path,
cs_id,
terminator,
(),
HistoryAcrossDeletions::Track,
)
@ -1114,13 +1069,11 @@ mod test {
let history = |cs_id, path| {
cloned!(ctx, repo);
async move {
let terminator = Some(|_cs_id| future::ready(Ok(false)));
let history_stream = list_file_history(
ctx.clone(),
repo.clone(),
path,
cs_id,
terminator,
(),
HistoryAcrossDeletions::Track,
)
@ -1184,13 +1137,11 @@ mod test {
.await?;
expected.push(bcs_id.clone());
let terminator = Some(|_cs_id| future::ready(Ok(false)));
let history_stream = list_file_history(
ctx.clone(),
repo.clone(),
MPath::new_opt(filename)?,
bcs_id,
terminator,
(),
HistoryAcrossDeletions::Track,
)
@ -1205,7 +1156,6 @@ mod test {
repo.clone(),
MPath::new_opt(filename)?,
bcs_id,
terminator,
(),
HistoryAcrossDeletions::DontTrack,
)
@ -1264,13 +1214,11 @@ mod test {
// |
// 0 <- creates "dir/1"
let terminator = Some(|_cs_id| future::ready(Ok(false)));
let history_stream = list_file_history(
ctx.clone(),
repo.clone(),
MPath::new_opt(filename)?,
bcs_id,
terminator,
(),
HistoryAcrossDeletions::Track,
)
@ -1286,7 +1234,6 @@ mod test {
repo.clone(),
MPath::new_opt(filename)?,
merge,
terminator,
(),
HistoryAcrossDeletions::Track,
)

View File

@ -23,7 +23,7 @@ use derived_data::BonsaiDerived;
use fastlog::{list_file_history, FastlogError, HistoryAcrossDeletions, Visitor};
use filestore::FetchKey;
use futures::compat::Future01CompatExt;
use futures::future::{self, try_join_all, FutureExt, Shared, TryFutureExt};
use futures::future::{try_join_all, FutureExt, Shared, TryFutureExt};
use futures::stream::{Stream, TryStreamExt};
use futures::try_join;
use futures_old::Future as FutureLegacy;
@ -366,7 +366,6 @@ impl ChangesetPathContext {
}
};
let cs_info_enabled = self.repo().derive_changeset_info_enabled();
let terminator = Some(|_cs_id| future::ready(Ok(false)));
let history_across_deletions = if follow_history_across_deletions {
HistoryAcrossDeletions::Track
@ -378,7 +377,6 @@ impl ChangesetPathContext {
repo,
mpath.cloned(),
self.changeset.id(),
terminator,
FilterVisitor {
cs_info_enabled,
until_timestamp,