mirror of
https://github.com/facebook/sapling.git
synced 2024-10-05 06:18:07 +03:00
Back out "sharded_map: implement a way to create a sharded map node from independent values and other sharded map nodes"
Summary: Original commit changeset: 4a6f549cd250 Original Phabricator Diff: D48954392 Reviewed By: YousefSalama, sggutier Differential Revision: D49330473 fbshipit-source-id: 7a7d5a4e50cc668e64478ee04724fa06afa54116
This commit is contained in:
parent
b92771a3f6
commit
a9338d324b
@ -30,7 +30,6 @@ use futures::FutureExt;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use futures::TryStreamExt;
|
||||
use itertools::Either;
|
||||
use itertools::Itertools;
|
||||
use nonzero_ext::nonzero;
|
||||
use once_cell::sync::OnceCell;
|
||||
@ -322,152 +321,6 @@ impl<Value: MapValue> ShardedMapNode<Value> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new sharded map node from a map of values and other sharded map nodes. The key for every input sharded map node is
|
||||
/// a prefix that's prepended to it, which represents that keys that have this prefix are all contained in this sharded map node.
|
||||
/// Returns an error if the key for a sharded map node is a prefix of any other key in the input map.
|
||||
#[async_recursion]
|
||||
pub async fn from_entries(
|
||||
ctx: &CoreContext,
|
||||
blobstore: &impl Blobstore,
|
||||
entries: BTreeMap<Bytes, Either<Value, ShardedMapEdge<Value>>>,
|
||||
) -> Result<Self> {
|
||||
let shard_size = Self::shard_size()?;
|
||||
|
||||
let node_size: usize = entries
|
||||
.values()
|
||||
.map(|entry| match entry {
|
||||
Either::Left(_value) => 1,
|
||||
Either::Right(map) => map.size,
|
||||
})
|
||||
.sum();
|
||||
|
||||
if node_size <= shard_size {
|
||||
if ShardedMapNode::has_conflicts(&entries) {
|
||||
bail!("Cannot create sharded map node with conflicting entries");
|
||||
}
|
||||
|
||||
let entries_futures = entries
|
||||
.into_iter()
|
||||
.map(|(key, entry)| async move {
|
||||
match entry {
|
||||
Either::Left(value) => {
|
||||
Ok(vec![(SmallVec::from_slice(key.as_ref()), value)])
|
||||
}
|
||||
Either::Right(map) => {
|
||||
map.load_child(ctx, blobstore)
|
||||
.await?
|
||||
.into_entries(ctx, blobstore)
|
||||
.map_ok(|(mut map_key, map_value)| {
|
||||
map_key.insert_from_slice(0, key.as_ref());
|
||||
(map_key, map_value)
|
||||
})
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(Self::Terminal {
|
||||
values: stream::iter(entries_futures)
|
||||
.buffer_unordered(100)
|
||||
.map_ok(|v| stream::iter(v).map(Ok))
|
||||
.try_flatten()
|
||||
.try_collect()
|
||||
.await?,
|
||||
})
|
||||
} else {
|
||||
let mut keys_iter = entries.keys();
|
||||
let longest_common_prefix = match keys_iter.next() {
|
||||
None => return Ok(Default::default()),
|
||||
Some(first_key) => SmallBinary::from_slice(
|
||||
keys_iter.fold(first_key.as_ref(), |longest_common_prefix, key| {
|
||||
common_prefix(longest_common_prefix, key)
|
||||
}),
|
||||
),
|
||||
};
|
||||
|
||||
let mut partitioned: BTreeMap<
|
||||
u8,
|
||||
BTreeMap<Bytes, Either<Value, ShardedMapEdge<Value>>>,
|
||||
> = Default::default();
|
||||
let mut current_value = None;
|
||||
let is_single_entry = entries.len() == 1;
|
||||
|
||||
for (key, entry) in entries {
|
||||
let stripped_key = key.strip_prefix(longest_common_prefix.as_ref()).unwrap();
|
||||
match stripped_key.split_first() {
|
||||
None => match entry {
|
||||
Either::Left(value) => {
|
||||
current_value = Some(value);
|
||||
}
|
||||
Either::Right(edge) => {
|
||||
if is_single_entry {
|
||||
let mut map = edge.load_child(ctx, blobstore).await?;
|
||||
map.prepend(SmallVec::from_slice(key.as_ref()));
|
||||
return Ok(map);
|
||||
} else {
|
||||
bail!("Cannot create sharded map node with conflicting entries");
|
||||
}
|
||||
}
|
||||
},
|
||||
Some((first, rest)) => {
|
||||
partitioned
|
||||
.entry(*first)
|
||||
.or_default()
|
||||
.insert(key.slice_ref(rest), entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let edges_futures = partitioned
|
||||
.into_iter()
|
||||
.map(|(next_byte, entries)| async move {
|
||||
let node = Self::from_entries(ctx, blobstore, entries).await?;
|
||||
Ok((
|
||||
next_byte,
|
||||
ShardedMapEdge {
|
||||
size: node.size(),
|
||||
child: match node {
|
||||
Self::Intermediate { .. } => ShardedMapChild::Inlined(node),
|
||||
Self::Terminal { .. } => ShardedMapChild::Id(
|
||||
node.into_blob().store(ctx, blobstore).await?,
|
||||
),
|
||||
},
|
||||
},
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let edges = stream::iter(edges_futures)
|
||||
.buffer_unordered(100)
|
||||
.try_collect::<SortedVectorMap<_, _>>()
|
||||
.await?;
|
||||
|
||||
Ok(Self::intermediate(
|
||||
longest_common_prefix,
|
||||
current_value,
|
||||
edges,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if there exists a key corresponding to a map that is also a prefix of any other key in the input map.
|
||||
fn has_conflicts(entries: &BTreeMap<Bytes, Either<Value, ShardedMapEdge<Value>>>) -> bool {
|
||||
// It's sufficient to only check adjacent entries, because if some key is a prefix of another,
|
||||
// then it's also a prefix of all keys in between them in sorted order.
|
||||
entries
|
||||
.iter()
|
||||
.zip(entries.iter().skip(1))
|
||||
.any(
|
||||
|(first_entry, second_entry)| match (first_entry, second_entry) {
|
||||
((_, Either::Left(_first_value)), (_, Either::Left(_second_value))) => false,
|
||||
((first_key, _), (second_key, _)) => {
|
||||
first_key.starts_with(second_key) || second_key.starts_with(first_key)
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a new map from this map with given replacements. It is a generalization of
|
||||
/// adding and removing, and should be faster than doing all operations separately.
|
||||
/// It does not rely on the added keys not existing or the removed keys existing.
|
||||
@ -1138,37 +991,6 @@ mod test {
|
||||
#[derive(Clone)]
|
||||
struct MapHelper(TestShardedMap, CoreContext, Memblob);
|
||||
impl MapHelper {
|
||||
async fn from_entries(
|
||||
ctx: CoreContext,
|
||||
blobstore: Memblob,
|
||||
entries: Vec<(&str, Either<MyType, ShardedMapEdge<MyType>>)>,
|
||||
) -> Result<Self> {
|
||||
let map = Self(
|
||||
ShardedMapNode::from_entries(
|
||||
&ctx,
|
||||
&blobstore,
|
||||
entries
|
||||
.into_iter()
|
||||
.map(|(key, entry)| (Bytes::copy_from_slice(key.as_bytes()), entry))
|
||||
.collect(),
|
||||
)
|
||||
.await?,
|
||||
ctx,
|
||||
blobstore,
|
||||
);
|
||||
map.validate().await?;
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
async fn into_edge(self) -> Result<ShardedMapEdge<MyType>> {
|
||||
let MapHelper(map, _, _) = self;
|
||||
|
||||
Ok(ShardedMapEdge {
|
||||
size: map.size(),
|
||||
child: ShardedMapChild::Inlined(map),
|
||||
})
|
||||
}
|
||||
|
||||
fn size(&self) -> usize {
|
||||
self.0.size()
|
||||
}
|
||||
@ -1217,31 +1039,6 @@ mod test {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_example_map(&self) -> Result<()> {
|
||||
self.assert_entries(EXAMPLE_ENTRIES).await?;
|
||||
|
||||
self.assert_prefix_entries("", EXAMPLE_ENTRIES).await?;
|
||||
self.assert_prefix_entries("aba", &EXAMPLE_ENTRIES[0..8])
|
||||
.await?;
|
||||
self.assert_prefix_entries("abaca", &EXAMPLE_ENTRIES[1..6])
|
||||
.await?;
|
||||
self.assert_prefix_entries("omi", &EXAMPLE_ENTRIES[8..10])
|
||||
.await?;
|
||||
self.assert_prefix_entries("om", &EXAMPLE_ENTRIES[8..])
|
||||
.await?;
|
||||
self.assert_prefix_entries("o", &EXAMPLE_ENTRIES[8..])
|
||||
.await?;
|
||||
self.assert_prefix_entries("ban", &[]).await?;
|
||||
|
||||
self.assert_intermediate(2);
|
||||
self.child('a').await?.assert_intermediate(2);
|
||||
self.child('o').await?.assert_terminal(4);
|
||||
self.child('a').await?.child('c').await?.assert_terminal(5);
|
||||
self.child('a').await?.child('l').await?.assert_terminal(2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_remove(
|
||||
&mut self,
|
||||
to_add: &[(&str, i32)],
|
||||
@ -1425,7 +1222,20 @@ mod test {
|
||||
let blobstore = Memblob::default();
|
||||
|
||||
let map = MapHelper(example_map(), ctx, blobstore);
|
||||
map.assert_example_map().await
|
||||
map.assert_entries(EXAMPLE_ENTRIES).await?;
|
||||
map.assert_prefix_entries("", EXAMPLE_ENTRIES).await?;
|
||||
map.assert_prefix_entries("aba", &EXAMPLE_ENTRIES[0..8])
|
||||
.await?;
|
||||
map.assert_prefix_entries("abaca", &EXAMPLE_ENTRIES[1..6])
|
||||
.await?;
|
||||
map.assert_prefix_entries("omi", &EXAMPLE_ENTRIES[8..10])
|
||||
.await?;
|
||||
map.assert_prefix_entries("om", &EXAMPLE_ENTRIES[8..])
|
||||
.await?;
|
||||
map.assert_prefix_entries("o", &EXAMPLE_ENTRIES[8..])
|
||||
.await?;
|
||||
map.assert_prefix_entries("ban", &[]).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_all_keys(
|
||||
@ -1706,224 +1516,6 @@ mod test {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
async fn test_from_entries_only_values(fb: FacebookInit) -> Result<()> {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
let blobstore = Memblob::default();
|
||||
|
||||
let map = MapHelper::from_entries(
|
||||
ctx,
|
||||
blobstore,
|
||||
EXAMPLE_ENTRIES
|
||||
.iter()
|
||||
.map(|(key, value)| (*key, Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
map.assert_example_map().await
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
async fn test_from_entries_only_maps(fb: FacebookInit) -> Result<()> {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
let blobstore = Memblob::default();
|
||||
|
||||
let map_ab = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[0..8]
|
||||
.iter()
|
||||
.map(|(key, value)| (&key[2..], Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let map_omi = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[8..10]
|
||||
.iter()
|
||||
.map(|(key, value)| (&key[3..], Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let map_omu = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[10..12]
|
||||
.iter()
|
||||
.map(|(key, value)| (&key[3..], Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let map = MapHelper::from_entries(
|
||||
ctx,
|
||||
blobstore,
|
||||
vec![
|
||||
("ab", Either::Right(map_ab.into_edge().await?)),
|
||||
("omi", Either::Right(map_omi.into_edge().await?)),
|
||||
("omu", Either::Right(map_omu.into_edge().await?)),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
|
||||
map.assert_example_map().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
async fn test_from_entries_maps_and_values(fb: FacebookInit) -> Result<()> {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
let blobstore = Memblob::default();
|
||||
|
||||
let map_ab = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[0..8]
|
||||
.iter()
|
||||
.map(|(key, value)| (&key[2..], Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let map = MapHelper::from_entries(
|
||||
ctx,
|
||||
blobstore,
|
||||
std::iter::once(("ab", Either::Right(map_ab.into_edge().await?)))
|
||||
.chain(
|
||||
EXAMPLE_ENTRIES[8..]
|
||||
.iter()
|
||||
.map(|(key, value)| (*key, Either::Left(MyType(*value)))),
|
||||
)
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
map.assert_example_map().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
async fn test_from_entries_conflict_detection(fb: FacebookInit) -> Result<()> {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
let blobstore = Memblob::default();
|
||||
|
||||
let map_first_six = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[0..6]
|
||||
.iter()
|
||||
.map(|(key, value)| (&key[2..], Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let map_last_six = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[6..12]
|
||||
.iter()
|
||||
.map(|(key, value)| (*key, Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(
|
||||
MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
vec![
|
||||
("ab", Either::Right(map_first_six.into_edge().await?)),
|
||||
("", Either::Right(map_last_six.into_edge().await?)),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
let map_ab = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[0..8]
|
||||
.iter()
|
||||
.map(|(key, value)| (&key[2..], Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let map_om = MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
EXAMPLE_ENTRIES[8..12]
|
||||
.iter()
|
||||
.map(|(key, value)| (*key, Either::Left(MyType(*value))))
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert!(
|
||||
MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
vec![
|
||||
("ab", Either::Right(map_ab.clone().into_edge().await?)),
|
||||
("om", Either::Right(map_om.clone().into_edge().await?)),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.is_ok()
|
||||
);
|
||||
|
||||
assert!(
|
||||
MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
vec![
|
||||
("ab", Either::Right(map_ab.clone().into_edge().await?)),
|
||||
("om", Either::Right(map_om.clone().into_edge().await?)),
|
||||
("abababab", Either::Left(MyType(100))),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert!(
|
||||
MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
vec![
|
||||
("ab", Either::Right(map_ab.clone().into_edge().await?)),
|
||||
("om", Either::Right(map_om.clone().into_edge().await?)),
|
||||
("zz", Either::Left(MyType(100))),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.is_ok()
|
||||
);
|
||||
|
||||
assert!(
|
||||
MapHelper::from_entries(
|
||||
ctx.clone(),
|
||||
blobstore.clone(),
|
||||
vec![
|
||||
("ab", Either::Right(map_ab.clone().into_edge().await?)),
|
||||
("om", Either::Right(map_om.clone().into_edge().await?)),
|
||||
("omo", Either::Left(MyType(100))),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[fbinit::test]
|
||||
fn round_trip_quickcheck(fb: FacebookInit) {
|
||||
let ctx = CoreContext::test_mock(fb);
|
||||
|
Loading…
Reference in New Issue
Block a user