Split asynchronize into useful components

Summary:
`asynchronize` does two conceptually separate things:

1. Given a closure that can do blocking I/O or is CPU heavy, create a future
that runs that closure inside a Tokio task.
2. Given a future, run it on a new Tokio task and shuffle the result back to
the caller via a channel.

Split these two things out into their own functions - one to make the future,
one to spawn it and recover the result. For now, this is no net change - but
`spawn_future` is likely to come in useful once we need more parallelism than
we get from I/O alone, and `closure_to_blocking_future` at least signals intent
when we allow a long-running function to take over a Tokio task.

Reviewed By: jsgf

Differential Revision: D9635812

fbshipit-source-id: e15aeeb305c8499219b89a542962cb7c4b740354
This commit is contained in:
Simon Farnsworth 2018-09-05 12:10:21 -07:00 committed by Facebook Github Bot
parent 7fd5851f1e
commit 0ad8dcc0da

View File

@ -476,39 +476,61 @@ macro_rules! ensure_boxstream {
};
}
/// Take a future, and run it on its own task, returning the result to the caller. This permits
/// Rust to run the spawned future on a different thread to the task that spawned it, thus adding
/// parallelism if used sensibly.
/// Note that the spawning here is lazy - the new task will not be spawned if the returned future
/// is dropped before it's polled.
pub fn spawn_future<T, E, Fut, IntoFut>(f: IntoFut) -> impl Future<Item = T, Error = E>
where
IntoFut: IntoFuture<Item = T, Error = E, Future = Fut>,
Fut: Future<Item = T, Error = E> + Send + 'static,
T: Send + 'static,
E: From<futures::Canceled> + Send + 'static,
{
let (tx, rx) = oneshot::channel();
let fut = f.into_future().then(|res| {
let _ = tx.send(res);
Ok(())
});
future::lazy(move || {
let _ = tokio::spawn(fut);
rx.from_err().and_then(|v| v)
})
}
/// Given an `FnMut` closure, create a `Future` that will eventually execute the closure using
/// Tokio's `blocking` mechanism, so that it is safe to call blocking code inside the closure
/// without preventing other tasks from making progress.
/// This returns a lazy future - it will not even attempt to run the blocking code until you poll
/// the future.
/// Note that this does not spawn the future onto its own task - use `asynchronize` below if you
/// need to run the blocking code on its own thread, rather than letting it block this task.
pub fn closure_to_blocking_future<T, E, Func>(f: Func) -> impl Future<Item = T, Error = E>
where
Func: FnMut() -> Result<T, E>,
E: From<tokio_threadpool::BlockingError>,
{
let mut func = f;
future::lazy(|| future::poll_fn(move || blocking(&mut func)))
.map_err(E::from)
.and_then(|res| res) // flatten Ok(res) => res
}
/// This method allows us to take synchronous code, schedule it on the default tokio thread pool
/// and convert it to the future. Func can return anything that is convertable to a future, for
/// example, Result
///
/// ```
/// use std::{thread, time};
///
/// asynchronize(move || {
/// thread::sleep(time::Duration::from_secs(5));
/// Ok(())
/// })
/// ```
/// and convert it to the future. It's the combination of `spawn_future` (which runs a future on
/// another thread) and `closure_to_blocking_future` (which turns a closure into a future).
pub fn asynchronize<Func, T, E>(f: Func) -> impl Future<Item = T, Error = E>
where
Func: FnMut() -> Result<T, E> + Send + 'static,
T: Send + 'static,
E: From<tokio_threadpool::BlockingError> + From<futures::Canceled> + Send + 'static,
{
let (tx, rx) = oneshot::channel();
let fut = closure_to_blocking_future(f);
let mut func = f;
let fut = future::lazy(|| future::poll_fn(move || blocking(&mut func)))
.map_err(E::from)
.and_then(|res| res) // flatten Ok(res) => res
.then(|res| {
let _ = tx.send(res);
Ok(())
});
future::lazy(move || {
let _ = tokio::spawn(fut);
rx.from_err().and_then(|v| v)
})
spawn_future(fut)
}
/// Simple adapter from `Sink` interface to `AsyncWrite` interface.