convert bookmark methods to new type futures

Summary: convert bookmark methods to new type futures

Reviewed By: ahornby

Differential Revision: D25196555

fbshipit-source-id: b41014937e1dd10ad839aca5011f199ee6007827
This commit is contained in:
Pavel Aslanov 2020-11-27 11:09:33 -08:00 committed by Facebook GitHub Bot
parent ac33b17233
commit ca4b5cf073
18 changed files with 47 additions and 70 deletions

View File

@ -290,11 +290,10 @@ impl BlobRepo {
max_rec: u32,
offset: Option<u32>,
freshness: Freshness,
) -> impl OldStream<Item = (Option<ChangesetId>, BookmarkUpdateReason, Timestamp), Error = Error>
) -> impl Stream<Item = Result<(Option<ChangesetId>, BookmarkUpdateReason, Timestamp), Error>>
{
self.attribute_expected::<dyn BookmarkUpdateLog>()
.list_bookmark_log_entries(ctx.clone(), name, max_rec, offset, freshness)
.compat()
}
pub fn read_next_bookmark_log_entries(
@ -303,21 +302,20 @@ impl BlobRepo {
id: u64,
limit: u64,
freshness: Freshness,
) -> impl OldStream<Item = BookmarkUpdateLogEntry, Error = Error> {
) -> impl Stream<Item = Result<BookmarkUpdateLogEntry, Error>> {
self.attribute_expected::<dyn BookmarkUpdateLog>()
.read_next_bookmark_log_entries(ctx, id, limit, freshness)
.compat()
}
pub fn count_further_bookmark_log_entries(
pub async fn count_further_bookmark_log_entries(
&self,
ctx: CoreContext,
id: u64,
exclude_reason: Option<BookmarkUpdateReason>,
) -> impl OldFuture<Item = u64, Error = Error> {
) -> Result<u64, Error> {
self.attribute_expected::<dyn BookmarkUpdateLog>()
.count_further_bookmark_log_entries(ctx, id, exclude_reason)
.compat()
.await
}
pub fn update_bookmark_transaction(&self, ctx: CoreContext) -> Box<dyn BookmarkTransaction> {
@ -328,16 +326,19 @@ impl BlobRepo {
// Returns the generation number of a changeset
// note: it returns Option because changeset might not exist
pub fn get_generation_number(
pub async fn get_generation_number(
&self,
ctx: CoreContext,
cs: ChangesetId,
) -> impl OldFuture<Item = Option<Generation>, Error = Error> {
) -> Result<Option<Generation>, Error> {
STATS::get_generation_number.add_value(1);
self.inner
let result = self
.inner
.changesets
.get(ctx, self.get_repoid(), cs)
.map(|res| res.map(|res| Generation::new(res.gen)))
.compat()
.await?;
Ok(result.map(|res| Generation::new(res.gen)))
}
pub fn get_changeset_fetcher(&self) -> Arc<dyn ChangesetFetcher> {

View File

@ -24,7 +24,7 @@ use derived_data::BonsaiDerived;
use fsnodes::RootFsnodeId;
use futures::{
channel::oneshot,
compat::{Future01CompatExt, Stream01CompatExt},
compat::Future01CompatExt,
future::{self, select, BoxFuture, FutureExt as NewFutureExt},
stream::{self, FuturesUnordered, StreamExt, TryStreamExt},
};
@ -330,7 +330,6 @@ pub async fn find_all_underived_and_latest_derived(
Some(prev_limit),
Freshness::MaybeStale,
)
.compat()
.try_collect::<Vec<_>>()
.await?;
@ -1273,9 +1272,8 @@ mod tests {
assert_eq!(false, (warmer.is_warm)(&ctx, &repo, &master_cs_id).await?);
let gen_num = repo
.get_generation_number(ctx.clone(), master_cs_id)
.compat();
let maybe_gen_num = gen_num.await?;
let gen_num = maybe_gen_num.ok_or(anyhow!("gen num for {} not found", master_cs_id))?;
.await?
.ok_or_else(|| anyhow!("gen num for {} not found", master_cs_id))?;
mutable_counters
.set_counter(
ctx.clone(),

View File

@ -79,9 +79,9 @@ async fn fetch_generation_number(
repo: &BlobRepo,
cs_id: ChangesetId,
) -> Result<Generation, Error> {
let gen_num = repo.get_generation_number(ctx.clone(), cs_id).compat();
let maybe_gen_num = gen_num.await?;
maybe_gen_num.ok_or(anyhow!("gen num for {} not found", cs_id))
repo.get_generation_number(ctx.clone(), cs_id)
.await?
.ok_or_else(|| anyhow!("gen num for {} not found", cs_id))
}
async fn check_if_present_in_hg(

View File

@ -11,7 +11,7 @@ use bookmarks::Freshness;
use clap::{App, Arg, ArgMatches, SubCommand};
use cloned::cloned;
use context::CoreContext;
use futures::{compat::Future01CompatExt, TryFutureExt, TryStreamExt};
use futures::{compat::Future01CompatExt, StreamExt, TryFutureExt, TryStreamExt};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use futures_old::{future, Future, IntoFuture, Stream};
use mercurial_types::HgChangesetId;
@ -188,6 +188,8 @@ fn list_hg_bookmark_log_entries(
Error = Error,
> {
repo.list_bookmark_log_entries(ctx.clone(), name, max_rec, None, Freshness::MostRecent)
.boxed()
.compat()
.map({
cloned!(ctx, repo);
move |(cs_id, rs, ts)| match cs_id {
@ -250,6 +252,8 @@ fn handle_log<'a>(
None,
Freshness::MostRecent,
)
.boxed()
.compat()
.map(move |rows| {
let (cs_id, reason, timestamp) = rows;
let cs_id_str = match cs_id {

View File

@ -444,9 +444,7 @@ async fn maybe_update_highest_imported_generation_number(
blobimport_lib::HIGHEST_IMPORTED_GEN_NUM,
)
.compat();
let new_gen_num = blobrepo
.get_generation_number(ctx.clone(), latest_imported_cs_id)
.compat();
let new_gen_num = blobrepo.get_generation_number(ctx.clone(), latest_imported_cs_id);
let (maybe_highest_imported_gen_num, new_gen_num) =
try_join(maybe_highest_imported_gen_num, new_gen_num).await?;

View File

@ -38,12 +38,8 @@ use cross_repo_sync::{
find_toposorted_unsynced_ancestors, CandidateSelectionHint, CommitSyncContext,
CommitSyncOutcome, CommitSyncer,
};
use futures::{
compat::Future01CompatExt,
future::{FutureExt, TryFutureExt},
};
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt, TryStreamExt};
use futures_old::future::Future;
use futures_old::stream::Stream as OldStream;
use metaconfig_types::MetadataDatabaseConfig;
use mononoke_types::{ChangesetId, RepositoryId};
use mutable_counters::{MutableCounters, SqlMutableCounters};
@ -104,7 +100,7 @@ where
u64::max_value()
}
};
let next_entries = commit_syncer
let next_entries: Vec<_> = commit_syncer
.get_source_repo()
.read_next_bookmark_log_entries(
ctx.clone(),
@ -112,8 +108,7 @@ where
log_entries_limit,
Freshness::MostRecent,
)
.collect()
.compat()
.try_collect()
.await?;
if next_entries.is_empty() {

View File

@ -26,7 +26,7 @@ use futures::{
stream::{self, StreamExt, TryStreamExt},
try_join,
};
use futures_old::{stream::Stream as Stream_old, Future};
use futures_old::Future;
use live_commit_sync_config::{CfgrLiveCommitSyncConfig, LiveCommitSyncConfig};
use mercurial_types::HgChangesetId;
use mononoke_types::ChangesetId;
@ -181,11 +181,9 @@ where
let source_repo = commit_syncer.get_source_repo();
let next_entry = source_repo
.read_next_bookmark_log_entries(ctx.clone(), counter as u64, 1, Freshness::MostRecent)
.collect()
.compat();
let remaining_entries = source_repo
.count_further_bookmark_log_entries(ctx.clone(), counter as u64, None)
.compat();
.try_collect::<Vec<_>>();
let remaining_entries =
source_repo.count_further_bookmark_log_entries(ctx.clone(), counter as u64, None);
let (next_entry, remaining_entries) = try_join!(next_entry, remaining_entries)?;
let delay_secs = next_entry

View File

@ -23,11 +23,7 @@ use cross_repo_sync::{
use dbbookmarks::SqlBookmarksBuilder;
use fbinit::FacebookInit;
use fixtures::linear;
use futures::{
compat::Future01CompatExt,
future::{FutureExt, TryFutureExt},
TryStreamExt,
};
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt, TryStreamExt};
use futures_ext::spawn_future;
use futures_old::{future, stream::Stream as OldStream};
use manifest::{Entry, ManifestOps};
@ -96,10 +92,9 @@ fn test_sync_entries(fb: FacebookInit) -> Result<(), Error> {
let source_repo = commit_syncer.get_source_repo();
let target_repo = commit_syncer.get_target_repo();
let next_log_entries = source_repo
let next_log_entries: Vec<_> = source_repo
.read_next_bookmark_log_entries(ctx.clone(), 0, 1000, Freshness::MostRecent)
.collect()
.compat()
.try_collect()
.await?;
// Sync entries starting from counter 0. sync_entries() function should skip
@ -475,11 +470,10 @@ async fn backsync_and_verify_master_wc(
let target_repo = commit_syncer.get_target_repo();
let ctx = CoreContext::test_mock(fb);
let next_log_entries = commit_syncer
let next_log_entries: Vec<_> = commit_syncer
.get_source_repo()
.read_next_bookmark_log_entries(ctx.clone(), 0, 1000, Freshness::MaybeStale)
.collect()
.compat()
.try_collect()
.await?;
let latest_log_id = next_log_entries.len() as i64;

View File

@ -21,11 +21,9 @@ cached_config = { git = "https://github.com/facebookexperimental/rust-shed.git",
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
clap = "2.33"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
slog = { version = "2.5", features = ["max_level_debug"] }
thiserror = "1.0"
tokio = { version = "=0.2.13", features = ["full"] }
[dev-dependencies]

View File

@ -18,8 +18,7 @@ use cross_repo_sync::{
CommitSyncOutcome, CommitSyncer, Syncers,
};
use fbinit::FacebookInit;
use futures::{compat::Future01CompatExt, future};
use futures_old::Stream;
use futures::{future, TryStreamExt};
use live_commit_sync_config::CONFIGERATOR_PUSHREDIRECT_ENABLE;
use mononoke_types::ChangesetId;
use pushredirect_enable::types::MononokePushRedirectEnable;
@ -219,7 +218,7 @@ async fn check_large_bookmark_history<M: SyncedCommitMapping + Clone + 'static>(
let large_repo = small_to_large.get_large_repo();
// Log entries are sorted newest to oldest
let log_entries = large_repo
let log_entries: Vec<_> = large_repo
.list_bookmark_log_entries(
ctx.clone(),
large_bookmark.clone(),
@ -227,8 +226,7 @@ async fn check_large_bookmark_history<M: SyncedCommitMapping + Clone + 'static>(
None,
Freshness::MostRecent,
)
.collect()
.compat()
.try_collect()
.await?;
if let Some((_, _, latest_timestamp)) = log_entries.get(0) {

View File

@ -235,7 +235,6 @@ async fn tail<
let remaining_entries = commit_syncer
.get_source_repo()
.count_further_bookmark_log_entries(ctx.clone(), start_id as u64, None)
.compat()
.await?;
if log_entries.is_empty() {

View File

@ -487,8 +487,8 @@ async fn validate_if_new_repo_merge(
p1: ChangesetId,
p2: ChangesetId,
) -> Result<Vec<ChangesetId>, Error> {
let p1gen = repo.get_generation_number(ctx.clone(), p1).compat();
let p2gen = repo.get_generation_number(ctx.clone(), p2).compat();
let p1gen = repo.get_generation_number(ctx.clone(), p1);
let p2gen = repo.get_generation_number(ctx.clone(), p2);
let (p1gen, p2gen) = try_join!(p1gen, p2gen)?;
// FIXME: this code has an assumption that parent with a smaller generation number is a
// parent that introduces a new repo. This is usually the case, however it might not be true
@ -644,7 +644,6 @@ mod test {
use cross_repo_sync_test_utils::init_small_large_repo;
use fbinit::FacebookInit;
use futures::TryStreamExt;
use futures_old::stream::Stream;
use maplit::hashset;
use mutable_counters::MutableCounters;
use tests_utils::{bookmark, resolve_cs_id, CreateCommitContext};
@ -1015,15 +1014,14 @@ mod test {
println!("start from: {}", start_from);
let read_all = 65536;
let log_entries = smallrepo
let log_entries: Vec<_> = smallrepo
.read_next_bookmark_log_entries(
ctx.clone(),
start_from as u64,
read_all,
Freshness::MostRecent,
)
.collect()
.compat()
.try_collect()
.await?;
println!(

View File

@ -296,7 +296,6 @@ impl ChangesetContext {
self.repo()
.blob_repo()
.get_generation_number(self.ctx().clone(), self.id)
.compat()
.await?
.ok_or_else(|| {
MononokeError::NotAvailable(format!("Generation number missing for {:?}", &self.id))

View File

@ -580,7 +580,6 @@ impl Repo {
let maybe_gen_num = self
.blob_repo
.get_generation_number(ctx.clone(), *cs_id)
.compat()
.await?;
maybe_gen_num.ok_or(format_err!("gen num for {} not found", cs_id))
}

View File

@ -560,7 +560,6 @@ async fn find_closest_root(
async move {
let gen_num = repo
.get_generation_number(ctx.clone(), *root)
.compat()
.await?
.ok_or(PushrebaseError::from(
PushrebaseInternalError::RootNotFound(*root),

View File

@ -877,7 +877,6 @@ async fn log_commits_to_scribe(
async move {
let get_generation = async {
repo.get_generation_number(ctx.clone(), changeset_id)
.compat()
.await?
.ok_or_else(|| Error::msg("No generation number found"))
};

View File

@ -72,7 +72,6 @@ pub async fn assert_changesets_sequence<I>(
let expected_generation = repo
.clone()
.get_generation_number(ctx.clone(), expected)
.compat()
.await
.expect("Unexpected error");
@ -91,7 +90,6 @@ pub async fn assert_changesets_sequence<I>(
let node_generation = repo
.clone()
.get_generation_number(ctx.clone(), expected)
.compat()
.await
.expect("Unexpected error");

View File

@ -35,8 +35,10 @@ impl ChangesetFetcher for TestChangesetFetcher {
ctx: CoreContext,
cs_id: ChangesetId,
) -> BoxFuture<Generation, Error> {
self.repo
.get_generation_number(ctx, cs_id)
let repo = self.repo.clone();
async move { repo.get_generation_number(ctx, cs_id).await }
.boxed()
.compat()
.and_then(move |genopt| genopt.ok_or_else(|| format_err!("{} not found", cs_id)))
.boxify()
}