mononoke: remove dry_run functionality from backfill_derived_data

Summary:
It wasn't really ever used, and it's quite complicated and unnecessary. Let's
just remove it

Reviewed By: krallin

Differential Revision: D29963129

fbshipit-source-id: d31ec788fe31e010dcc8f110431f4e4fbda21778
This commit is contained in:
Stanislau Hlebik 2021-07-29 02:08:43 -07:00 committed by Facebook GitHub Bot
parent a67aef1a16
commit fbcb42a51f
3 changed files with 213 additions and 498 deletions

View File

@ -11,7 +11,6 @@ path = "../cmds/backfill_derived_data/main.rs"
[dependencies]
anyhow = "1.0"
async-trait = "0.1.45"
blame = { version = "0.1.0", path = "../derived_data/blame" }
blobrepo = { version = "0.1.0", path = "../blobrepo" }
blobrepo_override = { version = "0.1.0", path = "../blobrepo/override" }
@ -49,12 +48,11 @@ tunables = { version = "0.1.0", path = "../tunables" }
unodes = { version = "0.1.0", path = "../derived_data/unodes" }
[dev-dependencies]
async-trait = "0.1.45"
blobrepo_hg = { version = "0.1.0", path = "../blobrepo/blobrepo_hg" }
fbinit-tokio = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "master" }
fixtures = { version = "0.1.0", path = "../tests/fixtures" }
maplit = "1.0"
mercurial_types = { version = "0.1.0", path = "../mercurial/types" }
test_repo_factory = { version = "0.1.0", path = "../repo_factory/test_repo_factory" }
tests_utils = { version = "0.1.0", path = "../tests/utils" }
[patch.crates-io]

View File

