revisionstore: add upload to RemoteDataStore

Summary: This method will be used to upload local LFS blobs to the LFS server.

Reviewed By: DurhamG

Differential Revision: D20843137

fbshipit-source-id: 33a331c42687c47442189ee329da33cb5ce4d376
This commit is contained in:
Xavier Deguillard 2020-04-07 16:50:12 -07:00 committed by Facebook GitHub Bot
parent 80b222c01b
commit 0dca734464
10 changed files with 90 additions and 18 deletions

View File

@ -13,7 +13,7 @@ use cpython::{
ToPyObject,
};
use cpython_ext::{PyPath, PyPathBuf, ResultPyErrExt};
use cpython_ext::{PyNone, PyPath, PyPathBuf, ResultPyErrExt};
use revisionstore::{
ContentDataStore, ContentHash, HgIdDataStore, HgIdMutableDeltaStore, RemoteDataStore, StoreKey,
ToKeys,
@ -57,6 +57,7 @@ pub trait HgIdMutableDeltaStorePyExt: HgIdDataStorePyExt {
pub trait RemoteDataStorePyExt: RemoteDataStore {
fn prefetch_py(&self, py: Python, keys: PyList) -> PyResult<PyObject>;
fn upload_py(&self, py: Python, keys: PyList) -> PyResult<PyNone>;
}
impl<T: HgIdDataStore + ?Sized> HgIdDataStorePyExt for T {
@ -245,4 +246,13 @@ impl<T: RemoteDataStore + ?Sized> RemoteDataStorePyExt for T {
self.prefetch(&keys).map_pyerr(py)?;
Ok(Python::None(py))
}
fn upload_py(&self, py: Python, keys: PyList) -> PyResult<PyNone> {
let keys = keys
.iter(py)
.map(|tuple| Ok(StoreKey::from(from_tuple_to_key(py, &tuple)?)))
.collect::<PyResult<Vec<StoreKey>>>()?;
self.upload(&keys).map_pyerr(py)?;
Ok(PyNone)
}
}

View File

@ -680,6 +680,10 @@ impl RemoteDataStore for PyRemoteDataStore {
fn prefetch(&self, keys: &[StoreKey]) -> Result<()> {
self.0.prefetch(keys)
}
fn upload(&self, _keys: &[StoreKey]) -> Result<()> {
Ok(())
}
}
impl HgIdDataStore for PyRemoteDataStore {
@ -844,6 +848,11 @@ py_class!(pub class contentstore |py| {
store.prefetch_py(py, keys)
}
def upload(&self, keys: PyList) -> PyResult<PyNone> {
let store = self.store(py);
store.upload_py(py, keys)
}
def blob(&self, name: &PyPath, node: &PyBytes) -> PyResult<PyBytes> {
let store = self.store(py);
store.blob_py(py, name, node)

View File

@ -168,6 +168,10 @@ impl RemoteDataStore for PythonHgIdDataStore {
Ok(())
}
fn upload(&self, _keys: &[StoreKey]) -> Result<()> {
Ok(())
}
}
impl LocalStore for PythonHgIdDataStore {

View File

@ -137,6 +137,14 @@ impl RemoteDataStore for ContentStore {
Ok(())
}
}
fn upload(&self, keys: &[StoreKey]) -> Result<()> {
if let Some(remote_store) = self.remote_store.as_ref() {
remote_store.upload(keys)
} else {
Ok(())
}
}
}
impl LocalStore for ContentStore {
@ -297,7 +305,7 @@ impl<'a> ContentStoreBuilder<'a> {
shared_pack_store
};
let local_mutabledatastore: Option<Arc<dyn HgIdMutableDeltaStore>> =
let (local_mutabledatastore, local_lfs_store): (Option<Arc<dyn HgIdMutableDeltaStore>>, _) =
if let Some(unsuffixed_local_path) = self.local_path {
let local_pack_store = Arc::new(MutableDataPackStore::new(
get_packs_path(&unsuffixed_local_path, &self.suffix)?,
@ -310,7 +318,7 @@ impl<'a> ContentStoreBuilder<'a> {
let local_store: Arc<dyn HgIdMutableDeltaStore> =
if let Some(lfs_threshold) = lfs_threshold {
let local_store = Arc::new(LfsMultiplexer::new(
local_lfs_store,
local_lfs_store.clone(),
local_pack_store,
lfs_threshold.value() as usize,
));
@ -320,18 +328,18 @@ impl<'a> ContentStoreBuilder<'a> {
local_store
} else {
datastore.add(local_pack_store.clone());
datastore.add(local_lfs_store);
datastore.add(local_lfs_store.clone());
local_pack_store
};
Some(local_store)
(Some(local_store), Some(local_lfs_store))
} else {
if !self.no_local_store {
return Err(format_err!(
"a ContentStore cannot be built without a local store"
));
}
None
(None, None)
};
let remote_store: Option<Arc<dyn RemoteDataStore>> =
@ -375,8 +383,11 @@ impl<'a> ContentStoreBuilder<'a> {
// Third, the LFS remote store. The previously fetched LFS pointers will be used to
// fetch the actual blobs in this store.
if enable_lfs {
let lfs_remote_store =
Arc::new(LfsRemote::new(shared_lfs_store.clone(), self.config)?);
let lfs_remote_store = Arc::new(LfsRemote::new(
shared_lfs_store,
local_lfs_store,
self.config,
)?);
remotestores.add(lfs_remote_store.datastore(shared_store.clone()));
// Fallback store if the LFS one is dead. In `ContentStore::get_missing`, when

View File

@ -54,6 +54,9 @@ pub trait RemoteDataStore: HgIdDataStore + Send + Sync {
/// everything that was asked. On a higher level store, such as the `ContentStore`, this will
/// avoid fetching data that is already present locally.
fn prefetch(&self, keys: &[StoreKey]) -> Result<()>;
/// Send all the blobs referenced by the keys to the remote store.
fn upload(&self, keys: &[StoreKey]) -> Result<()>;
}
pub trait HgIdMutableDeltaStore: HgIdDataStore + Send + Sync {
@ -107,6 +110,10 @@ impl<T: RemoteDataStore + ?Sized, U: Deref<Target = T> + Send + Sync> RemoteData
fn prefetch(&self, keys: &[StoreKey]) -> Result<()> {
T::prefetch(self, keys)
}
fn upload(&self, keys: &[StoreKey]) -> Result<()> {
T::upload(self, keys)
}
}
impl<T: HgIdMutableDeltaStore + ?Sized, U: Deref<Target = T> + Send + Sync> HgIdMutableDeltaStore

View File

@ -104,6 +104,10 @@ impl RemoteDataStore for EdenApiRemoteDataStore {
}
Ok(())
}
fn upload(&self, _keys: &[StoreKey]) -> Result<()> {
Ok(())
}
}
impl HgIdDataStore for EdenApiRemoteDataStore {

View File

@ -91,7 +91,8 @@ enum LfsRemoteInner {
}
pub struct LfsRemote {
local: Arc<LfsStore>,
local: Option<Arc<LfsStore>>,
shared: Arc<LfsStore>,
remote: LfsRemoteInner,
}
@ -1041,7 +1042,11 @@ impl LfsRemoteInner {
}
impl LfsRemote {
pub fn new(store: Arc<LfsStore>, config: &ConfigSet) -> Result<Self> {
pub fn new(
shared: Arc<LfsStore>,
local: Option<Arc<LfsStore>>,
config: &ConfigSet,
) -> Result<Self> {
let mut url = get_str_config(config, "lfs", "url")?;
// A trailing '/' needs to be present so that `Url::join` doesn't remove the reponame
// present at the end of the config.
@ -1054,7 +1059,8 @@ impl LfsRemote {
create_dir(&path)?;
let file = LfsBlobsStore::loose(path);
Ok(Self {
local: store,
shared,
local,
remote: LfsRemoteInner::File(file),
})
} else {
@ -1073,7 +1079,8 @@ impl LfsRemote {
let rt = Arc::new(Mutex::new(Runtime::new()?));
let client = Client::new();
Ok(Self {
local: store,
shared,
local,
remote: LfsRemoteInner::Http(HttpLfsRemote {
url,
user_agent,
@ -1123,7 +1130,7 @@ impl RemoteDataStore for LfsRemoteStore {
let objs = keys
.iter()
.map(|k| {
if let Some(pointer) = self.remote.local.pointers.read().entry(k)? {
if let Some(pointer) = self.remote.shared.pointers.read().entry(k)? {
match pointer.content_hashes.get(&ContentHashType::Sha256) {
None => Ok(None),
Some(content_hash) => Ok(Some((
@ -1140,11 +1147,15 @@ impl RemoteDataStore for LfsRemoteStore {
for response in self.remote.batch(&objs)? {
let (sha256, content) = response?;
self.remote.local.blobs.add(&sha256, content)?;
self.remote.shared.blobs.add(&sha256, content)?;
}
Ok(())
}
fn upload(&self, keys: &[StoreKey]) -> Result<()> {
unimplemented!();
}
}
impl HgIdDataStore for LfsRemoteStore {
@ -1729,7 +1740,7 @@ mod tests {
let config = make_lfs_config(&cachedir);
let lfs = Arc::new(LfsStore::shared(&lfsdir, &config)?);
let remote = LfsRemote::new(lfs, &config)?;
let remote = LfsRemote::new(lfs, None, &config)?;
let blob = (
Sha256::from_str(
@ -1753,7 +1764,7 @@ mod tests {
let config = make_lfs_config(&cachedir);
let lfs = Arc::new(LfsStore::shared(&lfsdir, &config)?);
let remote = LfsRemote::new(lfs, &config)?;
let remote = LfsRemote::new(lfs, None, &config)?;
let blob1 = (
Sha256::from_str(
@ -1786,7 +1797,7 @@ mod tests {
let config = make_lfs_config(&cachedir);
let lfs = Arc::new(LfsStore::shared(&lfsdir, &config)?);
let remote = Arc::new(LfsRemote::new(lfs.clone(), &config)?);
let remote = Arc::new(LfsRemote::new(lfs.clone(), None, &config)?);
let key = key("a/b", "1234");
@ -1851,7 +1862,7 @@ mod tests {
let url = Url::from_file_path(&remote).unwrap();
config.set("lfs", "url", Some(url.as_str()), &Default::default());
let remote = LfsRemote::new(lfs, &config)?;
let remote = LfsRemote::new(lfs, None, &config)?;
let resp = remote
.batch(&[(blob1.0, blob1.1), (blob2.0, blob2.1)])?

View File

@ -243,6 +243,10 @@ impl RemoteDataStore for MemcacheHgIdDataStore {
Ok(())
}
fn upload(&self, _keys: &[StoreKey]) -> Result<()> {
Ok(())
}
}
struct MemcacheHgIdHistoryStore {

View File

@ -108,6 +108,10 @@ impl RemoteDataStore for FakeRemoteDataStore {
Ok(())
}
fn upload(&self, _keys: &[StoreKey]) -> Result<()> {
unimplemented!()
}
}
impl HgIdDataStore for FakeRemoteDataStore {

View File

@ -117,6 +117,14 @@ impl<T: RemoteDataStore> RemoteDataStore for UnionHgIdDataStore<T> {
Ok(())
}
fn upload(&self, keys: &[StoreKey]) -> Result<()> {
for store in self {
store.upload(keys)?
}
Ok(())
}
}
pub type UnionContentDataStore<T> = UnionStore<T>;