From 866275e6262c2358017c3dcbaa9be18869b9e01f Mon Sep 17 00:00:00 2001 From: Mark Juggurnauth-Thomas Date: Wed, 4 Nov 2020 08:37:54 -0800 Subject: [PATCH] fsnodes: convert fsnodes::collect_fsnode_subentries to new futures Reviewed By: krallin Differential Revision: D24726148 fbshipit-source-id: 7e37156b73aaea22280b6e3322520c5eab9e71fe --- eden/mononoke/derived_data/fsnodes/Cargo.toml | 2 +- eden/mononoke/derived_data/fsnodes/derive.rs | 179 ++++++++---------- 2 files changed, 83 insertions(+), 98 deletions(-) diff --git a/eden/mononoke/derived_data/fsnodes/Cargo.toml b/eden/mononoke/derived_data/fsnodes/Cargo.toml index e6284bbfbf..fadba940d7 100644 --- a/eden/mononoke/derived_data/fsnodes/Cargo.toml +++ b/eden/mononoke/derived_data/fsnodes/Cargo.toml @@ -18,8 +18,8 @@ filestore = { path = "../../filestore" } manifest = { path = "../../manifest" } mononoke_types = { path = "../../mononoke_types" } repo_blobstore = { path = "../../blobrepo/repo_blobstore" } +borrowed = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } -failure_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } sorted_vector_map = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" } anyhow = "1.0" diff --git a/eden/mononoke/derived_data/fsnodes/derive.rs b/eden/mononoke/derived_data/fsnodes/derive.rs index 23ba036ffe..da40c01944 100644 --- a/eden/mononoke/derived_data/fsnodes/derive.rs +++ b/eden/mononoke/derived_data/fsnodes/derive.rs @@ -8,21 +8,19 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; -use anyhow::{format_err, Error, Result}; +use anyhow::{format_err, Context, Error, Result}; use ascii::AsciiString; use blobrepo::BlobRepo; use blobstore::{Blobstore, Loadable}; +use borrowed::borrowed; use cloned::cloned; use context::CoreContext; use digest::Digest; -use failure_ext::FutureFailureExt; use filestore::{get_metadata, FetchKey}; use futures::channel::mpsc; use futures::compat::Future01CompatExt; -use futures::future::{BoxFuture, FutureExt, TryFutureExt}; -use futures::stream::{FuturesUnordered, TryStreamExt}; -use futures_ext::FutureExt as _; -use futures_old::{future, stream, Future, Stream}; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{FuturesOrdered, FuturesUnordered, TryStreamExt}; use manifest::{derive_manifest_with_io_sender, Entry, LeafInfo, TreeInfo}; use mononoke_types::fsnode::{Fsnode, FsnodeDirectory, FsnodeEntry, FsnodeFile, FsnodeSummary}; use mononoke_types::hash::{Sha1, Sha256}; @@ -122,10 +120,10 @@ pub async fn prefetch_content_metadata( /// Collect all the subentries for a new fsnode, re-using entries the parent /// fsnodes to avoid fetching too much. -fn collect_fsnode_subentries( - ctx: CoreContext, - blobstore: RepoBlobstore, - prefetched_content_metadata: Arc>, +async fn collect_fsnode_subentries( + ctx: &CoreContext, + blobstore: &RepoBlobstore, + prefetched_content_metadata: &HashMap, parents: Vec, subentries: BTreeMap< MPathElement, @@ -134,91 +132,81 @@ fn collect_fsnode_subentries( Entry, ), >, -) -> impl Future, Error = Error> { - // Load the parent fsnodes - stream::futures_unordered(parents.into_iter().map({ - cloned!(ctx, blobstore); - move |fsnode_id| { - fsnode_id - .load(ctx.clone(), &blobstore) - .compat() - .context(ErrorKind::MissingParent(fsnode_id)) - } - })) - .collect() - .from_err() - .and_then({ - cloned!(ctx, blobstore); - move |parent_fsnodes| { - // Collect all entries from the parent fsnodes as a cache. - let mut file_cache = HashMap::new(); - let mut dir_cache = HashMap::new(); - for parent_fsnode in parent_fsnodes.into_iter() { - for (_elem, entry) in parent_fsnode.list() { - match entry { - FsnodeEntry::File(file) => { - file_cache - .entry((*file.content_id(), *file.file_type())) - .or_insert(file.clone()); - } - FsnodeEntry::Directory(dir) => { - dir_cache.entry(*dir.id()).or_insert(dir.clone()); - } - } +) -> Result> { + // Load the parent fsnodes and collect their entries into a cache + let mut file_cache = HashMap::new(); + let mut dir_cache = HashMap::new(); + let mut parent_fsnodes = parents + .into_iter() + .map({ + move |fsnode_id| async move { + fsnode_id + .load(ctx.clone(), blobstore) + .await + .context(ErrorKind::MissingParent(fsnode_id)) + } + }) + .collect::>(); + while let Some(parent_fsnode) = parent_fsnodes.try_next().await? { + for (_elem, entry) in parent_fsnode.list() { + match entry { + FsnodeEntry::File(file) => { + file_cache + .entry((*file.content_id(), *file.file_type())) + .or_insert(file.clone()); + } + FsnodeEntry::Directory(dir) => { + dir_cache.entry(*dir.id()).or_insert(dir.clone()); } } + } + } - // Find or fetch the `FsnodeEntry` for each of the subentries. - stream::futures_ordered(subentries.into_iter().map(move |(elem, (summary, entry))| { + // Find (from the traversal or the cache) or fetch (from the blobstore) + // the `FsnodeEntry` for each of the subentries. + borrowed!(file_cache, dir_cache); + subentries + .into_iter() + .map(move |(elem, (summary, entry))| { + async move { match entry { Entry::Tree(fsnode_id) => { if let Some(Some(summary)) = summary { // The subdirectory was just created. Use the // summary we just calculated. - future::ok(( + Ok(( elem.clone(), FsnodeEntry::Directory(FsnodeDirectory::new(fsnode_id, summary)), )) - .boxify() } else if let Some(entry) = dir_cache.get(&fsnode_id) { // The subdirectory was already in this // directory. Use the cached entry. - future::ok((elem.clone(), FsnodeEntry::Directory(entry.clone()))) - .boxify() + Ok((elem.clone(), FsnodeEntry::Directory(entry.clone()))) } else { // Some other directory is being used. Fetch its // summary from the blobstore. - fsnode_id - .load(ctx.clone(), &blobstore) - .compat() - .with_context({ - cloned!(elem); - move || { + let fsnode = + fsnode_id.load(ctx.clone(), blobstore).await.with_context({ + || { ErrorKind::MissingSubentry( String::from_utf8_lossy(elem.as_ref()).to_string(), fsnode_id, ) } - }) - .from_err() - .map({ - cloned!(elem, fsnode_id); - move |fsnode| { - let entry = FsnodeEntry::Directory(FsnodeDirectory::new( - fsnode_id, - fsnode.summary().clone(), - )); - (elem, entry) - } - }) - .boxify() + })?; + + let entry = FsnodeEntry::Directory(FsnodeDirectory::new( + fsnode_id, + fsnode.summary().clone(), + )); + Ok((elem.clone(), entry)) } } Entry::Leaf(content_id_and_file_type) => { if let Some(entry) = file_cache.get(&content_id_and_file_type) { // The file was already in this directory. Use // the cached entry. - future::ok((elem.clone(), FsnodeEntry::File(entry.clone()))).boxify() + Ok((elem.clone(), FsnodeEntry::File(entry.clone()))) } else { // Some other file is being used. Use the // metadata we prefetched to create a new entry. @@ -231,17 +219,18 @@ fn collect_fsnode_subentries( metadata.sha1, metadata.sha256, )); - future::ok((elem.clone(), entry)).boxify() + Ok((elem.clone(), entry)) } else { - future::err(ErrorKind::MissingContent(content_id).into()).boxify() + Err(ErrorKind::MissingContent(content_id).into()) } } } } - })) - .collect() - } - }) + } + }) + .collect::>() + .try_collect() + .await } /// Create a new fsnode for the tree described by `tree_info`. @@ -253,13 +242,12 @@ async fn create_fsnode( tree_info: TreeInfo>, ) -> Result<(Option, FsnodeId)> { let entries = collect_fsnode_subentries( - ctx.clone(), - blobstore.clone(), - prefetched_content_metadata, + &ctx, + &blobstore, + prefetched_content_metadata.as_ref(), tree_info.parents, tree_info.subentries, ) - .compat() .await?; // Build a summary of the entries and store it as the new fsnode. @@ -404,6 +392,7 @@ mod test { use crate::mapping::get_file_changes; use fbinit::FacebookInit; use fixtures::{linear, many_files_dirs}; + use futures::compat::Stream01CompatExt; use std::str::FromStr; use test_utils::{get_bonsai_changeset, iterate_all_entries}; use tokio_compat::runtime::Runtime; @@ -430,13 +419,12 @@ mod test { // Make sure the fsnodes describe the full manifest. let all_fsnodes: BTreeMap<_, _> = runtime - .block_on( + .block_on_std( iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id)) - .collect(), + .compat() + .try_collect(), ) - .unwrap() - .into_iter() - .collect(); + .unwrap(); assert_eq!( all_fsnodes.keys().collect::>(), vec![ @@ -487,13 +475,12 @@ mod test { // Make sure the fsnodes describe the full manifest. let all_fsnodes: BTreeMap<_, _> = runtime - .block_on( + .block_on_std( iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id)) - .collect(), + .compat() + .try_collect(), ) - .unwrap() - .into_iter() - .collect(); + .unwrap(); assert_eq!( all_fsnodes.keys().collect::>(), vec![ @@ -574,13 +561,12 @@ mod test { // Make sure the fsnodes describe the full manifest. let all_fsnodes: BTreeMap<_, _> = runtime - .block_on( + .block_on_std( iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id)) - .collect(), + .compat() + .try_collect(), ) - .unwrap() - .into_iter() - .collect(); + .unwrap(); assert_eq!( all_fsnodes.keys().collect::>(), vec![ @@ -760,13 +746,12 @@ mod test { // Make sure the fsnodes describe the full manifest. let all_fsnodes: BTreeMap<_, _> = runtime - .block_on( + .block_on_std( iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id)) - .collect(), + .compat() + .try_collect(), ) - .unwrap() - .into_iter() - .collect(); + .unwrap(); assert_eq!( all_fsnodes.keys().collect::>(), vec![