Adding in Futures 0.3 support

This commit is contained in:
Pauan 2019-05-02 13:48:44 +02:00
parent 578d59ebc0
commit 69d7dc24b1
4 changed files with 563 additions and 294 deletions

View File

@ -14,6 +14,12 @@ edition = "2018"
futures = "0.1.20"
js-sys = { path = "../js-sys", version = '0.3.20' }
wasm-bindgen = { path = "../..", version = '0.2.43' }
futures-util-preview = { version = "0.3.0-alpha.15", optional = true }
futures-channel-preview = { version = "0.3.0-alpha.15", optional = true }
lazy_static = { version = "1.3.0", optional = true }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = { path = '../test', version = '0.2.43' }
[features]
nightly = ["futures-util-preview", "futures-channel-preview", "lazy_static"]

View File

@ -103,300 +103,14 @@
#![deny(missing_docs)]
use std::cell::{Cell, RefCell};
use std::fmt;
use std::rc::Rc;
use std::sync::Arc;
#[cfg(feature = "nightly")]
mod nightly;
use futures::executor::{self, Notify, Spawn};
use futures::future;
use futures::prelude::*;
use futures::sync::oneshot;
use js_sys::{Function, Promise};
use wasm_bindgen::prelude::*;
#[cfg(feature = "nightly")]
pub use nightly::*;
/// A Rust `Future` backed by a JavaScript `Promise`.
///
/// This type is constructed with a JavaScript `Promise` object and translates
/// it to a Rust `Future`. This type implements the `Future` trait from the
/// `futures` crate and will either succeed or fail depending on what happens
/// with the JavaScript `Promise`.
///
/// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>,
rejected: oneshot::Receiver<JsValue>,
callbacks: Option<(Closure<FnMut(JsValue)>, Closure<FnMut(JsValue)>)>,
}
#[cfg(not(feature = "nightly"))]
mod stable;
impl fmt::Debug for JsFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JsFuture {{ ... }}")
}
}
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. These two callbacks
// will be connected to oneshot channels which feed back into our
// future.
//
// This may not be the speediest option today but it should work!
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut tx1 = Some(tx1);
let resolve = Closure::wrap(Box::new(move |val| {
drop(tx1.take().unwrap().send(val));
}) as Box<FnMut(_)>);
let mut tx2 = Some(tx2);
let reject = Closure::wrap(Box::new(move |val| {
drop(tx2.take().unwrap().send(val));
}) as Box<FnMut(_)>);
js.then2(&resolve, &reject);
JsFuture {
resolved: rx1,
rejected: rx2,
callbacks: Some((resolve, reject)),
}
}
}
impl Future for JsFuture {
type Item = JsValue;
type Error = JsValue;
fn poll(&mut self) -> Poll<JsValue, JsValue> {
// Test if either our resolved or rejected side is finished yet. Note
// that they will return errors if they're disconnected which can't
// happen until we drop the `callbacks` field, which doesn't happen
// till we're done, so we dont need to handle that.
if let Ok(Async::Ready(val)) = self.resolved.poll() {
drop(self.callbacks.take());
return Ok(val.into());
}
if let Ok(Async::Ready(val)) = self.rejected.poll() {
drop(self.callbacks.take());
return Err(val);
}
Ok(Async::NotReady)
}
}
/// Converts a Rust `Future` into a JavaScript `Promise`.
///
/// This function will take any future in Rust and schedule it to be executed,
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
/// to get plumbed into the rest of a system.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references. The
/// returned `Promise` will be resolved or rejected when the future completes,
/// depending on whether it finishes with `Ok` or `Err`.
///
/// # Panics
///
/// Note that in wasm panics are currently translated to aborts, but "abort" in
/// this case means that a JavaScript exception is thrown. The wasm module is
/// still usable (likely erroneously) after Rust panics.
///
/// If the `future` provided panics then the returned `Promise` **will not
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
/// limitation of wasm currently that's hoped to be fixed one day!
pub fn future_to_promise<F>(future: F) -> Promise
where
F: Future<Item = JsValue, Error = JsValue> + 'static,
{
_future_to_promise(Box::new(future))
}
// Implementation of actually transforming a future into a JavaScript `Promise`.
//
// The only primitive we have to work with here is `Promise::new`, which gives
// us two callbacks that we can use to either reject or resolve the promise.
// It's our job to ensure that one of those callbacks is called at the
// appropriate time.
//
// Now we know that JavaScript (in general) can't block and is largely
// notification/callback driven. That means that our future must either have
// synchronous computational work to do, or it's "scheduled a notification" to
// happen. These notifications are likely callbacks to get executed when things
// finish (like a different promise or something like `setTimeout`). The general
// idea here is thus to do as much synchronous work as we can and then otherwise
// translate notifications of a future's task into "let's poll the future!"
//
// This isn't necessarily the greatest future executor in the world, but it
// should get the job done for now hopefully.
fn _future_to_promise(future: Box<Future<Item = JsValue, Error = JsValue>>) -> Promise {
let mut future = Some(executor::spawn(future));
return Promise::new(&mut |resolve, reject| {
Package::poll(&Arc::new(Package {
spawn: RefCell::new(future.take().unwrap()),
resolve,
reject,
notified: Cell::new(State::Notified),
}));
});
struct Package {
// Our "spawned future". This'll have everything we need to poll the
// future and continue to move it forward.
spawn: RefCell<Spawn<Box<Future<Item = JsValue, Error = JsValue>>>>,
// The current state of this future, expressed in an enum below. This
// indicates whether we're currently polling the future, received a
// notification and need to keep polling, or if we're waiting for a
// notification to come in (and no one is polling).
notified: Cell<State>,
// Our two callbacks connected to the `Promise` that we returned to
// JavaScript. We'll be invoking one of these at the end.
resolve: Function,
reject: Function,
}
// The possible states our `Package` (future) can be in, tracked internally
// and used to guide what happens when polling a future.
enum State {
// This future is currently and actively being polled. Attempting to
// access the future will result in a runtime panic and is considered a
// bug.
Polling,
// This future has been notified, while it was being polled. This marker
// is used in the `Notify` implementation below, and indicates that a
// notification was received that the future is ready to make progress.
// If seen, however, it probably means that the future is also currently
// being polled.
Notified,
// The future is blocked, waiting for something to happen. Stored here
// is a self-reference to the future itself so we can pull it out in
// `Notify` and continue polling.
//
// Note that the self-reference here is an Arc-cycle that will leak
// memory unless the future completes, but currently that should be ok
// as we'll have to stick around anyway while the future is executing!
//
// This state is removed as soon as a notification comes in, so the leak
// should only be "temporary"
Waiting(Arc<Package>),
}
// No shared memory right now, wasm is single threaded, no need to worry
// about this!
unsafe impl Send for Package {}
unsafe impl Sync for Package {}
impl Package {
// Move the future contained in `me` as far forward as we can. This will
// do as much synchronous work as possible to complete the future,
// ensuring that when it blocks we're scheduled to get notified via some
// callback somewhere at some point (vague, right?)
//
// TODO: this probably shouldn't do as much synchronous work as possible
// as it can starve other computations. Rather it should instead
// yield every so often with something like `setTimeout` with the
// timeout set to zero.
fn poll(me: &Arc<Package>) {
loop {
match me.notified.replace(State::Polling) {
// We received a notification while previously polling, or
// this is the initial poll. We've got work to do below!
State::Notified => {}
// We've gone through this loop once and no notification was
// received while we were executing work. That means we got
// `NotReady` below and we're scheduled to receive a
// notification. Block ourselves and wait for later.
//
// When the notification comes in it'll notify our task, see
// our `Waiting` state, and resume the polling process
State::Polling => {
me.notified.set(State::Waiting(me.clone()));
break;
}
State::Waiting(_) => panic!("shouldn't see waiting state!"),
}
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
// If the future is ready, immediately call the
// resolve/reject callback and then return as we're done.
Ok(Async::Ready(value)) => (value, &me.resolve),
Err(value) => (value, &me.reject),
// Otherwise keep going in our loop, if we weren't notified
// we'll break out and start waiting.
Ok(Async::NotReady) => continue,
};
drop(f.call1(&JsValue::undefined(), &val));
break;
}
}
}
impl Notify for Package {
fn notify(&self, _id: usize) {
let me = match self.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
// access to the future as it's being polled, so the future
// polling process later sees this notification and will
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
};
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident.
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = Promise::resolve(&JsValue::undefined());
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
}
}
/// Converts a Rust `Future` on a local task queue.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references.
///
/// # Panics
///
/// This function has the same panic behavior as `future_to_promise`.
pub fn spawn_local<F>(future: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
future_to_promise(
future
.map(|()| JsValue::undefined())
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
);
}
#[cfg(not(feature = "nightly"))]
pub use stable::*;

