mononoke: walker: make WrappedPath available in stream output

Summary:
Make the repo path in Option<WrappedPath> available in stream output in preparation for using it in the corpus dumper to write to disk

The path is Option as not all nodes can have an associated file system path (e.g. BonsaiChangeset)

The headlines changes are in sampling.rs and sizing.rs.  The progress.rs change slightly generalises to allow any type convertible to NodeType as the main walk identifier in the output stream.

Some refactors done as part of this
* NodeSamplingHandler is renamed to  WalkSampleMapping to reflect this is what it stores.
* WalkSampleMapping generic parameters are extended to take both a key and a sample type
* NodeSamplingHandler::start_node() is moved to a new SampleTrigger::map_keys() type.  This is so that SamplingWalkVisitor doesn't need the full WalkSampleMapping generic parameters.

Reviewed By: krallin

Differential Revision: D20835662

fbshipit-source-id: 58db622dc63d7f869a092739d1187a34b77219f6
This commit is contained in:
Alex Hornby 2020-05-11 11:58:27 -07:00 committed by Facebook GitHub Bot
parent 2222bd9f19
commit 0f8beabdb9
4 changed files with 192 additions and 120 deletions

View File

@ -387,25 +387,32 @@ impl<Inner> Clone for ProgressStateMutex<Inner> {
}
}
// Print some status update, passing on all data unchanged
pub fn progress_stream<InStream, PS, ND, SS>(
// Log some status update, passing on all data unchanged
pub fn progress_stream<InStream, PS, ND, SS, K>(
quiet: bool,
progress_state: &PS,
s: InStream,
) -> impl Stream<Item = Result<(Node, Option<ND>, Option<SS>), Error>>
) -> impl Stream<Item = Result<(K, Option<ND>, Option<SS>), Error>>
where
InStream: Stream<Item = Result<(Node, Option<ND>, Option<SS>), Error>> + 'static + Send,
InStream: Stream<Item = Result<(K, Option<ND>, Option<SS>), Error>> + 'static + Send,
PS: 'static + Send + Clone + ProgressRecorder<SS> + ProgressReporter,
K: 'static,
// Make sure we can convert from K reference to Node reference
for<'b> &'b Node: From<&'b K>,
{
s.map({
let progress_state = progress_state.clone();
move |r| {
r.and_then(|(n, data_opt, stats_opt)| {
progress_state.record_step(&n, stats_opt.as_ref());
if !quiet {
progress_state.report_throttled();
r.and_then(|(key, data_opt, stats_opt)| {
{
let k: &K = &key;
let n: &Node = k.into();
progress_state.record_step(n, stats_opt.as_ref());
if !quiet {
progress_state.report_throttled();
}
}
Ok((n, data_opt, stats_opt))
Ok((key, data_opt, stats_opt))
})
}
})

View File

@ -11,13 +11,17 @@ use crate::walk::{OutgoingEdge, WalkVisitor};
use context::{CoreContext, SamplingKey};
use dashmap::DashMap;
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, fmt, hash, sync::Arc};
pub trait SampleTrigger<K> {
fn map_keys(&self, key: SamplingKey, walk_key: K);
}
#[derive(Debug)]
pub struct SamplingWalkVisitor<T> {
inner: WalkStateCHashMap,
sample_node_types: HashSet<NodeType>,
sampler: Arc<NodeSamplingHandler<T>>,
sampler: Arc<T>,
sample_rate: u64,
sample_offset: u64,
}
@ -27,7 +31,7 @@ impl<T> SamplingWalkVisitor<T> {
include_node_types: HashSet<NodeType>,
include_edge_types: HashSet<EdgeType>,
sample_node_types: HashSet<NodeType>,
sampler: Arc<NodeSamplingHandler<T>>,
sampler: Arc<T>,
sample_rate: u64,
sample_offset: u64,
) -> Self {
@ -108,10 +112,24 @@ impl PathTrackingRoute {
}
}
impl<T> WalkVisitor<(Node, Option<NodeData>, Option<StepStats>), PathTrackingRoute>
for SamplingWalkVisitor<T>
// Map the key type so progress reporting works
impl<'a> From<&'a (Node, Option<WrappedPath>)> for &'a Node {
fn from((n, _p): &'a (Node, Option<WrappedPath>)) -> &'a Node {
n
}
}
impl<T>
WalkVisitor<
(
(Node, Option<WrappedPath>),
Option<NodeData>,
Option<StepStats>,
),
PathTrackingRoute,
> for SamplingWalkVisitor<T>
where
T: Default,
T: SampleTrigger<(Node, Option<WrappedPath>)>,
{
fn start_step(
&self,
@ -120,16 +138,16 @@ where
step: &OutgoingEdge,
) -> CoreContext {
if self.sample_node_types.contains(&step.target.get_type()) {
let repo_path = PathTrackingRoute::evolve_path(
route.and_then(|r| r.path.as_ref()),
step.path.as_ref(),
&step.target,
);
let should_sample = match self.sample_rate {
0 => false,
1 => true,
sample_rate => {
let sampling_fingerprint = PathTrackingRoute::evolve_path(
route.and_then(|r| r.path.as_ref()),
step.path.as_ref(),
&step.target,
)
.map_or_else(
let sampling_fingerprint = repo_path.map_or_else(
|| step.target.sampling_fingerprint(),
|r| r.sampling_fingerprint(),
);
@ -140,9 +158,10 @@ where
};
if should_sample {
ctx = ctx.clone_and_sample(SamplingKey::new());
ctx.sampling_key()
.map(|k| self.sampler.start_node(*k, step.target.clone()));
let sampling_key = SamplingKey::new();
ctx = ctx.clone_and_sample(sampling_key);
self.sampler
.map_keys(sampling_key, (step.target.clone(), repo_path.cloned()));
}
}
self.inner.start_step(ctx, route.map(|_| &()), step)
@ -156,23 +175,28 @@ where
route: Option<PathTrackingRoute>,
outgoing: Vec<OutgoingEdge>,
) -> (
(Node, Option<NodeData>, Option<StepStats>),
(
(Node, Option<WrappedPath>),
Option<NodeData>,
Option<StepStats>,
),
PathTrackingRoute,
Vec<OutgoingEdge>,
) {
let inner_route = route.as_ref().map(|_| ());
let route = PathTrackingRoute::evolve(route, resolved.path.as_ref(), &resolved.target);
let (vout, _inner_route, outgoing) =
let ((n, nd, stats), _inner_route, outgoing) =
self.inner
.visit(ctx, resolved, node_data, inner_route, outgoing);
(vout, route, outgoing)
(((n, route.path.clone()), nd, stats), route, outgoing)
}
}
// Super simple sampling visitor impl for scrubbing
impl<T> WalkVisitor<(Node, Option<NodeData>, Option<StepStats>), ()> for SamplingWalkVisitor<T>
where
T: Default,
T: SampleTrigger<Node>,
{
fn start_step(
&self,
@ -193,10 +217,9 @@ where
};
if should_sample {
ctx = ctx.clone_and_sample(SamplingKey::new());
if let Some(k) = ctx.sampling_key() {
self.sampler.start_node(*k, step.target.clone())
}
let sampling_key = SamplingKey::new();
ctx = ctx.clone_and_sample(sampling_key);
self.sampler.map_keys(sampling_key, step.target.clone());
}
}
self.inner.start_step(ctx, route.map(|_| &()), step)
@ -218,18 +241,54 @@ where
}
}
// Map from a Sampling Key the sample type T
// And from a graph level step S to the sampling key
#[derive(Debug)]
pub struct NodeSamplingHandler<T> {
pub struct WalkSampleMapping<S, T>
where
S: Eq + fmt::Debug + hash::Hash,
{
// T can keep a one to many mapping, e.g. some nodes like
// chunked files have multiple blobstore keys
inflight: DashMap<SamplingKey, T>,
// 1:1 relationship, each node has one SamplingKey
inflight_reverse: DashMap<Node, SamplingKey>,
// 1:1 relationship, each step has one SamplingKey
inflight_reverse: DashMap<S, SamplingKey>,
}
impl<T> NodeSamplingHandler<T>
impl<T> SampleTrigger<Node> for WalkSampleMapping<Node, T>
where
T: Default,
{
fn map_keys(&self, sample_key: SamplingKey, walk_key: Node) {
self.inflight.insert(sample_key, T::default());
self.inflight_reverse.insert(walk_key, sample_key);
}
}
impl<T> SampleTrigger<(Node, Option<WrappedPath>)> for WalkSampleMapping<Node, T>
where
T: Default,
{
fn map_keys(&self, sample_key: SamplingKey, walk_key: (Node, Option<WrappedPath>)) {
self.inflight.insert(sample_key, T::default());
self.inflight_reverse.insert(walk_key.0, sample_key);
}
}
impl<T> SampleTrigger<(Node, Option<WrappedPath>)>
for WalkSampleMapping<(Node, Option<WrappedPath>), T>
where
T: Default,
{
fn map_keys(&self, sample_key: SamplingKey, walk_key: (Node, Option<WrappedPath>)) {
self.inflight.insert(sample_key, T::default());
self.inflight_reverse.insert(walk_key, sample_key);
}
}
impl<S, T> WalkSampleMapping<S, T>
where
S: Eq + fmt::Debug + hash::Hash,
{
pub fn new() -> Self {
Self {
@ -243,24 +302,18 @@ where
&self.inflight
}
// Called from the visitor start_step
pub fn start_node(&self, key: SamplingKey, node: Node) {
self.inflight.insert(key, T::default());
self.inflight_reverse.insert(node, key);
}
pub fn is_sampling(&self, node: &Node) -> bool {
self.inflight_reverse.contains_key(node)
}
// Needs to be called to stop tracking the node and thus free memory.
// Can be called from the vistor visit, or in the stream processing
// walk output.
pub fn complete_node(&self, node: &Node) -> Option<T> {
let reverse_mapping = self.inflight_reverse.remove(node);
pub fn complete_step(&self, s: &S) -> Option<T> {
let reverse_mapping = self.inflight_reverse.remove(s);
reverse_mapping
.as_ref()
.and_then(|(_k, sample_key)| self.inflight.remove(sample_key))
.map(|(_k, v)| v)
}
pub fn is_sampling(&self, s: &S) -> bool {
self.inflight_reverse.contains_key(s)
}
}

View File

@ -11,7 +11,7 @@ use crate::progress::{
progress_stream, report_state, ProgressReporter, ProgressReporterUnprotected,
ProgressStateCountByType, ProgressStateMutex,
};
use crate::sampling::{NodeSamplingHandler, SamplingWalkVisitor};
use crate::sampling::{SamplingWalkVisitor, WalkSampleMapping};
use crate::setup::{
parse_node_types, setup_common, DEFAULT_INCLUDE_NODE_TYPES, EXCLUDE_SAMPLE_NODE_TYPE_ARG,
INCLUDE_SAMPLE_NODE_TYPE_ARG, LIMIT_DATA_FETCH_ARG, PROGRESS_INTERVAL_ARG,
@ -53,8 +53,8 @@ struct ScrubStats {
blobstore_keys: u64,
}
impl ScrubStats {
fn new(sample: Option<ScrubSample>) -> Self {
impl From<Option<&ScrubSample>> for ScrubStats {
fn from(sample: Option<&ScrubSample>) -> Self {
sample
.map(|sample| ScrubStats {
blobstore_keys: sample.data.values().len() as u64,
@ -75,7 +75,7 @@ fn loading_stream<InStream, SS>(
limit_data_fetch: bool,
scheduled_max: usize,
s: InStream,
sampler: Arc<NodeSamplingHandler<ScrubSample>>,
sampler: Arc<WalkSampleMapping<Node, ScrubSample>>,
) -> impl Stream<Item = Result<(Node, Option<NodeData>, Option<ScrubStats>), Error>>
where
InStream: Stream<Item = Result<(Node, Option<NodeData>, Option<SS>), Error>> + 'static + Send,
@ -88,7 +88,7 @@ where
file_bytes_stream
.try_fold(0, |acc, file_bytes| future::ok(acc + file_bytes.size()))
.map_ok(move |num_bytes| {
let size = ScrubStats::new(sampler.complete_node(&n));
let size = ScrubStats::from(sampler.complete_step(&n).as_ref());
(
n,
Some(NodeData::FileContent(FileContentData::Consumed(num_bytes))),
@ -100,7 +100,7 @@ where
data_opt => {
let size = data_opt
.as_ref()
.map(|_d| ScrubStats::new(sampler.complete_node(&n)));
.map(|_d| ScrubStats::from(sampler.complete_step(&n).as_ref()));
future::ok((n, data_opt, size)).right_future()
}
})
@ -120,7 +120,7 @@ impl Default for ScrubSample {
}
}
impl SamplingHandler for NodeSamplingHandler<ScrubSample> {
impl SamplingHandler for WalkSampleMapping<Node, ScrubSample> {
fn sample_get(
&self,
ctx: CoreContext,
@ -291,7 +291,7 @@ pub async fn scrub_objects<'a>(
matches: &'a ArgMatches<'a>,
sub_m: &'a ArgMatches<'a>,
) -> Result<(), Error> {
let scrub_sampler = Arc::new(NodeSamplingHandler::<ScrubSample>::new());
let scrub_sampler = Arc::new(WalkSampleMapping::<Node, ScrubSample>::new());
let (datasources, walk_params) = setup_common(
SCRUB,

View File

@ -5,12 +5,12 @@
* GNU General Public License version 2.
*/
use crate::graph::{FileContentData, Node, NodeData, NodeType};
use crate::graph::{FileContentData, Node, NodeData, NodeType, WrappedPath};
use crate::progress::{
progress_stream, report_state, ProgressReporter, ProgressReporterUnprotected,
ProgressStateCountByType, ProgressStateMutex,
};
use crate::sampling::{NodeSamplingHandler, PathTrackingRoute, SamplingWalkVisitor};
use crate::sampling::{PathTrackingRoute, SamplingWalkVisitor, WalkSampleMapping};
use crate::setup::{
parse_node_types, setup_common, COMPRESSION_BENEFIT, COMPRESSION_LEVEL_ARG,
DEFAULT_INCLUDE_NODE_TYPES, EXCLUDE_SAMPLE_NODE_TYPE_ARG, INCLUDE_SAMPLE_NODE_TYPE_ARG,
@ -88,74 +88,86 @@ fn size_sampling_stream<InStream, InStats>(
scheduled_max: usize,
s: InStream,
compressor_type: CompressorType,
sampler: Arc<NodeSamplingHandler<SizingSample>>,
sampler: Arc<WalkSampleMapping<Node, SizingSample>>,
) -> impl Stream<Item = Result<(Node, Option<NodeData>, Option<SizingStats>), Error>>
where
InStream:
Stream<Item = Result<(Node, Option<NodeData>, Option<InStats>), Error>> + 'static + Send,
InStream: Stream<
Item = Result<
(
(Node, Option<WrappedPath>),
Option<NodeData>,
Option<InStats>,
),
Error,
>,
>
+ 'static
+ Send,
InStats: 'static + Send,
{
s.map_ok(move |(n, data_opt, _stats_opt)| match (&n, data_opt) {
(Node::FileContent(_content_id), Some(NodeData::FileContent(fc)))
if sampler.is_sampling(&n) =>
{
match fc {
FileContentData::Consumed(_num_loaded_bytes) => {
future::ok(_num_loaded_bytes).left_future()
s.map_ok(
move |((n, _path), data_opt, _stats_opt)| match (&n, data_opt) {
(Node::FileContent(_content_id), Some(NodeData::FileContent(fc)))
if sampler.is_sampling(&n) =>
{
match fc {
FileContentData::Consumed(_num_loaded_bytes) => {
future::ok(_num_loaded_bytes).left_future()
}
// Consume the stream to make sure we loaded all blobs
FileContentData::ContentStream(file_bytes_stream) => file_bytes_stream
.try_fold(0, |acc, file_bytes| future::ok(acc + file_bytes.size()))
.right_future(),
}
// Consume the stream to make sure we loaded all blobs
FileContentData::ContentStream(file_bytes_stream) => file_bytes_stream
.try_fold(0, |acc, file_bytes| future::ok(acc + file_bytes.size()))
.right_future(),
}
.and_then({
cloned!(sampler);
move |fs_stream_size| {
// Report the blobstore sizes in sizing stats, more accurate than stream sizes, as headers included
let sizes = sampler
.complete_node(&n)
.map(|sizing_sample| {
sizing_sample.data.values().try_fold(
SizingStats::default(),
|acc, v| {
try_compress(v.as_bytes(), compressor_type)
.map(|sizes| acc + sizes)
},
.and_then({
cloned!(sampler);
move |fs_stream_size| {
// Report the blobstore sizes in sizing stats, more accurate than stream sizes, as headers included
let sizes = sampler
.complete_step(&n)
.map(|sizing_sample| {
sizing_sample.data.values().try_fold(
SizingStats::default(),
|acc, v| {
try_compress(v.as_bytes(), compressor_type)
.map(|sizes| acc + sizes)
},
)
})
.transpose();
future::ready(sizes.map(|sizes| {
// Report the filestore stream's bytes size in the Consumed node
(
n,
Some(NodeData::FileContent(FileContentData::Consumed(
fs_stream_size,
))),
sizes,
)
})
.transpose();
future::ready(sizes.map(|sizes| {
// Report the filestore stream's bytes size in the Consumed node
(
n,
Some(NodeData::FileContent(FileContentData::Consumed(
fs_stream_size,
))),
sizes,
)
}))
}
})
.left_future()
}
(_, data_opt) => {
// Report the blobstore sizes in sizing stats, more accurate than stream sizes, as headers included
let sizes = sampler
.complete_node(&n)
.map(|sizing_sample| {
sizing_sample
.data
.values()
.try_fold(SizingStats::default(), |acc, v| {
try_compress(v.as_bytes(), compressor_type).map(|sizes| acc + sizes)
})
}))
}
})
.transpose();
.left_future()
}
(_, data_opt) => {
// Report the blobstore sizes in sizing stats, more accurate than stream sizes, as headers included
let sizes = sampler
.complete_step(&n)
.map(|sizing_sample| {
sizing_sample
.data
.values()
.try_fold(SizingStats::default(), |acc, v| {
try_compress(v.as_bytes(), compressor_type).map(|sizes| acc + sizes)
})
})
.transpose();
future::ready(sizes.map(|sizes| (n, data_opt, sizes))).right_future()
}
})
future::ready(sizes.map(|sizes| (n, data_opt, sizes))).right_future()
}
},
)
.try_buffer_unordered(scheduled_max)
}
@ -247,7 +259,7 @@ impl Default for SizingSample {
}
}
impl SamplingHandler for NodeSamplingHandler<SizingSample> {
impl SamplingHandler for WalkSampleMapping<Node, SizingSample> {
fn sample_get(
&self,
ctx: CoreContext,
@ -270,7 +282,7 @@ pub async fn compression_benefit<'a>(
matches: &'a ArgMatches<'a>,
sub_m: &'a ArgMatches<'a>,
) -> Result<(), Error> {
let sizing_sampler = Arc::new(NodeSamplingHandler::<SizingSample>::new());
let sizing_sampler = Arc::new(WalkSampleMapping::<Node, SizingSample>::new());
let (datasources, walk_params) = setup_common(
COMPRESSION_BENEFIT,