revisionstore: revamp repack to ease file format migration

Summary:
One of the main drawback of the current version of repack is that it writes
back the data to a packfile, making it hard to change file format. Currently, 2
file format changes are ongoing: moving away from packfiles entirely, and
moving from having LFS pointers stored in the packfiles, to a separate storage.

While an ad-hoc solution could be designed for this purpose, repack can
fullfill this goal easily by simply writing to the ContentStore, the
configuration of the ContentStore will then decide where this data will
be written into.

The main drawback of this code is the unfortunate added duplication of code.
I'm sure there is a way to avoid it by having new traits, I decided against it
for now from a code readability point of view.

Reviewed By: DurhamG

Differential Revision: D20567118

fbshipit-source-id: d67282dae31db93739e50f8cc64f9ecce92d2d30
This commit is contained in:
Xavier Deguillard 2020-03-26 18:59:47 -07:00 committed by Facebook GitHub Bot
parent cf650d232a
commit d404b0a228
14 changed files with 438 additions and 143 deletions

View File

@ -1344,8 +1344,8 @@ def resolveonto(repo, ontoarg):
def _createpackstore(ui, packpath):
datastore = datapack.makedatapackstore(ui, packpath)
histstore = historypack.makehistorypackstore(ui, packpath)
datastore = datapack.makedatapackstore(ui, packpath, True)
histstore = historypack.makehistorypackstore(ui, packpath, True)
return datastore, histstore

View File