View File

@ -0,0 +1,252 @@
use std::fmt;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::future::Future;
use std::task::{Poll, Context};
use std::collections::VecDeque;
use futures_util::task::ArcWake;
use futures_util::future::FutureExt;
use futures_channel::oneshot;
use lazy_static::lazy_static;
use js_sys::Promise;
use wasm_bindgen::prelude::*;
/// A Rust `Future` backed by a JavaScript `Promise`.
///
/// This type is constructed with a JavaScript `Promise` object and translates
/// it to a Rust `Future`. This type implements the `Future` trait from the
/// `futures` crate and will either succeed or fail depending on what happens
/// with the JavaScript `Promise`.
///
/// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>,
rejected: oneshot::Receiver<JsValue>,
_cb_resolve: Closure<FnMut(JsValue)>,
_cb_reject: Closure<FnMut(JsValue)>,
}
impl fmt::Debug for JsFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JsFuture {{ ... }}")
}
}
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. These two callbacks
// will be connected to oneshot channels which feed back into our
// future.
//
// This may not be the speediest option today but it should work!
let (tx1, rx1) = oneshot::channel();
let cb_resolve = Closure::once(move |val| {
tx1.send(val).unwrap_throw();
});
let (tx2, rx2) = oneshot::channel();
let cb_reject = Closure::once(move |val| {
tx2.send(val).unwrap_throw();
});
js.then2(&cb_resolve, &cb_reject);
JsFuture {
resolved: rx1,
rejected: rx2,
_cb_resolve: cb_resolve,
_cb_reject: cb_reject,
}
}
}
impl Future for JsFuture {
type Output = Result<JsValue, JsValue>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Test if either our resolved or rejected side is finished yet.
if let Poll::Ready(val) = self.resolved.poll_unpin(cx) {
return Poll::Ready(Ok(val.unwrap_throw()));
}
if let Poll::Ready(val) = self.rejected.poll_unpin(cx) {
return Poll::Ready(Err(val.unwrap_throw()));
}
Poll::Pending
}
}
/// Converts a Rust `Future` into a JavaScript `Promise`.
///
/// This function will take any future in Rust and schedule it to be executed,
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
/// to get plumbed into the rest of a system.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references. The
/// returned `Promise` will be resolved or rejected when the future completes,
/// depending on whether it finishes with `Ok` or `Err`.
///
/// # Panics
///
/// Note that in wasm panics are currently translated to aborts, but "abort" in
/// this case means that a JavaScript exception is thrown. The wasm module is
/// still usable (likely erroneously) after Rust panics.
///
/// If the `future` provided panics then the returned `Promise` **will not
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
/// limitation of wasm currently that's hoped to be fixed one day!
pub fn future_to_promise<F>(future: F) -> Promise
where
F: Future<Output = Result<JsValue, JsValue>> + 'static,
{
let mut future = Some(future);
Promise::new(&mut |resolve, reject| {
// TODO change Promise::new to be FnOnce
spawn_local(future.take().unwrap_throw().map(move |val| {
match val {
Ok(val) => {
resolve.call1(&JsValue::undefined(), &val).unwrap_throw();
},
Err(val) => {
reject.call1(&JsValue::undefined(), &val).unwrap_throw();
},
}
}));
})
}
/// Runs a Rust `Future` on a local task queue.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references.
///
/// # Panics
///
/// This function has the same panic behavior as `future_to_promise`.
pub fn spawn_local<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
struct Task {
future: Mutex<Option<Pin<Box<dyn Future<Output = ()> + 'static>>>>,
is_queued: AtomicBool,
}
impl Task {
#[inline]
fn new<F>(future: F) -> Arc<Self> where F: Future<Output = ()> + 'static {
Arc::new(Self {
future: Mutex::new(Some(Box::pin(future))),
is_queued: AtomicBool::new(false),
})
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// TODO can this be more relaxed ?
if !arc_self.is_queued.swap(true, Ordering::SeqCst) {
let mut lock = EXECUTOR.tasks.lock().unwrap_throw();
lock.push_back(arc_self.clone());
EXECUTOR.next_tick.schedule();
}
}
}
struct NextTick {
is_spinning: AtomicBool,
promise: Promise,
closure: Closure<dyn FnMut(JsValue)>,
}
impl NextTick {
fn new<F>(mut f: F) -> Self where F: FnMut() + 'static {
Self {
is_spinning: AtomicBool::new(false),
promise: Promise::resolve(&JsValue::null()),
closure: Closure::wrap(Box::new(move |_| {
f();
})),
}
}
fn schedule(&self) {
// TODO can this be more relaxed ?
if !self.is_spinning.swap(true, Ordering::SeqCst) {
// TODO avoid creating a new Promise
self.promise.then(&self.closure);
}
}
fn done(&self) {
// TODO can this be more relaxed ?
self.is_spinning.store(false, Ordering::SeqCst);
}
}
struct Executor {
tasks: Mutex<VecDeque<Arc<Task>>>,
next_tick: NextTick,
}
// This is only safe because JS is currently single-threaded
unsafe impl Send for Executor {}
unsafe impl Sync for Executor {}
lazy_static! {
static ref EXECUTOR: Executor = Executor {
tasks: Mutex::new(VecDeque::new()),
next_tick: NextTick::new(|| {
let tasks = &EXECUTOR.tasks;
loop {
let mut lock = tasks.lock().unwrap_throw();
match lock.pop_front() {
Some(task) => {
// This is necessary because the polled task might queue more tasks
drop(lock);
let mut future = task.future.lock().unwrap_throw();
let poll = future.as_mut().map(|mut future| {
// Clear `is_queued` flag so that it will re-queue if poll calls waker.wake()
task.is_queued.store(false, Ordering::SeqCst);
// TODO is there some way of saving these so they don't need to be recreated all the time ?
let waker = ArcWake::into_waker(task.clone());
let cx = &mut Context::from_waker(&waker);
Pin::new(&mut future).poll(cx)
});
if let Some(Poll::Ready(_)) = poll {
*future = None;
}
},
None => {
EXECUTOR.next_tick.done();
break;
},
}
}
}),
};
}
ArcWake::wake_by_ref(&Task::new(future));
}

