fsnodes: convert fsnodes::collect_fsnode_subentries to new futures

Reviewed By: krallin

Differential Revision: D24726148

fbshipit-source-id: 7e37156b73aaea22280b6e3322520c5eab9e71fe
This commit is contained in:
Mark Juggurnauth-Thomas 2020-11-04 08:37:54 -08:00 committed by Facebook GitHub Bot
parent 320f0edfe5
commit 866275e626
2 changed files with 83 additions and 98 deletions

View File

@ -18,8 +18,8 @@ filestore = { path = "../../filestore" }
manifest = { path = "../../manifest" }
mononoke_types = { path = "../../mononoke_types" }
repo_blobstore = { path = "../../blobrepo/repo_blobstore" }
borrowed = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
cloned = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
failure_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
futures_ext = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
sorted_vector_map = { git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
anyhow = "1.0"

View File

@ -8,21 +8,19 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use anyhow::{format_err, Error, Result};
use anyhow::{format_err, Context, Error, Result};
use ascii::AsciiString;
use blobrepo::BlobRepo;
use blobstore::{Blobstore, Loadable};
use borrowed::borrowed;
use cloned::cloned;
use context::CoreContext;
use digest::Digest;
use failure_ext::FutureFailureExt;
use filestore::{get_metadata, FetchKey};
use futures::channel::mpsc;
use futures::compat::Future01CompatExt;
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::stream::{FuturesUnordered, TryStreamExt};
use futures_ext::FutureExt as _;
use futures_old::{future, stream, Future, Stream};
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesOrdered, FuturesUnordered, TryStreamExt};
use manifest::{derive_manifest_with_io_sender, Entry, LeafInfo, TreeInfo};
use mononoke_types::fsnode::{Fsnode, FsnodeDirectory, FsnodeEntry, FsnodeFile, FsnodeSummary};
use mononoke_types::hash::{Sha1, Sha256};
@ -122,10 +120,10 @@ pub async fn prefetch_content_metadata(
/// Collect all the subentries for a new fsnode, re-using entries the parent
/// fsnodes to avoid fetching too much.
fn collect_fsnode_subentries(
ctx: CoreContext,
blobstore: RepoBlobstore,
prefetched_content_metadata: Arc<HashMap<ContentId, ContentMetadata>>,
async fn collect_fsnode_subentries(
ctx: &CoreContext,
blobstore: &RepoBlobstore,
prefetched_content_metadata: &HashMap<ContentId, ContentMetadata>,
parents: Vec<FsnodeId>,
subentries: BTreeMap<
MPathElement,
@ -134,91 +132,81 @@ fn collect_fsnode_subentries(
Entry<FsnodeId, (ContentId, FileType)>,
),
>,
) -> impl Future<Item = Vec<(MPathElement, FsnodeEntry)>, Error = Error> {
// Load the parent fsnodes
stream::futures_unordered(parents.into_iter().map({
cloned!(ctx, blobstore);
move |fsnode_id| {
fsnode_id
.load(ctx.clone(), &blobstore)
.compat()
.context(ErrorKind::MissingParent(fsnode_id))
}
}))
.collect()
.from_err()
.and_then({
cloned!(ctx, blobstore);
move |parent_fsnodes| {
// Collect all entries from the parent fsnodes as a cache.
let mut file_cache = HashMap::new();
let mut dir_cache = HashMap::new();
for parent_fsnode in parent_fsnodes.into_iter() {
for (_elem, entry) in parent_fsnode.list() {
match entry {
FsnodeEntry::File(file) => {
file_cache
.entry((*file.content_id(), *file.file_type()))
.or_insert(file.clone());
}
FsnodeEntry::Directory(dir) => {
dir_cache.entry(*dir.id()).or_insert(dir.clone());
}
}
) -> Result<Vec<(MPathElement, FsnodeEntry)>> {
// Load the parent fsnodes and collect their entries into a cache
let mut file_cache = HashMap::new();
let mut dir_cache = HashMap::new();
let mut parent_fsnodes = parents
.into_iter()
.map({
move |fsnode_id| async move {
fsnode_id
.load(ctx.clone(), blobstore)
.await
.context(ErrorKind::MissingParent(fsnode_id))
}
})
.collect::<FuturesUnordered<_>>();
while let Some(parent_fsnode) = parent_fsnodes.try_next().await? {
for (_elem, entry) in parent_fsnode.list() {
match entry {
FsnodeEntry::File(file) => {
file_cache
.entry((*file.content_id(), *file.file_type()))
.or_insert(file.clone());
}
FsnodeEntry::Directory(dir) => {
dir_cache.entry(*dir.id()).or_insert(dir.clone());
}
}
}
}
// Find or fetch the `FsnodeEntry` for each of the subentries.
stream::futures_ordered(subentries.into_iter().map(move |(elem, (summary, entry))| {
// Find (from the traversal or the cache) or fetch (from the blobstore)
// the `FsnodeEntry` for each of the subentries.
borrowed!(file_cache, dir_cache);
subentries
.into_iter()
.map(move |(elem, (summary, entry))| {
async move {
match entry {
Entry::Tree(fsnode_id) => {
if let Some(Some(summary)) = summary {
// The subdirectory was just created. Use the
// summary we just calculated.
future::ok((
Ok((
elem.clone(),
FsnodeEntry::Directory(FsnodeDirectory::new(fsnode_id, summary)),
))
.boxify()
} else if let Some(entry) = dir_cache.get(&fsnode_id) {
// The subdirectory was already in this
// directory. Use the cached entry.
future::ok((elem.clone(), FsnodeEntry::Directory(entry.clone())))
.boxify()
Ok((elem.clone(), FsnodeEntry::Directory(entry.clone())))
} else {
// Some other directory is being used. Fetch its
// summary from the blobstore.
fsnode_id
.load(ctx.clone(), &blobstore)
.compat()
.with_context({
cloned!(elem);
move || {
let fsnode =
fsnode_id.load(ctx.clone(), blobstore).await.with_context({
|| {
ErrorKind::MissingSubentry(
String::from_utf8_lossy(elem.as_ref()).to_string(),
fsnode_id,
)
}
})
.from_err()
.map({
cloned!(elem, fsnode_id);
move |fsnode| {
let entry = FsnodeEntry::Directory(FsnodeDirectory::new(
fsnode_id,
fsnode.summary().clone(),
));
(elem, entry)
}
})
.boxify()
})?;
let entry = FsnodeEntry::Directory(FsnodeDirectory::new(
fsnode_id,
fsnode.summary().clone(),
));
Ok((elem.clone(), entry))
}
}
Entry::Leaf(content_id_and_file_type) => {
if let Some(entry) = file_cache.get(&content_id_and_file_type) {
// The file was already in this directory. Use
// the cached entry.
future::ok((elem.clone(), FsnodeEntry::File(entry.clone()))).boxify()
Ok((elem.clone(), FsnodeEntry::File(entry.clone())))
} else {
// Some other file is being used. Use the
// metadata we prefetched to create a new entry.
@ -231,17 +219,18 @@ fn collect_fsnode_subentries(
metadata.sha1,
metadata.sha256,
));
future::ok((elem.clone(), entry)).boxify()
Ok((elem.clone(), entry))
} else {
future::err(ErrorKind::MissingContent(content_id).into()).boxify()
Err(ErrorKind::MissingContent(content_id).into())
}
}
}
}
}))
.collect()
}
})
}
})
.collect::<FuturesOrdered<_>>()
.try_collect()
.await
}
/// Create a new fsnode for the tree described by `tree_info`.
@ -253,13 +242,12 @@ async fn create_fsnode(
tree_info: TreeInfo<FsnodeId, (ContentId, FileType), Option<FsnodeSummary>>,
) -> Result<(Option<FsnodeSummary>, FsnodeId)> {
let entries = collect_fsnode_subentries(
ctx.clone(),
blobstore.clone(),
prefetched_content_metadata,
&ctx,
&blobstore,
prefetched_content_metadata.as_ref(),
tree_info.parents,
tree_info.subentries,
)
.compat()
.await?;
// Build a summary of the entries and store it as the new fsnode.
@ -404,6 +392,7 @@ mod test {
use crate::mapping::get_file_changes;
use fbinit::FacebookInit;
use fixtures::{linear, many_files_dirs};
use futures::compat::Stream01CompatExt;
use std::str::FromStr;
use test_utils::{get_bonsai_changeset, iterate_all_entries};
use tokio_compat::runtime::Runtime;
@ -430,13 +419,12 @@ mod test {
// Make sure the fsnodes describe the full manifest.
let all_fsnodes: BTreeMap<_, _> = runtime
.block_on(
.block_on_std(
iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id))
.collect(),
.compat()
.try_collect(),
)
.unwrap()
.into_iter()
.collect();
.unwrap();
assert_eq!(
all_fsnodes.keys().collect::<Vec<_>>(),
vec![
@ -487,13 +475,12 @@ mod test {
// Make sure the fsnodes describe the full manifest.
let all_fsnodes: BTreeMap<_, _> = runtime
.block_on(
.block_on_std(
iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id))
.collect(),
.compat()
.try_collect(),
)
.unwrap()
.into_iter()
.collect();
.unwrap();
assert_eq!(
all_fsnodes.keys().collect::<Vec<_>>(),
vec![
@ -574,13 +561,12 @@ mod test {
// Make sure the fsnodes describe the full manifest.
let all_fsnodes: BTreeMap<_, _> = runtime
.block_on(
.block_on_std(
iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id))
.collect(),
.compat()
.try_collect(),
)
.unwrap()
.into_iter()
.collect();
.unwrap();
assert_eq!(
all_fsnodes.keys().collect::<Vec<_>>(),
vec![
@ -760,13 +746,12 @@ mod test {
// Make sure the fsnodes describe the full manifest.
let all_fsnodes: BTreeMap<_, _> = runtime
.block_on(
.block_on_std(
iterate_all_entries(ctx.clone(), repo.clone(), Entry::Tree(root_fsnode_id))
.collect(),
.compat()
.try_collect(),
)
.unwrap()
.into_iter()
.collect();
.unwrap();
assert_eq!(
all_fsnodes.keys().collect::<Vec<_>>(),
vec![