mirror of
https://github.com/facebook/sapling.git
synced 2024-10-05 14:28:17 +03:00
Back out "bssm: optimize derivation by reusing sharded map nodes"
Summary: Original commit changeset: 6d0873232e51 Original Phabricator Diff: D48954394 Reviewed By: sggutier Differential Revision: D49330478 fbshipit-source-id: c8087c9ccc85aa9e46c03b90aac346f5c6095dfa
This commit is contained in:
parent
30c83c02ec
commit
42ccd24356
@ -15,7 +15,6 @@ anyhow = "1.0.71"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1.71"
|
||||
blobstore = { version = "0.1.0", path = "../../blobstore" }
|
||||
bytes = { version = "1.1", features = ["serde"] }
|
||||
cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
|
||||
context = { version = "0.1.0", path = "../../server/context" }
|
||||
derived_data = { version = "0.1.0", path = ".." }
|
||||
|
@ -7,34 +7,36 @@
|
||||
|
||||
//! See docs/basename_suffix_skeleton_manifest.md for more information
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use blobstore::Blobstore;
|
||||
use blobstore::Loadable;
|
||||
use blobstore::Storable;
|
||||
use bytes::Bytes;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use derived_data_manager::DerivationContext;
|
||||
use futures::future::FutureExt;
|
||||
use itertools::Either;
|
||||
use futures::stream;
|
||||
use futures::StreamExt;
|
||||
use futures::TryStreamExt;
|
||||
use manifest::derive_manifest_with_io_sender;
|
||||
use manifest::flatten_subentries;
|
||||
use manifest::Entry;
|
||||
use manifest::LeafInfo;
|
||||
use manifest::TreeInfo;
|
||||
use manifest::TreeInfoSubentries;
|
||||
use mononoke_types::basename_suffix_skeleton_manifest::BasenameSuffixSkeletonManifest;
|
||||
use mononoke_types::basename_suffix_skeleton_manifest::BssmDirectory;
|
||||
use mononoke_types::basename_suffix_skeleton_manifest::BssmEntry;
|
||||
use mononoke_types::sharded_map::ShardedMapNode;
|
||||
use mononoke_types::sharded_map::ShardedTrieMap;
|
||||
use mononoke_types::BasenameSuffixSkeletonManifestId;
|
||||
use mononoke_types::BlobstoreValue;
|
||||
use mononoke_types::BonsaiChangeset;
|
||||
use mononoke_types::ContentId;
|
||||
use mononoke_types::FileType;
|
||||
use mononoke_types::MPathElement;
|
||||
use mononoke_types::NonRootMPath;
|
||||
use mononoke_types::TrieMap;
|
||||
use skeleton_manifest::mapping::get_file_changes;
|
||||
|
||||
use crate::mapping::RootBasenameSuffixSkeletonManifest;
|
||||
@ -51,6 +53,20 @@ fn get_fixed_up_changes(
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn split_by_root_dir<X>(
|
||||
changes: Vec<(NonRootMPath, X)>,
|
||||
) -> HashMap<MPathElement, Vec<(NonRootMPath, X)>> {
|
||||
let mut map = HashMap::new();
|
||||
for (p, x) in changes {
|
||||
let (root_dir, rest) = p.split_first();
|
||||
let rest = rest.expect("We always add a sentinel suffix to the path");
|
||||
map.entry(root_dir.clone())
|
||||
.or_insert_with(Vec::new)
|
||||
.push((rest, x));
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
async fn empty_mf(
|
||||
ctx: &CoreContext,
|
||||
blobstore: &impl Blobstore,
|
||||
@ -79,89 +95,28 @@ async fn inner_derive(
|
||||
// create_tree
|
||||
{
|
||||
cloned!(ctx, blobstore);
|
||||
move |info: TreeInfo<TreeId, IntermediateLeafId, Ctx, ShardedTrieMap<BssmEntry>>,
|
||||
move |info: TreeInfo<
|
||||
TreeId,
|
||||
IntermediateLeafId,
|
||||
Ctx,
|
||||
TrieMap<Entry<TreeId, LeafId>>,
|
||||
>,
|
||||
fut_sender| {
|
||||
cloned!(ctx, blobstore);
|
||||
async move {
|
||||
let (rollup_count, subentries): (_, BTreeMap<_, _>) = match info.subentries {
|
||||
TreeInfoSubentries::AllSubentries(subentries) => {
|
||||
let mut rollup_count = 0;
|
||||
let subentries = subentries
|
||||
.into_iter()
|
||||
.inspect(|(_path, (_ctx, entry))| match entry {
|
||||
Entry::Tree(tree) => rollup_count += tree.rollup_count as i64,
|
||||
Entry::Leaf(_) => rollup_count += 1,
|
||||
})
|
||||
.map(|(path, (_ctx, entry))| {
|
||||
let entry = match entry {
|
||||
Entry::Leaf(()) => BssmEntry::File,
|
||||
Entry::Tree(entry) => BssmEntry::Directory(entry),
|
||||
};
|
||||
(Bytes::copy_from_slice(path.as_ref()), Either::Left(entry))
|
||||
})
|
||||
.collect();
|
||||
(rollup_count, subentries)
|
||||
}
|
||||
TreeInfoSubentries::ReusedMapsAndSubentries {
|
||||
produced_subentries_and_reused_maps,
|
||||
consumed_subentries,
|
||||
} => {
|
||||
// The sum of the rollup counts for the parents (-1 to remove the directories of the parents' themselves)
|
||||
// minus the rollup counts for the consumed subentries results in the sum of rollup counts for the reused maps.
|
||||
let mut rollup_count = info
|
||||
.parents
|
||||
.iter()
|
||||
.map(|parent| parent.rollup_count as i64 - 1)
|
||||
.chain(consumed_subentries.into_iter().map(|entry| match entry {
|
||||
Entry::Tree(tree) => -(tree.rollup_count as i64),
|
||||
Entry::Leaf(_) => -1,
|
||||
}))
|
||||
.sum();
|
||||
|
||||
let subentries = produced_subentries_and_reused_maps
|
||||
.into_iter()
|
||||
// Add the rollup counts for all the produced subentries.
|
||||
.inspect(|(_prefix, entry_or_map)| match entry_or_map {
|
||||
Either::Left((_ctx, Entry::Tree(tree))) => {
|
||||
rollup_count += tree.rollup_count as i64;
|
||||
}
|
||||
Either::Left((_ctx, Entry::Leaf(_))) => {
|
||||
rollup_count += 1;
|
||||
}
|
||||
Either::Right(_map) => {}
|
||||
})
|
||||
.flat_map(|(prefix, entry_or_map)| match entry_or_map {
|
||||
Either::Left((_maybe_ctx, entry)) => {
|
||||
let entry = match entry {
|
||||
Entry::Leaf(()) => BssmEntry::File,
|
||||
Entry::Tree(entry) => BssmEntry::Directory(entry),
|
||||
};
|
||||
vec![(prefix, Either::Left(entry))]
|
||||
}
|
||||
Either::Right(map) => match map {
|
||||
ShardedTrieMap::Edge(edge) => {
|
||||
vec![(prefix, Either::Right(edge))]
|
||||
}
|
||||
ShardedTrieMap::Trie(trie) => trie
|
||||
.into_iter()
|
||||
.map(|(mut key, entry)| {
|
||||
key.insert_from_slice(0, prefix.as_ref());
|
||||
(key, Either::Left(entry))
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
},
|
||||
})
|
||||
.map(|(key, entry)| (Bytes::copy_from_slice(&key), entry))
|
||||
.collect();
|
||||
(rollup_count, subentries)
|
||||
}
|
||||
};
|
||||
|
||||
let mf = BasenameSuffixSkeletonManifest {
|
||||
subentries: ShardedMapNode::from_entries(&ctx, &blobstore, subentries)
|
||||
.await?,
|
||||
};
|
||||
let entries = flatten_subentries(&ctx, &(), info.subentries).await?.map(
|
||||
|(path_el, (_maybe_ctx, entry_in))| {
|
||||
let entry = match entry_in {
|
||||
Entry::Leaf(()) => BssmEntry::File,
|
||||
Entry::Tree(entry) => BssmEntry::Directory(entry),
|
||||
};
|
||||
(path_el, Some(entry))
|
||||
},
|
||||
);
|
||||
|
||||
let (mf, rollup_count) = BasenameSuffixSkeletonManifest::empty()
|
||||
.update(&ctx, &blobstore, entries.collect())
|
||||
.await?;
|
||||
let entry = {
|
||||
let blob = mf.into_blob();
|
||||
let id = *blob.id();
|
||||
@ -198,12 +153,48 @@ pub(crate) async fn derive_single(
|
||||
bonsai: BonsaiChangeset,
|
||||
parents: Vec<RootBasenameSuffixSkeletonManifest>,
|
||||
) -> Result<RootBasenameSuffixSkeletonManifest> {
|
||||
const CONCURRENCY: usize = 100;
|
||||
let parents = parents.into_iter().map(|root| root.0).collect::<Vec<_>>();
|
||||
let changes = get_fixed_up_changes(&bonsai);
|
||||
let blobstore = derivation_ctx.blobstore();
|
||||
|
||||
let root = inner_derive(ctx, blobstore, parents, changes).await?;
|
||||
|
||||
// TODO(T123518092): Once T123518092 is done, this optimisation can be removed and
|
||||
// we can call `inner_derive` as in the `else` clause.
|
||||
let root = if parents.len() <= 1 {
|
||||
let (parent, previous_rollup) = match parents.into_iter().next() {
|
||||
Some(p) => (Some(p.load(ctx, blobstore).await?), p.rollup_count),
|
||||
None => (None, 1),
|
||||
};
|
||||
let entries_to_update = stream::iter(split_by_root_dir(changes))
|
||||
.map(|(root_dir, changes)| async {
|
||||
let new_parent = match parent.as_ref() {
|
||||
None => vec![],
|
||||
Some(p) => match p.lookup(ctx, blobstore, &root_dir).await? {
|
||||
Some(BssmEntry::Directory(dir)) => vec![dir],
|
||||
None => vec![],
|
||||
other => anyhow::bail!("Invalid directory {:?}", other),
|
||||
},
|
||||
};
|
||||
Ok((
|
||||
root_dir,
|
||||
inner_derive(ctx, blobstore, new_parent, changes)
|
||||
.await?
|
||||
.map(BssmEntry::Directory),
|
||||
))
|
||||
})
|
||||
.buffer_unordered(CONCURRENCY)
|
||||
.try_collect()
|
||||
.await?;
|
||||
let (mf, rollup_diff) = parent
|
||||
.unwrap_or_else(BasenameSuffixSkeletonManifest::empty)
|
||||
.update(ctx, blobstore, entries_to_update)
|
||||
.await?;
|
||||
Some(BssmDirectory {
|
||||
id: mf.into_blob().store(ctx, blobstore).await?,
|
||||
rollup_count: ((previous_rollup as i64) + rollup_diff) as u64,
|
||||
})
|
||||
} else {
|
||||
inner_derive(ctx, blobstore, parents, changes).await?
|
||||
};
|
||||
Ok(RootBasenameSuffixSkeletonManifest(match root {
|
||||
Some(root) => root,
|
||||
// Only happens on empty repo
|
||||
|
@ -26,7 +26,6 @@ use mononoke_types::fsnode::Fsnode;
|
||||
use mononoke_types::fsnode::FsnodeEntry;
|
||||
use mononoke_types::fsnode::FsnodeFile;
|
||||
use mononoke_types::path::MPath;
|
||||
use mononoke_types::sharded_map::ShardedTrieMap;
|
||||
use mononoke_types::skeleton_manifest::SkeletonManifest;
|
||||
use mononoke_types::skeleton_manifest::SkeletonManifestEntry;
|
||||
use mononoke_types::unode::ManifestUnode;
|
||||
@ -177,7 +176,7 @@ fn to_mf_entry(entry: BssmEntry) -> Entry<BssmDirectory, ()> {
|
||||
impl<Store: Blobstore> AsyncManifest<Store> for BasenameSuffixSkeletonManifest {
|
||||
type TreeId = BssmDirectory;
|
||||
type LeafId = ();
|
||||
type TrieMapType = ShardedTrieMap<BssmEntry>;
|
||||
type TrieMapType = TrieMap<Entry<BssmDirectory, ()>>;
|
||||
|
||||
async fn list(
|
||||
&self,
|
||||
@ -219,35 +218,13 @@ impl<Store: Blobstore> AsyncManifest<Store> for BasenameSuffixSkeletonManifest {
|
||||
|
||||
async fn into_trie_map(
|
||||
self,
|
||||
_ctx: &CoreContext,
|
||||
_blobstore: &Store,
|
||||
ctx: &CoreContext,
|
||||
blobstore: &Store,
|
||||
) -> Result<Self::TrieMapType> {
|
||||
Ok(ShardedTrieMap::new(self.subentries))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<Store: Blobstore> TrieMapOps<Store, Entry<BssmDirectory, ()>> for ShardedTrieMap<BssmEntry> {
|
||||
async fn expand(
|
||||
self,
|
||||
ctx: &CoreContext,
|
||||
blobstore: &Store,
|
||||
) -> Result<(Option<Entry<BssmDirectory, ()>>, Vec<(u8, Self)>)> {
|
||||
let (entry, children) = self.expand(ctx, blobstore).await?;
|
||||
Ok((entry.map(to_mf_entry), children))
|
||||
}
|
||||
|
||||
async fn into_stream(
|
||||
self,
|
||||
ctx: &CoreContext,
|
||||
blobstore: &Store,
|
||||
) -> Result<BoxStream<'async_trait, Result<(SmallVec<[u8; 24]>, Entry<BssmDirectory, ()>)>>>
|
||||
{
|
||||
Ok(self
|
||||
.into_stream(ctx, blobstore)
|
||||
.await?
|
||||
self.into_subentries(ctx, blobstore)
|
||||
.map_ok(|(path, entry)| (path, to_mf_entry(entry)))
|
||||
.boxed())
|
||||
.try_collect()
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user