revisionstore: auto-delete when we have too much pack data

Summary:
In order to keep the hgcache size bounded we need to keep track of pack
file size even during normal operations and delete excess packs.

This has the negative side effect of deleting necessary data if the operation is
legitimately huge, but we'd rather have extra downloading time than fill up the
entire disk.

Reviewed By: quark-zju

Differential Revision: D23486922

fbshipit-source-id: d21be095a8671d2bfc794c85918f796358dc4834
This commit is contained in:
Durham Goode 2020-09-08 11:31:09 -07:00 committed by Facebook GitHub Bot
parent 717d10958f
commit 2919268555
9 changed files with 180 additions and 35 deletions

View File

@ -217,14 +217,14 @@ py_class!(class datapackstore |py| {
data store: Box<DataPackStore>;
data path: PathBuf;
def __new__(_cls, path: &PyPath, deletecorruptpacks: bool = false) -> PyResult<datapackstore> {
def __new__(_cls, path: &PyPath, deletecorruptpacks: bool = false, maxbytes: Option<u64> = None) -> PyResult<datapackstore> {
let corruption_policy = if deletecorruptpacks {
CorruptionPolicy::REMOVE
} else {
CorruptionPolicy::IGNORE
};
datapackstore::create_instance(py, Box::new(DataPackStore::new(path, corruption_policy)), path.to_path_buf())
datapackstore::create_instance(py, Box::new(DataPackStore::new(path, corruption_policy, maxbytes)), path.to_path_buf())
}
def get(&self, name: PyPathBuf, node: &PyBytes) -> PyResult<PyBytes> {
@ -310,14 +310,14 @@ py_class!(class historypackstore |py| {
data store: Box<HistoryPackStore>;
data path: PathBuf;
def __new__(_cls, path: PyPathBuf, deletecorruptpacks: bool = false) -> PyResult<historypackstore> {
def __new__(_cls, path: PyPathBuf, deletecorruptpacks: bool = false, maxbytes: Option<u64> = None) -> PyResult<historypackstore> {
let corruption_policy = if deletecorruptpacks {
CorruptionPolicy::REMOVE
} else {
CorruptionPolicy::IGNORE
};
historypackstore::create_instance(py, Box::new(HistoryPackStore::new(path.as_path(), corruption_policy)), path.to_path_buf())
historypackstore::create_instance(py, Box::new(HistoryPackStore::new(path.as_path(), corruption_policy, maxbytes)), path.to_path_buf())
}
def getnodeinfo(&self, name: PyPathBuf, node: &PyBytes) -> PyResult<PyTuple> {

View File

@ -43,7 +43,7 @@ pub fn run(opts: DebugstoreOpts, io: &mut IO, repo: Repo) -> Result<u8> {
None => return Err(errors::Abort("remotefilelog.reponame is not set".into()).into()),
};
let fullpath = format!("{}/{}/packs", cachepath, reponame);
let packstore = Box::new(DataPackStore::new(fullpath, CorruptionPolicy::IGNORE));
let packstore = Box::new(DataPackStore::new(fullpath, CorruptionPolicy::IGNORE, None));
let fullpath = format!("{}/{}/indexedlogdatastore", cachepath, reponame);
let indexedstore = Box::new(IndexedLogHgIdDataStore::new(fullpath, &config).unwrap());
let mut unionstore: UnionHgIdDataStore<Box<dyn HgIdDataStore>> = UnionHgIdDataStore::new();

View File

@ -264,6 +264,10 @@ impl<'a> ContentStoreBuilder<'a> {
ByteCount::from(4 * (1024 ^ 3))
})?
.value();
let max_bytes = self
.config
.get_opt::<ByteCount>("packs", "maxdatabytes")?
.map(|v| v.value());
let mut datastore: UnionHgIdDataStore<Arc<dyn HgIdDataStore>> = UnionHgIdDataStore::new();
let mut blob_stores: UnionContentDataStore<Arc<dyn ContentDataStore>> =
@ -273,6 +277,7 @@ impl<'a> ContentStoreBuilder<'a> {
&cache_packs_path,
CorruptionPolicy::REMOVE,
max_pending_bytes,
max_bytes,
)?);
let shared_indexedlogdatastore = Arc::new(IndexedLogHgIdDataStore::new(
get_indexedlogdatastore_path(&cache_path)?,
@ -328,6 +333,7 @@ impl<'a> ContentStoreBuilder<'a> {
get_packs_path(&unsuffixed_local_path, &self.suffix)?,
CorruptionPolicy::IGNORE,
max_pending_bytes,
None,
)?);
let local_lfs_store = Arc::new(LfsStore::local(&local_path.unwrap(), self.config)?);

View File

@ -439,6 +439,10 @@ impl Repackable for DataPack {
result2?;
Ok(())
}
fn size(&self) -> u64 {
self.mmap.len() as u64
}
}
struct DataPackIterator<'a> {

View File

@ -386,6 +386,10 @@ impl Repackable for HistoryPack {
result2?;
Ok(())
}
fn size(&self) -> u64 {
self.mmap.len() as u64
}
}
struct HistoryPackIterator<'a> {

View File

@ -12,7 +12,10 @@ use std::{
use anyhow::{format_err, Result};
use configparser::{config::ConfigSet, hg::ConfigSetHgExt};
use configparser::{
config::ConfigSet,
hg::{ByteCount, ConfigSetHgExt},
};
use types::{Key, NodeInfo};
use crate::{
@ -184,12 +187,17 @@ impl<'a> MetadataStoreBuilder<'a> {
let max_pending: u64 = self
.config
.get_or("packs", "maxhistorypending", || 10000000)?;
let max_bytes = self
.config
.get_opt::<ByteCount>("packs", "maxhistorybytes")?
.map(|v| v.value());
let cache_packs_path = get_cache_packs_path(self.config, &self.suffix)?;
let shared_pack_store = Arc::new(MutableHistoryPackStore::new(
&cache_packs_path,
CorruptionPolicy::REMOVE,
max_pending,
max_bytes,
)?);
let mut historystore: UnionHgIdHistoryStore<Arc<dyn HgIdHistoryStore>> =
UnionHgIdHistoryStore::new();
@ -225,6 +233,7 @@ impl<'a> MetadataStoreBuilder<'a> {
get_packs_path(&local_path, &self.suffix)?,
CorruptionPolicy::IGNORE,
max_pending,
None,
)?);
historystore.add(local_pack_store.clone());

View File

@ -118,6 +118,8 @@ struct PackStoreInner<T> {
scan_frequency: Duration,
last_scanned: RefCell<Instant>,
packs: RefCell<LruStore<T>>,
max_bytes: Option<u64>,
current_bytes: AtomicU64,
}
/// A `PackStore` automatically keeps track of packfiles in a given directory. New on-disk
@ -134,6 +136,7 @@ struct PackStoreOptions {
scan_frequency: Duration,
extension: &'static str,
corruption_policy: CorruptionPolicy,
max_bytes: Option<u64>,
}
impl PackStoreOptions {
@ -143,6 +146,7 @@ impl PackStoreOptions {
scan_frequency: Duration::from_secs(10),
extension: "",
corruption_policy: CorruptionPolicy::IGNORE,
max_bytes: None,
}
}
@ -169,6 +173,11 @@ impl PackStoreOptions {
self
}
fn max_bytes(mut self, max_bytes: Option<u64>) -> Self {
self.max_bytes = max_bytes;
self
}
fn build<T>(self) -> PackStore<T> {
let now = Instant::now();
let force_rescan = now - self.scan_frequency;
@ -181,12 +190,14 @@ impl PackStoreOptions {
corruption_policy: self.corruption_policy,
last_scanned: RefCell::new(force_rescan),
packs: RefCell::new(LruStore::new()),
max_bytes: self.max_bytes,
current_bytes: AtomicU64::new(0),
}),
}
}
}
impl<T> PackStore<T> {
impl<T: LocalStore + Repackable + StoreFromPath> PackStore<T> {
/// Force a rescan to be performed. Since rescan are expensive, this should only be called for
/// out-of-process created packfiles.
pub fn force_rescan(&self) {
@ -197,8 +208,24 @@ impl<T> PackStore<T> {
}
/// Add a packfile to this store.
fn add_pack(&self, pack: T) {
self.inner.lock().packs.borrow_mut().add(pack);
fn add_pack(&self, pack: T) -> Result<()> {
let inner = self.inner.lock();
let size = pack.size();
inner.packs.borrow_mut().add(pack);
let current_bytes = inner.current_bytes.fetch_add(size, Ordering::SeqCst) + size;
if let Some(max_bytes) = inner.max_bytes {
if current_bytes > max_bytes {
if inner.delete_old_packs().is_ok() {
let _ = inner.rescan()?;
} else {
// If the delete fails, give up and move on. We don't want to block the user
// operation on this maintenance work.
}
}
}
Ok(())
}
}
@ -207,10 +234,15 @@ impl DataPackStore {
///
/// Only use for data that can be recoverd from the network, corrupted datapacks will be
/// automatically removed from disk.
pub fn new<P: AsRef<Path>>(pack_dir: P, corruption_policy: CorruptionPolicy) -> Self {
pub fn new<P: AsRef<Path>>(
pack_dir: P,
corruption_policy: CorruptionPolicy,
max_bytes: Option<u64>,
) -> Self {
PackStoreOptions::new()
.directory(pack_dir)
.corruption_policy(corruption_policy)
.max_bytes(max_bytes)
.extension("datapack")
.build()
}
@ -221,10 +253,15 @@ impl HistoryPackStore {
///
/// Only use for data that can be recoverd from the network, corrupted datapacks will be
/// automatically removed from disk.
pub fn new<P: AsRef<Path>>(pack_dir: P, corruption_policy: CorruptionPolicy) -> Self {
pub fn new<P: AsRef<Path>>(
pack_dir: P,
corruption_policy: CorruptionPolicy,
max_bytes: Option<u64>,
) -> Self {
PackStoreOptions::new()
.directory(pack_dir)
.corruption_policy(corruption_policy)
.max_bytes(max_bytes)
.extension("histpack")
.build()
}
@ -235,13 +272,51 @@ impl<T: LocalStore + Repackable + StoreFromPath> PackStoreInner<T> {
fn rescan(&self) -> Result<()> {
let mut new_packs = Vec::new();
let mut new_size = 0;
for entry in self.get_pack_paths()?.into_iter() {
if let Ok(pack) = T::from_path(&entry.path()) {
new_size += pack.size();
new_packs.push(pack);
}
}
self.packs.replace(new_packs.into());
self.current_bytes.store(new_size, Ordering::SeqCst);
Ok(())
}
fn delete_old_packs(&self) -> Result<()> {
if let Some(max_bytes) = self.max_bytes {
let mut entries = vec![];
for entry in self.get_pack_paths()? {
let metadata = match entry.metadata() {
Ok(m) => m,
Err(_) => continue,
};
let modified = match metadata.modified() {
Ok(m) => m,
Err(_) => continue,
};
entries.push((entry, modified, metadata.len()));
}
// Sort by reverse modified to get them in newest first order.
entries.sort_by(|a, b| b.1.cmp(&a.1));
let mut size = 0;
for entry in entries.into_iter() {
if size >= max_bytes {
// Delete the remaining packs
match T::from_path(&entry.0.path()) {
Ok(pack) => pack.delete()?,
Err(_) => continue,
};
} else {
size += entry.2;
}
}
}
Ok(())
}
@ -413,8 +488,13 @@ impl MutableDataPackStore {
pack_dir: impl AsRef<Path>,
corruption_policy: CorruptionPolicy,
max_pending_bytes: u64,
max_bytes: Option<u64>,
) -> Result<Self> {
let pack_store = Arc::new(DataPackStore::new(pack_dir.as_ref(), corruption_policy));
let pack_store = Arc::new(DataPackStore::new(
pack_dir.as_ref(),
corruption_policy,
max_bytes,
));
let mutable_pack = Arc::new(MutableDataPack::new(pack_dir, DataPackVersion::One)?);
let mut union_store: UnionHgIdDataStore<Arc<dyn HgIdDataStore>> = UnionHgIdDataStore::new();
union_store.add(pack_store.clone());
@ -438,7 +518,7 @@ impl MutableDataPackStore {
let mut result_packs = self.result_packs.lock();
for path in paths {
let datapack = DataPack::new(path.as_path())?;
self.inner.pack_store.add_pack(datapack);
self.inner.pack_store.add_pack(datapack)?;
result_packs.push(path);
}
}
@ -504,8 +584,13 @@ impl MutableHistoryPackStore {
pack_dir: impl AsRef<Path>,
corruption_policy: CorruptionPolicy,
max_pending: u64,
max_bytes: Option<u64>,
) -> Result<Self> {
let pack_store = Arc::new(HistoryPackStore::new(pack_dir.as_ref(), corruption_policy));
let pack_store = Arc::new(HistoryPackStore::new(
pack_dir.as_ref(),
corruption_policy,
max_bytes,
));
let mutable_pack = Arc::new(MutableHistoryPack::new(pack_dir, HistoryPackVersion::One)?);
let mut union_store: UnionHgIdHistoryStore<Arc<dyn HgIdHistoryStore>> =
UnionHgIdHistoryStore::new();
@ -530,7 +615,7 @@ impl MutableHistoryPackStore {
let mut result_packs = self.result_packs.lock();
for path in paths {
let histpack = HistoryPack::new(path.as_path())?;
self.inner.pack_store.add_pack(histpack);
self.inner.pack_store.add_pack(histpack)?;
result_packs.push(path);
}
}
@ -601,7 +686,7 @@ mod tests {
);
make_datapack(&tempdir, &vec![revision.clone()]);
let store = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE);
let store = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, None);
let stored = store.get(StoreKey::hgid(k))?;
assert_eq!(
stored,
@ -625,7 +710,7 @@ mod tests {
);
make_datapack(&tempdir, &vec![revision.clone()]);
let store = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE);
let store = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, None);
let missing = store.get_missing(&vec![StoreKey::from(k)])?;
assert_eq!(missing.len(), 0);
Ok(())
@ -634,7 +719,7 @@ mod tests {
#[test]
fn test_datapack_created_after() -> Result<()> {
let tempdir = TempDir::new()?;
let store = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE);
let store = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, None);
let k = key("a", "2");
let revision = (
@ -737,7 +822,7 @@ mod tests {
fn test_histpack() -> Result<()> {
let mut rng = ChaChaRng::from_seed([0u8; 32]);
let tempdir = TempDir::new()?;
let store = HistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE);
let store = HistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, None);
let nodes = get_nodes(&mut rng);
make_historypack(&tempdir, &nodes);
@ -775,7 +860,7 @@ mod tests {
);
make_datapack(&tempdir, &vec![revision2.clone()]);
let packstore = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE);
let packstore = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, None);
let k2 = StoreKey::hgid(k2);
let _ = packstore.get(k2.clone())?;
@ -797,7 +882,7 @@ mod tests {
let tempdir = TempDir::new()?;
let mut non_present_tempdir = tempdir.into_path();
non_present_tempdir.push("non_present");
let store = HistoryPackStore::new(&non_present_tempdir, CorruptionPolicy::REMOVE);
let store = HistoryPackStore::new(&non_present_tempdir, CorruptionPolicy::REMOVE, None);
let store = store.inner.lock();
store.rescan()
@ -830,7 +915,7 @@ mod tests {
.set_len(datapack.metadata().unwrap().len() / 2)
.unwrap();
let packstore = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE);
let packstore = DataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, None);
let k1 = StoreKey::hgid(k1);
assert_eq!(
packstore.get(k1.clone()).unwrap(),
@ -865,7 +950,7 @@ mod tests {
assert_eq!(read_dir(&tempdir)?.count(), 2);
let packstore = DataPackStore::new(&tempdir, CorruptionPolicy::IGNORE);
let packstore = DataPackStore::new(&tempdir, CorruptionPolicy::IGNORE, None);
let k1 = StoreKey::hgid(k1);
assert_eq!(packstore.get(k1.clone())?, StoreResult::NotFound(k1));
@ -876,7 +961,7 @@ mod tests {
#[test]
fn test_add_flush() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000)?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000, None)?;
let k1 = key("a", "2");
let delta = Delta {
@ -893,7 +978,7 @@ mod tests {
#[test]
fn test_add_get_delta() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000)?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000, None)?;
let k1 = key("a", "2");
let delta = Delta {
@ -911,7 +996,7 @@ mod tests {
#[test]
fn test_add_flush_get_delta() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000)?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000, None)?;
let k1 = key("a", "2");
let delta = Delta {
@ -930,7 +1015,8 @@ mod tests {
#[test]
fn test_histpack_add_get() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000)?;
let packstore =
MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000, None)?;
let mut rng = ChaChaRng::from_seed([0u8; 32]);
let nodes = get_nodes(&mut rng);
@ -948,7 +1034,8 @@ mod tests {
#[test]
fn test_histpack_add_flush_get() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000)?;
let packstore =
MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000, None)?;
let mut rng = ChaChaRng::from_seed([0u8; 32]);
let nodes = get_nodes(&mut rng);
@ -968,7 +1055,7 @@ mod tests {
#[test]
fn test_histpack_auto_flush() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 0)?;
let packstore = MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 0, None)?;
let mut rng = ChaChaRng::from_seed([0u8; 32]);
let nodes = get_nodes(&mut rng);
@ -989,7 +1076,7 @@ mod tests {
#[test]
fn test_datapack_auto_flush() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 0)?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 0, None)?;
let k1 = key("a", "1");
let delta1 = Delta {
@ -1022,7 +1109,7 @@ mod tests {
#[test]
fn test_datapack_flush_empty() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000)?;
let packstore = MutableDataPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000, None)?;
packstore.flush()?;
Ok(())
}
@ -1030,7 +1117,8 @@ mod tests {
#[test]
fn test_histpack_flush_empty() -> Result<()> {
let tempdir = TempDir::new()?;
let packstore = MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000)?;
let packstore =
MutableHistoryPackStore::new(&tempdir, CorruptionPolicy::REMOVE, 1000, None)?;
packstore.flush()?;
Ok(())
}

View File

@ -52,6 +52,7 @@ pub trait ToKeys {
pub trait Repackable {
fn delete(self) -> Result<()>;
fn size(&self) -> u64;
}
fn repack_datapack(data_pack: &DataPack, mut_pack: &mut MutableDataPack) -> Result<()> {

View File

@ -24,9 +24,6 @@
$ hgcloneshallow ssh://user@dummy/master shallow -q
1 files fetched over 1 fetches - (1 misses, 0.00% hit ratio) over *s (glob) (?)
$ cd shallow
$ cd ..
$ cd shallow
$ find $CACHEDIR/master/packs | sort
$TESTTMP/hgcache/master/packs
@ -45,3 +42,39 @@
$TESTTMP/hgcache/master/packs/276d308429d0303762befa376788300f0310f90e.histidx
$TESTTMP/hgcache/master/packs/276d308429d0303762befa376788300f0310f90e.histpack
$TESTTMP/hgcache/master/packs/repacklock
Cleanup old packs during writes when we're over the threshold
$ cd ../master
$ echo 12345678901234567890123456789012345678901234567890 > a
$ echo 12345678901234567890123456789012345678901234567890 > b
$ echo 12345678901234567890123456789012345678901234567890 > c
$ echo 12345678901234567890123456789012345678901234567890 > d
$ echo 12345678901234567890123456789012345678901234567890 > e
$ hg commit -Aqm "add a bunch of files"
$ cd ../shallow
$ hg pull -q
$ clearcache
$ hg up -q tip --config packs.maxdatapendingbytes=30
$ ls_l $CACHEDIR/master/packs | grep datapack
-r--r--r-- 144 *.datapack (glob)
-r--r--r-- 80 *.datapack (glob)
-r--r--r-- 80 *.datapack (glob)
-r--r--r-- 80 *.datapack (glob)
-r--r--r-- 80 *.datapack (glob)
$ hg up -q null
$ clearcache
$ hg up -q tip --config packs.maxdatapendingbytes=30 --config packs.maxdatabytes=120
$ ls_l $CACHEDIR/master/packs | grep datapack
-r--r--r-- 65 *.datapack (glob)
-r--r--r-- 80 *.datapack (glob)
$ hg up -q null
$ clearcache
$ hg up -q tip --config packs.maxdatapendingbytes=30 --config packs.maxdatabytes=200
$ ls_l $CACHEDIR/master/packs | grep datapack
-r--r--r-- 65 *.datapack (glob)
-r--r--r-- 80 *.datapack (glob)
-r--r--r-- 80 *.datapack (glob)