View File

@ -0,0 +1,297 @@
use std::cell::{Cell, RefCell};
use std::fmt;
use std::rc::Rc;
use std::sync::Arc;
use futures::executor::{self, Notify, Spawn};
use futures::future;
use futures::prelude::*;
use futures::sync::oneshot;
use js_sys::{Function, Promise};
use wasm_bindgen::prelude::*;
/// A Rust `Future` backed by a JavaScript `Promise`.
///
/// This type is constructed with a JavaScript `Promise` object and translates
/// it to a Rust `Future`. This type implements the `Future` trait from the
/// `futures` crate and will either succeed or fail depending on what happens
/// with the JavaScript `Promise`.
///
/// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture {
resolved: oneshot::Receiver<JsValue>,
rejected: oneshot::Receiver<JsValue>,
callbacks: Option<(Closure<FnMut(JsValue)>, Closure<FnMut(JsValue)>)>,
}
impl fmt::Debug for JsFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JsFuture {{ ... }}")
}
}
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. These two callbacks
// will be connected to oneshot channels which feed back into our
// future.
//
// This may not be the speediest option today but it should work!
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut tx1 = Some(tx1);
let resolve = Closure::wrap(Box::new(move |val| {
drop(tx1.take().unwrap().send(val));
}) as Box<FnMut(_)>);
let mut tx2 = Some(tx2);
let reject = Closure::wrap(Box::new(move |val| {
drop(tx2.take().unwrap().send(val));
}) as Box<FnMut(_)>);
js.then2(&resolve, &reject);
JsFuture {
resolved: rx1,
rejected: rx2,
callbacks: Some((resolve, reject)),
}
}
}
impl Future for JsFuture {
type Item = JsValue;
type Error = JsValue;
fn poll(&mut self) -> Poll<JsValue, JsValue> {
// Test if either our resolved or rejected side is finished yet. Note
// that they will return errors if they're disconnected which can't
// happen until we drop the `callbacks` field, which doesn't happen
// till we're done, so we dont need to handle that.
if let Ok(Async::Ready(val)) = self.resolved.poll() {
drop(self.callbacks.take());
return Ok(val.into());
}
if let Ok(Async::Ready(val)) = self.rejected.poll() {
drop(self.callbacks.take());
return Err(val);
}
Ok(Async::NotReady)
}
}
/// Converts a Rust `Future` into a JavaScript `Promise`.
///
/// This function will take any future in Rust and schedule it to be executed,
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
/// to get plumbed into the rest of a system.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references. The
/// returned `Promise` will be resolved or rejected when the future completes,
/// depending on whether it finishes with `Ok` or `Err`.
///
/// # Panics
///
/// Note that in wasm panics are currently translated to aborts, but "abort" in
/// this case means that a JavaScript exception is thrown. The wasm module is
/// still usable (likely erroneously) after Rust panics.
///
/// If the `future` provided panics then the returned `Promise` **will not
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
/// limitation of wasm currently that's hoped to be fixed one day!
pub fn future_to_promise<F>(future: F) -> Promise
where
F: Future<Item = JsValue, Error = JsValue> + 'static,
{
_future_to_promise(Box::new(future))
}
// Implementation of actually transforming a future into a JavaScript `Promise`.
//
// The only primitive we have to work with here is `Promise::new`, which gives
// us two callbacks that we can use to either reject or resolve the promise.
// It's our job to ensure that one of those callbacks is called at the
// appropriate time.
//
// Now we know that JavaScript (in general) can't block and is largely
// notification/callback driven. That means that our future must either have
// synchronous computational work to do, or it's "scheduled a notification" to
// happen. These notifications are likely callbacks to get executed when things
// finish (like a different promise or something like `setTimeout`). The general
// idea here is thus to do as much synchronous work as we can and then otherwise
// translate notifications of a future's task into "let's poll the future!"
//
// This isn't necessarily the greatest future executor in the world, but it
// should get the job done for now hopefully.
fn _future_to_promise(future: Box<Future<Item = JsValue, Error = JsValue>>) -> Promise {
let mut future = Some(executor::spawn(future));
return Promise::new(&mut |resolve, reject| {
Package::poll(&Arc::new(Package {
spawn: RefCell::new(future.take().unwrap()),
resolve,
reject,
notified: Cell::new(State::Notified),
}));
});
struct Package {
// Our "spawned future". This'll have everything we need to poll the
// future and continue to move it forward.
spawn: RefCell<Spawn<Box<Future<Item = JsValue, Error = JsValue>>>>,
// The current state of this future, expressed in an enum below. This
// indicates whether we're currently polling the future, received a
// notification and need to keep polling, or if we're waiting for a
// notification to come in (and no one is polling).
notified: Cell<State>,
// Our two callbacks connected to the `Promise` that we returned to
// JavaScript. We'll be invoking one of these at the end.
resolve: Function,
reject: Function,
}
// The possible states our `Package` (future) can be in, tracked internally
// and used to guide what happens when polling a future.
enum State {
// This future is currently and actively being polled. Attempting to
// access the future will result in a runtime panic and is considered a
// bug.
Polling,
// This future has been notified, while it was being polled. This marker
// is used in the `Notify` implementation below, and indicates that a
// notification was received that the future is ready to make progress.
// If seen, however, it probably means that the future is also currently
// being polled.
Notified,
// The future is blocked, waiting for something to happen. Stored here
// is a self-reference to the future itself so we can pull it out in
// `Notify` and continue polling.
//
// Note that the self-reference here is an Arc-cycle that will leak
// memory unless the future completes, but currently that should be ok
// as we'll have to stick around anyway while the future is executing!
//
// This state is removed as soon as a notification comes in, so the leak
// should only be "temporary"
Waiting(Arc<Package>),
}
// No shared memory right now, wasm is single threaded, no need to worry
// about this!
unsafe impl Send for Package {}
unsafe impl Sync for Package {}
impl Package {
// Move the future contained in `me` as far forward as we can. This will
// do as much synchronous work as possible to complete the future,
// ensuring that when it blocks we're scheduled to get notified via some
// callback somewhere at some point (vague, right?)
//
// TODO: this probably shouldn't do as much synchronous work as possible
// as it can starve other computations. Rather it should instead
// yield every so often with something like `setTimeout` with the
// timeout set to zero.
fn poll(me: &Arc<Package>) {
loop {
match me.notified.replace(State::Polling) {
// We received a notification while previously polling, or
// this is the initial poll. We've got work to do below!
State::Notified => {}
// We've gone through this loop once and no notification was
// received while we were executing work. That means we got
// `NotReady` below and we're scheduled to receive a
// notification. Block ourselves and wait for later.
//
// When the notification comes in it'll notify our task, see
// our `Waiting` state, and resume the polling process
State::Polling => {
me.notified.set(State::Waiting(me.clone()));
break;
}
State::Waiting(_) => panic!("shouldn't see waiting state!"),
}
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
// If the future is ready, immediately call the
// resolve/reject callback and then return as we're done.
Ok(Async::Ready(value)) => (value, &me.resolve),
Err(value) => (value, &me.reject),
// Otherwise keep going in our loop, if we weren't notified
// we'll break out and start waiting.
Ok(Async::NotReady) => continue,
};
drop(f.call1(&JsValue::undefined(), &val));
break;
}
}
}
impl Notify for Package {
fn notify(&self, _id: usize) {
let me = match self.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
// access to the future as it's being polled, so the future
// polling process later sees this notification and will
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
};
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident.
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = Promise::resolve(&JsValue::undefined());
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
}
}
/// Converts a Rust `Future` on a local task queue.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references.
///
/// # Panics
///
/// This function has the same panic behavior as `future_to_promise`.
pub fn spawn_local<F>(future: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
future_to_promise(
future
.map(|()| JsValue::undefined())
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
);
}