diff --git a/futures-ext/src/lib.rs b/futures-ext/src/lib.rs index 144181d144..d6789c7f31 100644 --- a/futures-ext/src/lib.rs +++ b/futures-ext/src/lib.rs @@ -476,39 +476,61 @@ macro_rules! ensure_boxstream { }; } +/// Take a future, and run it on its own task, returning the result to the caller. This permits +/// Rust to run the spawned future on a different thread to the task that spawned it, thus adding +/// parallelism if used sensibly. +/// Note that the spawning here is lazy - the new task will not be spawned if the returned future +/// is dropped before it's polled. +pub fn spawn_future(f: IntoFut) -> impl Future +where + IntoFut: IntoFuture, + Fut: Future + Send + 'static, + T: Send + 'static, + E: From + Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + + let fut = f.into_future().then(|res| { + let _ = tx.send(res); + Ok(()) + }); + + future::lazy(move || { + let _ = tokio::spawn(fut); + rx.from_err().and_then(|v| v) + }) +} + +/// Given an `FnMut` closure, create a `Future` that will eventually execute the closure using +/// Tokio's `blocking` mechanism, so that it is safe to call blocking code inside the closure +/// without preventing other tasks from making progress. +/// This returns a lazy future - it will not even attempt to run the blocking code until you poll +/// the future. +/// Note that this does not spawn the future onto its own task - use `asynchronize` below if you +/// need to run the blocking code on its own thread, rather than letting it block this task. +pub fn closure_to_blocking_future(f: Func) -> impl Future +where + Func: FnMut() -> Result, + E: From, +{ + let mut func = f; + future::lazy(|| future::poll_fn(move || blocking(&mut func))) + .map_err(E::from) + .and_then(|res| res) // flatten Ok(res) => res +} + /// This method allows us to take synchronous code, schedule it on the default tokio thread pool -/// and convert it to the future. Func can return anything that is convertable to a future, for -/// example, Result -/// -/// ``` -/// use std::{thread, time}; -/// -/// asynchronize(move || { -/// thread::sleep(time::Duration::from_secs(5)); -/// Ok(()) -/// }) -/// ``` +/// and convert it to the future. It's the combination of `spawn_future` (which runs a future on +/// another thread) and `closure_to_blocking_future` (which turns a closure into a future). pub fn asynchronize(f: Func) -> impl Future where Func: FnMut() -> Result + Send + 'static, T: Send + 'static, E: From + From + Send + 'static, { - let (tx, rx) = oneshot::channel(); + let fut = closure_to_blocking_future(f); - let mut func = f; - let fut = future::lazy(|| future::poll_fn(move || blocking(&mut func))) - .map_err(E::from) - .and_then(|res| res) // flatten Ok(res) => res - .then(|res| { - let _ = tx.send(res); - Ok(()) - }); - - future::lazy(move || { - let _ = tokio::spawn(fut); - rx.from_err().and_then(|v| v) - }) + spawn_future(fut) } /// Simple adapter from `Sink` interface to `AsyncWrite` interface.