Make NextStep simpler

Summary:
By using "unboxed closures", we don't need to name the exact type of the output of a function, and here we only need it to be a future.

In this way, we can remove two of the type parameters of `NextStep` and make it simpler to use.

Reviewed By: Croohand

Differential Revision: D43503769

fbshipit-source-id: f1b66456c43e84dd4c77c500b684d5234237421c
This commit is contained in:
Yan Soares Couto 2023-02-27 05:01:44 -08:00 committed by Facebook GitHub Bot
parent 486625c515
commit a5535f73aa
2 changed files with 22 additions and 20 deletions

View File

@ -5,6 +5,8 @@
* GNU General Public License version 2.
*/
#![feature(unboxed_closures)]
use futures::future;
use futures::stream::FusedStream;
use futures::Future;
@ -17,10 +19,10 @@ mod next_step;
pub use next_step::NextStep;
pub trait AssemblyLine: Stream + Sized {
fn next_step<F, O, FOut>(self, step_fn: F) -> NextStep<Self, F, O, FOut>
fn next_step<F>(self, step_fn: F) -> NextStep<Self, F>
where
F: FnMut(Self::Item) -> FOut,
FOut: Future<Output = O>,
F: FnMut<(Self::Item,)>,
F::Output: Future,
Self: FusedStream,
{
NextStep::new(self, step_fn)
@ -32,14 +34,14 @@ impl<S: Stream> AssemblyLine for S {}
pub struct TryAssemblyLine;
impl TryAssemblyLine {
pub fn try_next_step<S, F, O, FOut>(
pub fn try_next_step<S, F, O>(
stream: S,
mut step_fn: F,
) -> impl Stream<Item = Result<O, S::Error>>
where
S: TryStream + FusedStream,
F: FnMut(S::Ok) -> FOut,
FOut: Future<Output = Result<O, S::Error>>,
F: FnMut<(S::Ok,)>,
F::Output: Future<Output = Result<O, S::Error>>,
// This is always true, not sure why I need this bound
S: Stream<Item = Result<S::Ok, S::Error>>,
{

View File

@ -15,15 +15,15 @@ use futures::Future;
use pin_project::pin_project;
#[pin_project]
pub struct NextStep<S, F, O, FOut>
pub struct NextStep<S, F>
where
S: FusedStream,
F: FnMut(S::Item) -> FOut,
FOut: Future<Output = O>,
F: FnMut<(S::Item,)>,
F::Output: Future,
{
f: F,
next_in_line: Option<S::Item>,
running: Option<Pin<Box<FOut>>>,
running: Option<Pin<Box<F::Output>>>,
#[pin]
inner: S,
}
@ -31,11 +31,11 @@ where
/// Like stream::Then, it applies a future to the output. The difference is
/// that it continues waiting for the next item in the stream WHILE the current
/// one is being processed.
impl<S, F, O, FOut> NextStep<S, F, O, FOut>
impl<S, F> NextStep<S, F>
where
S: FusedStream,
F: FnMut(S::Item) -> FOut,
FOut: Future<Output = O>,
F: FnMut<(S::Item,)>,
F::Output: Future,
{
pub fn new(inner: S, f: F) -> Self {
Self {
@ -47,13 +47,13 @@ where
}
}
impl<S, F, O, FOut> Stream for NextStep<S, F, O, FOut>
impl<S, F> Stream for NextStep<S, F>
where
S: FusedStream,
F: FnMut(S::Item) -> FOut,
FOut: Future<Output = O>,
F: FnMut<(S::Item,)>,
F::Output: Future,
{
type Item = O;
type Item = <F::Output as Future>::Output;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
@ -82,11 +82,11 @@ where
}
}
impl<S, F, O, FOut> FusedStream for NextStep<S, F, O, FOut>
impl<S, F> FusedStream for NextStep<S, F>
where
S: FusedStream,
F: FnMut(S::Item) -> FOut,
FOut: Future<Output = O>,
F: FnMut<(S::Item,)>,
F::Output: Future,
{
fn is_terminated(&self) -> bool {
self.running.is_none() && self.next_in_line.is_none() && self.inner.is_terminated()