@ -96,10 +96,11 @@ class basepackstore(object):
# Default cache size limit for the pack files.
DEFAULTCACHESIZE = 100
def __init__(self, ui, path, deletecorruptpacks=False):
def __init__(self, ui, path, shared, deletecorruptpacks=False):
self.ui = ui
self.path = path
self.deletecorruptpacks = deletecorruptpacks
self.shared = shared
# lastrefesh is 0 so we'll immediately check for new packs on the first
# failure.
@ -108,6 +109,11 @@ class basepackstore(object):
self.packs = _cachebackedpacks([], self.DEFAULTCACHESIZE)
self.packspath = set()
def repackstore(self, incremental=True):
from .repack import runrepacklegacy
runrepacklegacy(self.ui, self.path, incremental, self.shared)
def _getavailablepackfiles(self, currentpacks=None):
"""For each pack file (a index/data file combo), yields:
(full path without extension, mtime, size)

View File

@ -30,9 +30,9 @@ class datapackstore(basepack.basepackstore):
INDEXSUFFIX = INDEXSUFFIX
PACKSUFFIX = PACKSUFFIX
def __init__(self, ui, path, deletecorruptpacks=False):
def __init__(self, ui, path, shared, deletecorruptpacks=False):
super(datapackstore, self).__init__(
ui, path, deletecorruptpacks=deletecorruptpacks
ui, path, shared, deletecorruptpacks=deletecorruptpacks
)
def getpack(self, path):
@ -71,15 +71,12 @@ class datapackstore(basepack.basepackstore):
def add(self, name, node, data):
raise RuntimeError("cannot add to datapackstore")
def repackstore(self, incremental=True):
revisionstore.repackincrementaldatapacks(self.path)
def makedatapackstore(ui, path, deletecorruptpacks=False):
def makedatapackstore(ui, path, shared, deletecorruptpacks=False):
if ui.configbool("remotefilelog", "userustpackstore", False):
return revisionstore.datapackstore(path, deletecorruptpacks)
else:
return datapackstore(ui, path, deletecorruptpacks)
return datapackstore(ui, path, shared, deletecorruptpacks=deletecorruptpacks)
class memdatapack(object):

View File

@ -44,9 +44,9 @@ class historypackstore(basepack.basepackstore):
INDEXSUFFIX = INDEXSUFFIX
PACKSUFFIX = PACKSUFFIX
def __init__(self, ui, path, deletecorruptpacks=False):
def __init__(self, ui, path, shared, deletecorruptpacks=False):
super(historypackstore, self).__init__(
ui, path, deletecorruptpacks=deletecorruptpacks
ui, path, shared, deletecorruptpacks=deletecorruptpacks
)
def getpack(self, path):
@ -66,15 +66,12 @@ class historypackstore(basepack.basepackstore):
"cannot add to historypackstore (%s:%s)" % (filename, hex(node))
)
def repackstore(self):
revisionstore.repackincrementalhistpacks(self.path)
def makehistorypackstore(ui, path, deletecorruptpacks=False):
def makehistorypackstore(ui, path, shared, deletecorruptpacks=False):
if ui.configbool("remotefilelog", "userustpackstore", False):
return revisionstore.historypackstore(path, deletecorruptpacks)
else:
return historypackstore(ui, path, deletecorruptpacks)
return historypackstore(ui, path, shared, deletecorruptpacks=deletecorruptpacks)
class memhistorypack(object):

View File

@ -673,12 +673,14 @@ class remotefileslog(filelog.fileslog):
"""Packs are more efficient (to read from) cache stores."""
repo = self.repo
def makepackstore(datastores, historystores, packpath, deletecorrupt=False):
def makepackstore(
datastores, historystores, packpath, deletecorrupt=False, shared=True
):
packcontentstore = makedatapackstore(
repo.ui, packpath, deletecorruptpacks=deletecorrupt
repo.ui, packpath, deletecorruptpacks=deletecorrupt, shared=shared
)
packmetadatastore = makehistorypackstore(
repo.ui, packpath, deletecorruptpacks=deletecorrupt
repo.ui, packpath, deletecorruptpacks=deletecorrupt, shared=shared
)
datastores.append(packcontentstore)
historystores.append(packmetadatastore)
@ -692,13 +694,14 @@ class remotefileslog(filelog.fileslog):
self.sharedhistorystores,
spackpath,
deletecorrupt=True,
shared=True,
)
lpackpath = shallowutil.getlocalpackpath(
repo.svfs.vfs.base, constants.FILEPACK_CATEGORY
)
lpackcontent, lpackmetadata = makepackstore(
self.localdatastores, self.localhistorystores, lpackpath
self.localdatastores, self.localhistorystores, lpackpath, shared=False
)
return (spackcontent, spackmetadata, lpackcontent, lpackmetadata)

View File

@ -44,34 +44,38 @@ def backgroundrepack(repo, incremental=True):
runshellcommand(cmd, encoding.environ)
def _runrustrepack(repo, packpath, incremental):
def _runrustrepack(ui, packpath, stores, incremental, shared):
if not os.path.isdir(packpath):
return
if incremental:
repacks = [
revisionstore.repackincrementaldatapacks,
revisionstore.repackincrementalhistpacks,
]
else:
repacks = [revisionstore.repackdatapacks, revisionstore.repackhistpacks]
for dorepack in repacks:
try:
dorepack(packpath)
revisionstore.repack(packpath, stores, not incremental, shared)
except Exception as e:
repo.ui.log("repack_failure", msg=str(e), traceback=traceback.format_exc())
ui.log("repack_failure", msg=str(e), traceback=traceback.format_exc())
if "Repack successful but with errors" not in str(e):
raise
def _runrepack(repo, packpath, incremental, shared):
if repo.fileslog._ruststore:
stores = (repo.fileslog.contentstore, repo.fileslog.metadatastore)
else:
stores = None
_runrustrepack(repo.ui, packpath, stores, incremental, shared)
def runrepacklegacy(ui, packpath, incremental, shared):
_runrustrepack(ui, packpath, None, incremental, shared)
def _shareddatastoresrepack(repo, incremental):
if util.safehasattr(repo.fileslog, "shareddatastores") or repo.fileslog._ruststore:
packpath = shallowutil.getcachepackpath(repo, constants.FILEPACK_CATEGORY)
limit = repo.ui.configbytes("remotefilelog", "cachelimit", "10GB")
_cleanuppacks(repo.ui, packpath, limit)
_runrustrepack(repo, packpath, incremental)
_runrepack(repo, packpath, incremental, True)
def _localdatarepack(repo, incremental):
@ -83,12 +87,12 @@ def _localdatarepack(repo, incremental):
)
_cleanuppacks(repo.ui, packpath, 0)
_runrustrepack(repo, packpath, incremental)
_runrepack(repo, packpath, incremental, False)
def _manifestrepack(repo, incremental):
if repo.ui.configbool("treemanifest", "server"):
_runrustrepack(repo, repo.localvfs.join("cache/packs/manifests"), incremental)
_runrepack(repo, repo.localvfs.join("cache/packs/manifests"), incremental)
elif util.safehasattr(repo.manifestlog, "datastore"):
localdata, shareddata = _getmanifeststores(repo)
lpackpath, ldstores, lhstores = localdata
@ -101,7 +105,7 @@ def _manifestrepack(repo, incremental):
else 0
)
_cleanuppacks(repo.ui, packpath, limit)
_runrustrepack(repo, packpath, incremental)
runrepacklegacy(repo.ui, packpath, incremental, shared)
# Repack the shared manifest store
_domanifestrepack(spackpath, sdstores, shstores, True)

View File

@ -765,7 +765,7 @@ def setuptreestores(repo, mfl):
ondemandstore = ondemandtreedatastore(repo)
# Data store
datastore = makedatapackstore(ui, packpath)
datastore = makedatapackstore(ui, packpath, False)
revlogstore = manifestrevlogstore(repo)
mfl.revlogstore = revlogstore
@ -791,7 +791,7 @@ def setuptreestores(repo, mfl):
)
# History store
historystore = makehistorypackstore(ui, packpath)
historystore = makehistorypackstore(ui, packpath, False)
mfl.historystore = unionmetadatastore(
historystore, revlogstore, mutablelocalstore, ondemandstore
)
@ -828,8 +828,8 @@ def setuptreestores(repo, mfl):
)
# Data store
datastore = makedatapackstore(ui, packpath, deletecorruptpacks=True)
localdatastore = makedatapackstore(ui, localpackpath)
datastore = makedatapackstore(ui, packpath, True, deletecorruptpacks=True)
localdatastore = makedatapackstore(ui, localpackpath, False)
datastores = [datastore, localdatastore, mutablelocalstore, mutablesharedstore]
if demanddownload:
datastores.append(remotestore)
@ -844,8 +844,10 @@ def setuptreestores(repo, mfl):
mfl.localdatastores = [localdatastore]
# History store
sharedhistorystore = makehistorypackstore(ui, packpath, deletecorruptpacks=True)
localhistorystore = makehistorypackstore(ui, localpackpath)
sharedhistorystore = makehistorypackstore(
ui, packpath, True, deletecorruptpacks=True
)
localhistorystore = makehistorypackstore(ui, localpackpath, False)
mfl.sharedhistorystores = [sharedhistorystore]
mfl.localhistorystores = [localhistorystore]

View File

@ -20,16 +20,15 @@ use anyhow::{format_err, Error};
use cpython::*;
use parking_lot::RwLock;
use cpython_ext::{PyErr, PyPath, PyPathBuf, ResultPyErrExt, Str};
use cpython_ext::{PyErr, PyNone, PyPath, PyPathBuf, ResultPyErrExt, Str};
use pyconfigparser::config;
use revisionstore::{
repack::{filter_incrementalpacks, list_packs, repack_datapacks, repack_historypacks},
ContentStore, ContentStoreBuilder, CorruptionPolicy, DataPack, DataPackStore, DataPackVersion,
Delta, HgIdDataStore, HgIdHistoryStore, HgIdMutableDeltaStore, HgIdMutableHistoryStore,
HgIdRemoteStore, HistoryPack, HistoryPackStore, HistoryPackVersion, IndexedLogHgIdDataStore,
IndexedLogHgIdHistoryStore, IndexedlogRepair, LocalStore, MemcacheStore, Metadata,
MetadataStore, MetadataStoreBuilder, MutableDataPack, MutableHistoryPack, RemoteDataStore,
RemoteHistoryStore, StoreKey,
repack, ContentStore, ContentStoreBuilder, CorruptionPolicy, DataPack, DataPackStore,
DataPackVersion, Delta, HgIdDataStore, HgIdHistoryStore, HgIdMutableDeltaStore,
HgIdMutableHistoryStore, HgIdRemoteStore, HistoryPack, HistoryPackStore, HistoryPackVersion,
IndexedLogHgIdDataStore, IndexedLogHgIdHistoryStore, IndexedlogRepair, LocalStore,
MemcacheStore, Metadata, MetadataStore, MetadataStoreBuilder, MutableDataPack,
MutableHistoryPack, RemoteDataStore, RemoteHistoryStore, RepackKind, RepackLocation, StoreKey,
};
use types::{Key, NodeInfo};
@ -71,71 +70,44 @@ pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
m.add_class::<memcachestore>(py)?;
m.add(
py,
"repackdatapacks",
py_fn!(py, repackdata(packpath: &PyPath)),
)?;
m.add(
"repack",
py_fn!(
py,
"repackincrementaldatapacks",
py_fn!(py, incremental_repackdata(packpath: &PyPath)),
)?;
m.add(
py,
"repackhistpacks",
py_fn!(py, repackhist(packpath: &PyPath)),
)?;
m.add(
py,
"repackincrementalhistpacks",
py_fn!(py, incremental_repackhist(packpath: &PyPath)),
repack_py(
packpath: &PyPath,
stores: Option<(contentstore, metadatastore)>,
full: bool,
shared: bool
)
),
)?;
Ok(m)
}
/// Helper function to de-serialize and re-serialize from and to Python objects.
fn repack_pywrapper(
fn repack_py(
py: Python,
path: &PyPath,
repacker: impl FnOnce(PathBuf) -> Result<PathBuf>,
) -> PyResult<PyPathBuf> {
repacker(path.to_path_buf())
.and_then(|p| p.try_into())
.map_pyerr(py)
}
packpath: &PyPath,
stores: Option<(contentstore, metadatastore)>,
full: bool,
shared: bool,
) -> PyResult<PyNone> {
let stores = stores.map(|(content, metadata)| (content.to_inner(py), metadata.to_inner(py)));
/// Merge all the datapacks into one big datapack. Returns the fullpath of the resulting datapack.
fn repackdata(py: Python, packpath: &PyPath) -> PyResult<PyPathBuf> {
repack_pywrapper(py, packpath, |dir| {
repack_datapacks(list_packs(&dir, "datapack")?.iter(), &dir)
})
}
let kind = if full {
RepackKind::Full
} else {
RepackKind::Incremental
};
/// Merge all the history packs into one big historypack. Returns the fullpath of the resulting
/// histpack.
fn repackhist(py: Python, packpath: &PyPath) -> PyResult<PyPathBuf> {
repack_pywrapper(py, packpath, |dir| {
repack_historypacks(list_packs(&dir, "histpack")?.iter(), &dir)
})
}
let location = if shared {
RepackLocation::Shared
} else {
RepackLocation::Local
};
/// Perform an incremental repack of data packs.
fn incremental_repackdata(py: Python, packpath: &PyPath) -> PyResult<PyPathBuf> {
repack_pywrapper(py, packpath, |dir| {
repack_datapacks(
filter_incrementalpacks(list_packs(&dir, "datapack")?, "datapack")?.iter(),
&dir,
)
})
}
repack(packpath.to_path_buf(), stores, kind, location).map_pyerr(py)?;
/// Perform an incremental repack of history packs.
fn incremental_repackhist(py: Python, packpath: &PyPath) -> PyResult<PyPathBuf> {
repack_pywrapper(py, packpath, |dir| {
repack_historypacks(
filter_incrementalpacks(list_packs(&dir, "histpack")?, "histpack")?.iter(),
&dir,
)
})
Ok(PyNone)
}
py_class!(class datapack |py| {
@ -886,7 +858,7 @@ impl contentstore {
}
py_class!(class metadatastore |py| {
data store: MetadataStore;
data store: Arc<MetadataStore>;
def __new__(_cls, path: Option<PyPathBuf>, config: config, remote: pyremotestore, memcache: Option<memcachestore>) -> PyResult<metadatastore> {
let remotestore = remote.into_inner(py);
@ -906,7 +878,7 @@ py_class!(class metadatastore |py| {
builder
};
let metadatastore = builder.build().map_pyerr(py)?;
let metadatastore = Arc::new(builder.build().map_pyerr(py)?);
metadatastore::create_instance(py, metadatastore)
}
@ -934,6 +906,12 @@ py_class!(class metadatastore |py| {
}
});
impl metadatastore {
pub fn to_inner(&self, py: Python) -> Arc<MetadataStore> {
self.store(py).clone()
}
}
py_class!(pub class memcachestore |py| {
data memcache: MemcacheStore;

View File

@ -31,6 +31,7 @@ use crate::{
multiplexstore::MultiplexDeltaStore,
packstore::{CorruptionPolicy, MutableDataPackStore},
remotestore::HgIdRemoteStore,
repack::RepackLocation,
types::StoreKey,
uniondatastore::{UnionContentDataStore, UnionHgIdDataStore},
util::{
@ -75,6 +76,37 @@ impl ContentStore {
}
}
// Repack specific methods, not to be used directly but by the repack code.
impl ContentStore {
pub(crate) fn add_pending(
&self,
key: &Key,
data: Bytes,
meta: Metadata,
location: RepackLocation,
) -> Result<()> {
let delta = Delta {
data,
base: None,
key: key.clone(),
};
match location {
RepackLocation::Local => self.add(&delta, &meta),
RepackLocation::Shared => self.shared_mutabledatastore.add(&delta, &meta),
}
}
pub(crate) fn commit_pending(&self, location: RepackLocation) -> Result<()> {
match location {
RepackLocation::Local => self.flush()?,
RepackLocation::Shared => self.shared_mutabledatastore.flush()?,
};
Ok(())
}
}
impl HgIdDataStore for ContentStore {
fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
self.datastore.get(key)

View File

@ -21,6 +21,7 @@ mod lfs;
mod memcache;
mod metadatastore;
mod remotestore;
mod repack;
mod sliceext;
mod types;
mod unionstore;
@ -40,7 +41,6 @@ pub mod mutablehistorypack;
pub mod mutablepack;
pub mod packstore;
pub mod packwriter;
pub mod repack;
pub mod uniondatastore;
pub mod unionhistorystore;
@ -66,7 +66,7 @@ pub use crate::packstore::{
MutableHistoryPackStore,
};
pub use crate::remotestore::HgIdRemoteStore;
pub use crate::repack::ToKeys;
pub use crate::repack::{repack, RepackKind, RepackLocation, Repackable, ToKeys};
pub use crate::types::{ContentHash, StoreKey};
pub use crate::uniondatastore::UnionHgIdDataStore;
pub use crate::util::Error;

View File

@ -23,6 +23,7 @@ use crate::{
multiplexstore::MultiplexHgIdHistoryStore,
packstore::{CorruptionPolicy, MutableHistoryPackStore},
remotestore::HgIdRemoteStore,
repack::RepackLocation,
types::StoreKey,
unionhistorystore::UnionHgIdHistoryStore,
util::{
@ -50,6 +51,30 @@ impl MetadataStore {
}
}
// Repack specific methods, not to be used directly but by the repack code.
impl MetadataStore {
pub(crate) fn add_pending(
&self,
key: &Key,
info: NodeInfo,
location: RepackLocation,
) -> Result<()> {
match location {
RepackLocation::Local => self.add(&key, &info),
RepackLocation::Shared => self.shared_mutablehistorystore.add(&key, &info),
}
}
pub(crate) fn commit_pending(&self, location: RepackLocation) -> Result<()> {
match location {
RepackLocation::Local => self.flush()?,
RepackLocation::Shared => self.shared_mutablehistorystore.flush()?,
};
Ok(())
}
}
impl HgIdHistoryStore for MetadataStore {
fn get_node_info(&self, key: &Key) -> Result<Option<NodeInfo>> {
self.historystore.get_node_info(key)

View File

@ -8,25 +8,41 @@
use std::{
fs,
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::{format_err, Error, Result};
use bytes::Bytes;
use thiserror::Error;
use types::Key;
use crate::{
contentstore::ContentStore,
datapack::{DataPack, DataPackVersion},
datastore::{HgIdDataStore, HgIdMutableDeltaStore},
historypack::{HistoryPack, HistoryPackVersion},
historystore::{HgIdHistoryStore, HgIdMutableHistoryStore},
localstore::LocalStore,
metadatastore::MetadataStore,
mutabledatapack::MutableDataPack,
mutablehistorypack::MutableHistoryPack,
mutablepack::MutablePack,
types::StoreKey,
};
#[derive(Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
pub enum RepackLocation {
Local,
Shared,
}
#[derive(Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
pub enum RepackKind {
Incremental,
Full,
}
pub trait ToKeys {
fn to_keys(&self) -> Vec<Result<Key>>;
}
@ -60,20 +76,20 @@ enum RepackFailure {
#[error("Repack failure: {0:?}")]
Total(Vec<(PathBuf, Error)>),
#[error("Repack successful but with errors: {1:?}")]
Partial(PathBuf, Vec<(PathBuf, Error)>),
#[error("Repack successful but with errors: {0:?}")]
Partial(Vec<(PathBuf, Error)>),
}
/// Repack all pack files in the paths iterator. Once repacked, the repacked packs will be removed
/// from the filesystem.
fn repack_packs<'a, T: MutablePack, U: LocalStore + Repackable + ToKeys>(
paths: impl IntoIterator<Item = &'a PathBuf> + Clone,
fn repack_packs<T: MutablePack, U: LocalStore + Repackable + ToKeys>(
paths: impl IntoIterator<Item = PathBuf> + Clone,
mut mut_pack: T,
repack_pack: impl Fn(&U, &mut T) -> Result<()>,
) -> Result<PathBuf> {
if paths.clone().into_iter().count() <= 1 {
if let Some(path) = paths.into_iter().next() {
return Ok(path.to_path_buf());
return Ok(path);
} else {
return Ok(PathBuf::new());
}
@ -137,14 +153,14 @@ fn repack_packs<'a, T: MutablePack, U: LocalStore + Repackable + ToKeys>(
if successfully_repacked == 0 {
Err(RepackFailure::Total(errors).into())
} else if !errors.is_empty() {
Err(RepackFailure::Partial(new_pack_path, errors).into())
Err(RepackFailure::Partial(errors).into())
} else {
Ok(new_pack_path)
}
}
pub fn repack_datapacks<'a>(
paths: impl IntoIterator<Item = &'a PathBuf> + Clone,
fn repack_datapacks(
paths: impl IntoIterator<Item = PathBuf> + Clone,
outdir: &Path,
) -> Result<PathBuf> {
let mut_pack = MutableDataPack::new(outdir, DataPackVersion::One)?;
@ -163,8 +179,8 @@ fn repack_historypack(history_pack: &HistoryPack, mut_pack: &mut MutableHistoryP
Ok(())
}
pub fn repack_historypacks<'a>(
paths: impl IntoIterator<Item = &'a PathBuf> + Clone,
fn repack_historypacks(
paths: impl IntoIterator<Item = PathBuf> + Clone,
outdir: &Path,
) -> Result<PathBuf> {
let mut_pack = MutableHistoryPack::new(outdir, HistoryPackVersion::One)?;
@ -173,7 +189,7 @@ pub fn repack_historypacks<'a>(
}
/// List all the pack files in the directory `dir` that ends with `extension`.
pub fn list_packs(dir: &Path, extension: &str) -> Result<Vec<PathBuf>> {
fn list_packs(dir: &Path, extension: &str) -> Result<Vec<PathBuf>> {
let mut dirents = fs::read_dir(dir)?
.filter_map(|e| match e {
Err(_) => None,
@ -194,7 +210,7 @@ pub fn list_packs(dir: &Path, extension: &str) -> Result<Vec<PathBuf>> {
/// Select all the packs from `packs` that needs to be repacked during an incremental repack.
///
/// The filtering is fairly basic and is intended to reduce the fragmentation of pack files.
pub fn filter_incrementalpacks(packs: Vec<PathBuf>, extension: &str) -> Result<Vec<PathBuf>> {
fn filter_incrementalpacks(packs: Vec<PathBuf>, extension: &str) -> Result<Vec<PathBuf>> {
// XXX: Read these from the configuration.
let repackmaxpacksize = if extension == "histpack" {
// Per 100MB of histpack size, the memory consumption is over 1GB, thus repacking 4GB
@ -243,6 +259,177 @@ pub fn filter_incrementalpacks(packs: Vec<PathBuf>, extension: &str) -> Result<V
.collect())
}
/// Fallback for `repack` for when no `ContentStore`/`MetadataStore` were passed in. Will simply
/// use the legacy code path to write the content of the packfiles to a packfile.
fn repack_no_store(path: PathBuf, kind: RepackKind) -> Result<()> {
let mut datapacks = list_packs(&path, "datapack")?;
let mut histpacks = list_packs(&path, "histpack")?;
if kind == RepackKind::Incremental {
datapacks = filter_incrementalpacks(datapacks, "datapack")?;
histpacks = filter_incrementalpacks(histpacks, "histpack")?;
}
let datapack_res = repack_datapacks(datapacks, &path).map(|_| ());
let histpack_res = repack_historypacks(histpacks, &path).map(|_| ());
datapack_res.and(histpack_res)
}
fn repack_datapack_to_contentstore(
paths: Vec<PathBuf>,
store: &ContentStore,
location: RepackLocation,
) -> Result<()> {
let mut repacked = Vec::with_capacity(paths.len());
let mut errors = vec![];
for path in paths {
let pack = match DataPack::new(&path) {
Ok(pack) => pack,
Err(_) => continue,
};
let res = (|| -> Result<()> {
for key in pack.to_keys() {
let key = key?;
if let Some(content) = store.get(&key)? {
// If we managed to get a delta, the metadata must be present.
let meta = store.get_meta(&key)?.unwrap();
store.add_pending(&key, Bytes::from(content), meta, location)?;
}
}
Ok(())
})();
match res {
Ok(_) => repacked.push(path),
Err(e) => errors.push((path, e)),
}
}
if repacked.is_empty() {
return Err(RepackFailure::Total(errors).into());
}
store.commit_pending(location)?;
for path in repacked {
match DataPack::new(&path) {
Ok(pack) => pack.delete()?,
Err(_) => continue,
}
}
if !errors.is_empty() {
Err(RepackFailure::Partial(errors).into())
} else {
Ok(())
}
}
fn repack_histpack_to_metadatastore(
paths: Vec<PathBuf>,
store: &MetadataStore,
location: RepackLocation,
) -> Result<()> {
let mut repacked = Vec::with_capacity(paths.len());
let mut errors = vec![];
for path in paths {
let pack = match HistoryPack::new(&path) {
Ok(pack) => pack,
Err(_) => continue,
};
let res = (|| -> Result<()> {
for key in pack.to_keys() {
let key = key?;
if let Some(nodeinfo) = store.get_node_info(&key)? {
store.add_pending(&key, nodeinfo, location)?;
}
}
Ok(())
})();
match res {
Ok(_) => repacked.push(path),
Err(e) => errors.push((path, e)),
}
}
if repacked.is_empty() {
return Err(RepackFailure::Total(errors).into());
}
store.commit_pending(location)?;
for path in repacked {
match HistoryPack::new(&path) {
Ok(pack) => pack.delete()?,
Err(_) => continue,
}
}
if !errors.is_empty() {
Err(RepackFailure::Partial(errors).into())
} else {
Ok(())
}
}
/// Read blobs and metadata contained in the packfiles from `path` and write them back to the
/// `stores`.
///
/// The primary goal of `repack` is to reduce the performance effect of having many packfiles on
/// disk. This is done by writing all the data (and metadata) of the several packfiles onto one.
///
/// The secondary goal of `repack` is for file format changes, packfile are for instance holding
/// LFS pointers, and by virtue of writing these pointers to a `ContentStore`, these will be
/// written to the `LfsStore` instead of to a packfile.
///
/// When `RepackKind::Incremental` is passed in, only a subset of the packfiles will be repacked in
/// order to minimize CPU cost.
///
/// When `stores` is None, a much dumber repack operation is performed, where only the primary goal
/// is fullfilled.
pub fn repack(
path: PathBuf,
stores: Option<(Arc<ContentStore>, Arc<MetadataStore>)>,
kind: RepackKind,
location: RepackLocation,
) -> Result<()> {
let (content, metadata) = match stores {
Some((content, metadata)) => (content, metadata),
None => return repack_no_store(path, kind),
};
let mut datapacks = list_packs(&path, "datapack")?;
let mut histpacks = list_packs(&path, "histpack")?;
if kind == RepackKind::Incremental {
// We may be filtering out packfiles that contain LFS pointers, reducing the effectiveness
// of the secondary goal of repack. To fully perform this secondary goal, a full repack
// will be necessary, to keep incremental repacks simple.
datapacks = filter_incrementalpacks(datapacks, "datapack")?;
histpacks = filter_incrementalpacks(histpacks, "histpack")?;
}
if datapacks.len() > 1 {
repack_datapack_to_contentstore(datapacks, &content, location)?;
}
if histpacks.len() > 1 {
repack_histpack_to_metadatastore(histpacks, &metadata, location)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@ -268,7 +455,7 @@ mod tests {
fn test_repack_no_datapack() {
let tempdir = TempDir::new().unwrap();
let newpath = repack_datapacks(vec![].iter(), tempdir.path());
let newpath = repack_datapacks(vec![].into_iter(), tempdir.path());
assert!(newpath.is_ok());
let newpath = newpath.unwrap();
assert_eq!(newpath.to_str(), Some(""));
@ -288,7 +475,10 @@ mod tests {
)];
let pack = make_datapack(&tempdir, &revisions);
let newpath = repack_datapacks(vec![pack.base_path().to_path_buf()].iter(), tempdir.path());
let newpath = repack_datapacks(
vec![pack.base_path().to_path_buf()].into_iter(),
tempdir.path(),
);
assert!(newpath.is_ok());
let newpath2 = newpath.unwrap();
assert_eq!(newpath2.with_extension("datapack"), pack.pack_path());
@ -340,7 +530,7 @@ mod tests {
paths.push(path);
}
let newpath = repack_datapacks(paths.iter(), tempdir.path());
let newpath = repack_datapacks(paths.into_iter(), tempdir.path());
assert!(newpath.is_ok());
let newpack = DataPack::new(&newpath.unwrap()).unwrap();
assert_eq!(
@ -362,13 +552,18 @@ mod tests {
let tempdir = TempDir::new().unwrap();
let paths = vec![PathBuf::from("foo.datapack"), PathBuf::from("bar.datapack")];
let files = paths.iter().map(|p| p).collect::<Vec<&PathBuf>>();
let res = repack_datapacks(files.clone(), tempdir.path())
let res = repack_datapacks(paths.clone().into_iter(), tempdir.path())
.err()
.unwrap();
if let Some(RepackFailure::Total(errors)) = res.downcast_ref() {
assert!(errors.iter().map(|(path, _)| path).eq(files));
if let Ok(RepackFailure::Total(errors)) = res.downcast() {
assert_eq!(
errors
.into_iter()
.map(|(path, _)| path)
.collect::<Vec<PathBuf>>(),
paths
);
} else {
assert!(false);
}
@ -418,11 +613,11 @@ mod tests {
file.write_all(b"FOOBARBAZ").unwrap();
drop(file);
let res = repack_datapacks(paths.iter(), tempdir.path())
let res = repack_datapacks(paths.into_iter(), tempdir.path())
.err()
.unwrap();
if let Some(RepackFailure::Partial(_, errors)) = res.downcast_ref() {
if let Some(RepackFailure::Partial(errors)) = res.downcast_ref() {
assert_eq!(errors.iter().count(), 1);
to_corrupt.set_extension("");
assert!(errors.iter().find(|(p, _)| p == &to_corrupt).is_some());
@ -439,8 +634,10 @@ mod tests {
let nodes = get_nodes(&mut rng);
let pack = make_historypack(&tempdir, &nodes);
let newpath =
repack_historypacks(vec![pack.base_path().to_path_buf()].iter(), tempdir.path());
let newpath = repack_historypacks(
vec![pack.base_path().to_path_buf()].into_iter(),
tempdir.path(),
);
assert!(newpath.is_ok());
let newpack = HistoryPack::new(&newpath.unwrap()).unwrap();
@ -466,7 +663,7 @@ mod tests {
paths.push(path);
}
let newpath = repack_historypacks(paths.iter(), tempdir.path());
let newpath = repack_historypacks(paths.into_iter(), tempdir.path());
assert!(newpath.is_ok());
let newpack = HistoryPack::new(&newpath.unwrap()).unwrap();

View File

@ -281,7 +281,7 @@ class datapacktestsbase(object):
# Ensures that we are not keeping everything in the cache.
DEFAULTCACHESIZE = numpacks / 2
store = testdatapackstore(uimod.ui(), packdir)
store = testdatapackstore(uimod.ui(), packdir, True)
random.shuffle(deltachains)
for randomchain in deltachains:
@ -323,7 +323,7 @@ class datapacktestsbase(object):
def getpack(self, path):
return packreader(path)
store = testdatapackstore(uimod.ui(), packdir)
store = testdatapackstore(uimod.ui(), packdir, True)
# The first refresh should populate all the packfiles.
store.refresh()
@ -370,7 +370,7 @@ class datapacktestsbase(object):
deltachains.append(chain)
ui = uimod.ui()
store = datapackstore(ui, packdir, deletecorruptpacks=True)
store = datapackstore(ui, packdir, True, deletecorruptpacks=True)
key = (deltachains[0][0][0], deltachains[0][0][1])
# Count packs
@ -388,7 +388,7 @@ class datapacktestsbase(object):
# Re-create the store. Otherwise the behavior is kind of "undefined"
# because the size of mmap-ed memory isn't truncated automatically,
# and is filled by 0.
store = datapackstore(ui, packdir, deletecorruptpacks=True)
store = datapackstore(ui, packdir, True, deletecorruptpacks=True)
# Look for key again
try:
@ -414,7 +414,7 @@ class datapacktestsbase(object):
# Load the packs
origpackcount = len(os.listdir(packdir))
ui.pushbuffer(error=True)
store = datapackstore(ui, packdir, deletecorruptpacks=True)
store = datapackstore(ui, packdir, True, deletecorruptpacks=True)
# Constructing the store doesn't load the packfiles, these are loaded
# on demand, and thus the detection of bad packfiles only happen then.
# Let's force a refresh to make sure the bad pack files are deleted.

View File

@ -0,0 +1,54 @@
#chg-compatible
$ . "$TESTDIR/library.sh"
$ newserver master
$ clone master shallow --noupdate
$ cd shallow
$ setconfig remotefilelog.useruststore=True worker.rustworkers=True remotefilelog.localdatarepack=True
$ echo x > x
$ hg commit -qAm x
$ echo y > y
$ hg commit -qAm y
$ findfilessorted .hg/store/packs
.hg/store/packs/2d66e09c3bf8a000428af1630d978127182e496e.dataidx
.hg/store/packs/2d66e09c3bf8a000428af1630d978127182e496e.datapack
.hg/store/packs/65749040bf285c8867cb0d12bdae7cbcac022a55.dataidx
.hg/store/packs/65749040bf285c8867cb0d12bdae7cbcac022a55.datapack
.hg/store/packs/c3399b56e035f73c3295276ed098235a08a0ed8c.histidx
.hg/store/packs/c3399b56e035f73c3295276ed098235a08a0ed8c.histpack
.hg/store/packs/ed1aaa9bfbf108367f595bdff7a706b587e188bc.histidx
.hg/store/packs/ed1aaa9bfbf108367f595bdff7a706b587e188bc.histpack
.hg/store/packs/manifests/1921bd3d3d8442c6f92cf8363675e538c36d062b.dataidx
.hg/store/packs/manifests/1921bd3d3d8442c6f92cf8363675e538c36d062b.datapack
.hg/store/packs/manifests/2105dd350da61d1a4f08cacbb82949d855edf5bb.histidx
.hg/store/packs/manifests/2105dd350da61d1a4f08cacbb82949d855edf5bb.histpack
.hg/store/packs/manifests/2bf8539e08195f796c4ada99d894c92b6447b73e.dataidx
.hg/store/packs/manifests/2bf8539e08195f796c4ada99d894c92b6447b73e.datapack
.hg/store/packs/manifests/a890c983659e18f095538fb20f217db4e7bb129d.histidx
.hg/store/packs/manifests/a890c983659e18f095538fb20f217db4e7bb129d.histpack
$ hg repack --debug --traceback
$ findfilessorted .hg/store/packs
.hg/store/packs/102e9c722b8edc89ad9e5a488ad8e5347bc7e213.dataidx
.hg/store/packs/102e9c722b8edc89ad9e5a488ad8e5347bc7e213.datapack
.hg/store/packs/ed6d1e892f0715dc798b5e31f8b5a546f6dc357f.histidx
.hg/store/packs/ed6d1e892f0715dc798b5e31f8b5a546f6dc357f.histpack
.hg/store/packs/manifests/7041e644145f0031dca8f552159e2bb2e30a9d62.dataidx
.hg/store/packs/manifests/7041e644145f0031dca8f552159e2bb2e30a9d62.datapack
.hg/store/packs/manifests/ab796727d5271e973a5f03cf927e0bc877a0fb53.histidx
.hg/store/packs/manifests/ab796727d5271e973a5f03cf927e0bc877a0fb53.histpack
# Verify that the data is still what we expect.
$ hg up null
0 files updated, 0 files merged, 2 files removed, 0 files unresolved
$ hg up -r tip
2 files updated, 0 files merged, 0 files removed, 0 files unresolved
$ cat x
x
$ cat y
y