admin: add --ordered to skeleton manifest tree command

Summary:
Add the `--ordered` flag to `mononoke_admin skeleton-manifests tree`.  This
uses `bounded_traversal_ordered_stream` to list the manifest in order.

Reviewed By: mitrandir77

Differential Revision: D26197359

fbshipit-source-id: 2f95471abfccd514d713b2092844d271bc732498
This commit is contained in:
Mark Juggurnauth-Thomas 2021-02-02 08:58:07 -08:00 committed by Facebook GitHub Bot
parent 792d18eef6
commit 78b8a6ca75
7 changed files with 320 additions and 63 deletions

View File

@ -19,7 +19,7 @@ use cmdlib::{
use context::CoreContext;
use fbinit::FacebookInit;
use futures::{compat::Future01CompatExt, stream::StreamExt};
use manifest::{Entry, ManifestOps, PathOrPrefix};
use manifest::{Entry, ManifestOps, ManifestOrderedOps, PathOrPrefix};
use mononoke_types::skeleton_manifest::SkeletonManifestEntry;
use mononoke_types::{ChangesetId, MPath};
@ -32,6 +32,7 @@ const COMMAND_LIST: &str = "list";
const ARG_CSID: &str = "csid";
const ARG_PATH: &str = "path";
const ARG_IF_DERIVED: &str = "if-derived";
const ARG_ORDERED: &str = "ordered";
pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> {
SubCommand::with_name(SKELETON_MANIFESTS)
@ -49,6 +50,11 @@ pub fn build_subcommand<'a, 'b>() -> App<'a, 'b> {
.help("only list the manifests if they are already derived")
.long(ARG_IF_DERIVED),
)
.arg(
Arg::with_name(ARG_ORDERED)
.help("list the manifest in order")
.long(ARG_ORDERED),
)
.arg(Arg::with_name(ARG_PATH).help("path")),
)
.subcommand(
@ -88,7 +94,8 @@ pub async fn subcommand_skeleton_manifests<'a>(
.compat()
.await?;
let fetch_derived = matches.is_present(ARG_IF_DERIVED);
subcommand_tree(&ctx, &repo, csid, path, fetch_derived).await?;
let ordered = matches.is_present(ARG_ORDERED);
subcommand_tree(&ctx, &repo, csid, path, fetch_derived, ordered).await?;
Ok(())
}
(COMMAND_LIST, Some(matches)) => {
@ -148,17 +155,30 @@ async fn subcommand_tree(
csid: ChangesetId,
path: Option<MPath>,
fetch_derived: bool,
ordered: bool,
) -> Result<(), Error> {
let root = derive_or_fetch::<RootSkeletonManifestId>(ctx, repo, csid, fetch_derived).await?;
info!(ctx.logger(), "ROOT: {:?}", root);
info!(ctx.logger(), "PATH: {:?}", path);
let mut stream = root.skeleton_manifest_id().find_entries(
let mut stream = if ordered {
root.skeleton_manifest_id()
.find_entries_ordered(
ctx.clone(),
repo.get_blobstore(),
vec![PathOrPrefix::Prefix(path)],
);
)
.left_stream()
} else {
root.skeleton_manifest_id()
.find_entries(
ctx.clone(),
repo.get_blobstore(),
vec![PathOrPrefix::Prefix(path)],
)
.right_stream()
};
while let Some((path, entry)) = stream.next().await.transpose()? {
match entry {

View File

@ -18,6 +18,7 @@ async-stream = "0.3"
async-trait = "0.1.29"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
maplit = "1.0"
nonzero_ext = "0.2"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_derive = "1.0"

View File

@ -12,15 +12,18 @@ pub use crate::derive::{derive_manifest, derive_manifest_with_io_sender, LeafInf
pub use crate::implicit_deletes::get_implicit_deletes;
pub use crate::ops::{
find_intersection_of_diffs, find_intersection_of_diffs_and_parents, Diff, ManifestOps,
PathOrPrefix,
};
pub use crate::types::{Entry, Manifest, PathTree, Traced};
pub use crate::ordered_ops::ManifestOrderedOps;
pub use crate::select::PathOrPrefix;
pub use crate::types::{Entry, Manifest, OrderedManifest, PathTree, Traced};
pub use blobstore::StoreLoadable;
mod bonsai;
mod derive;
mod implicit_deletes;
mod ops;
mod ordered_ops;
mod select;
mod types;
#[cfg(test)]

View File

@ -5,7 +5,8 @@
* GNU General Public License version 2.
*/
use crate::{Entry, Manifest, PathTree, StoreLoadable};
use crate::select::select_path_tree;
use crate::{Entry, Manifest, PathOrPrefix, PathTree, StoreLoadable};
use anyhow::Error;
use borrowed::borrowed;
use cloned::cloned;
@ -29,24 +30,6 @@ pub enum Diff<Entry> {
Changed(Option<MPath>, Entry, Entry),
}
#[derive(Debug, Clone)]
pub enum PathOrPrefix {
Path(Option<MPath>),
Prefix(Option<MPath>),
}
impl From<MPath> for PathOrPrefix {
fn from(path: MPath) -> Self {
PathOrPrefix::Path(Some(path))
}
}
impl From<Option<MPath>> for PathOrPrefix {
fn from(path: Option<MPath>) -> Self {
PathOrPrefix::Path(path)
}
}
pub trait ManifestOps<Store>
where
Store: Sync + Send + Clone + 'static,
@ -73,41 +56,7 @@ where
I: IntoIterator<Item = P>,
PathOrPrefix: From<P>,
{
enum Select {
Single, // single entry selected
Recursive, // whole subtree selected
Skip, // not selected
}
impl Select {
fn is_selected(&self) -> bool {
match self {
Select::Single | Select::Recursive => true,
Select::Skip => false,
}
}
fn is_recursive(&self) -> bool {
match self {
Select::Recursive => true,
_ => false,
}
}
}
impl Default for Select {
fn default() -> Select {
Select::Skip
}
}
let selector: PathTree<Select> = paths_or_prefixes
.into_iter()
.map(|path_or_prefix| match PathOrPrefix::from(path_or_prefix) {
PathOrPrefix::Path(path) => (path, Select::Single),
PathOrPrefix::Prefix(path) => (path, Select::Recursive),
})
.collect();
let selector = select_path_tree(paths_or_prefixes);
let init = Some((self.clone(), selector, None, false));
(async_stream::stream! {

View File

@ -0,0 +1,151 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::marker::Unpin;
use anyhow::Error;
use borrowed::borrowed;
use bounded_traversal::OrderedTraversal;
use context::CoreContext;
use futures::pin_mut;
use futures::stream::{BoxStream, StreamExt};
use mononoke_types::MPath;
use nonzero_ext::nonzero;
use crate::select::select_path_tree;
use crate::{Entry, Manifest, OrderedManifest, PathOrPrefix, PathTree, StoreLoadable};
pub trait ManifestOrderedOps<Store>
where
Store: Sync + Send + Clone + 'static,
Self: StoreLoadable<Store> + Clone + Send + Sync + Eq + Unpin + 'static,
<Self as StoreLoadable<Store>>::Value: Manifest<TreeId = Self> + OrderedManifest + Send,
<<Self as StoreLoadable<Store>>::Value as Manifest>::LeafId: Clone + Send + Eq + Unpin,
{
fn find_entries_ordered<I, P>(
&self,
ctx: CoreContext,
store: Store,
paths_or_prefixes: I,
) -> BoxStream<
'static,
Result<
(
Option<MPath>,
Entry<Self, <<Self as StoreLoadable<Store>>::Value as Manifest>::LeafId>,
),
Error,
>,
>
where
I: IntoIterator<Item = P>,
PathOrPrefix: From<P>,
{
let selector = select_path_tree(paths_or_prefixes);
// Schedule a maximum of 256 concurrently unfolding directories.
let schedule_max = nonzero!(256usize);
// Allow queueing of up to 2,560 items, which would be 10 items per
// directory at the maximum concurrency level. Experiments show this
// is a good balance of queueing items while not spending too long
// determining what can be scheduled.
let queue_max = nonzero!(2560usize);
let init = Some((queue_max.get(), (self.clone(), selector, None, false)));
(async_stream::stream! {
let store = &store;
borrowed!(ctx, store);
let s = bounded_traversal::bounded_traversal_ordered_stream(
schedule_max,
queue_max,
init,
move |(manifest_id, selector, path, recursive)| {
let PathTree {
subentries,
value: select,
} = selector;
async move {
let manifest = manifest_id.load(ctx, &store).await?;
let mut output = Vec::new();
if recursive || select.is_recursive() {
output.push(OrderedTraversal::Output((
path.clone(),
Entry::Tree(manifest_id),
)));
for (name, entry) in manifest.list_weighted() {
let path = Some(MPath::join_opt_element(path.as_ref(), &name));
match entry {
Entry::Leaf(leaf) => {
output.push(OrderedTraversal::Output((
path.clone(),
Entry::Leaf(leaf),
)));
}
Entry::Tree((weight, manifest_id)) => {
output.push(OrderedTraversal::Recurse(
weight,
(manifest_id, Default::default(), path, true),
));
}
}
}
} else {
if select.is_selected() {
output.push(OrderedTraversal::Output((
path.clone(),
Entry::Tree(manifest_id),
)));
}
for (name, selector) in subentries {
if let Some(entry) = manifest.lookup_weighted(&name) {
let path = Some(MPath::join_opt_element(path.as_ref(), &name));
match entry {
Entry::Leaf(leaf) => {
if selector.value.is_selected() {
output.push(OrderedTraversal::Output((
path.clone(),
Entry::Leaf(leaf),
)));
}
}
Entry::Tree((weight, manifest_id)) => {
output.push(OrderedTraversal::Recurse(
weight,
(manifest_id, selector, path, false),
));
}
}
}
}
}
Ok::<_, Error>(output)
}
},
);
pin_mut!(s);
while let Some(value) = s.next().await {
yield value;
}
})
.boxed()
}
}
impl<TreeId, Store> ManifestOrderedOps<Store> for TreeId
where
Store: Sync + Send + Clone + 'static,
Self: StoreLoadable<Store> + Clone + Send + Sync + Eq + Unpin + 'static,
<Self as StoreLoadable<Store>>::Value: Manifest<TreeId = Self> + OrderedManifest + Send,
<<Self as StoreLoadable<Store>>::Value as Manifest>::LeafId: Send + Clone + Eq + Unpin,
{
}

View File

@ -0,0 +1,75 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use mononoke_types::MPath;
use crate::PathTree;
#[derive(Debug, Clone)]
pub enum PathOrPrefix {
Path(Option<MPath>),
Prefix(Option<MPath>),
}
impl From<MPath> for PathOrPrefix {
fn from(path: MPath) -> Self {
PathOrPrefix::Path(Some(path))
}
}
impl From<Option<MPath>> for PathOrPrefix {
fn from(path: Option<MPath>) -> Self {
PathOrPrefix::Path(path)
}
}
pub(crate) enum Select {
/// Single entry selected
Single,
/// Whole substree selected
Recursive,
/// Not selected
Skip,
}
impl Select {
pub(crate) fn is_selected(&self) -> bool {
match self {
Select::Single | Select::Recursive => true,
Select::Skip => false,
}
}
pub(crate) fn is_recursive(&self) -> bool {
match self {
Select::Recursive => true,
_ => false,
}
}
}
impl Default for Select {
fn default() -> Select {
Select::Skip
}
}
pub(crate) fn select_path_tree<I, P>(paths_or_prefixes: I) -> PathTree<Select>
where
I: IntoIterator<Item = P>,
PathOrPrefix: From<P>,
{
paths_or_prefixes
.into_iter()
.map(|path_or_prefix| match PathOrPrefix::from(path_or_prefix) {
PathOrPrefix::Path(path) => (path, Select::Single),
PathOrPrefix::Prefix(path) => (path, Select::Recursive),
})
.collect()
}

View File

@ -106,6 +106,64 @@ fn convert_skeleton_manifest(
}
}
pub type Weight = usize;
pub trait OrderedManifest: Manifest {
fn lookup_weighted(
&self,
name: &MPathElement,
) -> Option<Entry<(Weight, <Self as Manifest>::TreeId), <Self as Manifest>::LeafId>>;
fn list_weighted(
&self,
) -> Box<
dyn Iterator<
Item = (
MPathElement,
Entry<(Weight, <Self as Manifest>::TreeId), <Self as Manifest>::LeafId>,
),
>,
>;
}
impl OrderedManifest for SkeletonManifest {
fn lookup_weighted(
&self,
name: &MPathElement,
) -> Option<Entry<(Weight, <Self as Manifest>::TreeId), <Self as Manifest>::LeafId>> {
self.lookup(name).map(convert_skeleton_manifest_weighted)
}
fn list_weighted(
&self,
) -> Box<
dyn Iterator<
Item = (
MPathElement,
Entry<(Weight, <Self as Manifest>::TreeId), <Self as Manifest>::LeafId>,
),
>,
> {
let v: Vec<_> = self
.list()
.map(|(basename, entry)| (basename.clone(), convert_skeleton_manifest_weighted(entry)))
.collect();
Box::new(v.into_iter())
}
}
fn convert_skeleton_manifest_weighted(
skeleton_entry: &SkeletonManifestEntry,
) -> Entry<(Weight, SkeletonManifestId), ()> {
match skeleton_entry {
SkeletonManifestEntry::File => Entry::Leaf(()),
SkeletonManifestEntry::Directory(skeleton_directory) => {
let summary = skeleton_directory.summary();
let weight = summary.descendant_files_count + summary.descendant_dirs_count;
Entry::Tree((weight as Weight, skeleton_directory.id().clone()))
}
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
pub enum Entry<T, L> {
Tree(T),