mononoke/filestore: expose Range as an input in range fetches

Summary:
I'd like to add support for Range queries in LFS. In HTTP, the range header
isn't start + size, it's start / end, so I'll need a slightly lower API. This
makes that possible.

Note: Range should have a private constructor, because otherwise we'll get
errors if start > end. Hence the `Range` / `RangeInner` here.

Reviewed By: StanislavGlebik

Differential Revision: D27187817

fbshipit-source-id: 20183d8b200fef6c16e45a66886ce8886d92e4f6
This commit is contained in:
Thomas Orozco 2021-03-23 03:18:35 -07:00 committed by Facebook GitHub Bot
parent f8247dbc53
commit 7644185846
6 changed files with 59 additions and 43 deletions

View File

@ -34,8 +34,24 @@ pub enum ErrorKind {
ChunkNotFound(ContentChunkId),
}
#[derive(Debug)]
pub enum Range {
#[derive(Debug, Copy, Clone)]
pub struct Range(RangeInner);
impl Range {
pub fn all() -> Self {
Self(RangeInner::All)
}
pub fn sized(start: u64, size: u64) -> Self {
Self(RangeInner::Span {
start,
end: start.saturating_add(size),
})
}
}
#[derive(Debug, Copy, Clone)]
enum RangeInner {
All,
Span { start: u64, end: u64 },
}
@ -46,12 +62,14 @@ pub fn stream_file_bytes<'a, B: Blobstore + Clone + 'a>(
file_contents: FileContents,
range: Range,
) -> impl Stream<Item = Result<Bytes, Error>> + 'a {
let range = range.0;
match file_contents {
FileContents::Bytes(bytes) => {
// File is just a single chunk of bytes. Return the correct
// slice, based on the requested range.
let bytes = match range {
Range::Span {
RangeInner::Span {
start: range_start,
end: range_end,
} => {
@ -60,7 +78,7 @@ pub fn stream_file_bytes<'a, B: Blobstore + Clone + 'a>(
let slice_end = min(range_end, len);
bytes.slice((slice_start as usize)..(slice_end as usize))
}
Range::All => bytes,
RangeInner::All => bytes,
};
stream::once(future::ready(Ok(bytes))).left_stream()
}
@ -82,7 +100,7 @@ pub fn stream_file_bytes<'a, B: Blobstore + Clone + 'a>(
let buffer_size = buffer_size.try_into().unwrap();
let chunk_iter = match range {
Range::Span {
RangeInner::Span {
start: range_start,
end: range_end,
} => {
@ -111,10 +129,10 @@ pub fn stream_file_bytes<'a, B: Blobstore + Clone + 'a>(
let slice_start = max(chunk_start, range_start);
let slice_end = min(chunk_end, range_end);
if (slice_start, slice_end) == (chunk_start, chunk_end) {
(Range::All, chunk)
(RangeInner::All, chunk)
} else {
(
Range::Span {
RangeInner::Span {
start: slice_start - chunk_start,
end: slice_end - chunk_start,
},
@ -124,8 +142,8 @@ pub fn stream_file_bytes<'a, B: Blobstore + Clone + 'a>(
});
Either::Left(iter)
}
Range::All => {
let iter = chunks.into_iter().map(|chunk| (Range::All, chunk));
RangeInner::All => {
let iter = chunks.into_iter().map(|chunk| (RangeInner::All, chunk));
Either::Right(iter)
}
};
@ -149,8 +167,10 @@ pub fn stream_file_bytes<'a, B: Blobstore + Clone + 'a>(
.map(ContentChunk::into_bytes)?;
let bytes = match chunk_range {
Range::Span { start, end } => bytes.slice((start as usize)..(end as usize)),
Range::All => bytes,
RangeInner::Span { start, end } => {
bytes.slice((start as usize)..(end as usize))
}
RangeInner::All => bytes,
};
Ok(bytes)

View File

@ -36,6 +36,7 @@ mod prepare;
mod rechunk;
mod streamhash;
pub use fetch::Range;
pub use fetch_key::{Alias, AliasBlob, FetchKey};
pub use rechunk::{force_rechunk, rechunk};
@ -238,7 +239,7 @@ pub async fn fetch_with_size<'a, B: Blobstore + Clone + 'a>(
match content_id {
Some(content_id) => {
fetch::fetch_with_size(blobstore, ctx, content_id, fetch::Range::All).await
fetch::fetch_with_size(blobstore, ctx, content_id, fetch::Range::all()).await
}
None => Ok(None),
}
@ -253,8 +254,7 @@ pub async fn fetch_range_with_size<'a, B: Blobstore>(
blobstore: &'a B,
ctx: &'a CoreContext,
key: &FetchKey,
start: u64,
size: u64,
range: Range,
) -> Result<Option<(impl Stream<Item = Result<Bytes, Error>> + 'a, u64)>, Error> {
let content_id = key
.load(ctx, blobstore)
@ -266,18 +266,7 @@ pub async fn fetch_range_with_size<'a, B: Blobstore>(
})?;
match content_id {
Some(content_id) => {
fetch::fetch_with_size(
blobstore,
ctx,
content_id,
fetch::Range::Span {
start,
end: start.saturating_add(size),
},
)
.await
}
Some(content_id) => fetch::fetch_with_size(blobstore, ctx, content_id, range).await,
None => Ok(None),
}
}
@ -358,10 +347,9 @@ pub async fn fetch_range<'a, B: Blobstore>(
blobstore: &'a B,
ctx: &'a CoreContext,
key: &FetchKey,
start: u64,
size: u64,
range: Range,
) -> Result<Option<impl Stream<Item = Result<Bytes, Error>> + 'a>, Error> {
let res = fetch_range_with_size(blobstore, ctx, key, start, size).await?;
let res = fetch_range_with_size(blobstore, ctx, key, range).await?;
Ok(res.map(|(stream, _len)| stream))
}

View File

@ -98,7 +98,8 @@ async fn rebuild_metadata<B: Blobstore>(
// NOTE: We implicitly trust data from the Filestore here. We do not validate
// the size, nor the ContentId.
let total_size = file_contents.size();
let content_stream = fetch::stream_file_bytes(blobstore, ctx, file_contents, fetch::Range::All);
let content_stream =
fetch::stream_file_bytes(blobstore, ctx, file_contents, fetch::Range::all());
let redeemable = alias_stream(ExpectedSize::new(total_size), content_stream)
.await

View File

@ -178,7 +178,7 @@ async fn do_rechunk_file_contents<B: Blobstore + Clone + 'static>(
content_id: ContentId,
) -> Result<ContentMetadata, Error> {
let req = StoreRequest::with_canonical(file_contents.size(), content_id);
let file_stream = fetch::stream_file_bytes(blobstore, ctx, file_contents, fetch::Range::All);
let file_stream = fetch::stream_file_bytes(blobstore, ctx, file_contents, fetch::Range::all());
store(blobstore, filestore_config, ctx, &req, file_stream).await
}

View File

@ -569,11 +569,15 @@ async fn filestore_get_range(fb: FacebookInit) -> Result<()> {
.await?;
let res = async {
let stream =
filestore::fetch_range_with_size(&blob, ctx, &FetchKey::Canonical(content_id), 7, 5)
.await?
.ok_or_else(|| Error::msg("Object does not exist"))?
.0;
let stream = filestore::fetch_range_with_size(
&blob,
ctx,
&FetchKey::Canonical(content_id),
filestore::Range::sized(7, 5),
)
.await?
.ok_or_else(|| Error::msg("Object does not exist"))?
.0;
let bytes = stream
.try_fold(BytesMut::new(), |mut buff, chunk| async move {
@ -620,11 +624,15 @@ async fn filestore_get_chunked_range(fb: FacebookInit) -> Result<()> {
.await?;
let res = async {
let stream =
filestore::fetch_range_with_size(blob, ctx, &FetchKey::Canonical(full_id), 4, 6)
.await?
.ok_or_else(|| Error::msg("Object does not exist"))?
.0;
let stream = filestore::fetch_range_with_size(
blob,
ctx,
&FetchKey::Canonical(full_id),
filestore::Range::sized(4, 6),
)
.await?
.ok_or_else(|| Error::msg("Object does not exist"))?
.0;
let bytes = stream
.try_fold(BytesMut::new(), |mut buff, chunk| async move {

View File

@ -157,8 +157,7 @@ impl FileContext {
self.repo().blob_repo().blobstore(),
self.ctx(),
&self.fetch_key,
start,
size,
filestore::Range::sized(start, size),
)
.await;