@ -1,236 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Error;
use async_trait::async_trait;
use blobrepo::BlobRepo;
use blobrepo_override::DangerousOverride;
use blobstore::{Blobstore, BlobstoreBytes, Loadable};
use cacheblob::MemWritesBlobstore;
use context::CoreContext;
use derived_data::BonsaiDerived;
use fsnodes::RootFsnodeId;
use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
use manifest::ManifestOps;
use mononoke_types::{blob::BlobstoreValue, ChangesetId, FsnodeId, MononokeId};
use slog::{debug, info};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
#[async_trait]
pub trait Cleaner {
async fn clean(&mut self, cs_ids: Vec<ChangesetId>) -> Result<(), Error>;
}
pub struct FsnodeCleaner {
alive: HashSet<ChangesetId>,
children_count: HashMap<ChangesetId, u64>,
clean_period: u64,
commits_since_last_clean: u64,
ctx: CoreContext,
memblobstore: Arc<MemWritesBlobstore<Arc<dyn Blobstore>>>,
repo: BlobRepo,
}
impl FsnodeCleaner {
pub fn new(
ctx: CoreContext,
repo: BlobRepo,
children_count: HashMap<ChangesetId, u64>,
clean_period: u64,
) -> (Self, BlobRepo) {
let mut memblobstore = None;
let repo = repo.dangerous_override(|blobstore| -> Arc<dyn Blobstore> {
let blobstore = Arc::new(MemWritesBlobstore::new(blobstore));
memblobstore = Some(blobstore.clone());
blobstore
});
let memblobstore = memblobstore.unwrap();
let s = Self {
alive: HashSet::new(),
children_count,
clean_period,
commits_since_last_clean: 0,
ctx,
repo: repo.clone(),
memblobstore,
};
(s, repo)
}
async fn clean_cache(
&mut self,
entries_to_preserve: Vec<(String, BlobstoreBytes)>,
) -> Result<(), Error> {
let ctx = &self.ctx;
let repo = &self.repo;
{
let mut cache = self.memblobstore.get_cache().lock().unwrap();
info!(ctx.logger(), "cache entries: {}", cache.len());
let mut to_delete = vec![];
{
for key in cache.keys() {
// That seems to be the best way of detecting if it's fsnode key or not...
if key.contains(FsnodeId::blobstore_key_prefix()) {
to_delete.push(key.clone());
}
}
}
for key in to_delete {
cache.remove(&key);
}
info!(ctx.logger(), "cache entries after cleanup: {}", cache.len());
}
info!(
ctx.logger(),
"finished cleanup, preserving {}",
entries_to_preserve.len()
);
stream::iter(entries_to_preserve)
.map(|(key, value)| {
debug!(ctx.logger(), "preserving: {}", key);
// Note - it's important to use repo.get_blobstore() and not
// use mem_writes blobstore. This is repo.get_blobstore()
// add a few wrapper blobstores (e.g. the one that adds repo prefix)
async move { repo.blobstore().put(ctx, key, value).await }
})
.map(Result::<_, Error>::Ok)
.try_for_each_concurrent(100, |f| async move { f.await })
.await
}
}
// Fsnode cleaner for dry-run backfill mode. It's job is to delete all fsnodes entries
// except for those that can be used to derive children commits.
//
// A commit is considered "alive" if there's still at least single child that hasn't
// been derived yet. We must keep all fsnode entries that can be referenced by any alive
// commit. However we are free to delete any other entries.
// Fsnodes cleaner works in the following way:
// 1) It gets a chunk of commits that were just derived and it figures out which commits are still alive
// 2) Periodically (i.e. after every `clean_period` commits) it removes fsnode entries that are no
// longer reachable by alive commits.
#[async_trait]
impl Cleaner for FsnodeCleaner {
async fn clean(&mut self, cs_ids: Vec<ChangesetId>) -> Result<(), Error> {
for cs_id in cs_ids {
self.commits_since_last_clean += 1;
let parents = self
.repo
.get_changeset_parents_by_bonsai(self.ctx.clone(), cs_id)
.await?;
self.alive.insert(cs_id);
for p in parents {
let value = if let Some(value) = self.children_count.get_mut(&p) {
value
} else {
continue;
};
*value -= 1;
if *value == 0 {
self.alive.remove(&p);
}
}
}
if self.commits_since_last_clean >= self.clean_period {
self.commits_since_last_clean = 0;
let entries_to_preserve =
find_entries_to_preserve(&self.ctx, &self.repo, &self.alive).await?;
self.clean_cache(entries_to_preserve).await?;
}
Ok(())
}
}
// Finds entries that are still reachable from cs_to_preserve and returns
// corresponding blobs that needs to be saved
async fn find_entries_to_preserve(
ctx: &CoreContext,
repo: &BlobRepo,
cs_to_preserve: &HashSet<ChangesetId>,
) -> Result<Vec<(String, BlobstoreBytes)>, Error> {
cs_to_preserve
.iter()
.map(|cs_id| async move {
let root_fsnode = RootFsnodeId::derive(&ctx, &repo, *cs_id).await?;
Result::<_, Error>::Ok(
root_fsnode
.fsnode_id()
.list_tree_entries(ctx.clone(), repo.get_blobstore())
.map_ok(move |(_, mf_id)| async move {
let mf = mf_id.load(ctx, &repo.get_blobstore()).await?;
Ok((mf_id.blobstore_key(), mf.into_blob().into()))
})
.try_buffer_unordered(100),
)
})
.collect::<FuturesUnordered<_>>()
.try_flatten()
.try_collect::<Vec<_>>()
.await
}
#[cfg(test)]
mod test {
use super::*;
use fbinit::FacebookInit;
use maplit::hashmap;
use tests_utils::CreateCommitContext;
async fn try_list_all_fsnodes(
ctx: &CoreContext,
repo: &BlobRepo,
cs_id: ChangesetId,
) -> Result<(), Error> {
let fsnode = RootFsnodeId::derive(&ctx, &repo, cs_id).await?;
fsnode
.fsnode_id()
.list_all_entries(ctx.clone(), repo.get_blobstore())
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
#[fbinit::test]
async fn test_fsnode_cleaner(fb: FacebookInit) -> Result<(), Error> {
let ctx = CoreContext::test_mock(fb);
let repo = test_repo_factory::build_empty()?;
let root = CreateCommitContext::new_root(&ctx, &repo)
.add_file("file", "content")
.commit()
.await?;
let child = CreateCommitContext::new(&ctx, &repo, vec![root])
.add_file("file", "content2")
.commit()
.await?;
let clean_period = 1;
let (cleaner, newrepo) = FsnodeCleaner::new(
ctx.clone(),
repo.clone(),
hashmap! { root => 1 },
clean_period,
);
let mut cleaner = cleaner;
let repo = newrepo;
cleaner.clean(vec![root, child]).await?;
assert!(try_list_all_fsnodes(&ctx, &repo, root).await.is_err());
assert!(try_list_all_fsnodes(&ctx, &repo, child).await.is_ok());
Ok(())
}
}

View File

@ -45,7 +45,7 @@ use skiplist::SkiplistIndex;
use slog::{info, Logger};
use stats::prelude::*;
use std::{
collections::{BTreeSet, HashMap, HashSet},
collections::{BTreeSet, HashSet},
fs,
path::Path,
sync::Arc,
@ -54,7 +54,6 @@ use std::{
use time_ext::DurationExt;
use tunables::tunables;
mod dry_run;
mod slice;
mod warmup;
@ -66,7 +65,6 @@ define_stats! {
const ARG_ALL_TYPES: &str = "all-types";
const ARG_DERIVED_DATA_TYPE: &str = "derived-data-type";
const ARG_DRY_RUN: &str = "dry-run";
const ARG_OUT_FILENAME: &str = "out-filename";
const ARG_SKIP: &str = "skip-changesets";
const ARG_LIMIT: &str = "limit";
@ -123,227 +121,217 @@ async fn open_repo_maybe_unredacted(
#[fbinit::main]
fn main(fb: FacebookInit) -> Result<()> {
let app = args::MononokeAppBuilder::new("Utility to work with bonsai derived data")
.with_advanced_args_hidden()
.with_fb303_args()
.with_repo_required(RepoRequirement::AtLeastOne)
.build()
.about("Utility to work with bonsai derived data")
.subcommand(
SubCommand::with_name(SUBCOMMAND_BACKFILL)
.about("backfill derived data for public commits")
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.required(true)
.index(1)
.possible_values(POSSIBLE_DERIVED_TYPES)
.help("derived data type for which backfill will be run"),
)
.arg(
Arg::with_name(ARG_SKIP)
.long(ARG_SKIP)
.takes_value(true)
.help("skip this number of changesets"),
)
.arg(
Arg::with_name(ARG_LIMIT)
.long(ARG_LIMIT)
.takes_value(true)
.help("backfill at most this number of changesets"),
)
.arg(
Arg::with_name(ARG_REGENERATE)
.long(ARG_REGENERATE)
.help("regenerate derivations even if mapping contains changeset"),
)
.arg(
Arg::with_name(ARG_PREFETCHED_COMMITS_PATH)
.long(ARG_PREFETCHED_COMMITS_PATH)
.takes_value(true)
.required(false)
.help("a file with a list of bonsai changesets to backfill"),
)
.arg(
Arg::with_name(ARG_DRY_RUN)
.long(ARG_DRY_RUN)
.takes_value(false)
.required(false)
.help(
"Derives all data but writes it to memory. Note - requires --readonly",
),
)
.arg(
Arg::with_name(ARG_BATCH_SIZE)
.long(ARG_BATCH_SIZE)
.default_value(DEFAULT_BATCH_SIZE_STR)
.help("number of changesets in each derivation batch"),
)
.arg(
Arg::with_name(ARG_PARALLEL)
.long(ARG_PARALLEL)
.help("derive commits within a batch in parallel"),
)
.arg(
Arg::with_name(ARG_GAP_SIZE)
.long(ARG_GAP_SIZE)
.takes_value(true)
.help("size of gap to leave in derived data types that support gaps"),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_TAIL)
.about("tail public commits and fill derived data")
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.required(false)
.multiple(true)
.index(1)
.possible_values(POSSIBLE_DERIVED_TYPES)
// TODO(stash): T66492899 remove unused value
.help("Unused, will be deleted soon"),
)
.arg(
Arg::with_name(ARG_USE_SHARED_LEASES)
.long(ARG_USE_SHARED_LEASES)
.takes_value(false)
.required(false)
.help(concat!(
"By default the derived data tailer doesn't compete with ",
"other mononoke services for a derived data lease, so ",
"it will derive the data even if another mononoke service ",
"(e.g. mononoke_server, scs_server, ...) are already ",
"deriving it.\n\n",
"This flag disables this behaviour, meaning this command ",
"will compete for the derived data lease with other ",
"mononoke services and start deriving only if the lease ",
"is obtained.",
)),
)
.arg(
Arg::with_name(ARG_STOP_ON_IDLE)
.long(ARG_STOP_ON_IDLE)
.help("Stop tailing or backfilling when there is nothing left"),
)
.arg(
Arg::with_name(ARG_BATCHED)
.long(ARG_BATCHED)
.takes_value(false)
.required(false)
.help("Use batched deriver instead of calling `::derive` periodically"),
)
.arg(
Arg::with_name(ARG_BATCH_SIZE)
.long(ARG_BATCH_SIZE)
.default_value(DEFAULT_BATCH_SIZE_STR)
.help("number of changesets in each derivation batch"),
)
.arg(
Arg::with_name(ARG_PARALLEL)
.long(ARG_PARALLEL)
.help("derive commits within a batch in parallel"),
)
.arg(
Arg::with_name(ARG_BACKFILL)
.long(ARG_BACKFILL)
.help("also backfill derived data types configured for backfilling"),
)
.arg(
Arg::with_name(ARG_SLICED)
.long(ARG_SLICED)
.help("pre-slice repository using the skiplist index when backfilling"),
)
.arg(
Arg::with_name(ARG_SLICE_SIZE)
.long(ARG_SLICE_SIZE)
.default_value(DEFAULT_SLICE_SIZE_STR)
.help("number of generations to include in each generation slice"),
)
.arg(
Arg::with_name(ARG_GAP_SIZE)
.long(ARG_GAP_SIZE)
.takes_value(true)
.help("size of gap to leave in derived data types that support gaps"),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_PREFETCH_COMMITS)
.about("fetch commits metadata from the database and save them to a file")
.arg(
Arg::with_name(ARG_OUT_FILENAME)
.long(ARG_OUT_FILENAME)
.takes_value(true)
.required(true)
.help("file name where commits will be saved"),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_SINGLE)
.about("backfill single changeset (mainly for performance testing purposes)")
.arg(
Arg::with_name(ARG_ALL_TYPES)
.long(ARG_ALL_TYPES)
.required(false)
.takes_value(false)
.help("derive all derived data types enabled for this repo"),
)
.arg(
Arg::with_name(ARG_CHANGESET)
.required(true)
.index(1)
.help("changeset by {hd|bonsai} hash or bookmark"),
)
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.required(false)
.index(2)
.conflicts_with(ARG_ALL_TYPES)
.possible_values(POSSIBLE_DERIVED_TYPES)
.help("derived data type for which backfill will be run"),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_BACKFILL_ALL)
.about("backfill all/many derived data types at once")
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.possible_values(POSSIBLE_DERIVED_TYPES)
.required(false)
.takes_value(true)
.multiple(true)
.help(concat!(
"derived data type for which backfill will be run, ",
"all enabled and backfilling types if not specified",
)),
)
.arg(
Arg::with_name(ARG_BATCH_SIZE)
.long(ARG_BATCH_SIZE)
.default_value(DEFAULT_BATCH_SIZE_STR)
.help("number of changesets in each derivation batch"),
)
.arg(
Arg::with_name(ARG_PARALLEL)
.long(ARG_PARALLEL)
.help("derive commits within a batch in parallel"),
)
.arg(
Arg::with_name(ARG_SLICED).long(ARG_SLICED).help(
"pre-slice repository into generation slices using the skiplist index",
let app =
args::MononokeAppBuilder::new("Utility to work with bonsai derived data")
.with_advanced_args_hidden()
.with_fb303_args()
.with_repo_required(RepoRequirement::AtLeastOne)
.build()
.about("Utility to work with bonsai derived data")
.subcommand(
SubCommand::with_name(SUBCOMMAND_BACKFILL)
.about("backfill derived data for public commits")
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.required(true)
.index(1)
.possible_values(POSSIBLE_DERIVED_TYPES)
.help("derived data type for which backfill will be run"),
)
.arg(
Arg::with_name(ARG_SKIP)
.long(ARG_SKIP)
.takes_value(true)
.help("skip this number of changesets"),
)
.arg(
Arg::with_name(ARG_LIMIT)
.long(ARG_LIMIT)
.takes_value(true)
.help("backfill at most this number of changesets"),
)
.arg(
Arg::with_name(ARG_REGENERATE)
.long(ARG_REGENERATE)
.help("regenerate derivations even if mapping contains changeset"),
)
.arg(
Arg::with_name(ARG_PREFETCHED_COMMITS_PATH)
.long(ARG_PREFETCHED_COMMITS_PATH)
.takes_value(true)
.required(false)
.help("a file with a list of bonsai changesets to backfill"),
)
.arg(
Arg::with_name(ARG_BATCH_SIZE)
.long(ARG_BATCH_SIZE)
.default_value(DEFAULT_BATCH_SIZE_STR)
.help("number of changesets in each derivation batch"),
)
.arg(
Arg::with_name(ARG_PARALLEL)
.long(ARG_PARALLEL)
.help("derive commits within a batch in parallel"),
)
.arg(
Arg::with_name(ARG_GAP_SIZE)
.long(ARG_GAP_SIZE)
.takes_value(true)
.help("size of gap to leave in derived data types that support gaps"),
),
)
.arg(
Arg::with_name(ARG_SLICE_SIZE)
.long(ARG_SLICE_SIZE)
.default_value(DEFAULT_SLICE_SIZE_STR)
.help("number of generations to include in each generation slice"),
)
.arg(
Arg::with_name(ARG_GAP_SIZE)
.long(ARG_GAP_SIZE)
.takes_value(true)
.help("size of gap to leave in derived data types that support gaps"),
),
);
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_TAIL)
.about("tail public commits and fill derived data")
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.required(false)
.multiple(true)
.index(1)
.possible_values(POSSIBLE_DERIVED_TYPES)
// TODO(stash): T66492899 remove unused value
.help("Unused, will be deleted soon"),
)
.arg(
Arg::with_name(ARG_USE_SHARED_LEASES)
.long(ARG_USE_SHARED_LEASES)
.takes_value(false)
.required(false)
.help(concat!(
"By default the derived data tailer doesn't compete with ",
"other mononoke services for a derived data lease, so ",
"it will derive the data even if another mononoke service ",
"(e.g. mononoke_server, scs_server, ...) are already ",
"deriving it.\n\n",
"This flag disables this behaviour, meaning this command ",
"will compete for the derived data lease with other ",
"mononoke services and start deriving only if the lease ",
"is obtained.",
)),
)
.arg(
Arg::with_name(ARG_STOP_ON_IDLE)
.long(ARG_STOP_ON_IDLE)
.help("Stop tailing or backfilling when there is nothing left"),
)
.arg(
Arg::with_name(ARG_BATCHED)
.long(ARG_BATCHED)
.takes_value(false)
.required(false)
.help("Use batched deriver instead of calling `::derive` periodically"),
)
.arg(
Arg::with_name(ARG_BATCH_SIZE)
.long(ARG_BATCH_SIZE)
.default_value(DEFAULT_BATCH_SIZE_STR)
.help("number of changesets in each derivation batch"),
)
.arg(
Arg::with_name(ARG_PARALLEL)
.long(ARG_PARALLEL)
.help("derive commits within a batch in parallel"),
)
.arg(
Arg::with_name(ARG_BACKFILL)
.long(ARG_BACKFILL)
.help("also backfill derived data types configured for backfilling"),
)
.arg(
Arg::with_name(ARG_SLICED)
.long(ARG_SLICED)
.help("pre-slice repository using the skiplist index when backfilling"),
)
.arg(
Arg::with_name(ARG_SLICE_SIZE)
.long(ARG_SLICE_SIZE)
.default_value(DEFAULT_SLICE_SIZE_STR)
.help("number of generations to include in each generation slice"),
)
.arg(
Arg::with_name(ARG_GAP_SIZE)
.long(ARG_GAP_SIZE)
.takes_value(true)
.help("size of gap to leave in derived data types that support gaps"),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_PREFETCH_COMMITS)
.about("fetch commits metadata from the database and save them to a file")
.arg(
Arg::with_name(ARG_OUT_FILENAME)
.long(ARG_OUT_FILENAME)
.takes_value(true)
.required(true)
.help("file name where commits will be saved"),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_SINGLE)
.about("backfill single changeset (mainly for performance testing purposes)")
.arg(
Arg::with_name(ARG_ALL_TYPES)
.long(ARG_ALL_TYPES)
.required(false)
.takes_value(false)
.help("derive all derived data types enabled for this repo"),
)
.arg(
Arg::with_name(ARG_CHANGESET)
.required(true)
.index(1)
.help("changeset by {hd|bonsai} hash or bookmark"),
)
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.required(false)
.index(2)
.conflicts_with(ARG_ALL_TYPES)
.possible_values(POSSIBLE_DERIVED_TYPES)
.help("derived data type for which backfill will be run"),
),
)
.subcommand(
SubCommand::with_name(SUBCOMMAND_BACKFILL_ALL)
.about("backfill all/many derived data types at once")
.arg(
Arg::with_name(ARG_DERIVED_DATA_TYPE)
.possible_values(POSSIBLE_DERIVED_TYPES)
.required(false)
.takes_value(true)
.multiple(true)
.help(concat!(
"derived data type for which backfill will be run, ",
"all enabled and backfilling types if not specified",
)),
)
.arg(
Arg::with_name(ARG_BATCH_SIZE)
.long(ARG_BATCH_SIZE)
.default_value(DEFAULT_BATCH_SIZE_STR)
.help("number of changesets in each derivation batch"),
)
.arg(
Arg::with_name(ARG_PARALLEL)
.long(ARG_PARALLEL)
.help("derive commits within a batch in parallel"),
)
.arg(Arg::with_name(ARG_SLICED).long(ARG_SLICED).help(
"pre-slice repository into generation slices using the skiplist index",
))
.arg(
Arg::with_name(ARG_SLICE_SIZE)
.long(ARG_SLICE_SIZE)
.default_value(DEFAULT_SLICE_SIZE_STR)
.help("number of generations to include in each generation slice"),
)
.arg(
Arg::with_name(ARG_GAP_SIZE)
.long(ARG_GAP_SIZE)
.takes_value(true)
.help("size of gap to leave in derived data types that support gaps"),
),
);
let matches = app.get_matches(fb)?;
let logger = matches.logger();
let ctx = CoreContext::new_with_logger(fb, logger.clone());
@ -431,7 +419,7 @@ async fn run_subcmd<'a>(
.map(|limit| limit.parse::<usize>())
.transpose()?;
let mut repo =
let repo =
open_repo_maybe_unredacted(fb, &logger, &matches, &derived_data_type).await?;
info!(
@ -442,36 +430,6 @@ async fn run_subcmd<'a>(
let mut changesets = parse_serialized_commits(prefetched_commits_path)?;
changesets.sort_by_key(|cs_entry| cs_entry.gen);
let mut cleaner = None;
if sub_m.is_present(ARG_DRY_RUN) {
if !matches.readonly_storage().0 {
return Err(anyhow!("--dry-run requires readonly storage!"));
}
if derived_data_type != "fsnodes" {
return Err(anyhow!("unsupported dry run data type"));
}
let mut children_count = HashMap::new();
for entry in &changesets {
for p in &entry.parents {
*children_count.entry(*p).or_insert(0) += 1;
}
}
if derived_data_type == "fsnodes" {
let (new_cleaner, wrapped_repo) = dry_run::FsnodeCleaner::new(
ctx.clone(),
repo.blob_repo.clone(),
children_count,
10000,
);
repo.blob_repo = wrapped_repo;
cleaner = Some(new_cleaner);
}
}
let iter = changesets.into_iter().skip(skip);
let changesets = match maybe_limit {
Some(limit) => iter.take(limit).map(|entry| entry.cs_id).collect(),
@ -497,7 +455,6 @@ async fn run_subcmd<'a>(
batch_size,
gap_size,
changesets,
cleaner,
)
.await
}
@ -744,7 +701,6 @@ async fn subcommand_backfill(
batch_size: usize,
gap_size: Option<usize>,
changesets: Vec<ChangesetId>,
mut cleaner: Option<impl dry_run::Cleaner>,
) -> Result<()> {
let derived_utils = &derived_data_utils_for_backfill(&repo.blob_repo, derived_data_type)?;
@ -823,9 +779,6 @@ async fn subcommand_backfill(
generated / total_duration.as_secs() as f32,
);
}
if let Some(ref mut cleaner) = cleaner {
cleaner.clean(chunk.to_vec()).await?;
}
}
Ok(())
}