convert save_bonsai_changesets to new type futures

Summary:
- convert save_bonsai_changesets to new type futures
- `blobrepo:blobrepo` is free from old futures deps

Reviewed By: StanislavGlebik

Differential Revision: D25197060

fbshipit-source-id: 910bd3f9674094b56e1133d7799cefea56c84123
This commit is contained in:
Pavel Aslanov 2020-11-30 11:58:29 -08:00 committed by Facebook GitHub Bot
parent 5a091f1b6a
commit 48b6813a06
21 changed files with 86 additions and 146 deletions

View File

@ -10,7 +10,7 @@ use anyhow::{Error, Result};
use blobrepo::{save_bonsai_changesets, BlobRepo};
use blobstore::Storable;
use context::CoreContext;
use futures::{compat::Future01CompatExt, future, stream, TryStreamExt};
use futures::{future, stream, TryStreamExt};
use mononoke_types::{
BlobstoreValue, BonsaiChangesetMut, ChangesetId, DateTime, FileChange, FileContents, FileType,
MPath, MPathElement,
@ -145,9 +145,7 @@ impl GenManifest {
None => Err(Error::msg("empty changes iterator")),
Some(csid) => {
store_changes.try_for_each(|_| future::ok(())).await?;
save_bonsai_changesets(changesets, ctx, repo)
.compat()
.await?;
save_bonsai_changesets(changesets, ctx, repo).await?;
Ok(csid)
}
}

View File

@ -30,11 +30,9 @@ repo_blobstore = { path = "repo_blobstore" }
topo_sort = { path = "../common/topo_sort" }
type_map = { path = "../common/type_map" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
stats = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
futures-old = { package = "futures", version = "0.1" }
[dev-dependencies]
benchmark_lib = { path = "../benchmark" }
@ -53,10 +51,12 @@ mercurial_types-mocks = { path = "../mercurial/types/mocks" }
scuba_ext = { path = "../common/scuba_ext" }
tests_utils = { path = "../tests/utils" }
fbinit = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { package = "futures_01_ext", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
ascii = "1.0"
assert_matches = "1.3"
async-trait = "0.1.29"
bytes = { version = "0.5", features = ["serde"] }
futures-old = { package = "futures", version = "0.1" }
maplit = "1.0"
rand = { version = "0.7", features = ["small_rng"] }
rand_distr = "0.2"

View File

@ -21,12 +21,10 @@ use cloned::cloned;
use context::CoreContext;
use filestore::FilestoreConfig;
use futures::{
compat::Future01CompatExt, future::BoxFuture, FutureExt, Stream, TryFutureExt, TryStreamExt,
};
use futures_ext::FutureExt as _;
use futures_old::{
future::{loop_fn, ok, Future as OldFuture, Loop},
stream::{self, FuturesUnordered, Stream as OldStream},
compat::Future01CompatExt,
future::{try_join, BoxFuture},
stream::FuturesUnordered,
FutureExt, Stream, TryStreamExt,
};
use metaconfig_types::DerivedDataConfig;
use mononoke_types::{
@ -418,11 +416,11 @@ impl BlobRepo {
/// This function uploads bonsai changests object to blobstore in parallel, and then does
/// sequential writes to changesets table. Parents of the changesets should already by saved
/// in the repository.
pub fn save_bonsai_changesets(
pub async fn save_bonsai_changesets(
bonsai_changesets: Vec<BonsaiChangeset>,
ctx: CoreContext,
repo: BlobRepo,
) -> impl OldFuture<Item = (), Error = Error> {
) -> Result<(), Error> {
let complete_changesets = repo.get_changesets_object();
let blobstore = repo.get_blobstore();
let repoid = repo.get_repoid();
@ -436,42 +434,29 @@ pub fn save_bonsai_changesets(
parents_to_check.remove(&bcs.get_changeset_id());
}
let parents_to_check = stream::futures_unordered(parents_to_check.into_iter().map({
cloned!(ctx, repo);
move |p| {
cloned!(ctx, repo);
async move {
let exists = repo.changeset_exists_by_bonsai(ctx, p).await?;
Ok(exists)
}
.boxed()
.compat()
.and_then(move |exists| {
if exists {
Ok(())
} else {
Err(format_err!("Commit {} does not exist in the repo", p))
let parents_to_check = parents_to_check
.into_iter()
.map({
|p| {
cloned!(ctx, repo);
async move {
let exists = repo.changeset_exists_by_bonsai(ctx, p).await?;
if exists {
Ok(())
} else {
Err(format_err!("Commit {} does not exist in the repo", p))
}
}
})
}
}))
.collect();
}
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>();
let bonsai_changesets: HashMap<_, _> = bonsai_changesets
.into_iter()
.map(|bcs| (bcs.get_changeset_id(), bcs))
.collect();
// Order of inserting bonsai changesets objects doesn't matter, so we can join them
let mut bonsai_object_futs = FuturesUnordered::new();
for bcs in bonsai_changesets.values() {
bonsai_object_futs.push(save_bonsai_changeset_object(
ctx.clone(),
blobstore.clone(),
bcs.clone(),
));
}
let bonsai_objects = bonsai_object_futs.collect();
// Order of inserting entries in changeset table matters though, so we first need to
// topologically sort commits.
let mut bcs_parents = HashMap::new();
@ -491,36 +476,41 @@ pub fn save_bonsai_changesets(
parents: bcs.parents().into_iter().collect(),
};
bonsai_complete_futs.push(complete_changesets.add(ctx.clone(), completion_record));
cloned!(ctx, complete_changesets);
bonsai_complete_futs.push(async move {
complete_changesets
.add(ctx, completion_record)
.compat()
.await
});
}
}
bonsai_objects
.join(parents_to_check)
.and_then(move |_| {
loop_fn(bonsai_complete_futs.into_iter(), move |mut futs| {
match futs.next() {
Some(fut) => fut
.and_then(move |_| ok(Loop::Continue(futs)))
.left_future(),
None => ok(Loop::Break(())).right_future(),
// Order of inserting bonsai changesets objects doesn't matter, so we can join them
let bonsai_objects = bonsai_changesets
.into_iter()
.map({
|(_, bcs)| {
cloned!(ctx, blobstore);
async move {
let bonsai_blob = bcs.into_blob();
let bcs_id = bonsai_blob.id().clone();
let blobstore_key = bcs_id.blobstore_key();
blobstore
.put(&ctx, blobstore_key, bonsai_blob.into())
.await?;
Ok(())
}
})
}
})
.and_then(|_| ok(()))
}
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>();
pub fn save_bonsai_changeset_object(
ctx: CoreContext,
blobstore: RepoBlobstore,
bonsai_cs: BonsaiChangeset,
) -> impl OldFuture<Item = (), Error = Error> {
let bonsai_blob = bonsai_cs.into_blob();
let bcs_id = bonsai_blob.id().clone();
let blobstore_key = bcs_id.blobstore_key();
try_join(bonsai_objects, parents_to_check).await?;
async move { blobstore.put(&ctx, blobstore_key, bonsai_blob.into()).await }
.boxed()
.compat()
.map(|_| ())
for bonsai_complete in bonsai_complete_futs {
bonsai_complete.await?;
}
Ok(())
}

View File

@ -643,23 +643,24 @@ async fn test_get_manifest_from_bonsai(fb: FacebookInit) {
let get_entries = {
cloned!(ctx, repo);
move |ms_hash: HgManifestId| -> BoxFuture<HashMap<String, Entry<HgManifestId, (FileType, HgFileNodeId)>>, Error> {
cloned!(ctx, repo);
async move {
ms_hash.load(&ctx, repo.blobstore()).await
}
.boxed()
.compat()
.from_err()
.map(|ms| {
Manifest::list(&ms)
.map(|(name, entry)| {
(String::from_utf8(Vec::from(name.as_ref())).unwrap(), entry)
})
.collect::<HashMap<_, _>>()
})
.boxify()
}
move |ms_hash: HgManifestId| -> BoxFuture<
HashMap<String, Entry<HgManifestId, (FileType, HgFileNodeId)>>,
Error,
> {
cloned!(ctx, repo);
async move { ms_hash.load(&ctx, repo.blobstore()).await }
.boxed()
.compat()
.from_err()
.map(|ms| {
Manifest::list(&ms)
.map(|(name, entry)| {
(String::from_utf8(Vec::from(name.as_ref())).unwrap(), entry)
})
.collect::<HashMap<_, _>>()
})
.boxify()
}
};
// #CONTENT
@ -1040,7 +1041,6 @@ async fn test_hg_commit_generation_simple(fb: FacebookInit) {
let bcs_id = bcs.get_changeset_id();
let ctx = CoreContext::test_mock(fb);
blobrepo::save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
let hg_cs_id = repo
@ -1084,7 +1084,6 @@ async fn test_hg_commit_generation_stack(fb: FacebookInit) {
let top_of_stack = changesets.last().unwrap().clone().get_changeset_id();
let ctx = CoreContext::test_mock(fb);
blobrepo::save_bonsai_changesets(changesets, ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -1112,7 +1111,6 @@ async fn test_hg_commit_generation_one_after_another(fb: FacebookInit) {
let second_bcs = create_bonsai_changeset(vec![first_bcs_id]);
let second_bcs_id = second_bcs.get_changeset_id();
blobrepo::save_bonsai_changesets(vec![first_bcs, second_bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -1215,7 +1213,6 @@ async fn test_hg_commit_generation_uneven_branch(fb: FacebookInit) {
ctx.clone(),
repo.clone(),
)
.compat()
.await
.unwrap();

View File

@ -67,7 +67,6 @@ pub async fn subcommand_create_bonsai<'a>(
Err(e) => return Err(SubcommandError::Error(anyhow!(e))),
};
let blobrepo = args::open_repo(fb, &logger, &matches).await?;
for (_, change) in bcs.file_changes() {
if let Some(change) = change {
@ -84,7 +83,6 @@ pub async fn subcommand_create_bonsai<'a>(
}
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), blobrepo.clone())
.compat()
.map_err(|e| SubcommandError::Error(anyhow!(e)))
.await?;
let hg_cs = blobrepo

View File

@ -374,9 +374,7 @@ async fn create_empty_commit_for_mapping_change(
.freeze()?;
let large_cs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), large_repo.0.clone())
.compat()
.await?;
save_bonsai_changesets(vec![bcs], ctx.clone(), large_repo.0.clone()).await?;
Ok(Large(large_cs_id))
}

View File

@ -261,7 +261,6 @@ impl Limits {
args::get_and_parse_opt::<NonZeroU64>(sub_m, ARG_TOTAL_SIZE_LIMIT);
let maybe_lfs_threshold = args::get_and_parse_opt::<NonZeroU64>(sub_m, ARG_LFS_THRESHOLD);
Self {
file_num_limit: maybe_file_num_limit,
total_size_limit: maybe_total_size_limit,
@ -422,9 +421,7 @@ async fn create_changesets(
parent = cs_id;
}
save_bonsai_changesets(changesets, ctx.clone(), repo.clone())
.compat()
.await?;
save_bonsai_changesets(changesets, ctx.clone(), repo.clone()).await?;
Ok(cs_ids)
}

View File

@ -972,7 +972,6 @@ async fn init_repos(
let rewritten = rewritten.freeze()?;
save_bonsai_changesets(vec![rewritten.clone()], ctx.clone(), target_repo.clone())
.compat()
.await?;
rewritten.get_changeset_id()
}

View File

@ -1679,9 +1679,7 @@ pub async fn upload_commits<'a>(
})
.collect();
uploader.try_for_each_concurrent(100, identity).await?;
save_bonsai_changesets(rewritten_list.clone(), ctx.clone(), target_repo.clone())
.compat()
.await?;
save_bonsai_changesets(rewritten_list.clone(), ctx.clone(), target_repo.clone()).await?;
Ok(())
}

View File

@ -140,7 +140,6 @@ async fn create_initial_commit_with_contents<'a>(
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -174,7 +173,6 @@ async fn create_empty_commit(ctx: CoreContext, repo: &BlobRepo) -> ChangesetId {
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -459,7 +457,6 @@ async fn create_commit_from_parent_and_changes<'a>(
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -497,7 +494,6 @@ async fn update_master_file(ctx: CoreContext, repo: &BlobRepo) -> ChangesetId {
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -672,7 +668,6 @@ async fn megarepo_copy_file(
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -1056,7 +1051,6 @@ async fn update_linear_1_file(ctx: CoreContext, repo: &BlobRepo) -> ChangesetId
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -1982,7 +1976,6 @@ async fn create_merge(
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();

View File

@ -74,9 +74,7 @@ async fn save_and_maybe_mark_public(
mark_public: bool,
) -> Result<ChangesetId, Error> {
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await?;
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone()).await?;
if mark_public {
repo.get_phases()

View File

@ -464,7 +464,7 @@ mod tests {
use derived_data_test_utils::bonsai_changeset_from_hg;
use fbinit::FacebookInit;
use fixtures::{many_files_dirs, store_files};
use futures::{compat::Future01CompatExt, pin_mut, stream::iter, Stream, TryStreamExt};
use futures::{pin_mut, stream::iter, Stream, TryStreamExt};
use maplit::btreemap;
use mononoke_types::{BonsaiChangeset, BonsaiChangesetMut, DateTime, FileChange, MPath};
use tests_utils::CreateCommitContext;
@ -1031,7 +1031,6 @@ mod tests {
.unwrap();
save_bonsai_changesets(vec![bcs.clone()], CoreContext::test_mock(fb), repo.clone())
.compat()
.await
.unwrap();
bcs

View File

@ -392,7 +392,6 @@ mod tests {
use blobrepo_factory::new_memblob_empty;
use fbinit::FacebookInit;
use fixtures::store_files;
use futures::compat::Future01CompatExt;
use manifest::PathOrPrefix;
use maplit::btreemap;
use mononoke_types::{
@ -603,7 +602,6 @@ mod tests {
.unwrap();
save_bonsai_changesets(vec![bcs.clone()], CoreContext::test_mock(fb), repo.clone())
.compat()
.await
.unwrap();
bcs

View File

@ -229,7 +229,6 @@ mod tests {
let bcs = create_bonsai_changeset(vec![]);
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -327,7 +326,6 @@ mod tests {
let latest = parents.get(0).unwrap();
save_bonsai_changesets(bonsais, ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -374,7 +372,6 @@ mod tests {
let latest = parents.get(0).unwrap();
save_bonsai_changesets(bonsais, ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -446,7 +443,6 @@ mod tests {
bonsais.push(bcs);
save_bonsai_changesets(bonsais, ctx.clone(), repo.clone())
.compat()
.await
.unwrap();
@ -641,9 +637,7 @@ mod tests {
println!("e = {}", e.get_changeset_id());
bonsais.push(e.clone());
save_bonsai_changesets(bonsais, ctx.clone(), repo.clone())
.compat()
.await?;
save_bonsai_changesets(bonsais, ctx.clone(), repo.clone()).await?;
verify_all_entries_for_commit(&ctx, &repo, e.get_changeset_id()).await;
Ok(())

View File

@ -927,7 +927,7 @@ mod tests {
.unwrap();
runtime
.block_on(save_bonsai_changesets(
.block_on_std(save_bonsai_changesets(
vec![bcs.clone()],
CoreContext::test_mock(fb),
repo.clone(),

View File

@ -14,7 +14,6 @@ use chrono::{DateTime, FixedOffset};
use context::CoreContext;
use filestore::{FetchKey, StoreRequest};
use futures::{
compat::Future01CompatExt,
future::try_join3,
stream::{self, FuturesOrdered, FuturesUnordered, Stream, TryStreamExt},
};
@ -449,7 +448,6 @@ impl RepoWriteContext {
self.ctx().clone(),
self.blob_repo().clone(),
)
.compat()
.await?;
Ok(ChangesetContext::new(self.repo.clone(), new_changeset_id))
}

View File

@ -917,9 +917,7 @@ async fn create_rebased_changesets(
rebased.push(bcs_new);
}
save_bonsai_changesets(rebased, ctx.clone(), repo.clone())
.compat()
.await?;
save_bonsai_changesets(rebased, ctx.clone(), repo.clone()).await?;
Ok((
remapping
.get(&head)

View File

@ -19,7 +19,6 @@ use bytes::Bytes;
use chrono::Utc;
use context::CoreContext;
use futures::{
compat::Future01CompatExt,
future::try_join,
stream::{FuturesUnordered, TryStreamExt},
};

View File

@ -198,9 +198,7 @@ async fn rewrite_file_paths(
bonsai_changesets = sort_bcs(&bonsai_changesets)?;
let bcs_ids = get_cs_ids(&bonsai_changesets);
info!(ctx.logger(), "Saving shifted bonsai changesets");
save_bonsai_changesets(bonsai_changesets, ctx.clone(), repo.clone())
.compat()
.await?;
save_bonsai_changesets(bonsai_changesets, ctx.clone(), repo.clone()).await?;
info!(ctx.logger(), "Saved shifted bonsai changesets");
Ok(bcs_ids)
}
@ -570,9 +568,7 @@ async fn merge_imported_commit(
"Created merge bonsai: {} and changeset: {:?}", merged_cs_id, merged_cs
);
save_bonsai_changesets(vec![merged_cs], ctx.clone(), repo.clone())
.compat()
.await?;
save_bonsai_changesets(vec![merged_cs], ctx.clone(), repo.clone()).await?;
info!(ctx.logger(), "Finished merging");
Ok(merged_cs_id)
}
@ -1055,9 +1051,7 @@ async fn repo_import(
bonsai_values.iter().map(|(id, _)| id.clone()).collect();
info!(ctx.logger(), "Saving gitimported bonsai changesets");
save_bonsai_changesets(gitimport_bcs.clone(), ctx.clone(), repo.clone())
.compat()
.await?;
save_bonsai_changesets(gitimport_bcs.clone(), ctx.clone(), repo.clone()).await?;
info!(ctx.logger(), "Saved gitimported bonsai changesets");
recovery_fields.import_stage = ImportStage::RewritePaths;

View File

@ -109,7 +109,6 @@ async fn create_bonsai_changeset_from_test_data(
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx.clone(), blobrepo.clone())
.compat()
.await
.unwrap();
@ -1576,7 +1575,6 @@ pub async fn save_diamond_commits(
ctx.clone(),
repo.clone(),
)
.compat()
.await
.map(move |()| fourth_bcs_id)
}

View File

@ -279,9 +279,7 @@ impl<'a> CreateCommitContext<'a> {
let bcs = bcs.freeze()?;
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], self.ctx.clone(), self.repo.clone())
.compat()
.await?;
save_bonsai_changesets(vec![bcs], self.ctx.clone(), self.repo.clone()).await?;
Ok(bcs_id)
}
}
@ -591,7 +589,6 @@ pub async fn create_commit(
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx, repo.clone())
.compat()
.await
.unwrap();
bcs_id
@ -619,7 +616,6 @@ pub async fn create_commit_with_date(
let bcs_id = bcs.get_changeset_id();
save_bonsai_changesets(vec![bcs], ctx, repo.clone())
.compat()
.await
.unwrap();
bcs_id