bounded_traversal: use standard futures types instead of custom Job

Summary:
Replace `common::Job` by using `futures::Join` and `futures::Ready`.

We still need a heterogeneous variant of `Either`, where the output types of the
two futures differ, so extract this from `Job` as `common::Either2`, which
returns `either::Either<LeftFuture::Out, RightFuture::Out>`.

Reviewed By: ahornby

Differential Revision: D25867668

fbshipit-source-id: 13c90b212c64ca5eae67217a1cecd9aee5e40a38
This commit is contained in:
Mark Juggurnauth-Thomas 2021-01-29 03:12:52 -08:00 committed by Facebook GitHub Bot
parent a3243f4b5d
commit 1cd098181c
4 changed files with 73 additions and 73 deletions

View File

@ -7,6 +7,7 @@ license = "GPLv2+"
include = ["src/**/*.rs"]
[dependencies]
either = "1.5"
futures = { version = "0.3.5", features = ["async-await", "compat"] }
[dev-dependencies]

View File

@ -5,12 +5,12 @@
* GNU General Public License version 2.
*/
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use either::Either;
use futures::ready;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
#[derive(Clone, Copy)]
pub(crate) struct NodeLocation<Index> {
@ -18,39 +18,29 @@ pub(crate) struct NodeLocation<Index> {
pub child_index: usize, // index inside parents children list
}
// This is essentially just a `.map` over futures `{FFut|UFut}`, this only exisists
// so it would be possible to name `FuturesUnoredered` type parameter.
/// Equivalent of `futures::future::Either` but with heterogeneous output
/// types using `either::Either`.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub(crate) enum Job<In, UFut, FFut> {
Unfold { value: In, future: UFut },
Fold { value: In, future: FFut },
pub(crate) enum Either2<A, B> {
Left(A),
Right(B),
}
pub(crate) enum JobResult<In, UFutResult, FFutResult> {
Unfold { value: In, result: UFutResult },
Fold { value: In, result: FFutResult },
}
impl<In, UFut, FFut> Future for Job<In, UFut, FFut>
impl<A, B> Future for Either2<A, B>
where
In: Clone,
UFut: Future,
FFut: Future,
A: Future,
B: Future,
{
type Output = JobResult<In, UFut::Output, FFut::Output>;
type Output = Either<A::Output, B::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// see `impl<A, B> Future for Either<A, B>`
// see `impl<Left, Right> Future for Either<Left, Right>`
unsafe {
let result = match self.get_unchecked_mut() {
Job::Fold { value, future } => JobResult::Fold {
value: value.clone(),
result: ready!(Pin::new_unchecked(future).poll(cx)),
},
Job::Unfold { value, future } => JobResult::Unfold {
value: value.clone(),
result: ready!(Pin::new_unchecked(future).poll(cx)),
},
Either2::Left(future) => Either::Left(ready!(Pin::new_unchecked(future).poll(cx))),
Either2::Right(future) => {
Either::Right(ready!(Pin::new_unchecked(future).poll(cx)))
}
};
Poll::Ready(result)
}

View File

@ -5,11 +5,13 @@
* GNU General Public License version 2.
*/
use super::{
common::{Job, JobResult, NodeLocation},
Iter,
};
use futures::{ready, stream::FuturesUnordered, StreamExt};
use super::common::{Either2, NodeLocation};
use super::Iter;
use either::Either;
use futures::future::{join, ready, Join, Ready};
use futures::ready;
use futures::stream::{FuturesUnordered, StreamExt};
use std::{
collections::{HashMap, VecDeque},
future::Future,
@ -85,13 +87,17 @@ enum Node<In, Out, OutCtx> {
}
#[must_use = "futures do nothing unless polled"]
struct BoundedTraversalDAG<In, Out, OutCtx, Unfold, UFut, Fold, FFut> {
struct BoundedTraversalDAG<In, Out, OutCtx, Unfold, UFut, Fold, FFut>
where
UFut: Future,
FFut: Future,
{
init: In,
unfold: Unfold,
fold: Fold,
scheduled_max: usize,
scheduled: FuturesUnordered<Job<In, UFut, FFut>>, // jobs being executed
unscheduled: VecDeque<Job<In, UFut, FFut>>, // as of yet unscheduled jobs
scheduled: FuturesUnordered<Join<Ready<In>, Either2<UFut, FFut>>>, // jobs being executed
unscheduled: VecDeque<Join<Ready<In>, Either2<UFut, FFut>>>, // as of yet unscheduled jobs
execution_tree: HashMap<In, Node<In, Out, OutCtx>>, // tree tracking execution process
}
@ -139,10 +145,10 @@ where
children: None,
},
);
self.unscheduled.push_front(Job::Unfold {
value: value.clone(),
future: (self.unfold)(value),
});
self.unscheduled.push_front(join(
ready(value.clone()),
Either2::Left((self.unfold)(value)),
));
None
}
Some(Node::Pending { parents, .. }) => {
@ -156,10 +162,10 @@ where
}
fn enqueue_fold(&mut self, value: In, context: OutCtx, children: Iter<Out>) {
self.unscheduled.push_front(Job::Fold {
value,
future: (self.fold)(context, children),
});
self.unscheduled.push_front(join(
ready(value),
Either2::Right((self.fold)(context, children)),
));
}
fn process_unfold(&mut self, value: In, (context, children): (OutCtx, Ins)) {
@ -290,8 +296,8 @@ where
// execute scheduled until it is blocked or done
if let Some(job_result) = ready!(this.scheduled.poll_next_unpin(cx)) {
match job_result {
JobResult::Unfold { value, result } => this.process_unfold(value, result?),
JobResult::Fold { value, result } => {
(value, Either::Left(result)) => this.process_unfold(value, result?),
(value, Either::Right(result)) => {
// we have computed value associated with `init` node
if value == this.init {
// all jobs have to be completed and execution_tree empty

View File

@ -5,17 +5,17 @@
* GNU General Public License version 2.
*/
use super::{
common::{Job, JobResult},
Iter,
};
use futures::{ready, stream::FuturesUnordered, StreamExt};
use std::{
collections::{HashMap, VecDeque},
future::Future,
pin::Pin,
task::{Context, Poll},
};
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use either::Either;
use futures::future::{join, ready, Join, Ready};
use futures::ready;
use futures::stream::{FuturesUnordered, StreamExt};
use super::{common::Either2, Iter};
/// `bounded_traversal` traverses implicit asynchronous tree specified by `init`
/// and `unfold` arguments, and it also does backward pass with `fold` operation.
@ -67,14 +67,18 @@ struct NodeIndex(usize);
type NodeLocation = super::common::NodeLocation<NodeIndex>;
#[must_use = "futures do nothing unless polled"]
struct BoundedTraversal<Out, OutCtx, Unfold, UFut, Fold, FFut> {
struct BoundedTraversal<Out, OutCtx, Unfold, UFut, Fold, FFut>
where
UFut: Future,
FFut: Future,
{
unfold: Unfold,
fold: Fold,
scheduled_max: usize,
scheduled: FuturesUnordered<Job<NodeLocation, UFut, FFut>>, // jobs being executed
unscheduled: VecDeque<Job<NodeLocation, UFut, FFut>>, // as of yet unscheduled jobs
execution_tree: HashMap<NodeIndex, Node<Out, OutCtx>>, // tree tracking execution process
execution_tree_index: NodeIndex, // last allocated node index
scheduled: FuturesUnordered<Join<Ready<NodeLocation>, Either2<UFut, FFut>>>, // jobs being executed
unscheduled: VecDeque<Join<Ready<NodeLocation>, Either2<UFut, FFut>>>, // as of yet unscheduled jobs
execution_tree: HashMap<NodeIndex, Node<Out, OutCtx>>, // tree tracking execution process
execution_tree_index: NodeIndex, // last allocated node index
}
impl<Err, In, Ins, Out, OutCtx, Unfold, UFut, Fold, FFut>
@ -107,17 +111,16 @@ where
}
fn enqueue_unfold(&mut self, parent: NodeLocation, value: In) {
self.unscheduled.push_front(Job::Unfold {
value: parent,
future: (self.unfold)(value),
});
let fut = join(ready(parent), Either2::Left((self.unfold)(value)));
self.unscheduled.push_front(fut);
}
fn enqueue_fold(&mut self, parent: NodeLocation, context: OutCtx, children: Iter<Out>) {
self.unscheduled.push_front(Job::Fold {
value: parent,
future: (self.fold)(context, children),
});
let fut = join(
ready(parent),
Either2::Right((self.fold)(context, children)),
);
self.unscheduled.push_front(fut);
}
fn process_unfold(&mut self, parent: NodeLocation, (context, children): (OutCtx, Ins)) {
@ -209,8 +212,8 @@ where
// execute scheduled until it is blocked or done
if let Some(job_result) = ready!(this.scheduled.poll_next_unpin(cx)) {
match job_result {
JobResult::Unfold { value, result } => this.process_unfold(value, result?),
JobResult::Fold { value, result } => {
(value, Either::Left(result)) => this.process_unfold(value, result?),
(value, Either::Right(result)) => {
// `0` is special index which means whole tree have been executed
if value.node_index == NodeIndex(0) {
// all jobs have to be completed and execution_tree empty