From d2af7b30a06ade6267aafd66293fc09490267018 Mon Sep 17 00:00:00 2001 From: Harvey Hunt Date: Fri, 16 Aug 2019 09:07:36 -0700 Subject: [PATCH] mononoke: Add rechunking command to filestore Summary: Once enabled, the filestore will chunk up new files before storing them in the blobstore. However, there are quite a few large files in the blobstore that should be rechunked. Add support to the filestore for rechunking files by downloading them from the blobstore and reuploading the, but chunked. Reviewed By: krallin Differential Revision: D16802702 fbshipit-source-id: 2e617ec2cd215204596769c72431fd921ff33dd8 --- blobrepo/src/repo.rs | 13 ++ filestore/src/lib.rs | 2 + filestore/src/rechunk.rs | 46 +++++++ filestore/src/test/test_api.rs | 217 +++++++++++++++++++++++++++++++++ 4 files changed, 278 insertions(+) create mode 100644 filestore/src/rechunk.rs diff --git a/blobrepo/src/repo.rs b/blobrepo/src/repo.rs index 3bc4810811..af495eacaf 100644 --- a/blobrepo/src/repo.rs +++ b/blobrepo/src/repo.rs @@ -284,6 +284,19 @@ impl BlobRepo { fetch_file_content_from_blobstore(ctx, &self.blobstore, key).boxify() } + pub fn rechunk_file_by_content_id( + &self, + ctx: CoreContext, + id: ContentId, + ) -> impl Future { + filestore::rechunk( + self.blobstore.clone(), + self.filestore_config.clone(), + ctx, + id, + ) + } + pub fn get_file_content_by_content_id( &self, ctx: CoreContext, diff --git a/filestore/src/lib.rs b/filestore/src/lib.rs index 37d599e257..841a1339b5 100644 --- a/filestore/src/lib.rs +++ b/filestore/src/lib.rs @@ -29,10 +29,12 @@ mod incremental_hash; mod metadata; mod multiplexer; mod prepare; +mod rechunk; mod spawn; mod streamhash; pub use fetch_key::{Alias, AliasBlob, FetchKey}; +pub use rechunk::rechunk; #[cfg(test)] mod test; diff --git a/filestore/src/rechunk.rs b/filestore/src/rechunk.rs new file mode 100644 index 0000000000..219329b9f8 --- /dev/null +++ b/filestore/src/rechunk.rs @@ -0,0 +1,46 @@ +// Copyright (c) 2019-present, Facebook, Inc. +// All Rights Reserved. +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +use failure_ext::{Error, Fail}; +use futures::future::IntoFuture; +use futures::Future; +use futures_ext::FutureExt; + +use blobstore::{Blobstore, Loadable}; +use context::CoreContext; +use mononoke_types::{ContentId, ContentMetadata}; + +use crate::fetch::stream_file_bytes; +use crate::{store, FilestoreConfig, StoreRequest}; + +#[derive(Debug, Fail)] +pub enum ErrorKind { + #[fail(display = "Content not found: {:?}", _0)] + ContentNotFound(ContentId), +} + +/// Fetch a file from the blobstore and reupload it in a chunked form. +/// NOTE: This could actually unchunk a file if the chunk size threshold +/// is increased after the file is written. +pub fn rechunk( + blobstore: B, + config: FilestoreConfig, + ctx: CoreContext, + content_id: ContentId, +) -> impl Future { + content_id + .load(ctx.clone(), &blobstore) + .and_then(move |maybe_file_contents| match maybe_file_contents { + Some(file_contents) => { + let req = StoreRequest::with_canonical(file_contents.size(), content_id); + let file_stream = stream_file_bytes(blobstore.clone(), ctx.clone(), file_contents); + store(&blobstore, &config, ctx, &req, file_stream).left_future() + } + None => Err(ErrorKind::ContentNotFound(content_id).into()) + .into_future() + .right_future(), + }) +} diff --git a/filestore/src/test/test_api.rs b/filestore/src/test/test_api.rs index 84a788e2c2..24b4480972 100644 --- a/filestore/src/test/test_api.rs +++ b/filestore/src/test/test_api.rs @@ -870,3 +870,220 @@ fn filestore_store_error() -> Result<()> { ); Ok(()) } + +#[test] +fn filestore_test_rechunk() -> Result<()> { + let mut rt = tokio::runtime::Runtime::new()?; + + let blob = memblob::LazyMemblob::new(); + + let small = FilestoreConfig { + chunk_size: Some(1), + concurrency: 5, + }; + let large = FilestoreConfig { + chunk_size: Some(3), + concurrency: 5, + }; + let ctx = CoreContext::test_mock(); + + let full_data = &b"foobar"[..]; + let full_key = request(full_data); + let full_id = canonical(full_data); + + // Store in 3-byte chunks + rt.block_on(filestore::store( + &blob, + &large, + ctx.clone(), + &full_key, + stream::once(Ok(Bytes::from(full_data))), + ))?; + + // Verify that the chunks are 3 bytes in size. + let res = rt.block_on( + filestore::fetch(&blob, ctx.clone(), &FetchKey::Canonical(full_id)) + .map(|maybe_str| maybe_str.map(|s| s.collect())) + .flatten(), + ); + + let expected: Vec<_> = vec!["foo", "bar"].into_iter().map(Bytes::from).collect(); + + assert_eq!(res?, Some(expected)); + + // Rechunk the file into 1 byte sections + rt.block_on(filestore::rechunk::rechunk( + blob.clone(), + small, + ctx.clone(), + full_id, + ))?; + + // Verify that we can still read the full thing. + let res = rt.block_on( + filestore::fetch(&blob, ctx, &FetchKey::Canonical(full_id)) + .map(|maybe_str| maybe_str.map(|s| s.collect())) + .flatten(), + ); + + let expected: Vec<_> = vec!["f", "o", "o", "b", "a", "r"] + .into_iter() + .map(Bytes::from) + .collect(); + + println!("res = {:#?}", res); + assert_eq!(res?, Some(expected)); + Ok(()) +} + +#[test] +fn filestore_test_rechunk_larger() -> Result<()> { + let mut rt = tokio::runtime::Runtime::new()?; + + let blob = memblob::LazyMemblob::new(); + + let small = FilestoreConfig { + chunk_size: Some(1), + concurrency: 5, + }; + let large = FilestoreConfig { + chunk_size: Some(3), + concurrency: 5, + }; + let ctx = CoreContext::test_mock(); + + let full_data = &b"foobar"[..]; + let full_key = request(full_data); + let full_id = canonical(full_data); + + // Store in 1 byte chunks + rt.block_on(filestore::store( + &blob, + &small, + ctx.clone(), + &full_key, + stream::once(Ok(Bytes::from(full_data))), + ))?; + + // Verify that the chunks are 1 byte in size. + let res = rt.block_on( + filestore::fetch(&blob, ctx.clone(), &FetchKey::Canonical(full_id)) + .map(|maybe_str| maybe_str.map(|s| s.collect())) + .flatten(), + ); + + let expected: Vec<_> = vec!["f", "o", "o", "b", "a", "r"] + .into_iter() + .map(Bytes::from) + .collect(); + + assert_eq!(res?, Some(expected)); + + // Rechunk the file into 3 byte sections + rt.block_on(filestore::rechunk::rechunk( + blob.clone(), + large, + ctx.clone(), + full_id, + ))?; + + // Verify that we can still read the full thing. + let res = rt.block_on( + filestore::fetch(&blob, ctx, &FetchKey::Canonical(full_id)) + .map(|maybe_str| maybe_str.map(|s| s.collect())) + .flatten(), + ); + + let expected: Vec<_> = vec!["foo", "bar"].into_iter().map(Bytes::from).collect(); + + println!("res = {:#?}", res); + assert_eq!(res?, Some(expected)); + Ok(()) +} + +#[test] +fn filestore_test_rechunk_unchunked() -> Result<()> { + let mut rt = tokio::runtime::Runtime::new()?; + + let blob = memblob::LazyMemblob::new(); + + let small = FilestoreConfig { + chunk_size: Some(1), + concurrency: 5, + }; + // This is large enough that the data we upload won't be chunked. + let large = FilestoreConfig { + chunk_size: Some(100), + concurrency: 5, + }; + let ctx = CoreContext::test_mock(); + + let full_data = &b"foobar"[..]; + let full_key = request(full_data); + let full_id = canonical(full_data); + + // Don't chunk + rt.block_on(filestore::store( + &blob, + &large, + ctx.clone(), + &full_key, + stream::once(Ok(Bytes::from(full_data))), + ))?; + + // Rechunk the file into 1 byte sections + rt.block_on(filestore::rechunk::rechunk( + blob.clone(), + small, + ctx.clone(), + full_id, + ))?; + + // Verify that we can still read the full thing. + let res = rt.block_on( + filestore::fetch(&blob, ctx, &FetchKey::Canonical(full_id)) + .map(|maybe_str| maybe_str.map(|s| s.collect())) + .flatten(), + ); + + let expected: Vec<_> = vec!["f", "o", "o", "b", "a", "r"] + .into_iter() + .map(Bytes::from) + .collect(); + + println!("res = {:#?}", res); + assert_eq!(res?, Some(expected)); + Ok(()) +} + +#[test] +fn filestore_test_rechunk_missing_content() -> Result<()> { + let mut rt = tokio::runtime::Runtime::new()?; + + let blob = memblob::LazyMemblob::new(); + + let conf = FilestoreConfig { + chunk_size: Some(1), + concurrency: 5, + }; + let ctx = CoreContext::test_mock(); + + let full_data = &b"foobar"[..]; + let full_id = canonical(full_data); + + // Attempt to rechunk the file into 1 byte sections + let res = rt.block_on(filestore::rechunk::rechunk( + blob.clone(), + conf, + ctx.clone(), + full_id, + )); + + println!("res = {:#?}", res); + assert_matches!( + res.unwrap_err().downcast::(), + Ok(filestore::rechunk::ErrorKind::ContentNotFound(..)) + ); + + Ok(()) +}