mirror of
https://github.com/facebook/sapling.git
synced 2024-10-05 14:28:17 +03:00
remove manual_scrub
Summary: This tool was used for the initial population of the XDB blobstore. It has been superseded by the walker, and can now be deleted. Differential Revision: D45112828 fbshipit-source-id: 134e2c2b761fd27a9b096df4d64404bc14c89c43
This commit is contained in:
parent
80f85f9f0b
commit
d91fbd8a26
@ -20,10 +20,6 @@ test = false
|
||||
name = "check_git_wc"
|
||||
path = "cmds/check_git_wc/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "manual_scrub"
|
||||
path = "cmds/manual_scrub/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "packer"
|
||||
path = "cmds/packer/main.rs"
|
||||
@ -49,10 +45,8 @@ name = "streaming_clone_warmup"
|
||||
path = "cmds/streaming_clone_warmup/main.rs"
|
||||
|
||||
[dependencies]
|
||||
ahash = "0.8"
|
||||
anyhow = "1.0.65"
|
||||
ascii = "1.0"
|
||||
async-compression = { version = "0.3.14", features = ["brotli", "bzip2", "deflate", "futures-io", "gzip", "tokio", "zlib", "zstd"] }
|
||||
blobrepo = { version = "0.1.0", path = "blobrepo" }
|
||||
blobrepo_utils = { version = "0.1.0", path = "blobrepo_utils" }
|
||||
blobstore = { version = "0.1.0", path = "blobstore" }
|
||||
@ -74,8 +68,6 @@ cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rus
|
||||
cmdlib = { version = "0.1.0", path = "cmdlib" }
|
||||
cmdlib_logging = { version = "0.1.0", path = "cmdlib/log" }
|
||||
context = { version = "0.1.0", path = "server/context" }
|
||||
dashmap = { version = "5.4", features = ["raw-api", "rayon", "serde"] }
|
||||
derive_more = "0.99.17"
|
||||
facet = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
|
||||
failure_ext = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
|
||||
fbinit = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
|
||||
@ -106,9 +98,7 @@ sql_construct = { version = "0.1.0", path = "common/sql_construct" }
|
||||
sql_ext = { version = "0.1.0", path = "common/rust/sql_ext" }
|
||||
sqlblob = { version = "0.1.0", path = "blobstore/sqlblob" }
|
||||
streaming_clone = { version = "0.1.0", path = "repo_client/streaming_clone" }
|
||||
tempfile = "3.5"
|
||||
tokio = { version = "1.25.0", features = ["full", "test-util", "tracing"] }
|
||||
tokio-stream = { version = "0.1.4", features = ["fs", "io-util", "net", "signal", "sync", "time"] }
|
||||
toml = "=0.5.8"
|
||||
|
||||
[patch.crates-io]
|
||||
|
@ -10,8 +10,6 @@
|
||||
//! after the file content was copied. So it should be used only in the urgent situations
|
||||
//! like e.g. repo corruption. List of keys are passed in a file, and this list of keys
|
||||
//! can be generated by any tool e.g. walker.
|
||||
//! It's similar to manual_scrub tool, with the exception that manual_scrub preserves the repoid
|
||||
//! prefix for the blob, while this tool either strips it or ignores it.
|
||||
|
||||
use anyhow::anyhow;
|
||||
use anyhow::Context;
|
||||
|
@ -1,53 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This software may be used and distributed according to the terms of the
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::read_to_string;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Error;
|
||||
use slog::info;
|
||||
use slog::Logger;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FileCheckpoint {
|
||||
pub file_name: PathBuf,
|
||||
}
|
||||
|
||||
impl FileCheckpoint {
|
||||
pub fn new(file_name: &OsStr) -> Self {
|
||||
let mut buf = PathBuf::new();
|
||||
buf.push(file_name);
|
||||
Self { file_name: buf }
|
||||
}
|
||||
|
||||
pub fn read(&self) -> Result<Option<String>, Error> {
|
||||
if self.file_name.exists() {
|
||||
return read_to_string(&self.file_name)
|
||||
.map(Some)
|
||||
.context("couldn't read checkpoint");
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn update(&self, logger: &Logger, key: &str) -> Result<(), Error> {
|
||||
let tempfile = NamedTempFile::new_in(
|
||||
self.file_name
|
||||
.parent()
|
||||
.context("no parent dir for checkpoint file")?,
|
||||
)?;
|
||||
tempfile.as_file().write_all(key.as_bytes())?;
|
||||
let file = tempfile.persist(&self.file_name)?;
|
||||
// This is expensive, but we only call it every PROGRESS_INTERVAL_SECS seconds
|
||||
file.sync_all()?;
|
||||
info!(logger, "checkpointed {}", key);
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,253 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This software may be used and distributed according to the terms of the
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use std::ffi::OsStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
use async_compression::tokio::write::ZstdEncoder;
|
||||
use async_compression::Level;
|
||||
use blobstore_factory::make_blobstore;
|
||||
use blobstore_factory::ScrubAction;
|
||||
use blobstore_factory::SrubWriteOnly;
|
||||
use clap_old::Arg;
|
||||
use cmdlib::args;
|
||||
use cmdlib::args::ArgType;
|
||||
use context::CoreContext;
|
||||
use futures::channel::mpsc;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::stream::StreamExt;
|
||||
use futures::stream::TryStreamExt;
|
||||
use slog::info;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::stdin;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
|
||||
mod checkpoint;
|
||||
mod progress;
|
||||
mod scrub;
|
||||
mod tracker;
|
||||
|
||||
use crate::checkpoint::FileCheckpoint;
|
||||
use crate::scrub::scrub;
|
||||
|
||||
const ARG_STORAGE_CONFIG_NAME: &str = "storage-config-name";
|
||||
const ARG_SCHEDULED_MAX: &str = "scheduled-max";
|
||||
|
||||
const ARG_SUCCESSFUL_KEYS: &str = "success-keys-output";
|
||||
const ARG_MISSING_KEYS: &str = "missing-keys-output";
|
||||
const ARG_ERROR_KEYS: &str = "error-keys-output";
|
||||
const ARG_CHECKPOINT_KEY: &str = "checkpoint-key-file";
|
||||
const ARG_ZSTD_LEVEL: &str = "keys-zstd-level";
|
||||
const ARG_QUIET: &str = "quiet";
|
||||
|
||||
async fn bridge_to_file<W: AsyncWriteExt + Unpin>(
|
||||
mut file: W,
|
||||
mut recv: mpsc::Receiver<String>,
|
||||
) -> Result<()> {
|
||||
while let Some(string) = recv.next().await {
|
||||
file.write_all(string.as_bytes()).await?;
|
||||
file.write_all(b"\n").await?;
|
||||
}
|
||||
let _ = file.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_errors<W: AsyncWriteExt + Unpin>(
|
||||
mut file: W,
|
||||
mut recv: mpsc::Receiver<(String, Error)>,
|
||||
) -> Result<()> {
|
||||
while let Some((key, err)) = recv.next().await {
|
||||
eprintln!("Error: {:?}", err.context(format!("Scrubbing key {}", key)));
|
||||
file.write_all(key.as_bytes()).await?;
|
||||
file.write_all(b"\n").await?;
|
||||
}
|
||||
let _ = file.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_file(
|
||||
file_name: &OsStr,
|
||||
zstd_level: Option<i32>,
|
||||
) -> Result<Box<dyn AsyncWrite + Unpin + Send>, Error> {
|
||||
let file = File::create(file_name).await?;
|
||||
if let Some(zstd_level) = zstd_level {
|
||||
Ok(Box::new(ZstdEncoder::with_quality(
|
||||
file,
|
||||
Level::Precise(zstd_level as u32),
|
||||
)))
|
||||
} else {
|
||||
Ok(Box::new(file))
|
||||
}
|
||||
}
|
||||
|
||||
#[fbinit::main]
|
||||
fn main(fb: fbinit::FacebookInit) -> Result<()> {
|
||||
let app = args::MononokeAppBuilder::new("manual scrub")
|
||||
.with_advanced_args_hidden()
|
||||
.with_all_repos()
|
||||
.with_arg_types(vec![ArgType::Scrub])
|
||||
.with_scrub_action_default(Some(ScrubAction::Repair))
|
||||
.with_scrub_action_on_missing_write_only_default(Some(SrubWriteOnly::Scrub))
|
||||
.build()
|
||||
.arg(
|
||||
Arg::with_name(ARG_STORAGE_CONFIG_NAME)
|
||||
.long(ARG_STORAGE_CONFIG_NAME)
|
||||
.takes_value(true)
|
||||
.required(true)
|
||||
.help("the name of the storage config to scrub"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_SCHEDULED_MAX)
|
||||
.long(ARG_SCHEDULED_MAX)
|
||||
.takes_value(true)
|
||||
.required(false)
|
||||
.help("Maximum number of scrub keys to attempt to execute at once. Default 100."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_SUCCESSFUL_KEYS)
|
||||
.long(ARG_SUCCESSFUL_KEYS)
|
||||
.takes_value(true)
|
||||
.required_unless(ARG_CHECKPOINT_KEY)
|
||||
.help("A file to write successfully scrubbed key IDs to"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_MISSING_KEYS)
|
||||
.long(ARG_MISSING_KEYS)
|
||||
.takes_value(true)
|
||||
.required(true)
|
||||
.help("A file to write missing data key IDs to"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_ERROR_KEYS)
|
||||
.long(ARG_ERROR_KEYS)
|
||||
.takes_value(true)
|
||||
.required(true)
|
||||
.help("A file to write error fetching data key IDs to"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_CHECKPOINT_KEY)
|
||||
.long(ARG_CHECKPOINT_KEY)
|
||||
.takes_value(true)
|
||||
.required(false)
|
||||
.help("A file to write checkpoint key to"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_ZSTD_LEVEL)
|
||||
.long(ARG_ZSTD_LEVEL)
|
||||
.takes_value(true)
|
||||
.required(false)
|
||||
.help("Pass a level to Zstd compress the output files, 0 means Zstd default"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name(ARG_QUIET)
|
||||
.long(ARG_QUIET)
|
||||
.takes_value(false)
|
||||
.required(false)
|
||||
.help("Run quietly with minimal progress logging"),
|
||||
);
|
||||
|
||||
let (matches, runtime) = app.get_matches(fb)?;
|
||||
let logger = matches.logger();
|
||||
let config_store = matches.config_store();
|
||||
|
||||
let scheduled_max = args::get_usize_opt(&matches, ARG_SCHEDULED_MAX).unwrap_or(100);
|
||||
|
||||
let storage_config = args::load_storage_configs(config_store, &matches)
|
||||
.context("Could not read storage configs")?
|
||||
.storage
|
||||
.remove(
|
||||
matches
|
||||
.value_of(ARG_STORAGE_CONFIG_NAME)
|
||||
.context("No storage config name")?,
|
||||
)
|
||||
.context("Requested storage config not found")?;
|
||||
|
||||
let mysql_options = matches.mysql_options();
|
||||
let blobstore_options = matches.blobstore_options();
|
||||
let ctx = CoreContext::new_for_bulk_processing(fb, logger.clone());
|
||||
|
||||
let success_file_name = matches.value_of_os(ARG_SUCCESSFUL_KEYS);
|
||||
let missing_keys_file_name = matches
|
||||
.value_of_os(ARG_MISSING_KEYS)
|
||||
.context("No missing data output file")?;
|
||||
let errors_file_name = matches
|
||||
.value_of_os(ARG_ERROR_KEYS)
|
||||
.context("No errored keys output file")?;
|
||||
let zstd_level = args::get_i32_opt(&matches, ARG_ZSTD_LEVEL);
|
||||
let quiet = matches.is_present(ARG_QUIET);
|
||||
let checkpoint = matches
|
||||
.value_of_os(ARG_CHECKPOINT_KEY)
|
||||
.map(FileCheckpoint::new);
|
||||
|
||||
let scrub = async move {
|
||||
let blobstore = make_blobstore(
|
||||
fb,
|
||||
storage_config.blobstore,
|
||||
mysql_options,
|
||||
blobstore_factory::ReadOnlyStorage(false),
|
||||
blobstore_options,
|
||||
logger,
|
||||
config_store,
|
||||
&blobstore_factory::default_scrub_handler(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if !quiet {
|
||||
info!(logger, "Scrubbing blobstore: {}", blobstore);
|
||||
}
|
||||
|
||||
let stdin = BufReader::new(stdin());
|
||||
let mut output_handles = FuturesUnordered::new();
|
||||
let success = if let Some(success_file_name) = success_file_name {
|
||||
let (send, recv) = mpsc::channel(100);
|
||||
let file = create_file(success_file_name, zstd_level).await?;
|
||||
output_handles.push(tokio::spawn(bridge_to_file(file, recv)));
|
||||
Some(send)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let missing = {
|
||||
let (send, recv) = mpsc::channel(100);
|
||||
let file = create_file(missing_keys_file_name, zstd_level).await?;
|
||||
output_handles.push(tokio::spawn(bridge_to_file(file, recv)));
|
||||
send
|
||||
};
|
||||
let error = {
|
||||
let (send, recv) = mpsc::channel(100);
|
||||
let file = create_file(errors_file_name, zstd_level).await?;
|
||||
output_handles.push(tokio::spawn(handle_errors(file, recv)));
|
||||
send
|
||||
};
|
||||
|
||||
let res = scrub(
|
||||
&blobstore,
|
||||
&ctx,
|
||||
tokio_stream::wrappers::LinesStream::new(stdin.lines()).map_err(Error::from),
|
||||
success,
|
||||
missing,
|
||||
error,
|
||||
checkpoint,
|
||||
scheduled_max,
|
||||
quiet,
|
||||
)
|
||||
.await
|
||||
.context("Scrub failed");
|
||||
|
||||
while let Some(task_result) = output_handles.try_next().await? {
|
||||
task_result.context("Writing output files failed")?;
|
||||
}
|
||||
res
|
||||
};
|
||||
|
||||
runtime.block_on(scrub)
|
||||
}
|
@ -1,101 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This software may be used and distributed according to the terms of the
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use std::fmt;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Error;
|
||||
use derive_more::Add;
|
||||
use derive_more::Sub;
|
||||
use slog::info;
|
||||
use slog::Logger;
|
||||
|
||||
#[derive(Add, Sub, Clone, Copy, Default, Debug)]
|
||||
pub struct Progress {
|
||||
pub success: u64,
|
||||
pub missing: u64,
|
||||
pub error: u64,
|
||||
pub skipped: u64,
|
||||
pub bytes: u64,
|
||||
}
|
||||
|
||||
// Log at most every N seconds
|
||||
const PROGRESS_INTERVAL_SECS: u64 = 30;
|
||||
|
||||
impl fmt::Display for Progress {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}, {}, {}, {}, {}, {}",
|
||||
self.success,
|
||||
self.missing,
|
||||
self.error,
|
||||
self.total(),
|
||||
self.skipped,
|
||||
self.bytes,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Progress {
|
||||
pub fn total(&self) -> u64 {
|
||||
self.success + self.missing + self.error
|
||||
}
|
||||
|
||||
pub fn legend(&self, logger: &Logger) {
|
||||
info!(
|
||||
logger,
|
||||
"period, rate/s, seconds, success, missing, error, total, skipped, bytes, bytes/s"
|
||||
);
|
||||
}
|
||||
|
||||
// Returns time of last log, if any
|
||||
pub fn record(
|
||||
&self,
|
||||
logger: &Logger,
|
||||
quiet: bool,
|
||||
started: Instant,
|
||||
prev: Option<(Progress, Instant)>,
|
||||
is_final: bool,
|
||||
) -> Result<Option<Instant>, Error> {
|
||||
let log_period = |period, run: &Self, period_secs| {
|
||||
let per_sec = if period_secs > 0 {
|
||||
run.total() / period_secs
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let bytes_per_sec = if period_secs > 0 {
|
||||
run.bytes / period_secs
|
||||
} else {
|
||||
0
|
||||
};
|
||||
info!(
|
||||
logger,
|
||||
"{}, {:06}, {}, {}, {}", period, per_sec, period_secs, run, bytes_per_sec
|
||||
);
|
||||
};
|
||||
|
||||
let now = Instant::now();
|
||||
let run_secs = now.duration_since(started).as_secs();
|
||||
|
||||
if let Some((prev, prev_t)) = prev {
|
||||
// keep log volume down
|
||||
let delta_secs = now.duration_since(prev_t).as_secs();
|
||||
if delta_secs < PROGRESS_INTERVAL_SECS && !is_final {
|
||||
return Ok(None);
|
||||
}
|
||||
if !quiet {
|
||||
log_period("run", self, run_secs);
|
||||
let delta = *self - prev;
|
||||
log_period("delta", &delta, delta_secs);
|
||||
}
|
||||
} else if !quiet {
|
||||
log_period("run", self, run_secs);
|
||||
}
|
||||
Ok(Some(now))
|
||||
}
|
||||
}
|
@ -1,162 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This software may be used and distributed according to the terms of the
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Error;
|
||||
use anyhow::Result;
|
||||
use blobstore::Blobstore;
|
||||
use cloned::cloned;
|
||||
use context::CoreContext;
|
||||
use futures::channel::mpsc;
|
||||
use futures::future;
|
||||
use futures::future::FutureExt;
|
||||
use futures::sink::SinkExt;
|
||||
use futures::stream::Stream;
|
||||
use futures::stream::StreamExt;
|
||||
use futures::stream::TryStreamExt;
|
||||
|
||||
use crate::checkpoint::FileCheckpoint;
|
||||
use crate::progress::Progress;
|
||||
use crate::tracker::Tracker;
|
||||
|
||||
const PROGRESS_SAMPLE_KEYS: u64 = 1000;
|
||||
|
||||
async fn scrub_key<B: Blobstore + Clone + 'static>(
|
||||
blobstore: &B,
|
||||
ctx: &CoreContext,
|
||||
key: String,
|
||||
success: Option<mpsc::Sender<String>>,
|
||||
mut missing: mpsc::Sender<String>,
|
||||
mut error: mpsc::Sender<(String, Error)>,
|
||||
) -> Result<(Progress, String)> {
|
||||
let handle = {
|
||||
cloned!(ctx, key, blobstore);
|
||||
tokio::task::spawn(async move { blobstore.get(&ctx, &key).await })
|
||||
};
|
||||
let res = handle.await?;
|
||||
let mut progress = Progress::default();
|
||||
{
|
||||
cloned!(key);
|
||||
match res {
|
||||
Ok(None) => {
|
||||
missing.send(key).await?;
|
||||
progress.missing += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
error.send((key, e)).await?;
|
||||
progress.error += 1;
|
||||
}
|
||||
Ok(Some(v)) => {
|
||||
if let Some(mut success) = success {
|
||||
success.send(key).await?;
|
||||
}
|
||||
progress.success += 1;
|
||||
progress.bytes += v.as_bytes().len() as u64;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok((progress, key))
|
||||
}
|
||||
|
||||
pub async fn scrub<B: Blobstore + Clone + 'static>(
|
||||
blobstore: &B,
|
||||
ctx: &CoreContext,
|
||||
keys: impl Stream<Item = Result<String>>,
|
||||
success: Option<mpsc::Sender<String>>,
|
||||
missing: mpsc::Sender<String>,
|
||||
error: mpsc::Sender<(String, Error)>,
|
||||
checkpoint: Option<FileCheckpoint>,
|
||||
scheduled_max: usize,
|
||||
quiet: bool,
|
||||
) -> Result<()> {
|
||||
let init = Progress::default();
|
||||
let started = Instant::now();
|
||||
if !quiet {
|
||||
init.legend(ctx.logger());
|
||||
}
|
||||
|
||||
let mut starting_key = checkpoint
|
||||
.as_ref()
|
||||
.and_then(|cp| cp.read().transpose())
|
||||
.transpose()?;
|
||||
|
||||
// keep a tracker so that we can use buffered_unordered but still checkpoint safely
|
||||
let tracker = Tracker::with_capacity(scheduled_max);
|
||||
let mut pos: u64 = 0;
|
||||
|
||||
let (run, last_update, cp, last_key) = keys
|
||||
.map_ok(|key| {
|
||||
tracker.insert(key.clone(), pos);
|
||||
pos += 1;
|
||||
key
|
||||
})
|
||||
.map(|key| match key {
|
||||
Ok(key) => {
|
||||
if let Some(start) = starting_key.as_ref() {
|
||||
if start == &key {
|
||||
let _ = starting_key.take();
|
||||
}
|
||||
let mut progress = Progress::default();
|
||||
progress.skipped += 1;
|
||||
return future::ready(Ok((progress, key))).right_future();
|
||||
}
|
||||
scrub_key(
|
||||
blobstore,
|
||||
ctx,
|
||||
key,
|
||||
success.clone(),
|
||||
missing.clone(),
|
||||
error.clone(),
|
||||
)
|
||||
.left_future()
|
||||
}
|
||||
Err(e) => future::ready(Err(e)).right_future(),
|
||||
})
|
||||
.buffer_unordered(scheduled_max)
|
||||
.try_fold(
|
||||
(init, Some((init, started)), checkpoint, None),
|
||||
|(run, mut prev, checkpoint, _prev_key), (latest, key)| {
|
||||
let tracker = &tracker;
|
||||
async move {
|
||||
tracker.mark_done(&key)?;
|
||||
let run = run + latest;
|
||||
// overkill to check time elapsed every key, so sample
|
||||
if run.total() % PROGRESS_SAMPLE_KEYS == 0 {
|
||||
if let Some(updated) =
|
||||
run.record(ctx.logger(), quiet, started, prev, false)?
|
||||
{
|
||||
let best_done = tracker.compact();
|
||||
match (best_done, checkpoint.as_ref()) {
|
||||
(Some(done_key), Some(checkpoint)) if run.success > 0 => {
|
||||
checkpoint.update(ctx.logger(), &done_key)?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
prev = Some((run, updated));
|
||||
}
|
||||
}
|
||||
Ok((run, prev, checkpoint, Some(key)))
|
||||
}
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Record progress at finish
|
||||
run.record(ctx.logger(), quiet, started, last_update, true)?;
|
||||
|
||||
// Record the last update
|
||||
if run.success > 0 {
|
||||
match (cp.as_ref(), last_key) {
|
||||
(Some(cp), Some(last_key)) => cp.update(ctx.logger(), &last_key)?,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
@ -1,87 +0,0 @@
|
||||
/*
|
||||
* Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
*
|
||||
* This software may be used and distributed according to the terms of the
|
||||
* GNU General Public License version 2.
|
||||
*/
|
||||
|
||||
use ahash::RandomState;
|
||||
use anyhow::bail;
|
||||
use anyhow::Error;
|
||||
use dashmap::DashMap;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TrackData {
|
||||
pos: u64,
|
||||
done: bool,
|
||||
}
|
||||
|
||||
// Track whether a key is done or pending
|
||||
pub struct Tracker {
|
||||
state: DashMap<String, TrackData, RandomState>,
|
||||
}
|
||||
|
||||
impl Tracker {
|
||||
pub fn with_capacity(capacity: usize) -> Self {
|
||||
Self {
|
||||
state: DashMap::with_capacity_and_hasher(capacity, RandomState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
// Start tracking a key
|
||||
pub fn insert(&self, key: String, pos: u64) {
|
||||
self.state.insert(key, TrackData { pos, done: false });
|
||||
}
|
||||
|
||||
pub fn mark_done(&self, key: &str) -> Result<(), Error> {
|
||||
if let Some(mut tracked) = self.state.get_mut(key) {
|
||||
tracked.done = true;
|
||||
} else {
|
||||
bail!("No inflight entry for {}, may have duplicates", key);
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Finds latest done key, if any, before the earliest pending key, and prunes the state before the done key.
|
||||
pub fn compact(&self) -> Option<String> {
|
||||
let mut earliest_pending = None;
|
||||
for i in self.state.iter() {
|
||||
let tracked = i.value();
|
||||
if !tracked.done {
|
||||
let replace = if let Some(ref best) = earliest_pending {
|
||||
tracked.pos < *best
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if replace {
|
||||
earliest_pending.replace(tracked.pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut best_done = None;
|
||||
for i in self.state.iter() {
|
||||
let tracked = i.value();
|
||||
let in_bound = if let Some(bound) = earliest_pending.as_ref() {
|
||||
tracked.pos < *bound
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if tracked.done && in_bound {
|
||||
let replace = if let Some((_, best)) = best_done.as_ref() {
|
||||
tracked.pos > *best
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if replace {
|
||||
best_done.replace((i.key().clone(), tracked.pos));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove the used entries
|
||||
if let Some((_, done_pos)) = best_done.as_ref() {
|
||||
self.state.retain(|_k, v| v.pos > *done_pos);
|
||||
}
|
||||
best_done.map(|(key, _)| key)
|
||||
}
|
||||
}
|
@ -1304,14 +1304,6 @@ function bonsai_verify {
|
||||
"$@"
|
||||
}
|
||||
|
||||
function manual_scrub {
|
||||
GLOG_minloglevel=5 "$MONONOKE_MANUAL_SCRUB" \
|
||||
"${CACHE_ARGS[@]}" \
|
||||
"${COMMON_ARGS[@]}" \
|
||||
--mononoke-config-path "$TESTTMP/mononoke-config" \
|
||||
"$@"
|
||||
}
|
||||
|
||||
function s_client {
|
||||
/usr/local/fbcode/platform009/bin/openssl s_client \
|
||||
-connect "$(mononoke_address)" \
|
||||
|
@ -25,7 +25,6 @@ MONONOKE_BINS = {
|
||||
"MONONOKE_GITIMPORT": "gitimport",
|
||||
"MONONOKE_HG_SYNC": "mononoke_hg_sync_job",
|
||||
"MONONOKE_IMPORT": "import",
|
||||
"MONONOKE_MANUAL_SCRUB": "manual_scrub",
|
||||
"MONONOKE_MICROWAVE_BUILDER": "builder",
|
||||
"MONONOKE_PACKER": "packer",
|
||||
"MONONOKE_RECHUNKER": "rechunker",
|
||||
|
@ -1,143 +0,0 @@
|
||||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
#
|
||||
# This software may be used and distributed according to the terms of the
|
||||
# GNU General Public License found in the LICENSE file in the root
|
||||
# directory of this source tree.
|
||||
|
||||
setup
|
||||
$ . "${TEST_FIXTURES}/library.sh"
|
||||
|
||||
setup configuration
|
||||
$ MULTIPLEXED=2 default_setup_blobimport "blob_files"
|
||||
hg repo
|
||||
o C [draft;rev=2;26805aba1e60]
|
||||
│
|
||||
o B [draft;rev=1;112478962961]
|
||||
│
|
||||
o A [draft;rev=0;426bada5c675]
|
||||
$
|
||||
blobimporting
|
||||
|
||||
Run a heal
|
||||
$ mononoke_blobstore_healer -q --iteration-limit=1 --heal-min-age-secs=0 --storage-id=blobstore --sync-queue-limit=100 2>&1 > /dev/null
|
||||
|
||||
Failure time - this key will not exist
|
||||
$ echo fake-key | manual_scrub --storage-config-name blobstore --checkpoint-key-file=checkpoint.txt --error-keys-output errors --missing-keys-output missing --success-keys-output success 2>&1 | strip_glog
|
||||
Scrubbing blobstore: WalScrubBlobstore[WAL MultiplexedBlobstore[normal [(BlobstoreId(0):"Fileblob"), (BlobstoreId(1):"Fileblob"), (BlobstoreId(2):"Fileblob")], write only []]]
|
||||
period, rate/s, seconds, success, missing, error, total, skipped, bytes, bytes/s
|
||||
run, *, *, 0, 1, 0, 1, 0, * (glob)
|
||||
delta, *, *, 0, 1, 0, 1, 0, * (glob)
|
||||
$ wc -l success missing errors
|
||||
0 success
|
||||
1 missing
|
||||
0 errors
|
||||
1 total
|
||||
$ cat missing
|
||||
fake-key
|
||||
$ [ ! -r checkpoint.txt ] || echo "not expecting checkpoint as no successes"
|
||||
|
||||
Success time - these keys will exist and be scrubbed
|
||||
$ manual_scrub --storage-config-name blobstore --quiet --scheduled-max=1 --checkpoint-key-file=checkpoint.txt --error-keys-output errors --missing-keys-output missing --success-keys-output success <<EOF 2>&1 | strip_glog
|
||||
> repo0000.content.blake2.55662471e2a28db8257939b2f9a2d24e65b46a758bac12914a58f17dcde6905f
|
||||
> repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b
|
||||
> EOF
|
||||
checkpointed repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b
|
||||
$ sort < success
|
||||
repo0000.content.blake2.55662471e2a28db8257939b2f9a2d24e65b46a758bac12914a58f17dcde6905f
|
||||
repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b
|
||||
$ wc -l success missing errors
|
||||
2 success
|
||||
0 missing
|
||||
0 errors
|
||||
2 total
|
||||
$ cat checkpoint.txt
|
||||
repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b (no-eol)
|
||||
|
||||
Continue from checkpoint
|
||||
$ manual_scrub --storage-config-name blobstore --scheduled-max=1 --checkpoint-key-file=checkpoint.txt --error-keys-output errors --missing-keys-output missing --success-keys-output success <<EOF 2>&1 | strip_glog
|
||||
> repo0000.content.blake2.55662471e2a28db8257939b2f9a2d24e65b46a758bac12914a58f17dcde6905f
|
||||
> repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b
|
||||
> repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d
|
||||
> EOF
|
||||
Scrubbing blobstore: WalScrubBlobstore[WAL MultiplexedBlobstore[normal [(BlobstoreId(0):"Fileblob"), (BlobstoreId(1):"Fileblob"), (BlobstoreId(2):"Fileblob")], write only []]]
|
||||
period, rate/s, seconds, success, missing, error, total, skipped, bytes, bytes/s
|
||||
run, *, *, 1, 0, 0, 1, 2, * (glob)
|
||||
delta, *, *, 1, 0, 0, 1, 2, * (glob)
|
||||
checkpointed repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d
|
||||
$ sort < success
|
||||
repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d
|
||||
$ wc -l success missing errors
|
||||
1 success
|
||||
0 missing
|
||||
0 errors
|
||||
1 total
|
||||
$ cat checkpoint.txt
|
||||
repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d (no-eol)
|
||||
|
||||
Do same run with compressed key output
|
||||
$ manual_scrub --storage-config-name blobstore --quiet --keys-zstd-level=9 --error-keys-output errors --missing-keys-output missing --success-keys-output success <<EOF
|
||||
> repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b
|
||||
> repo0000.content.blake2.55662471e2a28db8257939b2f9a2d24e65b46a758bac12914a58f17dcde6905f
|
||||
> repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d
|
||||
> EOF
|
||||
$ cat success | zstd -d | wc -l
|
||||
3
|
||||
|
||||
Do same run without specifing the optional success file when checkpointing
|
||||
$ rm success
|
||||
$ manual_scrub --storage-config-name blobstore --scheduled-max=1 --checkpoint-key-file=checkpoint2.txt --quiet --error-keys-output errors --missing-keys-output missing <<EOF 2>&1 | strip_glog
|
||||
> repo0000.hgchangeset.sha1.26805aba1e600a82e93661149f2313866a221a7b
|
||||
> repo0000.content.blake2.55662471e2a28db8257939b2f9a2d24e65b46a758bac12914a58f17dcde6905f
|
||||
> repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d
|
||||
> EOF
|
||||
checkpointed repo0000.hgfilenode.sha1.35e7525ce3a48913275d7061dd9a867ffef1e34d
|
||||
|
||||
Demostrate that a key exists
|
||||
$ ls "$TESTTMP/blobstore/0/blobs/blob-repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0"
|
||||
$TESTTMP/blobstore/0/blobs/blob-repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0
|
||||
|
||||
Delete it
|
||||
$ rm "$TESTTMP/blobstore/0/blobs/blob-repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0"
|
||||
|
||||
Check that healer queue is empty
|
||||
$ read_blobstore_wal_queue_size
|
||||
0
|
||||
|
||||
Scrub restores it
|
||||
$ manual_scrub --storage-config-name blobstore --quiet --error-keys-output errors --missing-keys-output missing --success-keys-output success <<EOF
|
||||
> repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0
|
||||
> EOF
|
||||
* scrub: blobstore_id BlobstoreId(0) repaired for repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0 (glob)
|
||||
$ wc -l success missing errors
|
||||
1 success
|
||||
0 missing
|
||||
0 errors
|
||||
1 total
|
||||
$ cat success
|
||||
repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0
|
||||
|
||||
Demonstrate its back
|
||||
$ ls "$TESTTMP/blobstore/0/blobs/blob-repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0"
|
||||
$TESTTMP/blobstore/0/blobs/blob-repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0
|
||||
|
||||
Check that healer queue is empty
|
||||
$ read_blobstore_wal_queue_size
|
||||
0
|
||||
|
||||
Damage the contents of blobstore 0 to demonstrate error handling
|
||||
The error will group blobstores 1 and 2 together, and leave blobstore 0 in its own group
|
||||
$ echo "foo" > "$TESTTMP/blobstore/0/blobs/blob-repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0"
|
||||
$ manual_scrub --storage-config-name blobstore --quiet --error-keys-output errors --missing-keys-output missing --success-keys-output success <<EOF
|
||||
> repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0
|
||||
> EOF
|
||||
Error: Scrubbing key repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0
|
||||
* (glob)
|
||||
Caused by:
|
||||
Different blobstores have different values for this item: [{*}, {*}] are grouped by content, {} do not have (glob)
|
||||
$ wc -l success missing errors
|
||||
0 success
|
||||
0 missing
|
||||
1 errors
|
||||
1 total
|
||||
$ cat errors
|
||||
repo0000.hgchangeset.sha1.426bada5c67598ca65036d57d9e4b64b0c1ce7a0
|
Loading…
Reference in New Issue
Block a user