metalog: implement Repair

Summary:
Provide a way to repair the MetaLog. It's not just repairing 2 indexedlog
structures, but also checking the relationships between them.

Reviewed By: xavierd

Differential Revision: D18737904

fbshipit-source-id: fcf8ae56a1fdbb0561765701d962dfad4a8b5bd4
This commit is contained in:
Jun Wu 2019-12-09 14:13:16 -08:00 committed by Facebook Github Bot
parent 039e5a1f9d
commit 02b4902319
3 changed files with 258 additions and 0 deletions

View File

@ -14,4 +14,6 @@ zstore = { path = "../zstore" }
[dev-dependencies]
quickcheck = "0.9"
rand_chacha = "0.2"
rand_core = "0.5"
tempfile = "3.0.7"

View File

@ -14,3 +14,4 @@ mod metalog;
pub use crate::metalog::{resolver, CommitOptions, Id20, MetaLog};
pub use errors::{Error, Result};
pub use indexedlog::Repair;

View File

@ -8,10 +8,12 @@
use crate::{Error, Result};
use indexedlog::lock::ScopedDirLock;
use indexedlog::log as ilog;
use indexedlog::Repair;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt;
use std::io::Write;
use std::path::{Path, PathBuf};
pub use zstore::Id20;
use zstore::Zstore;
@ -222,6 +224,84 @@ impl MetaLog {
}
}
impl Repair<()> for MetaLog {
fn repair(path: impl AsRef<Path>) -> indexedlog::Result<String> {
let path = path.as_ref();
let _lock = ScopedDirLock::new(path);
let blobs_path = path.join("blobs");
let roots_path = path.join("roots");
// Repair indexedlog without considering their dependencies.
let mut message = format!(
"Checking blobs at {:?}:\n{}\nChecking roots at {:?}:\n{}\n",
&path,
Zstore::repair(&blobs_path)?,
&path,
Self::ilog_open_options().repair(&roots_path)?,
);
// Check referred objects by Root and rollback to a Root where all objects are present.
let blobs = Zstore::open(&blobs_path)
.map_err(|e| indexedlog::Error::from(("cannot reopen blobs after repair", e)))?;
let root_ids = MetaLog::list_roots(path)
.map_err(|e| indexedlog::Error::from(("cannot list root ids after repair", e)))?;
message += &format!("Checking blobs referred by {} Roots:\n", root_ids.len());
// Filter out good Root IDs.
let good_root_ids: Vec<Id20> = root_ids
.iter()
.filter(|root_id| match load_root(&blobs, **root_id) {
Ok(root) => root.map.iter().all(|(key, id)| match blobs.get(*id) {
Ok(Some(_)) => true,
_ => {
let desc = format!("Root {} ({})", root_id.to_hex(), root.message);
message +=
&format!("Key {:?} referred by {} cannot be read.\n", key, &desc);
false
}
}),
Err(_) => {
message += &format!("Root {} cannot be read.\n", root_id.to_hex());
false
}
})
.cloned()
.collect();
// Write out good Root IDs.
if good_root_ids.len() == root_ids.len() {
message += &format!("All Roots are verified.\n");
} else {
message += &format!(
"Removing {} bad Root IDs.\n",
root_ids.len() - good_root_ids.len()
);
// Write Root IDs to a backup in case something goes wrong.
(|| -> std::io::Result<()> {
let mut backup = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(path.join("roots.backup"))?;
backup.write_all(
&root_ids.iter().map(|id| id.as_ref()).collect::<Vec<_>>()[..].concat(),
)
})()
.map_err(|e| indexedlog::Error::from(("cannot create backup", e)))?;
Self::ilog_open_options().delete_content(&roots_path)?;
let mut root_id_log = Self::ilog_open_options().open(&roots_path)?;
for root_id in &good_root_ids {
root_id_log.append(root_id.as_ref())?;
}
root_id_log.sync()?;
message += &format!("Rebuilt Root log with {} Root IDs.\n", good_root_ids.len());
}
Ok(message)
}
}
fn find_last_root_id(log: &ilog::Log) -> Result<Id20> {
for entry in log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)? {
// The linked list in the index is in the reversed order.
@ -306,7 +386,12 @@ pub mod resolver {
#[cfg(test)]
mod tests {
use super::*;
use indexedlog::DefaultOpenOptions;
use quickcheck::quickcheck;
use rand_chacha::ChaChaRng;
use rand_core::{RngCore, SeedableRng};
use std::fs;
use std::io::{Seek, SeekFrom, Write};
use tempfile::TempDir;
#[test]
@ -458,4 +543,174 @@ mod tests {
opts.timestamp = timestamp;
opts
}
// Repair
#[test]
fn test_repair() {
let dir = TempDir::new().unwrap();
let repair = || {
let path = format!("{:?}", &dir.path());
let path = &path[1..path.len() - 1]; // strip leading and trailing '"'
format!(
"\n{}",
MetaLog::repair(&dir)
.unwrap()
.lines()
// Remove 'Backed up' lines since they have dynamic file names.
.filter(|l| !l.contains("Backed up"))
.collect::<Vec<_>>()
.join("\n")
.replace(&path, "<path>")
// Normalize path difference on Windows.
.replace("\\\\", "/")
.trim_end()
)
};
let create_log = || {
let mut noise = [0u8; 4000];
ChaChaRng::seed_from_u64(0).fill_bytes(&mut noise);
let mut metalog = MetaLog::open(&dir, None).unwrap();
metalog.set("a", &noise[..]).unwrap();
metalog.set("a", b"1").unwrap();
metalog.commit(commit_opt("commit 1", 1)).unwrap();
metalog.set("b", b"2").unwrap();
metalog.set("c", &noise[..]).unwrap();
metalog.commit(commit_opt("commit 2", 2)).unwrap();
metalog.set("d", &noise[..]).unwrap();
metalog.commit(commit_opt("commit 3", 3)).unwrap();
metalog.set("e", &noise[..]).unwrap();
metalog.commit(commit_opt("commit 4", 4)).unwrap();
metalog.set("f", &noise[..]).unwrap();
metalog.commit(commit_opt("commit 5", 5)).unwrap();
};
let corrupt = |name: &str, offset: i64| pwrite(&dir.path().join(name), offset, b"cc");
// Empty log.
assert_eq!(
repair(),
r#"
Checking blobs at "<path>":
"<path>/blobs" does not exist. Nothing to repair.
Checking roots at "<path>":
"<path>/roots" does not exist. Nothing to repair.
Checking blobs referred by 1 Roots:
All Roots are verified."#
);
// Non-empty log.
create_log();
assert_eq!(
repair(),
r#"
Checking blobs at "<path>":
Verified 8 entries, 4650 bytes in log
Index "id" passed integrity check
Checking roots at "<path>":
Verified 5 entries, 142 bytes in log
Index "reverse" passed integrity check
Checking blobs referred by 6 Roots:
All Roots are verified."#
);
// Break the last Root ID in "roots", used by "commit 5".
corrupt("roots/log", -20);
assert_eq!(
repair(),
r#"
Checking blobs at "<path>":
Verified 8 entries, 4650 bytes in log
Index "id" passed integrity check
Checking roots at "<path>":
Verified first 4 entries, 116 of 142 bytes in log
Reset log size to 116
Rebuilt index "reverse"
Checking blobs referred by 5 Roots:
All Roots are verified."#
);
// Break the Root structure in "blobs", used by "commit 4" .
corrupt("blobs/log", -150);
assert_eq!(
repair(),
r#"
Checking blobs at "<path>":
Verified first 6 entries, 4491 of 4650 bytes in log
Reset log size to 4491
Rebuilt index "id"
Checking roots at "<path>":
Verified 4 entries, 116 bytes in log
Index "reverse" passed integrity check
Checking blobs referred by 5 Roots:
Root 93b756c5e512ebd0dd7c4dfdb17924287869ec33 cannot be read.
Removing 1 bad Root IDs.
Rebuilt Root log with 4 Root IDs."#
);
// Break the blob referred by commits. To do that, we need to reorder
// blobs in "blobs/" so the large "noise" blob is at the end.
let zpath = dir.path().join("blobs");
let blobs = {
let zlog = Zstore::default_open_options().open(&zpath).unwrap();
let mut blobs: Vec<Vec<u8>> = zlog.iter().map(|e| e.unwrap().to_vec()).collect();
blobs.sort_unstable_by_key(|b| b.len());
blobs
};
{
Zstore::default_open_options()
.delete_content(&zpath)
.unwrap();
let mut zlog = Zstore::default_open_options().open(&zpath).unwrap();
for blob in blobs {
zlog.append(blob).unwrap();
}
zlog.flush().unwrap();
}
// Now the last blob is the 4KB "noise" blob. Break it without breaking
// other blobs.
corrupt("blobs/log", -1000);
assert_eq!(
repair(),
r#"
Checking blobs at "<path>":
Verified first 5 entries, 424 of 4491 bytes in log
Reset log size to 424
Rebuilt index "id"
Checking roots at "<path>":
Verified 4 entries, 116 bytes in log
Index "reverse" passed integrity check
Checking blobs referred by 5 Roots:
Key "c" referred by Root c4d3e70640748daac548adb39b07818b0dc34e4f (commit 2) cannot be read.
Key "c" referred by Root b0f57751e2ec36db46dc3d38b88d538b40eebdb9 (commit 3) cannot be read.
Removing 2 bad Root IDs.
Rebuilt Root log with 3 Root IDs."#
);
}
fn pwrite(path: &Path, offset: i64, data: &[u8]) {
let mut file = fs::OpenOptions::new()
.write(true)
.read(true)
.open(path)
.unwrap();
if offset < 0 {
file.seek(SeekFrom::End(offset)).unwrap();
} else {
file.seek(SeekFrom::Start(offset as u64)).unwrap();
}
file.write_all(data).unwrap();
}
}