diff --git a/eden/mononoke/derived_data/fastlog/ops.rs b/eden/mononoke/derived_data/fastlog/ops.rs index abee6a703b..a6f7fc5b8f 100644 --- a/eden/mononoke/derived_data/fastlog/ops.rs +++ b/eden/mononoke/derived_data/fastlog/ops.rs @@ -25,7 +25,6 @@ use manifest::{Entry, ManifestOps}; use mononoke_types::{ChangesetId, FileUnodeId, MPath, ManifestUnodeId}; use stats::prelude::*; use std::collections::{HashMap, HashSet, VecDeque}; -use std::future::Future; use std::sync::Arc; use thiserror::Error; use time_ext::DurationExt; @@ -66,19 +65,6 @@ pub enum HistoryAcrossDeletions { /// before they're added to the queue, see its docs for details. If you don't need to filter the /// history you can provide `()` instead for default implementation. /// -/// Can accept a terminator function: a function on changeset id, that returns true if -/// the history fetching on the current branch has to be terminated. -/// The terminator will be called on changeset id when a fatslog batch is going to be -/// fetched for the changeset. If the terminator returns true, fastlog is not fetched, -/// which means that this history branch is terminated. Already prefetched commits are -/// still streamed. -/// It is possible that history is not linear and have 2 or more branches, terminator -/// can drop history fetching on one of the branches and still proceed with others. -/// Usage: -/// as history stream generally is not ordered by commit creation time (due to -/// the BFS order), it's still necessary to drop the stream if the history is -/// already older than the given time frame. -/// /// This is the public API of this crate i.e. what clients should use if they want to /// fetch the history. /// @@ -109,19 +95,14 @@ pub enum HistoryAcrossDeletions { /// Why to pop all nodes on the same depth and not just one commit at a time? /// Because if history contains merges and parents for more than one node on the current depth /// haven't been fetched yet, we can fetch them at the same time using FuturesUnordered. -pub async fn list_file_history( +pub async fn list_file_history( ctx: CoreContext, repo: BlobRepo, path: Option, changeset_id: ChangesetId, - terminator: Option, mut visitor: impl Visitor, history_across_deletions: HistoryAcrossDeletions, -) -> Result>, FastlogError> -where - Terminator: Fn(ChangesetId) -> TFut + 'static + Clone + Send + Sync, - TFut: Future> + Send, -{ +) -> Result>, FastlogError> { let mut top_history = vec![]; // get unode entry let not_found_err = || { @@ -181,14 +162,13 @@ where }, // unfold move |state| { - cloned!(ctx, repo, path, terminator); + cloned!(ctx, repo, path); async move { do_history_unfold( ctx.clone(), repo.clone(), path.clone(), state, - terminator, history_across_deletions, ) .await @@ -377,18 +357,15 @@ struct TraversalState { visitor: V, } -async fn do_history_unfold( +async fn do_history_unfold( ctx: CoreContext, repo: BlobRepo, path: Option, state: TraversalState, - terminator: Option, history_across_deletions: HistoryAcrossDeletions, ) -> Result, TraversalState)>, Error> where V: Visitor, - Terminator: Fn(ChangesetId) -> TFut + Clone, - TFut: Future>, { let TraversalState { mut history_graph, @@ -399,24 +376,15 @@ where } = state; if let Some(prefetch) = prefetch { - let terminate = match terminator { - Some(terminator) => terminator(prefetch.clone()).await?, - _ => false, - }; - if !terminate { - prefetch_and_process_history( - &ctx, - &repo, - &mut visitor, - &path, - prefetch.clone(), - &mut history_graph, - ) - .await?; - } else { - // We won't be processing that node as we skipped fetching its ancestors. - let _prefetch = bfs.pop_front(); - }; + prefetch_and_process_history( + &ctx, + &repo, + &mut visitor, + &path, + prefetch.clone(), + &mut history_graph, + ) + .await?; } let mut history = vec![]; @@ -640,7 +608,7 @@ mod test { use blobrepo_factory::new_memblob_empty; use context::CoreContext; use fbinit::FacebookInit; - use futures::future::{self, TryFutureExt}; + use futures::future::TryFutureExt; use tests_utils::CreateCommitContext; #[fbinit::compat_test] @@ -673,13 +641,11 @@ mod test { .compat() .await?; - let terminator = |_cs_id| future::ready(Ok(false)); let history = list_file_history( ctx, repo, path(filename), top, - Some(terminator), (), HistoryAcrossDeletions::Track, ) @@ -748,13 +714,11 @@ mod test { .compat() .await?; - let terminator = |_cs_id| future::ready(Ok(false)); let history = list_file_history( ctx, repo, path(filename), top, - Some(terminator), (), HistoryAcrossDeletions::Track, ) @@ -815,13 +779,11 @@ mod test { .compat() .await?; - let terminator = |_cs_id| future::ready(Ok(false)); let history = list_file_history( ctx, repo, path(filename), prev_id, - Some(terminator), (), HistoryAcrossDeletions::Track, ) @@ -835,7 +797,7 @@ mod test { } #[fbinit::compat_test] - async fn test_list_history_terminator(fb: FacebookInit) -> Result<(), Error> { + async fn test_list_history_visitor(fb: FacebookInit) -> Result<(), Error> { // Test history termination on one of the history branches. // The main branch (top) and branch A have commits that change only single file. // @@ -876,8 +838,6 @@ mod test { let top = *main_branch.last().unwrap(); main_branch.reverse(); - let terminator = Some(|_cs_id| future::ready(Ok(false))); - struct NothingVisitor; #[async_trait] impl Visitor for NothingVisitor { @@ -896,7 +856,6 @@ mod test { repo.clone(), filepath.clone(), top.clone(), - terminator, NothingVisitor {}, HistoryAcrossDeletions::Track, ) @@ -920,13 +879,11 @@ mod test { Ok(cs_ids.into_iter().next().into_iter().collect()) } }; - let terminator = Some(|_cs_id| future::ready(Ok(false))); let history = list_file_history( ctx, repo, filepath, top, - terminator, SingleBranchOfHistoryVisitor {}, HistoryAcrossDeletions::Track, ) @@ -985,13 +942,11 @@ mod test { let history = |cs_id, path| { cloned!(ctx, repo); async move { - let terminator = Some(|_cs_id| future::ready(Ok(false))); let history_stream = list_file_history( ctx.clone(), repo.clone(), path, cs_id, - terminator, (), HistoryAcrossDeletions::Track, ) @@ -1114,13 +1069,11 @@ mod test { let history = |cs_id, path| { cloned!(ctx, repo); async move { - let terminator = Some(|_cs_id| future::ready(Ok(false))); let history_stream = list_file_history( ctx.clone(), repo.clone(), path, cs_id, - terminator, (), HistoryAcrossDeletions::Track, ) @@ -1184,13 +1137,11 @@ mod test { .await?; expected.push(bcs_id.clone()); - let terminator = Some(|_cs_id| future::ready(Ok(false))); let history_stream = list_file_history( ctx.clone(), repo.clone(), MPath::new_opt(filename)?, bcs_id, - terminator, (), HistoryAcrossDeletions::Track, ) @@ -1205,7 +1156,6 @@ mod test { repo.clone(), MPath::new_opt(filename)?, bcs_id, - terminator, (), HistoryAcrossDeletions::DontTrack, ) @@ -1264,13 +1214,11 @@ mod test { // | // 0 <- creates "dir/1" - let terminator = Some(|_cs_id| future::ready(Ok(false))); let history_stream = list_file_history( ctx.clone(), repo.clone(), MPath::new_opt(filename)?, bcs_id, - terminator, (), HistoryAcrossDeletions::Track, ) @@ -1286,7 +1234,6 @@ mod test { repo.clone(), MPath::new_opt(filename)?, merge, - terminator, (), HistoryAcrossDeletions::Track, ) diff --git a/eden/mononoke/mononoke_api/src/changeset_path.rs b/eden/mononoke/mononoke_api/src/changeset_path.rs index bc254c25a0..566a1ea3ad 100644 --- a/eden/mononoke/mononoke_api/src/changeset_path.rs +++ b/eden/mononoke/mononoke_api/src/changeset_path.rs @@ -23,7 +23,7 @@ use derived_data::BonsaiDerived; use fastlog::{list_file_history, FastlogError, HistoryAcrossDeletions, Visitor}; use filestore::FetchKey; use futures::compat::Future01CompatExt; -use futures::future::{self, try_join_all, FutureExt, Shared, TryFutureExt}; +use futures::future::{try_join_all, FutureExt, Shared, TryFutureExt}; use futures::stream::{Stream, TryStreamExt}; use futures::try_join; use futures_old::Future as FutureLegacy; @@ -366,7 +366,6 @@ impl ChangesetPathContext { } }; let cs_info_enabled = self.repo().derive_changeset_info_enabled(); - let terminator = Some(|_cs_id| future::ready(Ok(false))); let history_across_deletions = if follow_history_across_deletions { HistoryAcrossDeletions::Track @@ -378,7 +377,6 @@ impl ChangesetPathContext { repo, mpath.cloned(), self.changeset.id(), - terminator, FilterVisitor { cs_info_enabled, until_timestamp,