mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 08:47:12 +03:00
mononoke: Add rechunking command to filestore
Summary: Once enabled, the filestore will chunk up new files before storing them in the blobstore. However, there are quite a few large files in the blobstore that should be rechunked. Add support to the filestore for rechunking files by downloading them from the blobstore and reuploading the, but chunked. Reviewed By: krallin Differential Revision: D16802702 fbshipit-source-id: 2e617ec2cd215204596769c72431fd921ff33dd8
This commit is contained in:
parent
58be561d7a
commit
d2af7b30a0
@ -284,6 +284,19 @@ impl BlobRepo {
|
||||
fetch_file_content_from_blobstore(ctx, &self.blobstore, key).boxify()
|
||||
}
|
||||
|
||||
pub fn rechunk_file_by_content_id(
|
||||
&self,
|
||||
ctx: CoreContext,
|
||||
id: ContentId,
|
||||
) -> impl Future<Item = ContentMetadata, Error = Error> {
|
||||
filestore::rechunk(
|
||||
self.blobstore.clone(),
|
||||
self.filestore_config.clone(),
|
||||
ctx,
|
||||
id,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn get_file_content_by_content_id(
|
||||
&self,
|
||||
ctx: CoreContext,
|
||||
|
@ -29,10 +29,12 @@ mod incremental_hash;
|
||||
mod metadata;
|
||||
mod multiplexer;
|
||||
mod prepare;
|
||||
mod rechunk;
|
||||
mod spawn;
|
||||
mod streamhash;
|
||||
|
||||
pub use fetch_key::{Alias, AliasBlob, FetchKey};
|
||||
pub use rechunk::rechunk;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
46
filestore/src/rechunk.rs
Normal file
46
filestore/src/rechunk.rs
Normal file
@ -0,0 +1,46 @@
|
||||
// Copyright (c) 2019-present, Facebook, Inc.
|
||||
// All Rights Reserved.
|
||||
//
|
||||
// This software may be used and distributed according to the terms of the
|
||||
// GNU General Public License version 2 or any later version.
|
||||
|
||||
use failure_ext::{Error, Fail};
|
||||
use futures::future::IntoFuture;
|
||||
use futures::Future;
|
||||
use futures_ext::FutureExt;
|
||||
|
||||
use blobstore::{Blobstore, Loadable};
|
||||
use context::CoreContext;
|
||||
use mononoke_types::{ContentId, ContentMetadata};
|
||||
|
||||
use crate::fetch::stream_file_bytes;
|
||||
use crate::{store, FilestoreConfig, StoreRequest};
|
||||
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum ErrorKind {
|
||||
#[fail(display = "Content not found: {:?}", _0)]
|
||||
ContentNotFound(ContentId),
|
||||
}
|
||||
|
||||
/// Fetch a file from the blobstore and reupload it in a chunked form.
|
||||
/// NOTE: This could actually unchunk a file if the chunk size threshold
|
||||
/// is increased after the file is written.
|
||||
pub fn rechunk<B: Blobstore + Clone>(
|
||||
blobstore: B,
|
||||
config: FilestoreConfig,
|
||||
ctx: CoreContext,
|
||||
content_id: ContentId,
|
||||
) -> impl Future<Item = ContentMetadata, Error = Error> {
|
||||
content_id
|
||||
.load(ctx.clone(), &blobstore)
|
||||
.and_then(move |maybe_file_contents| match maybe_file_contents {
|
||||
Some(file_contents) => {
|
||||
let req = StoreRequest::with_canonical(file_contents.size(), content_id);
|
||||
let file_stream = stream_file_bytes(blobstore.clone(), ctx.clone(), file_contents);
|
||||
store(&blobstore, &config, ctx, &req, file_stream).left_future()
|
||||
}
|
||||
None => Err(ErrorKind::ContentNotFound(content_id).into())
|
||||
.into_future()
|
||||
.right_future(),
|
||||
})
|
||||
}
|
@ -870,3 +870,220 @@ fn filestore_store_error() -> Result<()> {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filestore_test_rechunk() -> Result<()> {
|
||||
let mut rt = tokio::runtime::Runtime::new()?;
|
||||
|
||||
let blob = memblob::LazyMemblob::new();
|
||||
|
||||
let small = FilestoreConfig {
|
||||
chunk_size: Some(1),
|
||||
concurrency: 5,
|
||||
};
|
||||
let large = FilestoreConfig {
|
||||
chunk_size: Some(3),
|
||||
concurrency: 5,
|
||||
};
|
||||
let ctx = CoreContext::test_mock();
|
||||
|
||||
let full_data = &b"foobar"[..];
|
||||
let full_key = request(full_data);
|
||||
let full_id = canonical(full_data);
|
||||
|
||||
// Store in 3-byte chunks
|
||||
rt.block_on(filestore::store(
|
||||
&blob,
|
||||
&large,
|
||||
ctx.clone(),
|
||||
&full_key,
|
||||
stream::once(Ok(Bytes::from(full_data))),
|
||||
))?;
|
||||
|
||||
// Verify that the chunks are 3 bytes in size.
|
||||
let res = rt.block_on(
|
||||
filestore::fetch(&blob, ctx.clone(), &FetchKey::Canonical(full_id))
|
||||
.map(|maybe_str| maybe_str.map(|s| s.collect()))
|
||||
.flatten(),
|
||||
);
|
||||
|
||||
let expected: Vec<_> = vec!["foo", "bar"].into_iter().map(Bytes::from).collect();
|
||||
|
||||
assert_eq!(res?, Some(expected));
|
||||
|
||||
// Rechunk the file into 1 byte sections
|
||||
rt.block_on(filestore::rechunk::rechunk(
|
||||
blob.clone(),
|
||||
small,
|
||||
ctx.clone(),
|
||||
full_id,
|
||||
))?;
|
||||
|
||||
// Verify that we can still read the full thing.
|
||||
let res = rt.block_on(
|
||||
filestore::fetch(&blob, ctx, &FetchKey::Canonical(full_id))
|
||||
.map(|maybe_str| maybe_str.map(|s| s.collect()))
|
||||
.flatten(),
|
||||
);
|
||||
|
||||
let expected: Vec<_> = vec!["f", "o", "o", "b", "a", "r"]
|
||||
.into_iter()
|
||||
.map(Bytes::from)
|
||||
.collect();
|
||||
|
||||
println!("res = {:#?}", res);
|
||||
assert_eq!(res?, Some(expected));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filestore_test_rechunk_larger() -> Result<()> {
|
||||
let mut rt = tokio::runtime::Runtime::new()?;
|
||||
|
||||
let blob = memblob::LazyMemblob::new();
|
||||
|
||||
let small = FilestoreConfig {
|
||||
chunk_size: Some(1),
|
||||
concurrency: 5,
|
||||
};
|
||||
let large = FilestoreConfig {
|
||||
chunk_size: Some(3),
|
||||
concurrency: 5,
|
||||
};
|
||||
let ctx = CoreContext::test_mock();
|
||||
|
||||
let full_data = &b"foobar"[..];
|
||||
let full_key = request(full_data);
|
||||
let full_id = canonical(full_data);
|
||||
|
||||
// Store in 1 byte chunks
|
||||
rt.block_on(filestore::store(
|
||||
&blob,
|
||||
&small,
|
||||
ctx.clone(),
|
||||
&full_key,
|
||||
stream::once(Ok(Bytes::from(full_data))),
|
||||
))?;
|
||||
|
||||
// Verify that the chunks are 1 byte in size.
|
||||
let res = rt.block_on(
|
||||
filestore::fetch(&blob, ctx.clone(), &FetchKey::Canonical(full_id))
|
||||
.map(|maybe_str| maybe_str.map(|s| s.collect()))
|
||||
.flatten(),
|
||||
);
|
||||
|
||||
let expected: Vec<_> = vec!["f", "o", "o", "b", "a", "r"]
|
||||
.into_iter()
|
||||
.map(Bytes::from)
|
||||
.collect();
|
||||
|
||||
assert_eq!(res?, Some(expected));
|
||||
|
||||
// Rechunk the file into 3 byte sections
|
||||
rt.block_on(filestore::rechunk::rechunk(
|
||||
blob.clone(),
|
||||
large,
|
||||
ctx.clone(),
|
||||
full_id,
|
||||
))?;
|
||||
|
||||
// Verify that we can still read the full thing.
|
||||
let res = rt.block_on(
|
||||
filestore::fetch(&blob, ctx, &FetchKey::Canonical(full_id))
|
||||
.map(|maybe_str| maybe_str.map(|s| s.collect()))
|
||||
.flatten(),
|
||||
);
|
||||
|
||||
let expected: Vec<_> = vec!["foo", "bar"].into_iter().map(Bytes::from).collect();
|
||||
|
||||
println!("res = {:#?}", res);
|
||||
assert_eq!(res?, Some(expected));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filestore_test_rechunk_unchunked() -> Result<()> {
|
||||
let mut rt = tokio::runtime::Runtime::new()?;
|
||||
|
||||
let blob = memblob::LazyMemblob::new();
|
||||
|
||||
let small = FilestoreConfig {
|
||||
chunk_size: Some(1),
|
||||
concurrency: 5,
|
||||
};
|
||||
// This is large enough that the data we upload won't be chunked.
|
||||
let large = FilestoreConfig {
|
||||
chunk_size: Some(100),
|
||||
concurrency: 5,
|
||||
};
|
||||
let ctx = CoreContext::test_mock();
|
||||
|
||||
let full_data = &b"foobar"[..];
|
||||
let full_key = request(full_data);
|
||||
let full_id = canonical(full_data);
|
||||
|
||||
// Don't chunk
|
||||
rt.block_on(filestore::store(
|
||||
&blob,
|
||||
&large,
|
||||
ctx.clone(),
|
||||
&full_key,
|
||||
stream::once(Ok(Bytes::from(full_data))),
|
||||
))?;
|
||||
|
||||
// Rechunk the file into 1 byte sections
|
||||
rt.block_on(filestore::rechunk::rechunk(
|
||||
blob.clone(),
|
||||
small,
|
||||
ctx.clone(),
|
||||
full_id,
|
||||
))?;
|
||||
|
||||
// Verify that we can still read the full thing.
|
||||
let res = rt.block_on(
|
||||
filestore::fetch(&blob, ctx, &FetchKey::Canonical(full_id))
|
||||
.map(|maybe_str| maybe_str.map(|s| s.collect()))
|
||||
.flatten(),
|
||||
);
|
||||
|
||||
let expected: Vec<_> = vec!["f", "o", "o", "b", "a", "r"]
|
||||
.into_iter()
|
||||
.map(Bytes::from)
|
||||
.collect();
|
||||
|
||||
println!("res = {:#?}", res);
|
||||
assert_eq!(res?, Some(expected));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filestore_test_rechunk_missing_content() -> Result<()> {
|
||||
let mut rt = tokio::runtime::Runtime::new()?;
|
||||
|
||||
let blob = memblob::LazyMemblob::new();
|
||||
|
||||
let conf = FilestoreConfig {
|
||||
chunk_size: Some(1),
|
||||
concurrency: 5,
|
||||
};
|
||||
let ctx = CoreContext::test_mock();
|
||||
|
||||
let full_data = &b"foobar"[..];
|
||||
let full_id = canonical(full_data);
|
||||
|
||||
// Attempt to rechunk the file into 1 byte sections
|
||||
let res = rt.block_on(filestore::rechunk::rechunk(
|
||||
blob.clone(),
|
||||
conf,
|
||||
ctx.clone(),
|
||||
full_id,
|
||||
));
|
||||
|
||||
println!("res = {:#?}", res);
|
||||
assert_matches!(
|
||||
res.unwrap_err().downcast::<filestore::rechunk::ErrorKind>(),
|
||||
Ok(filestore::rechunk::ErrorKind::ContentNotFound(..))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user