mononoke/filestore: convert prepare to futures 0.3 / tokio 0.2

Summary:
Like it says in the title. This also lets us remove the spawn module entirely.
Note that there is one little annoyance here: I ran into the good old "not
general enough" compiler issue, so I had to add a bit more boxing to make this
go away :(

Reviewed By: markbt

Differential Revision: D24727253

fbshipit-source-id: 73435305d39cade2f32b151734adf0969311c243
This commit is contained in:
Thomas Orozco 2020-11-06 07:00:47 -08:00 committed by Facebook GitHub Bot
parent 565eff1f9d
commit 5adbccff53
5 changed files with 124 additions and 207 deletions

View File

@ -24,7 +24,6 @@ sha2 = "0.8"
slog = { version = "2.5", features = ["max_level_debug"] }
thiserror = "1.0"
tokio = { version = "=0.2.13", features = ["full"] }
tokio-old = { package = "tokio", version = "0.1" }
[dev-dependencies]
memblob = { path = "../blobstore/memblob" }

View File

@ -14,8 +14,12 @@ use std::convert::TryInto;
use anyhow::Error;
use cloned::cloned;
use futures::{compat::Stream01CompatExt, future::TryFutureExt, stream::TryStreamExt};
use futures_ext::FutureExt;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{FutureExt, TryFutureExt},
stream::TryStreamExt,
};
use futures_ext::FutureExt as OldFutureExt;
use futures_old::{stream, Future, IntoFuture, Stream};
use blobstore::{Blobstore, Loadable, LoadableError};
@ -34,7 +38,6 @@ mod metadata;
mod multiplexer;
mod prepare;
mod rechunk;
mod spawn;
mod streamhash;
pub use fetch_key::{Alias, AliasBlob, FetchKey};
@ -401,22 +404,30 @@ pub fn store<B: Blobstore + Clone>(
) -> impl Future<Item = ContentMetadata, Error = Error> {
use chunk::Chunks;
let prepared = match chunk::make_chunks(data.compat(), req.expected_size, config.chunk_size) {
Chunks::Inline(fut) => prepare::prepare_inline(fut.compat()).left_future(),
Chunks::Chunked(expected_size, chunks) => prepare::prepare_chunked(
ctx.clone(),
blobstore.clone(),
expected_size,
chunks.compat(),
config.concurrency,
)
.right_future(),
};
cloned!(req); // TODO: Just take this by value.
prepared.and_then({
cloned!(blobstore, ctx, req);
move |prepared| finalize::finalize(blobstore, ctx, Some(&req), prepared)
})
async move {
let prepared = match chunk::make_chunks(data.compat(), req.expected_size, config.chunk_size)
{
Chunks::Inline(fut) => prepare::prepare_bytes(fut.await?),
Chunks::Chunked(expected_size, chunks) => {
prepare::prepare_chunked(
ctx.clone(),
blobstore.clone(),
expected_size,
chunks,
config.concurrency,
)
.await?
}
};
finalize::finalize(blobstore, ctx, Some(&req), prepared)
.compat()
.await
}
.boxed()
.compat()
}
/// Store a set of bytes, and immediately return their Contentid and size. This function is

View File

