mononoke: walk repos by public changeset chunks

Summary:
For large repos is is desirable to walk them in chunks as a prerequisite for being able to clear state caches to reduced memory usage between chunks and to checkpoint between chunks so that an interrupted scrub can resume.

Chunks are fetched from the repo bounds of changeset id in newestfirst order,  this means that we scrub newest data first.  Any edges discovered from the walk that point outside the chunk are deferred until the later chunk that covers them.

This change adds chunking and tests if for core bonsai data, following diffs add it for other types.

Reviewed By: StanislavGlebik

Differential Revision: D25742295

fbshipit-source-id: b989abdf2ca367cf9b10f45d9f932eba55ee6dae
This commit is contained in:
Alex Hornby 2021-01-22 03:10:40 -08:00 committed by Facebook GitHub Bot
parent 8490ead952
commit cc3eb431b6
11 changed files with 335 additions and 16 deletions

View File

@ -0,0 +1,49 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License found in the LICENSE file in the root
# directory of this source tree.
$ . "${TEST_FIXTURES}/library.sh"
setup configuration
$ default_setup_pre_blobimport "blob_files"
hg repo
o C [draft;rev=2;26805aba1e60]
o B [draft;rev=1;112478962961]
o A [draft;rev=0;426bada5c675]
$
$ blobimport repo-hg/.hg repo --derived-data-type=blame --derived-data-type=changeset_info --derived-data-type=deleted_manifest --derived-data-type=fastlog --derived-data-type=fsnodes --derived-data-type=skeleton_manifests --derived-data-type=unodes
bonsai core data, deep, unchunked. This is the base case
$ mononoke_walker -L sizing scrub -q -b master_bookmark -I bonsai 2>&1 | strip_glog
Walking edge types [BookmarkToChangeset, ChangesetToBonsaiParent, ChangesetToFileContent]
Walking node types [Bookmark, Changeset, FileContent]
Seen,Loaded: 7,7
* Type:Walked,Checks,Children Bookmark:1,1,2 Changeset:3,* FileContent:3,3,0 (glob)
bonsai core data, chunked, shallow. Shallow walk with chunked commits should still visit all changesets, but no bookmark
$ mononoke_walker -L sizing scrub -q -p Changeset --chunk-size=2 -I shallow -i bonsai -i FileContent 2>&1 | strip_glog
Walking edge types [ChangesetToFileContent]
Walking node types [Changeset, FileContent]
Seen,Loaded: 4,4
* Type:Walked,Checks,Children Changeset:2,*,4 FileContent:2,2,0 (glob)
Deferred: 0
Seen,Loaded: 2,2
* Type:Walked,Checks,Children Changeset:3,*,6 FileContent:3,3,0 (glob)
Deferred: 0
Completed in 2 chunks of 2
bonsai core data, chunked, deep. Should still visit all changesets, but no bookmark, second chunk has one deferred edge to process
$ mononoke_walker -L sizing scrub -q -p Changeset --chunk-size=2 -I deep -i bonsai -i FileContent 2>&1 | strip_glog
Walking edge types [ChangesetToBonsaiParent, ChangesetToFileContent]
Walking node types [Changeset, FileContent]
Seen,Loaded: 4,4
* Type:Walked,Checks,Children Changeset:2,*,4 FileContent:2,2,0 (glob)
Deferred: 1
Seen,Loaded: 3,3
* Type:Walked,Checks,Children Changeset:4,*,6 FileContent:3,*,0 (glob)
Deferred: 0
Completed in 2 chunks of 2

View File

@ -15,6 +15,7 @@ blobstore = { path = "../blobstore" }
blobstore_factory = { path = "../blobstore/factory" }
bookmarks = { path = "../bookmarks" }
bounded_traversal = { path = "../common/bounded_traversal" }
bulkops = { path = "../bulkops" }
changeset_info = { path = "../derived_data/changeset_info" }
cmdlib = { path = "../cmdlib" }
context = { path = "../server/context" }

View File

@ -455,6 +455,7 @@ async fn run_one(
job_params,
repo_params,
type_params,
sub_params.public_changeset_chunk_by,
walk_state,
make_sink,
)

View File

