mononoke: migrate megarepolib to new futures

Summary:
I'm going to change it in the next diffs, so let's migrate them to new futures
first.

Reviewed By: krallin

Differential Revision: D22065216

fbshipit-source-id: b06fcac518d684f40b1d19fcef9118ca51236873
This commit is contained in:
Stanislau Hlebik 2020-06-16 03:31:23 -07:00 committed by Facebook GitHub Bot
parent 262ce6f362
commit e7e21f9c30
4 changed files with 144 additions and 204 deletions

View File

@ -199,13 +199,12 @@ pub async fn init_small_large_repo(
mark_public: false,
};
let move_hg_cs = perform_move(
ctx.clone(),
megarepo.clone(),
&ctx,
&megarepo,
second_bcs_id,
Arc::new(prefix_mover),
move_cs_args,
)
.compat()
.await?;
let maybe_move_bcs_id = megarepo

View File

@ -15,9 +15,9 @@ manifest = { path = "../manifest" }
mercurial_types = { path = "../mercurial/types" }
mononoke_types = { path = "../mononoke_types" }
movers = { path = "../commit_rewriting/movers" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
futures = { version = "0.3", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
slog = { version = "2.5", features = ["max_level_debug"] }
@ -25,7 +25,7 @@ slog = { version = "2.5", features = ["max_level_debug"] }
blobrepo_factory = { path = "../blobrepo/factory" }
fixtures = { path = "../tests/fixtures" }
async_unit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures = { version = "0.3", features = ["async-await", "compat"] }
maplit = "1.0"
tokio-compat = "0.1"

View File

@ -8,13 +8,10 @@
use anyhow::{format_err, Error};
use blobrepo::{save_bonsai_changesets, BlobRepo};
use bookmarks::{BookmarkName, BookmarkUpdateReason};
use cloned::cloned;
use context::CoreContext;
use futures_old::future::{err, ok, Future};
use futures_old::IntoFuture;
use futures::compat::Future01CompatExt;
use slog::info;
use futures_ext::FutureExt;
use mercurial_types::{HgChangesetId, MPath};
use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, ChangesetId, DateTime, FileChange};
use std::collections::BTreeMap;
@ -28,13 +25,13 @@ pub struct ChangesetArgs {
pub mark_public: bool,
}
pub fn create_and_save_changeset(
ctx: CoreContext,
repo: BlobRepo,
pub async fn create_and_save_changeset(
ctx: &CoreContext,
repo: &BlobRepo,
parents: Vec<ChangesetId>,
file_changes: BTreeMap<MPath, Option<FileChange>>,
changeset_args: ChangesetArgs,
) -> impl Future<Item = HgChangesetId, Error = Error> {
) -> Result<HgChangesetId, Error> {
let ChangesetArgs {
author,
message,
@ -42,101 +39,75 @@ pub fn create_and_save_changeset(
bookmark: maybe_bookmark,
mark_public,
} = changeset_args;
create_bonsai_changeset_only(parents, file_changes, author, message, datetime)
.and_then({
cloned!(ctx, repo);
move |bcs| save_and_maybe_mark_public(ctx, repo, bcs, mark_public)
})
.and_then({
cloned!(ctx, repo);
move |bcs_id| match maybe_bookmark {
Some(bookmark) => create_bookmark(ctx, repo, bookmark, bcs_id.clone())
.map(move |_| bcs_id)
.left_future(),
None => ok(bcs_id).right_future(),
}
})
.and_then({
cloned!(ctx, repo);
move |bcs_id| generate_hg_changeset(ctx, repo, bcs_id)
})
let bcs = create_bonsai_changeset_only(parents, file_changes, author, message, datetime)?;
let bcs_id = save_and_maybe_mark_public(&ctx, &repo, bcs, mark_public).await?;
if let Some(bookmark) = maybe_bookmark {
create_bookmark(ctx, repo, bookmark, bcs_id).await?;
}
generate_hg_changeset(ctx, repo, bcs_id).await
}
fn save_and_maybe_mark_public(
ctx: CoreContext,
repo: BlobRepo,
async fn save_and_maybe_mark_public(
ctx: &CoreContext,
repo: &BlobRepo,
bcs: BonsaiChangeset,
mark_public: bool,
) -> impl Future<Item = ChangesetId, Error = Error> {
) -> Result<ChangesetId, Error> {
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs.clone()], ctx.clone(), repo.clone()).and_then({
cloned!(ctx, repo);
move |_| {
if mark_public {
repo.get_phases()
.add_reachable_as_public(ctx.clone(), vec![bcs_id.clone()])
.map(move |_| {
info!(ctx.logger(), "Marked as public {:?}", bcs_id);
bcs_id
})
.left_future()
} else {
ok(bcs_id).right_future()
}
}
})
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await?;
if mark_public {
repo.get_phases()
.add_reachable_as_public(ctx.clone(), vec![bcs_id])
.compat()
.await?;
info!(ctx.logger(), "Marked as public {:?}", bcs_id);
}
Ok(bcs_id)
}
fn generate_hg_changeset(
ctx: CoreContext,
repo: BlobRepo,
async fn generate_hg_changeset(
ctx: &CoreContext,
repo: &BlobRepo,
bcs_id: ChangesetId,
) -> impl Future<Item = HgChangesetId, Error = Error> {
) -> Result<HgChangesetId, Error> {
info!(ctx.logger(), "Generating an HG equivalent of {:?}", bcs_id);
repo.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id.clone())
.map({
cloned!(ctx);
move |hg_cs_id| {
info!(
ctx.logger(),
"Hg equivalent of {:?} is: {:?}", bcs_id, hg_cs_id
);
hg_cs_id
}
})
let hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), bcs_id)
.compat()
.await?;
info!(
ctx.logger(),
"Hg equivalent of {:?} is: {:?}", bcs_id, hg_cs_id
);
Ok(hg_cs_id)
}
fn create_bookmark(
ctx: CoreContext,
repo: BlobRepo,
async fn create_bookmark(
ctx: &CoreContext,
repo: &BlobRepo,
bookmark: BookmarkName,
bcs_id: ChangesetId,
) -> impl Future<Item = (), Error = Error> {
) -> Result<(), Error> {
info!(
ctx.logger(),
"Setting bookmark {:?} to point to {:?}", bookmark, bcs_id
);
let mut transaction = repo.clone().update_bookmark_transaction(ctx.clone());
if let Err(e) =
transaction.force_set(&bookmark, bcs_id.clone(), BookmarkUpdateReason::ManualMove)
{
return err(e).left_future();
let mut transaction = repo.update_bookmark_transaction(ctx.clone());
transaction.force_set(&bookmark, bcs_id, BookmarkUpdateReason::ManualMove)?;
let commit_result = transaction.commit().compat().await?;
if !commit_result {
Err(format_err!("Logical failure while setting {:?}", bookmark))
} else {
info!(ctx.logger(), "Setting bookmark {:?} finished", bookmark);
Ok(())
}
transaction
.commit()
.and_then({
cloned!(ctx);
move |commit_result| {
if !commit_result {
err(format_err!("Logical failure while setting {:?}", bookmark))
} else {
info!(ctx.logger(), "Setting bookmark {:?} finished", bookmark);
ok(bcs_id)
}
}
})
.map(|_| ())
.right_future()
}
fn create_bonsai_changeset_only(
@ -145,7 +116,7 @@ fn create_bonsai_changeset_only(
author: String,
message: String,
datetime: DateTime,
) -> impl Future<Item = BonsaiChangeset, Error = Error> {
) -> Result<BonsaiChangeset, Error> {
BonsaiChangesetMut {
parents,
author: author.clone(),
@ -157,5 +128,4 @@ fn create_bonsai_changeset_only(
file_changes,
}
.freeze()
.into_future()
}

View File

@ -8,11 +8,10 @@
use anyhow::Error;
use blobrepo::BlobRepo;
use blobstore::Loadable;
use cloned::cloned;
use context::CoreContext;
use futures_old::{
future::{ok, Future, FutureResult},
stream::{iter_ok, Stream},
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future, stream, Stream, StreamExt, TryStreamExt,
};
use manifest::ManifestOps;
use mercurial_types::{
@ -23,7 +22,6 @@ use mononoke_types::{ChangesetId, ContentId, FileChange, FileType};
use movers::Mover;
use slog::info;
use std::collections::BTreeMap;
use std::iter::Iterator;
pub mod common;
use crate::common::{create_and_save_changeset, ChangesetArgs};
@ -55,116 +53,105 @@ fn get_file_changes(
res
}
fn get_move_file_changes(
ctx: CoreContext,
repo: BlobRepo,
fn get_move_file_changes<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
hg_cs: HgBlobChangeset,
parent_cs_id: ChangesetId,
path_converter: Mover,
) -> impl Stream<Item = (MPath, Option<FileChange>), Error = Error> {
path_converter: &'a Mover,
) -> impl Stream<Item = Result<(MPath, Option<FileChange>), Error>> + 'a {
hg_cs
.manifestid()
.list_leaf_entries(ctx.clone(), repo.get_blobstore())
.filter_map(move |(old_path, (file_type, filenode_id))| {
.compat()
.try_filter_map(move |(old_path, (file_type, filenode_id))| async move {
let maybe_new_path = path_converter(&old_path).unwrap();
if Some(old_path.clone()) == maybe_new_path {
if Some(&old_path) == maybe_new_path.as_ref() {
// path does not need to be changed, drop from the stream
None
Ok(None)
} else {
// path needs to be changed (or deleted), keep in the stream
Some((old_path.clone(), maybe_new_path, file_type, filenode_id))
Ok(Some((old_path, maybe_new_path, file_type, filenode_id)))
}
})
.map({
cloned!(ctx, repo, parent_cs_id);
.map_ok({
move |(old_path, maybe_new_path, file_type, filenode_id)| {
filenode_id
.load(ctx.clone(), repo.blobstore())
.from_err()
.map(move |file_envelope| {
// Note: it is always safe to unwrap here, since
// `HgFileEnvelope::get_size()` always returns `Some()`
// The return type is `Option` to acommodate `HgManifestEnvelope`
// which returns `None`.
let file_size = file_envelope.get_size().unwrap();
iter_ok(get_file_changes(
old_path,
maybe_new_path,
file_type,
file_size,
file_envelope.content_id(),
parent_cs_id,
))
})
async move {
let file_envelope = filenode_id
.load(ctx.clone(), repo.blobstore())
.compat()
.await?;
// Note: it is always safe to unwrap here, since
// `HgFileEnvelope::get_size()` always returns `Some()`
// The return type is `Option` to acommodate `HgManifestEnvelope`
// which returns `None`.
let file_size = file_envelope.get_size().unwrap();
let stream = stream::iter(get_file_changes(
old_path,
maybe_new_path,
file_type,
file_size,
file_envelope.content_id(),
parent_cs_id,
));
Ok(stream.map(Ok))
}
}
})
.buffer_unordered(BUFFER_SIZE)
.flatten()
.try_buffer_unordered(BUFFER_SIZE)
.try_flatten()
}
pub fn perform_move(
ctx: CoreContext,
repo: BlobRepo,
pub async fn perform_move<'a>(
ctx: &'a CoreContext,
repo: &'a BlobRepo,
parent_bcs_id: ChangesetId,
path_converter: Mover,
resulting_changeset_args: ChangesetArgs,
) -> impl Future<Item = HgChangesetId, Error = Error> {
repo.clone()
) -> Result<HgChangesetId, Error> {
let parent_hg_cs_id = repo
.get_hg_from_bonsai_changeset(ctx.clone(), parent_bcs_id)
.and_then({
cloned!(repo, parent_bcs_id);
move |parent_hg_cs_id| {
parent_hg_cs_id
.load(ctx.clone(), repo.blobstore())
.from_err()
.and_then({
cloned!(ctx, parent_bcs_id, repo);
move |parent_hg_cs| {
get_move_file_changes(
ctx.clone(),
repo.clone(),
parent_hg_cs,
parent_bcs_id.clone(),
path_converter,
)
.fold(vec![], {
cloned!(ctx);
move |mut collected, item| {
collected.push(item);
if collected.len() % REPORTING_INTERVAL_FILES == 0 {
info!(ctx.logger(), "Processed {} files", collected.len());
}
let res: FutureResult<Vec<_>, Error> = ok(collected);
res
}
})
.and_then({
cloned!(ctx, repo, parent_bcs_id);
move |file_changes: Vec<(MPath, Option<FileChange>)>| {
let file_changes: BTreeMap<_, _> =
file_changes.into_iter().collect();
create_and_save_changeset(
ctx,
repo,
vec![parent_bcs_id],
file_changes,
resulting_changeset_args,
)
}
})
}
})
}
})
.compat()
.await?;
let parent_hg_cs = parent_hg_cs_id
.load(ctx.clone(), repo.blobstore())
.compat()
.await?;
let file_changes =
get_move_file_changes(&ctx, &repo, parent_hg_cs, parent_bcs_id, &path_converter)
.try_fold(BTreeMap::new(), {
move |mut collected, (path, file_change)| {
collected.insert(path, file_change);
if collected.len() % REPORTING_INTERVAL_FILES == 0 {
info!(ctx.logger(), "Processed {} files", collected.len());
}
future::ready(Ok(collected))
}
})
.await?;
create_and_save_changeset(
&ctx,
&repo,
vec![parent_bcs_id],
file_changes,
resulting_changeset_args,
)
.await
}
#[cfg(test)]
mod test {
use super::*;
use anyhow::Result;
use cloned::cloned;
use fbinit::FacebookInit;
use fixtures::many_files_dirs;
use futures::compat::Future01CompatExt;
use futures_old::{stream::Stream, Future};
use maplit::btreemap;
use mercurial_types::HgChangesetId;
use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, DateTime};
@ -255,13 +242,12 @@ mod test {
async_unit::tokio_unit_test(async move {
let (ctx, repo, _hg_cs_id, bcs_id, changeset_args) = prepare(fb).await;
let newcs = perform_move(
ctx.clone(),
repo.clone(),
&ctx,
&repo,
bcs_id,
Arc::new(identity_mover),
changeset_args,
)
.compat()
.await
.unwrap();
let newcs = get_bonsai_by_hg_cs_id(ctx.clone(), repo.clone(), newcs).await;
@ -285,16 +271,9 @@ mod test {
fn test_drop_file(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let (ctx, repo, _hg_cs_id, bcs_id, changeset_args) = prepare(fb).await;
let newcs = perform_move(
ctx.clone(),
repo.clone(),
bcs_id,
Arc::new(skip_one),
changeset_args,
)
.compat()
.await
.unwrap();
let newcs = perform_move(&ctx, &repo, bcs_id, Arc::new(skip_one), changeset_args)
.await
.unwrap();
let newcs = get_bonsai_by_hg_cs_id(ctx.clone(), repo.clone(), newcs).await;
let BonsaiChangesetMut {
@ -321,16 +300,9 @@ mod test {
fn test_shift_path_by_one(fb: FacebookInit) {
async_unit::tokio_unit_test(async move {
let (ctx, repo, _hg_cs_id, bcs_id, changeset_args) = prepare(fb).await;
let newcs = perform_move(
ctx.clone(),
repo.clone(),
bcs_id,
Arc::new(shift_one),
changeset_args,
)
.compat()
.await
.unwrap();
let newcs = perform_move(&ctx, &repo, bcs_id, Arc::new(shift_one), changeset_args)
.await
.unwrap();
let newcs = get_bonsai_by_hg_cs_id(ctx.clone(), repo.clone(), newcs).await;
let BonsaiChangesetMut {
@ -390,13 +362,12 @@ mod test {
async_unit::tokio_unit_test(async move {
let (ctx, repo, old_hg_cs_id, old_bcs_id, changeset_args) = prepare(fb).await;
let new_hg_cs_id = perform_move(
ctx.clone(),
repo.clone(),
&ctx,
&repo,
old_bcs_id,
Arc::new(shift_one_skip_another),
changeset_args,
)
.compat()
.await
.unwrap();
let mut old_wc =