@ -10,8 +10,7 @@ use futures::{
channel::mpsc,
future::{self, Future, TryFutureExt},
sink::{Sink, SinkExt},
stream::Stream,
stream::TryStreamExt,
stream::{Stream, StreamExt, TryStreamExt},
};
use tokio::task::JoinHandle;
@ -36,6 +35,7 @@ pub struct Multiplexer<T: Send + Sync> {
pub enum MultiplexerError<E> {
/// InputError indicates that the input stream hit an error E.
InputError(E),
/// Cancelled indicates that one of the consumers add()'ed to this Multiplexer stopped
/// accepting input (likely because it hit an error).
Cancelled,
@ -89,9 +89,9 @@ impl<T: Send + Sync + Clone + 'static> Multiplexer<T> {
}
/// Drain a Stream into the multiplexer.
pub async fn drain<S, E>(self, stream: S) -> Result<(), MultiplexerError<E>>
pub async fn drain<'a, S, E>(self, stream: S) -> Result<(), MultiplexerError<E>>
where
S: Stream<Item = Result<T, E>>,
S: Stream<Item = Result<T, E>> + Send + 'static,
{
let Self { sink, .. } = self;
@ -100,8 +100,9 @@ impl<T: Send + Sync + Clone + 'static> Multiplexer<T> {
// errors originating from the Stream and errors originating from the Sink.
Some(sink) => {
let mut sink = sink.sink_map_err(|_| MultiplexerError::Cancelled);
let stream = stream.map_err(MultiplexerError::InputError);
futures::pin_mut!(stream);
// NOTE: I'd like to use futures::pin_mut! here, but:
// https://github.com/rust-lang/rust/issues/64552
let mut stream = stream.map_err(MultiplexerError::InputError).boxed();
sink.send_all(&mut stream).await
}
// If we have no Sink, then consume the Stream regardless.

View File

@ -11,14 +11,10 @@ use bytes::Bytes;
use cloned::cloned;
use context::CoreContext;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
future::{FutureExt, TryFutureExt},
stream::{StreamExt, TryStreamExt},
};
use futures_ext::FutureExt as _;
use futures_old::{
future::{lazy, IntoFuture},
Future, Stream,
compat::Future01CompatExt,
future::{self, FutureExt, TryFutureExt},
stream::{Stream, StreamExt, TryStreamExt},
task::Poll,
};
use mononoke_types::{
content_chunk::new_blob_and_pointer, hash, ChunkedFileContents, FileContents, MononokeId,
@ -31,7 +27,6 @@ use crate::incremental_hash::{
Sha256IncrementalHasher,
};
use crate::multiplexer::{Multiplexer, MultiplexerError};
use crate::spawn::{self};
use crate::streamhash::hash_stream;
#[derive(Debug, Clone)]
@ -57,133 +52,107 @@ pub fn prepare_bytes(bytes: Bytes) -> Prepared {
}
}
/// Prepare a set of Bytes for upload. The size hint isn't actually used here, it's just passed
/// through.
pub fn prepare_inline<F>(chunk: F) -> impl Future<Item = Prepared, Error = Error>
where
F: Future<Item = Bytes, Error = Error>,
{
chunk.map(prepare_bytes)
}
/// Prepare a stream of bytes for upload. This will return a Prepared struct that can be used to
/// finalize the upload. The hashes we compute may depend on the size hint.
pub fn prepare_chunked<B: Blobstore + Clone, S>(
pub async fn prepare_chunked<B: Blobstore + Clone, S>(
ctx: CoreContext,
blobstore: B,
expected_size: ExpectedSize,
chunks: S,
concurrency: usize,
) -> impl Future<Item = Prepared, Error = Error>
) -> Result<Prepared, Error>
where
S: Stream<Item = Bytes, Error = Error> + Send + 'static,
S: Stream<Item = Result<Bytes, Error>> + Send + 'static,
{
lazy(move || {
// NOTE: The Multiplexer makes clones of the Bytes we pass in. It's worth noting that Bytes
// actually behaves like an Arc with an inner reference-counted handle to data, so those
// clones are actually fairly cheap
let mut multiplexer = Multiplexer::<Bytes>::new();
// NOTE: The Multiplexer makes clones of the Bytes we pass in. It's worth noting that Bytes
// actually behaves like an Arc with an inner reference-counted handle to data, so those
// clones are actually fairly cheap
let mut multiplexer = Multiplexer::<Bytes>::new();
// Spawn a stream for each hash we need to produce.
let content_id = multiplexer
.add(|stream| hash_stream(ContentIdIncrementalHasher::new(), stream))
.boxed()
.compat();
let aliases = add_aliases_to_multiplexer(&mut multiplexer, expected_size);
// Spawn a stream for each hash we need to produce.
let content_id =
multiplexer.add(|stream| hash_stream(ContentIdIncrementalHasher::new(), stream));
// For the file's contents, spawn new tasks for each individual chunk. This ensures that
// each chunk is hashed and uploaded separately, and potentially on a different CPU core.
// We allow up to concurrency uploads to progress at the same time, which creates
// backpressure into the chunks Stream.
let contents = multiplexer
.add(move |stream| {
stream
.map(|bytes| Result::<_, Error>::Ok(bytes))
.compat()
.map(move |bytes| {
// NOTE: This is lazy to allow the hash computation for this chunk's ID to
// happen on a separate core.
let fut = lazy({
cloned!(blobstore, ctx);
move || {
let (blob, pointer) = new_blob_and_pointer(bytes);
let aliases = add_aliases_to_multiplexer(&mut multiplexer, expected_size).compat();
// TODO: Convert this along with other store calls to impl Storable for
// MononokeId.
blobstore
.put(ctx, blob.id().blobstore_key(), blob.into())
.compat()
.map(move |_| pointer)
}
});
// For the file's contents, spawn new tasks for each individual chunk. This ensures that
// each chunk is hashed and uploaded separately, and potentially on a different CPU core.
// We allow up to concurrency uploads to progress at the same time, which creates
// backpressure into the chunks Stream.
let contents = multiplexer
.add(move |stream| {
stream
.map(move |bytes| {
// NOTE: This is lazy to allow the hash computation for this chunk's ID to
// happen on a separate core.
let fut = {
cloned!(blobstore, ctx);
async move {
let (blob, pointer) = new_blob_and_pointer(bytes);
spawn::spawn_and_start(fut).map_err(|e| e.into())
})
.buffered(concurrency)
.fold(vec![], |mut chunks, chunk| {
chunks.push(chunk);
let res: Result<_> = Ok(chunks);
res
})
.compat()
})
.map(|res| match res {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(e),
Err(e) => Err(e.into()),
})
.compat();
// TODO: Convert this along with other store calls to impl Storable for
// MononokeId.
blobstore
.put(ctx, blob.id().blobstore_key(), blob.into())
.await?;
multiplexer
.drain(chunks.compat())
.boxed()
.compat()
.then(|res| {
// Coerce the Error value for all our futures to Error. Note that the content_id and
// alias ones actually cannot fail.
let content_id = content_id.map_err(|e| e.into());
let aliases = aliases.map_err(|e| e.into());
let contents = contents.map_err(|e| e.into());
Result::<_, Error>::Ok(pointer)
}
};
// Mutable so we can poll later in the error case.
let mut futs = (content_id, aliases, contents).into_future();
async move { tokio::task::spawn(fut).await? }
})
.buffered(concurrency)
.try_fold(vec![], |mut chunks, chunk| async move {
chunks.push(chunk);
Result::<_, Error>::Ok(chunks)
})
})
.map(|res| match res {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(e),
Err(e) => Err(e.into()),
});
match res {
// All is well - get the results when our futures complete.
Ok(_) => futs
.and_then(|(content_id, aliases, chunks)| {
let contents =
FileContents::Chunked(ChunkedFileContents::new(content_id, chunks));
let res = multiplexer.drain(chunks).await;
let (sha1, sha256, git_sha1) = aliases.redeem(contents.size())?;
// Coerce the Error value for all our futures to Error.
let content_id = content_id.map_err(Error::from);
let aliases = aliases.map_err(Error::from);
let contents = contents.map_err(Error::from);
let prepared = Prepared {
sha1,
sha256,
git_sha1,
contents,
};
let futs = future::try_join3(content_id, aliases, contents);
Ok(prepared)
})
.left_future(),
// If the Multiplexer hit an error, then it's worth handling the Cancelled case
// separately: Cancelled means our Multiplexer noted that one of its readers
// stopped reading. Usually, this will be because one of the readers failed. So,
// let's just poll the readers once to see if they have an error value ready, and
// if so, let's return that Error (because it'll be a more usable one). If not,
// we'll passthrough the cancellation (but, we do have a unit test to make sure we
// hit the happy path that prettifies the error).
Err(e) => Err(match e {
e @ MultiplexerError::Cancelled => match futs.poll() {
Ok(_) => e.into(),
Err(e) => e,
},
e @ MultiplexerError::InputError(_) => e.into(),
})
.into_future()
.right_future(),
}
})
})
match res {
// All is well - get the results when our futures complete.
Ok(_) => {
let (content_id, aliases, chunks) = futs.await?;
let contents = FileContents::Chunked(ChunkedFileContents::new(content_id, chunks));
let (sha1, sha256, git_sha1) = aliases.redeem(contents.size())?;
let prepared = Prepared {
sha1,
sha256,
git_sha1,
contents,
};
Ok(prepared)
}
// If the Multiplexer hit an error, then it's worth handling the Cancelled case
// separately: Cancelled means our Multiplexer noted that one of its readers
// stopped reading. Usually, this will be because one of the readers failed. So,
// let's just poll the readers once to see if they have an error value ready, and
// if so, let's return that Error (because it'll be a more usable one). If not,
// we'll passthrough the cancellation (but, we do have a unit test to make sure we
// hit the happy path that prettifies the error).
Err(m @ MultiplexerError::Cancelled) => match futures::poll!(futs) {
Poll::Ready(Err(e)) => Err(e),
_ => Err(m.into()),
},
Err(m @ MultiplexerError::InputError(..)) => Err(m.into()),
}
}

View File

@ -1,63 +0,0 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::{format_err, Error};
use futures_ext::FutureExt;
use futures_old::{sync::oneshot, Future, IntoFuture};
// Spawn provides a helper to dispatch futures to the Tokio executor yet retain a handle to their
// results in the form of a Future. We also provide methods to cast SpawnError into an Error for
// convenience.
#[derive(Debug)]
pub enum SpawnError<E> {
Error(E),
Cancelled,
}
impl Into<Error> for SpawnError<Error> {
fn into(self) -> Error {
use SpawnError::*;
match self {
Error(e) => e,
e @ Cancelled => format_err!("SpawnError: {:?}", e),
}
}
}
impl Into<Error> for SpawnError<!> {
fn into(self) -> Error {
use SpawnError::*;
match self {
Error(e) => e,
e @ Cancelled => format_err!("SpawnError: {:?}", e),
}
}
}
// NOTE: We don't use FutureExt's spawn_future here because we need the Future to start doing work
// immediately. That's because we used these spawned futures with the Multiplexer, which requires
// those futures to do progress for draining to complete.
pub fn spawn_and_start<F, I, E>(fut: F) -> impl Future<Item = I, Error = SpawnError<E>>
where
F: Future<Item = I, Error = E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
let (sender, receiver) = oneshot::channel::<Result<I, E>>();
let fut = fut.then(|res| sender.send(res)).discard();
tokio_old::spawn(fut);
receiver.into_future().then(|res| match res {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(SpawnError::Error(e)),
Err(_) => Err(SpawnError::Cancelled),
})
}