@ -213,6 +213,21 @@ impl<T> WalkVisitor<(WalkKeyOptPath, WalkPayloadMtime, Option<StepStats>), PathT
where
T: SampleTrigger<WalkKeyOptPath> + Send + Sync,
{
fn start_chunk(
&self,
chunk_members: &HashSet<ChangesetId>,
) -> Result<HashSet<OutgoingEdge>, Error> {
self.inner.start_chunk(chunk_members)
}
fn end_chunks(&self) -> Result<(), Error> {
self.inner.end_chunks()
}
fn num_deferred(&self) -> usize {
self.inner.num_deferred()
}
fn start_step(
&self,
mut ctx: CoreContext,
@ -311,6 +326,21 @@ impl<T> WalkVisitor<(Node, Option<NodeData>, Option<StepStats>), EmptyRoute>
where
T: SampleTrigger<Node> + Send + Sync,
{
fn start_chunk(
&self,
chunk_members: &HashSet<ChangesetId>,
) -> Result<HashSet<OutgoingEdge>, Error> {
self.inner.start_chunk(chunk_members)
}
fn end_chunks(&self) -> Result<(), Error> {
self.inner.end_chunks()
}
fn num_deferred(&self) -> usize {
self.inner.num_deferred()
}
fn start_step(
&self,
mut ctx: CoreContext,

View File

@ -450,6 +450,7 @@ async fn run_one(
job_params,
repo_params,
type_params,
sub_params.public_changeset_chunk_by,
walk_state,
make_sink,
)

View File

@ -56,6 +56,7 @@ use strum_macros::{AsRefStr, EnumString, EnumVariantNames};
// Per repo things we don't pass into the walk
pub struct RepoSubcommandParams {
pub progress_state: ProgressStateMutex<ProgressStateCountByType<StepStats, ProgressSummary>>,
pub public_changeset_chunk_by: HashSet<NodeType>,
}
// These don't vary per repo
@ -162,6 +163,7 @@ const CHUNK_BY_PUBLIC_NODE_TYPES: &[NodeType] = &[
NodeType::PhaseMapping,
NodeType::Changeset,
NodeType::ChangesetInfo,
NodeType::ChangesetInfoMapping,
NodeType::FsnodeMapping,
NodeType::SkeletonManifestMapping,
NodeType::UnodeMapping,
@ -1145,6 +1147,7 @@ pub async fn setup_common<'a>(
repo,
walk_roots.clone(),
public_changeset_chunk_size,
public_changeset_chunk_by.clone(),
include_edge_types.clone(),
include_node_types.clone(),
progress_options,
@ -1187,6 +1190,7 @@ async fn setup_repo<'a>(
resolved: &'a ResolvedRepo,
walk_roots: Vec<OutgoingEdge>,
public_changeset_chunk_size: Option<usize>,
mut public_changeset_chunk_by: HashSet<NodeType>,
include_edge_types: HashSet<EdgeType>,
mut include_node_types: HashSet<NodeType>,
progress_options: ProgressOptions,
@ -1209,10 +1213,18 @@ async fn setup_repo<'a>(
}
});
public_changeset_chunk_by.retain(|t| {
if let Some(t) = t.derived_data_name() {
resolved.config.derived_data_config.is_enabled(t)
} else {
true
}
});
let mut root_node_types: HashSet<_> =
walk_roots.iter().map(|e| e.label.outgoing_type()).collect();
if public_changeset_chunk_size.is_some() {
root_node_types.insert(NodeType::Changeset);
root_node_types.extend(public_changeset_chunk_by.iter().cloned());
}
let (include_edge_types, include_node_types) =
@ -1266,7 +1278,10 @@ async fn setup_repo<'a>(
));
Ok((
RepoSubcommandParams { progress_state },
RepoSubcommandParams {
progress_state,
public_changeset_chunk_by,
},
RepoWalkParams {
repo: repo.await?,
logger: logger.clone(),

View File

@ -388,6 +388,7 @@ async fn run_one(
job_params,
repo_params,
type_params,
sub_params.public_changeset_chunk_by,
walk_state,
make_sink,
)

View File

@ -9,7 +9,7 @@ use crate::graph::{EdgeType, Node, NodeData, NodeType, UnodeFlags, WrappedPath};
use crate::walk::{expand_checked_nodes, EmptyRoute, OutgoingEdge, VisitOne, WalkVisitor};
use ahash::RandomState;
use anyhow::Error;
use anyhow::{bail, Error};
use array_init::array_init;
use async_trait::async_trait;
use context::CoreContext;
@ -136,7 +136,9 @@ where
}
}
type StateMap<T> = DashMap<T, (), RandomState>;
type ValueMap<K, V> = DashMap<K, V, RandomState>;
type StateMap<K> = DashMap<K, (), RandomState>;
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
struct UnodeInterned<T> {
id: InternedId<T>,
@ -158,6 +160,8 @@ pub struct WalkState {
unode_file_ids: InternMap<FileUnodeId, InternedId<FileUnodeId>>,
unode_manifest_ids: InternMap<ManifestUnodeId, InternedId<ManifestUnodeId>>,
// State
chunk_bcs: StateMap<InternedId<ChangesetId>>,
deferred_bcs: ValueMap<InternedId<ChangesetId>, HashSet<OutgoingEdge>>,
visited_bcs: StateMap<InternedId<ChangesetId>>,
visited_bcs_mapping: StateMap<InternedId<ChangesetId>>,
public_not_visited: StateMap<InternedId<ChangesetId>>,
@ -212,6 +216,8 @@ impl WalkState {
unode_file_ids: InternMap::with_hasher(fac.clone()),
unode_manifest_ids: InternMap::with_hasher(fac.clone()),
// State
chunk_bcs: StateMap::with_hasher(fac.clone()),
deferred_bcs: ValueMap::with_hasher(fac.clone()),
visited_bcs: StateMap::with_hasher(fac.clone()),
visited_bcs_mapping: StateMap::with_hasher(fac.clone()),
public_not_visited: StateMap::with_hasher(fac.clone()),
@ -246,12 +252,27 @@ impl WalkState {
fn record<K>(&self, visited: &StateMap<K>, k: &K) -> bool
where
K: Eq + Hash + Copy,
K: Eq + Hash + Clone,
{
if visited.contains_key(k) {
false
} else {
visited.insert(*k, ()).is_none()
visited.insert(k.clone(), ()).is_none()
}
}
fn record_multi<K, V>(&self, multi_map: &ValueMap<K, HashSet<V>>, k: K, v: &V) -> bool
where
K: Eq + Hash + Clone,
V: Eq + Hash + Clone,
{
let mut entry = multi_map.entry(k).or_insert_with(HashSet::default);
let values = entry.value_mut();
// No insert_with in HashSet, so do it ourselves
if values.contains(v) {
false
} else {
values.insert(v.clone())
}
}
@ -335,6 +356,14 @@ impl WalkState {
fn get_visit_count(&self, t: &NodeType) -> usize {
self.visit_count[*t as usize].load(Ordering::Acquire)
}
fn chunk_contains(&self, id: InternedId<ChangesetId>) -> bool {
if self.chunk_bcs.is_empty() {
true
} else {
self.chunk_bcs.contains_key(&id)
}
}
}
#[async_trait]
@ -386,7 +415,17 @@ impl VisitOne for WalkState {
Node::Bookmark(_) => true,
Node::PublishedBookmarks(_) => true,
// Bonsai
Node::Changeset(k) => self.record(&self.visited_bcs, &self.bcs_ids.interned(&k.inner)),
Node::Changeset(k) => {
let id = self.bcs_ids.interned(&k.inner);
if self.chunk_contains(id) {
self.record(&self.visited_bcs, &id)
} else {
if !self.visited_bcs.contains_key(&id) {
self.record_multi(&self.deferred_bcs, id, outgoing);
}
false
}
}
Node::BonsaiHgMapping(k) => {
if let Some(id) = self.bcs_ids.get(&k.inner) {
// Does not insert, see record_resolved_visit
@ -505,6 +544,50 @@ impl VisitOne for WalkState {
}
impl WalkVisitor<(Node, Option<NodeData>, Option<StepStats>), EmptyRoute> for WalkState {
fn start_chunk(
&self,
new_chunk_bcs: &HashSet<ChangesetId>,
) -> Result<HashSet<OutgoingEdge>, Error> {
// Reset self.chunk_bcs
let mut chunk_interned = HashSet::new();
for bcs_id in new_chunk_bcs {
let i = self.bcs_ids.interned(&bcs_id);
chunk_interned.insert(i);
self.chunk_bcs.insert(i, ());
}
self.chunk_bcs.retain(|k, _v| chunk_interned.contains(k));
// Check for items that were outside the chunk now being inside
let mut in_new_chunk = HashSet::new();
for e in self.deferred_bcs.iter() {
if !chunk_interned.contains(&e.key()) {
continue;
}
in_new_chunk.extend(e.value().iter().cloned());
}
self.deferred_bcs
.retain(|k, _v| !chunk_interned.contains(k));
Ok(in_new_chunk)
}
fn end_chunks(&self) -> Result<(), Error> {
if !self.deferred_bcs.is_empty() {
bail!(
"Unexpected remaining edges to walk {:?}",
self.deferred_bcs
.iter()
.map(|e| e.value().clone())
.collect::<Vec<_>>()
);
}
Ok(())
}
fn num_deferred(&self) -> usize {
self.deferred_bcs.len()
}
fn start_step(
&self,
ctx: CoreContext,

View File

@ -5,21 +5,69 @@
* GNU General Public License version 2.
*/
use crate::graph::{ChangesetKey, Node, NodeType};
use crate::log;
use crate::setup::JobWalkParams;
use crate::walk::{walk_exact, RepoWalkParams, RepoWalkTypeParams, StepRoute, WalkVisitor};
use crate::walk::{
walk_exact, OutgoingEdge, RepoWalkParams, RepoWalkTypeParams, StepRoute, WalkVisitor,
};
use anyhow::Error;
use anyhow::{bail, Error};
use bulkops::{Direction, PublicChangesetBulkFetch, MAX_FETCH_STEP, MIN_FETCH_STEP};
use cloned::cloned;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::{future::Future, stream::BoxStream};
use futures::{
future::{self, Future},
stream::{self, BoxStream, StreamExt, TryStreamExt},
};
use mononoke_types::ChangesetId;
use slog::info;
use std::collections::HashSet;
use tokio::time::{Duration, Instant};
// We can chose to go direct from the ChangesetId to types keyed by it without loading the Changeset
fn roots_for_chunk(
ids: HashSet<ChangesetId>,
root_types: &HashSet<NodeType>,
) -> Result<Vec<OutgoingEdge>, Error> {
let mut edges = vec![];
for id in ids {
for r in root_types {
let n = match r {
NodeType::BonsaiHgMapping => Node::BonsaiHgMapping(ChangesetKey {
inner: id,
filenode_known_derived: false,
}),
NodeType::PhaseMapping => Node::PhaseMapping(id),
NodeType::Changeset => Node::Changeset(ChangesetKey {
inner: id,
filenode_known_derived: false,
}),
NodeType::ChangesetInfo => Node::ChangesetInfo(id),
NodeType::ChangesetInfoMapping => Node::ChangesetInfoMapping(id),
NodeType::FsnodeMapping => Node::FsnodeMapping(id),
NodeType::SkeletonManifestMapping => Node::SkeletonManifestMapping(id),
NodeType::UnodeMapping => Node::UnodeMapping(id),
_ => bail!("Unsupported root type for chunking {:?}", r),
};
if let Some(edge_type) = n.get_type().root_edge_type() {
let edge = OutgoingEdge::new(edge_type, n);
edges.push(edge);
} else {
bail!("Unsupported node type for root edges {:?}", n.get_type());
}
}
}
Ok(edges)
}
pub async fn walk_exact_tail<RunFac, SinkFac, SinkOut, V, VOut, Route>(
fb: FacebookInit,
job_params: JobWalkParams,
mut repo_params: RepoWalkParams,
type_params: RepoWalkTypeParams,
public_changeset_chunk_by: HashSet<NodeType>,
visitor: V,
make_run: RunFac,
) -> Result<(), Error>
@ -39,13 +87,79 @@ where
let session_text = ctx.session().metadata().session_id().to_string();
repo_params.scuba_builder.add("session", session_text);
let walk_output = {
cloned!(ctx, repo_params);
walk_exact(ctx, visitor, job_params, repo_params, type_params)
let chunk_params = job_params
.public_changeset_chunk_size
.map(|chunk_size| {
// Don't SQL fetch in really small or large chunks
let chunk_size = chunk_size as u64;
let fetch_step = if chunk_size < MIN_FETCH_STEP {
MIN_FETCH_STEP
} else if chunk_size > MAX_FETCH_STEP {
MAX_FETCH_STEP
} else {
chunk_size
};
let heads_fetcher = PublicChangesetBulkFetch::new(
repo_params.repo.get_repoid(),
repo_params.repo.get_changesets_object(),
repo_params.repo.get_phases(),
)
.with_read_from_master(false)
.with_step(fetch_step);
heads_fetcher.map(|v| (chunk_size as usize, v))
})
.transpose()?;
let is_chunking = chunk_params.is_some();
// Done in separate step so that heads_fetcher stays live in chunk_params
let chunk_stream = if let Some((chunk_size, heads_fetcher)) = &chunk_params {
heads_fetcher
.fetch_ids(&ctx, Direction::NewestFirst, None)
// TODO(ahornby) use chunk bounds for checkpointing
.map_ok(|(cs_id, _chunk_bounds)| cs_id)
.chunks(*chunk_size)
.map(move |v| v.into_iter().collect::<Result<HashSet<_>, Error>>())
.left_stream()
} else {
stream::once(future::ok(HashSet::new())).right_stream()
};
let make_sink = make_run(&ctx, &repo_params);
make_sink(walk_output).await?;
let chunk_count: usize = chunk_stream
.map(|chunk_members| {
let chunk_members = chunk_members?;
cloned!(mut repo_params);
let extra_roots = visitor.start_chunk(&chunk_members)?.into_iter();
let chunk_roots = roots_for_chunk(chunk_members, &public_changeset_chunk_by)?;
repo_params.walk_roots.extend(chunk_roots);
repo_params.walk_roots.extend(extra_roots);
Ok(repo_params)
})
.and_then(|repo_params| {
cloned!(ctx, job_params, make_run, type_params, visitor);
let make_sink = make_run(&ctx, &repo_params);
let logger = repo_params.logger.clone();
let walk_output =
walk_exact(ctx, visitor.clone(), job_params, repo_params, type_params);
async move {
let res = make_sink(walk_output).await?;
if is_chunking {
info!(logger, #log::LOADED, "Deferred: {}", visitor.num_deferred());
}
Ok::<_, Error>(res)
}
})
.try_fold(0, |acc, _| future::ok(acc + 1))
.await?;
visitor.end_chunks()?;
if let Some((chunk_size, _heads_fetcher)) = &chunk_params {
info!(
repo_params.logger, #log::LOADED,
"Completed in {} chunks of {}", chunk_count, chunk_size
);
};
match tail_secs {
Some(interval) => {

View File

@ -314,6 +314,21 @@ impl StepRoute for ValidateRoute {
impl WalkVisitor<(Node, Option<CheckData>, Option<StepStats>), ValidateRoute>
for ValidatingVisitor
{
fn start_chunk(
&self,
chunk_members: &HashSet<ChangesetId>,
) -> Result<HashSet<OutgoingEdge>, Error> {
self.inner.start_chunk(chunk_members)
}
fn end_chunks(&self) -> Result<(), Error> {
self.inner.end_chunks()
}
fn num_deferred(&self) -> usize {
self.inner.num_deferred()
}
fn start_step(
&self,
ctx: CoreContext,
@ -731,6 +746,7 @@ async fn run_one(
job_params,
repo_params,
type_params,
sub_params.public_changeset_chunk_by,
stateful_visitor,
make_sink,
)

View File

@ -128,6 +128,15 @@ pub trait VisitOne {
// partially derived types (it can see the node_data)
#[auto_impl(Arc)]
pub trait WalkVisitor<VOut, Route>: VisitOne {
fn start_chunk(
&self,
chunk_members: &HashSet<ChangesetId>,
) -> Result<HashSet<OutgoingEdge>, Error>;
fn end_chunks(&self) -> Result<(), Error>;
fn num_deferred(&self) -> usize;
// Called before the step is attempted
fn start_step(
&self,
@ -436,7 +445,6 @@ async fn bonsai_changeset_step<V: VisitOne>(
key: &ChangesetKey<ChangesetId>,
) -> Result<StepOutput, Error> {
let bcs_id = &key.inner;
// TODO(ahornby) check bonsai is in the chunk to be processed when chunking
// Get the data, and add direct file data for this bonsai changeset
let bcs = bcs_id.load(ctx, repo.blobstore()).await?;