From e7e21f9c30d25a1c9584de0adf4fb429810df393 Mon Sep 17 00:00:00 2001 From: Stanislau Hlebik Date: Tue, 16 Jun 2020 03:31:23 -0700 Subject: [PATCH] mononoke: migrate megarepolib to new futures Summary: I'm going to change it in the next diffs, so let's migrate them to new futures first. Reviewed By: krallin Differential Revision: D22065216 fbshipit-source-id: b06fcac518d684f40b1d19fcef9118ca51236873 --- .../cross_repo_sync/test_utils/lib.rs | 5 +- eden/mononoke/megarepolib/Cargo.toml | 4 +- eden/mononoke/megarepolib/src/common.rs | 144 +++++-------- eden/mononoke/megarepolib/src/lib.rs | 195 ++++++++---------- 4 files changed, 144 insertions(+), 204 deletions(-) diff --git a/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/lib.rs b/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/lib.rs index 3dfa2a7cb5..2f23e1f512 100644 --- a/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/lib.rs +++ b/eden/mononoke/commit_rewriting/cross_repo_sync/test_utils/lib.rs @@ -199,13 +199,12 @@ pub async fn init_small_large_repo( mark_public: false, }; let move_hg_cs = perform_move( - ctx.clone(), - megarepo.clone(), + &ctx, + &megarepo, second_bcs_id, Arc::new(prefix_mover), move_cs_args, ) - .compat() .await?; let maybe_move_bcs_id = megarepo diff --git a/eden/mononoke/megarepolib/Cargo.toml b/eden/mononoke/megarepolib/Cargo.toml index 00cea2605e..d676a0ccd7 100644 --- a/eden/mononoke/megarepolib/Cargo.toml +++ b/eden/mononoke/megarepolib/Cargo.toml @@ -15,9 +15,9 @@ manifest = { path = "../manifest" } mercurial_types = { path = "../mercurial/types" } mononoke_types = { path = "../mononoke_types" } movers = { path = "../commit_rewriting/movers" } -cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } anyhow = "1.0" +futures = { version = "0.3", features = ["async-await", "compat"] } futures-old = { package = "futures", version = "0.1" } slog = { version = "2.5", features = ["max_level_debug"] } @@ -25,7 +25,7 @@ slog = { version = "2.5", features = ["max_level_debug"] } blobrepo_factory = { path = "../blobrepo/factory" } fixtures = { path = "../tests/fixtures" } async_unit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } +cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -futures = { version = "0.3", features = ["async-await", "compat"] } maplit = "1.0" tokio-compat = "0.1" diff --git a/eden/mononoke/megarepolib/src/common.rs b/eden/mononoke/megarepolib/src/common.rs index ffd38673c5..cb9b3c3720 100644 --- a/eden/mononoke/megarepolib/src/common.rs +++ b/eden/mononoke/megarepolib/src/common.rs @@ -8,13 +8,10 @@ use anyhow::{format_err, Error}; use blobrepo::{save_bonsai_changesets, BlobRepo}; use bookmarks::{BookmarkName, BookmarkUpdateReason}; -use cloned::cloned; use context::CoreContext; -use futures_old::future::{err, ok, Future}; -use futures_old::IntoFuture; +use futures::compat::Future01CompatExt; use slog::info; -use futures_ext::FutureExt; use mercurial_types::{HgChangesetId, MPath}; use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, ChangesetId, DateTime, FileChange}; use std::collections::BTreeMap; @@ -28,13 +25,13 @@ pub struct ChangesetArgs { pub mark_public: bool, } -pub fn create_and_save_changeset( - ctx: CoreContext, - repo: BlobRepo, +pub async fn create_and_save_changeset( + ctx: &CoreContext, + repo: &BlobRepo, parents: Vec, file_changes: BTreeMap>, changeset_args: ChangesetArgs, -) -> impl Future { +) -> Result { let ChangesetArgs { author, message, @@ -42,101 +39,75 @@ pub fn create_and_save_changeset( bookmark: maybe_bookmark, mark_public, } = changeset_args; - create_bonsai_changeset_only(parents, file_changes, author, message, datetime) - .and_then({ - cloned!(ctx, repo); - move |bcs| save_and_maybe_mark_public(ctx, repo, bcs, mark_public) - }) - .and_then({ - cloned!(ctx, repo); - move |bcs_id| match maybe_bookmark { - Some(bookmark) => create_bookmark(ctx, repo, bookmark, bcs_id.clone()) - .map(move |_| bcs_id) - .left_future(), - None => ok(bcs_id).right_future(), - } - }) - .and_then({ - cloned!(ctx, repo); - move |bcs_id| generate_hg_changeset(ctx, repo, bcs_id) - }) + let bcs = create_bonsai_changeset_only(parents, file_changes, author, message, datetime)?; + let bcs_id = save_and_maybe_mark_public(&ctx, &repo, bcs, mark_public).await?; + + if let Some(bookmark) = maybe_bookmark { + create_bookmark(ctx, repo, bookmark, bcs_id).await?; + } + generate_hg_changeset(ctx, repo, bcs_id).await } -fn save_and_maybe_mark_public( - ctx: CoreContext, - repo: BlobRepo, +async fn save_and_maybe_mark_public( + ctx: &CoreContext, + repo: &BlobRepo, bcs: BonsaiChangeset, mark_public: bool, -) -> impl Future { +) -> Result { let bcs_id = bcs.get_changeset_id(); - save_bonsai_changesets(vec![bcs.clone()], ctx.clone(), repo.clone()).and_then({ - cloned!(ctx, repo); - move |_| { - if mark_public { - repo.get_phases() - .add_reachable_as_public(ctx.clone(), vec![bcs_id.clone()]) - .map(move |_| { - info!(ctx.logger(), "Marked as public {:?}", bcs_id); - bcs_id - }) - .left_future() - } else { - ok(bcs_id).right_future() - } - } - }) + save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone()) + .compat() + .await?; + + if mark_public { + repo.get_phases() + .add_reachable_as_public(ctx.clone(), vec![bcs_id]) + .compat() + .await?; + info!(ctx.logger(), "Marked as public {:?}", bcs_id); + } + Ok(bcs_id) } -fn generate_hg_changeset( - ctx: CoreContext, - repo: BlobRepo, +async fn generate_hg_changeset( + ctx: &CoreContext, + repo: &BlobRepo, bcs_id: ChangesetId, -) -> impl Future { +) -> Result { info!(ctx.logger(), "Generating an HG equivalent of {:?}", bcs_id); - repo.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id.clone()) - .map({ - cloned!(ctx); - move |hg_cs_id| { - info!( - ctx.logger(), - "Hg equivalent of {:?} is: {:?}", bcs_id, hg_cs_id - ); - hg_cs_id - } - }) + let hg_cs_id = repo + .get_hg_from_bonsai_changeset(ctx.clone(), bcs_id) + .compat() + .await?; + + info!( + ctx.logger(), + "Hg equivalent of {:?} is: {:?}", bcs_id, hg_cs_id + ); + Ok(hg_cs_id) } -fn create_bookmark( - ctx: CoreContext, - repo: BlobRepo, +async fn create_bookmark( + ctx: &CoreContext, + repo: &BlobRepo, bookmark: BookmarkName, bcs_id: ChangesetId, -) -> impl Future { +) -> Result<(), Error> { info!( ctx.logger(), "Setting bookmark {:?} to point to {:?}", bookmark, bcs_id ); - let mut transaction = repo.clone().update_bookmark_transaction(ctx.clone()); - if let Err(e) = - transaction.force_set(&bookmark, bcs_id.clone(), BookmarkUpdateReason::ManualMove) - { - return err(e).left_future(); + let mut transaction = repo.update_bookmark_transaction(ctx.clone()); + transaction.force_set(&bookmark, bcs_id, BookmarkUpdateReason::ManualMove)?; + + let commit_result = transaction.commit().compat().await?; + + if !commit_result { + Err(format_err!("Logical failure while setting {:?}", bookmark)) + } else { + info!(ctx.logger(), "Setting bookmark {:?} finished", bookmark); + Ok(()) } - transaction - .commit() - .and_then({ - cloned!(ctx); - move |commit_result| { - if !commit_result { - err(format_err!("Logical failure while setting {:?}", bookmark)) - } else { - info!(ctx.logger(), "Setting bookmark {:?} finished", bookmark); - ok(bcs_id) - } - } - }) - .map(|_| ()) - .right_future() } fn create_bonsai_changeset_only( @@ -145,7 +116,7 @@ fn create_bonsai_changeset_only( author: String, message: String, datetime: DateTime, -) -> impl Future { +) -> Result { BonsaiChangesetMut { parents, author: author.clone(), @@ -157,5 +128,4 @@ fn create_bonsai_changeset_only( file_changes, } .freeze() - .into_future() } diff --git a/eden/mononoke/megarepolib/src/lib.rs b/eden/mononoke/megarepolib/src/lib.rs index fb05d0e67b..25c6b1a678 100644 --- a/eden/mononoke/megarepolib/src/lib.rs +++ b/eden/mononoke/megarepolib/src/lib.rs @@ -8,11 +8,10 @@ use anyhow::Error; use blobrepo::BlobRepo; use blobstore::Loadable; -use cloned::cloned; use context::CoreContext; -use futures_old::{ - future::{ok, Future, FutureResult}, - stream::{iter_ok, Stream}, +use futures::{ + compat::{Future01CompatExt, Stream01CompatExt}, + future, stream, Stream, StreamExt, TryStreamExt, }; use manifest::ManifestOps; use mercurial_types::{ @@ -23,7 +22,6 @@ use mononoke_types::{ChangesetId, ContentId, FileChange, FileType}; use movers::Mover; use slog::info; use std::collections::BTreeMap; -use std::iter::Iterator; pub mod common; use crate::common::{create_and_save_changeset, ChangesetArgs}; @@ -55,116 +53,105 @@ fn get_file_changes( res } -fn get_move_file_changes( - ctx: CoreContext, - repo: BlobRepo, +fn get_move_file_changes<'a>( + ctx: &'a CoreContext, + repo: &'a BlobRepo, hg_cs: HgBlobChangeset, parent_cs_id: ChangesetId, - path_converter: Mover, -) -> impl Stream), Error = Error> { + path_converter: &'a Mover, +) -> impl Stream), Error>> + 'a { hg_cs .manifestid() .list_leaf_entries(ctx.clone(), repo.get_blobstore()) - .filter_map(move |(old_path, (file_type, filenode_id))| { + .compat() + .try_filter_map(move |(old_path, (file_type, filenode_id))| async move { let maybe_new_path = path_converter(&old_path).unwrap(); - if Some(old_path.clone()) == maybe_new_path { + if Some(&old_path) == maybe_new_path.as_ref() { // path does not need to be changed, drop from the stream - None + Ok(None) } else { // path needs to be changed (or deleted), keep in the stream - Some((old_path.clone(), maybe_new_path, file_type, filenode_id)) + Ok(Some((old_path, maybe_new_path, file_type, filenode_id))) } }) - .map({ - cloned!(ctx, repo, parent_cs_id); + .map_ok({ move |(old_path, maybe_new_path, file_type, filenode_id)| { - filenode_id - .load(ctx.clone(), repo.blobstore()) - .from_err() - .map(move |file_envelope| { - // Note: it is always safe to unwrap here, since - // `HgFileEnvelope::get_size()` always returns `Some()` - // The return type is `Option` to acommodate `HgManifestEnvelope` - // which returns `None`. - let file_size = file_envelope.get_size().unwrap(); - iter_ok(get_file_changes( - old_path, - maybe_new_path, - file_type, - file_size, - file_envelope.content_id(), - parent_cs_id, - )) - }) + async move { + let file_envelope = filenode_id + .load(ctx.clone(), repo.blobstore()) + .compat() + .await?; + + // Note: it is always safe to unwrap here, since + // `HgFileEnvelope::get_size()` always returns `Some()` + // The return type is `Option` to acommodate `HgManifestEnvelope` + // which returns `None`. + let file_size = file_envelope.get_size().unwrap(); + let stream = stream::iter(get_file_changes( + old_path, + maybe_new_path, + file_type, + file_size, + file_envelope.content_id(), + parent_cs_id, + )); + Ok(stream.map(Ok)) + } } }) - .buffer_unordered(BUFFER_SIZE) - .flatten() + .try_buffer_unordered(BUFFER_SIZE) + .try_flatten() } -pub fn perform_move( - ctx: CoreContext, - repo: BlobRepo, +pub async fn perform_move<'a>( + ctx: &'a CoreContext, + repo: &'a BlobRepo, parent_bcs_id: ChangesetId, path_converter: Mover, resulting_changeset_args: ChangesetArgs, -) -> impl Future { - repo.clone() +) -> Result { + let parent_hg_cs_id = repo .get_hg_from_bonsai_changeset(ctx.clone(), parent_bcs_id) - .and_then({ - cloned!(repo, parent_bcs_id); - move |parent_hg_cs_id| { - parent_hg_cs_id - .load(ctx.clone(), repo.blobstore()) - .from_err() - .and_then({ - cloned!(ctx, parent_bcs_id, repo); - move |parent_hg_cs| { - get_move_file_changes( - ctx.clone(), - repo.clone(), - parent_hg_cs, - parent_bcs_id.clone(), - path_converter, - ) - .fold(vec![], { - cloned!(ctx); - move |mut collected, item| { - collected.push(item); - if collected.len() % REPORTING_INTERVAL_FILES == 0 { - info!(ctx.logger(), "Processed {} files", collected.len()); - } - let res: FutureResult, Error> = ok(collected); - res - } - }) - .and_then({ - cloned!(ctx, repo, parent_bcs_id); - move |file_changes: Vec<(MPath, Option)>| { - let file_changes: BTreeMap<_, _> = - file_changes.into_iter().collect(); - create_and_save_changeset( - ctx, - repo, - vec![parent_bcs_id], - file_changes, - resulting_changeset_args, - ) - } - }) - } - }) - } - }) + .compat() + .await?; + + let parent_hg_cs = parent_hg_cs_id + .load(ctx.clone(), repo.blobstore()) + .compat() + .await?; + + let file_changes = + get_move_file_changes(&ctx, &repo, parent_hg_cs, parent_bcs_id, &path_converter) + .try_fold(BTreeMap::new(), { + move |mut collected, (path, file_change)| { + collected.insert(path, file_change); + if collected.len() % REPORTING_INTERVAL_FILES == 0 { + info!(ctx.logger(), "Processed {} files", collected.len()); + } + future::ready(Ok(collected)) + } + }) + .await?; + + create_and_save_changeset( + &ctx, + &repo, + vec![parent_bcs_id], + file_changes, + resulting_changeset_args, + ) + .await } #[cfg(test)] mod test { use super::*; use anyhow::Result; + use cloned::cloned; use fbinit::FacebookInit; use fixtures::many_files_dirs; use futures::compat::Future01CompatExt; + use futures_old::{stream::Stream, Future}; use maplit::btreemap; use mercurial_types::HgChangesetId; use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, DateTime}; @@ -255,13 +242,12 @@ mod test { async_unit::tokio_unit_test(async move { let (ctx, repo, _hg_cs_id, bcs_id, changeset_args) = prepare(fb).await; let newcs = perform_move( - ctx.clone(), - repo.clone(), + &ctx, + &repo, bcs_id, Arc::new(identity_mover), changeset_args, ) - .compat() .await .unwrap(); let newcs = get_bonsai_by_hg_cs_id(ctx.clone(), repo.clone(), newcs).await; @@ -285,16 +271,9 @@ mod test { fn test_drop_file(fb: FacebookInit) { async_unit::tokio_unit_test(async move { let (ctx, repo, _hg_cs_id, bcs_id, changeset_args) = prepare(fb).await; - let newcs = perform_move( - ctx.clone(), - repo.clone(), - bcs_id, - Arc::new(skip_one), - changeset_args, - ) - .compat() - .await - .unwrap(); + let newcs = perform_move(&ctx, &repo, bcs_id, Arc::new(skip_one), changeset_args) + .await + .unwrap(); let newcs = get_bonsai_by_hg_cs_id(ctx.clone(), repo.clone(), newcs).await; let BonsaiChangesetMut { @@ -321,16 +300,9 @@ mod test { fn test_shift_path_by_one(fb: FacebookInit) { async_unit::tokio_unit_test(async move { let (ctx, repo, _hg_cs_id, bcs_id, changeset_args) = prepare(fb).await; - let newcs = perform_move( - ctx.clone(), - repo.clone(), - bcs_id, - Arc::new(shift_one), - changeset_args, - ) - .compat() - .await - .unwrap(); + let newcs = perform_move(&ctx, &repo, bcs_id, Arc::new(shift_one), changeset_args) + .await + .unwrap(); let newcs = get_bonsai_by_hg_cs_id(ctx.clone(), repo.clone(), newcs).await; let BonsaiChangesetMut { @@ -390,13 +362,12 @@ mod test { async_unit::tokio_unit_test(async move { let (ctx, repo, old_hg_cs_id, old_bcs_id, changeset_args) = prepare(fb).await; let new_hg_cs_id = perform_move( - ctx.clone(), - repo.clone(), + &ctx, + &repo, old_bcs_id, Arc::new(shift_one_skip_another), changeset_args, ) - .compat() .await .unwrap(); let mut old_wc =