moved lib.rs to stable.rs in wasm-bindgen-futures, updated during review

This commit is contained in:
ibaryshnikov 2019-06-27 00:06:43 +03:00
parent 221dc732af
commit 6ab1a49a41
6 changed files with 350 additions and 385 deletions

View File

@ -11,6 +11,7 @@ version = "0.3.25"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
cfg-if = "0.1.9"
futures = "0.1.20" futures = "0.1.20"
js-sys = { path = "../js-sys", version = '0.3.25' } js-sys = { path = "../js-sys", version = '0.3.25' }
wasm-bindgen = { path = "../..", version = '0.2.48' } wasm-bindgen = { path = "../..", version = '0.2.48' }
@ -20,7 +21,7 @@ lazy_static = { version = "1.3.0", optional = true }
[target.'cfg(target_feature = "atomics")'.dependencies.web-sys] [target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
path = "../web-sys" path = "../web-sys"
version = "0.3.23" version = "0.3.24"
features = [ features = [
"MessageEvent", "MessageEvent",
"Worker", "Worker",

View File

@ -10,16 +10,6 @@ use futures::sync::oneshot;
use js_sys::{Function, Promise}; use js_sys::{Function, Promise};
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
macro_rules! console_log {
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}
/// A Rust `Future` backed by a JavaScript `Promise`. /// A Rust `Future` backed by a JavaScript `Promise`.
/// ///
/// This type is constructed with a JavaScript `Promise` object and translates /// This type is constructed with a JavaScript `Promise` object and translates
@ -210,13 +200,11 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
impl Notify for Waker { impl Notify for Waker {
fn notify(&self, _id: usize) { fn notify(&self, _id: usize) {
console_log!("Waker notify");
if !self.notified.swap(true, Ordering::SeqCst) { if !self.notified.swap(true, Ordering::SeqCst) {
console_log!("Waker, inside if");
let _ = unsafe { let _ = unsafe {
core::arch::wasm32::atomic_notify( core::arch::wasm32::atomic_notify(
&self.value as *const AtomicI32 as *mut i32, &self.value as *const AtomicI32 as *mut i32,
0, std::u32::MAX, // number of threads to notify
) )
}; };
} }
@ -224,11 +212,9 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
} }
fn poll_again(package: Arc<Package>) { fn poll_again(package: Arc<Package>) {
console_log!("poll_again called");
let me = match package.notified.replace(State::Notified) { let me = match package.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going // we need to schedule polling to resume, so keep going
State::Waiting(me) => { State::Waiting(me) => {
console_log!("poll_again Waiting");
me me
} }
@ -236,7 +222,6 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
// having now coalesced the notifications we return as it's // having now coalesced the notifications we return as it's
// still someone else's job to process this // still someone else's job to process this
State::Notified => { State::Notified => {
console_log!("poll_again Notified");
return; return;
} }
@ -248,20 +233,17 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
// so we bail out. // so we bail out.
// later see // later see
State::Polling => { State::Polling => {
console_log!("poll_again Polling");
return; return;
} }
}; };
let value_location = &package.waker.value as *const AtomicI32 as u32 / 4;
// Use `Promise.then` on a resolved promise to place our execution // Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll // onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out // operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also // `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident. // helps avoid blowing the stack by accident.
let promise = let promise =
crate::polyfill::wait_async(value_location, 0).expect("Should create a Promise"); crate::polyfill::wait_async(&package.waker.value).expect("Should create a Promise");
let closure = Closure::once(Box::new(move |_| { let closure = Closure::once(Box::new(move |_| {
Package::poll(&me); Package::poll(&me);
}) as Box<dyn FnMut(JsValue)>); }) as Box<dyn FnMut(JsValue)>);
@ -284,9 +266,7 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
match me.notified.replace(State::Polling) { match me.notified.replace(State::Polling) {
// We received a notification while previously polling, or // We received a notification while previously polling, or
// this is the initial poll. We've got work to do below! // this is the initial poll. We've got work to do below!
State::Notified => { State::Notified => {}
console_log!("Package::poll Notified");
}
// We've gone through this loop once and no notification was // We've gone through this loop once and no notification was
// received while we were executing work. That means we got // received while we were executing work. That means we got
@ -296,8 +276,6 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
// When the notification comes in it'll notify our task, see // When the notification comes in it'll notify our task, see
// our `Waiting` state, and resume the polling process // our `Waiting` state, and resume the polling process
State::Polling => { State::Polling => {
console_log!("Package::poll Polling");
me.notified.set(State::Waiting(me.clone())); me.notified.set(State::Waiting(me.clone()));
poll_again(me.clone()); poll_again(me.clone());
@ -306,8 +284,6 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
} }
State::Waiting(_) => { State::Waiting(_) => {
console_log!("Package::poll Waiting");
panic!("shouldn't see waiting state!") panic!("shouldn't see waiting state!")
} }
} }

View File

@ -101,326 +101,28 @@
//! } //! }
//! ``` //! ```
#![feature(stdsimd)] #![cfg_attr(target_feature = "atomics", feature(stdsimd))]
#![deny(missing_docs)] #![deny(missing_docs)]
#[cfg(feature = "futures_0_3")] use cfg_if::cfg_if;
/// Contains a Futures 0.3 implementation of this crate.
pub mod futures_0_3;
#[cfg(target_feature = "atomics")] cfg_if! {
/// Contains a thread-safe version of this crate, with Futures 0.1 if #[cfg(target_feature = "atomics")] {
pub mod atomics; /// Contains a thread-safe version of this crate, with Futures 0.1
pub mod atomics;
#[cfg(target_feature = "atomics")] /// Polyfill for `Atomics.waitAsync` function
/// Polyfill for `Atomics.waitAsync` function mod polyfill;
mod polyfill;
use std::cell::{Cell, RefCell}; pub use atomics::*;
use std::fmt; } else if #[cfg(feature = "futures_0_3")] {
use std::rc::Rc; /// Contains a Futures 0.3 implementation of this crate.
use std::sync::Arc; pub mod futures_0_3;
use futures::executor::{self, Notify, Spawn}; pub mod stable;
use futures::future; pub use stable::*;
use futures::prelude::*; } else {
use futures::sync::oneshot; pub mod stable;
use js_sys::{Function, Promise}; pub use stable::*;
use wasm_bindgen::prelude::*; }
/// A Rust `Future` backed by a JavaScript `Promise`.
///
/// This type is constructed with a JavaScript `Promise` object and translates
/// it to a Rust `Future`. This type implements the `Future` trait from the
/// `futures` crate and will either succeed or fail depending on what happens
/// with the JavaScript `Promise`.
///
/// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture {
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
}
impl fmt::Debug for JsFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JsFuture {{ ... }}")
}
}
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. We're currently
// assuming that JS engines will unconditionally invoke precisely one of
// these callbacks, no matter what.
//
// Ideally we'd have a way to cancel the callbacks getting invoked and
// free up state ourselves when this `JsFuture` is dropped. We don't
// have that, though, and one of the callbacks is likely always going to
// be invoked.
//
// As a result we need to make sure that no matter when the callbacks
// are invoked they are valid to be called at any time, which means they
// have to be self-contained. Through the `Closure::once` and some
// `Rc`-trickery we can arrange for both instances of `Closure`, and the
// `Rc`, to all be destroyed once the first one is called.
let (tx, rx) = oneshot::channel();
let state = Rc::new(RefCell::new(None));
let state2 = state.clone();
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
let state2 = state.clone();
let reject = Closure::once(move |val| finish(&state2, Err(val)));
js.then2(&resolve, &reject);
*state.borrow_mut() = Some((tx, resolve, reject));
return JsFuture { rx };
fn finish(
state: &RefCell<
Option<(
oneshot::Sender<Result<JsValue, JsValue>>,
Closure<dyn FnMut(JsValue)>,
Closure<dyn FnMut(JsValue)>,
)>,
>,
val: Result<JsValue, JsValue>,
) {
match state.borrow_mut().take() {
// We don't have any guarantee that anyone's still listening at this
// point (the Rust `JsFuture` could have been dropped) so simply
// ignore any errors here.
Some((tx, _, _)) => drop(tx.send(val)),
None => wasm_bindgen::throw_str("cannot finish twice"),
}
}
}
}
impl Future for JsFuture {
type Item = JsValue;
type Error = JsValue;
fn poll(&mut self) -> Poll<JsValue, JsValue> {
match self.rx.poll() {
Ok(Async::Ready(val)) => val.map(Async::Ready),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => wasm_bindgen::throw_str("cannot cancel"),
}
}
}
/// Converts a Rust `Future` into a JavaScript `Promise`.
///
/// This function will take any future in Rust and schedule it to be executed,
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
/// to get plumbed into the rest of a system.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references. The
/// returned `Promise` will be resolved or rejected when the future completes,
/// depending on whether it finishes with `Ok` or `Err`.
///
/// # Panics
///
/// Note that in wasm panics are currently translated to aborts, but "abort" in
/// this case means that a JavaScript exception is thrown. The wasm module is
/// still usable (likely erroneously) after Rust panics.
///
/// If the `future` provided panics then the returned `Promise` **will not
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
/// limitation of wasm currently that's hoped to be fixed one day!
pub fn future_to_promise<F>(future: F) -> Promise
where
F: Future<Item = JsValue, Error = JsValue> + 'static,
{
_future_to_promise(Box::new(future))
}
// Implementation of actually transforming a future into a JavaScript `Promise`.
//
// The only primitive we have to work with here is `Promise::new`, which gives
// us two callbacks that we can use to either reject or resolve the promise.
// It's our job to ensure that one of those callbacks is called at the
// appropriate time.
//
// Now we know that JavaScript (in general) can't block and is largely
// notification/callback driven. That means that our future must either have
// synchronous computational work to do, or it's "scheduled a notification" to
// happen. These notifications are likely callbacks to get executed when things
// finish (like a different promise or something like `setTimeout`). The general
// idea here is thus to do as much synchronous work as we can and then otherwise
// translate notifications of a future's task into "let's poll the future!"
//
// This isn't necessarily the greatest future executor in the world, but it
// should get the job done for now hopefully.
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> Promise {
let mut future = Some(executor::spawn(future));
return Promise::new(&mut |resolve, reject| {
Package::poll(&Arc::new(Package {
spawn: RefCell::new(future.take().unwrap()),
resolve,
reject,
notified: Cell::new(State::Notified),
}));
});
struct Package {
// Our "spawned future". This'll have everything we need to poll the
// future and continue to move it forward.
spawn: RefCell<Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>>,
// The current state of this future, expressed in an enum below. This
// indicates whether we're currently polling the future, received a
// notification and need to keep polling, or if we're waiting for a
// notification to come in (and no one is polling).
notified: Cell<State>,
// Our two callbacks connected to the `Promise` that we returned to
// JavaScript. We'll be invoking one of these at the end.
resolve: Function,
reject: Function,
}
// The possible states our `Package` (future) can be in, tracked internally
// and used to guide what happens when polling a future.
enum State {
// This future is currently and actively being polled. Attempting to
// access the future will result in a runtime panic and is considered a
// bug.
Polling,
// This future has been notified, while it was being polled. This marker
// is used in the `Notify` implementation below, and indicates that a
// notification was received that the future is ready to make progress.
// If seen, however, it probably means that the future is also currently
// being polled.
Notified,
// The future is blocked, waiting for something to happen. Stored here
// is a self-reference to the future itself so we can pull it out in
// `Notify` and continue polling.
//
// Note that the self-reference here is an Arc-cycle that will leak
// memory unless the future completes, but currently that should be ok
// as we'll have to stick around anyway while the future is executing!
//
// This state is removed as soon as a notification comes in, so the leak
// should only be "temporary"
Waiting(Arc<Package>),
}
// No shared memory right now, wasm is single threaded, no need to worry
// about this!
unsafe impl Send for Package {}
unsafe impl Sync for Package {}
impl Package {
// Move the future contained in `me` as far forward as we can. This will
// do as much synchronous work as possible to complete the future,
// ensuring that when it blocks we're scheduled to get notified via some
// callback somewhere at some point (vague, right?)
//
// TODO: this probably shouldn't do as much synchronous work as possible
// as it can starve other computations. Rather it should instead
// yield every so often with something like `setTimeout` with the
// timeout set to zero.
fn poll(me: &Arc<Package>) {
loop {
match me.notified.replace(State::Polling) {
// We received a notification while previously polling, or
// this is the initial poll. We've got work to do below!
State::Notified => {}
// We've gone through this loop once and no notification was
// received while we were executing work. That means we got
// `NotReady` below and we're scheduled to receive a
// notification. Block ourselves and wait for later.
//
// When the notification comes in it'll notify our task, see
// our `Waiting` state, and resume the polling process
State::Polling => {
me.notified.set(State::Waiting(me.clone()));
break;
}
State::Waiting(_) => panic!("shouldn't see waiting state!"),
}
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
// If the future is ready, immediately call the
// resolve/reject callback and then return as we're done.
Ok(Async::Ready(value)) => (value, &me.resolve),
Err(value) => (value, &me.reject),
// Otherwise keep going in our loop, if we weren't notified
// we'll break out and start waiting.
Ok(Async::NotReady) => continue,
};
drop(f.call1(&JsValue::undefined(), &val));
break;
}
}
}
impl Notify for Package {
fn notify(&self, _id: usize) {
let me = match self.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
// access to the future as it's being polled, so the future
// polling process later sees this notification and will
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
};
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident.
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = Promise::resolve(&JsValue::undefined());
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<dyn FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
}
}
/// Converts a Rust `Future` on a local task queue.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references.
///
/// # Panics
///
/// This function has the same panic behavior as `future_to_promise`.
pub fn spawn_local<F>(future: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
future_to_promise(
future
.map(|()| JsValue::undefined())
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
);
} }

View File

@ -37,27 +37,17 @@
*/ */
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc; use std::sync::atomic::{AtomicI32, Ordering};
use js_sys::{ use js_sys::{
encode_uri_component, Array, Atomics, Error, Function, Int32Array, JsString, Promise, Reflect, encode_uri_component, Array, Function, Int32Array, JsString, Promise, Reflect,
SharedArrayBuffer, WebAssembly, WebAssembly,
}; };
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast; use wasm_bindgen::JsCast;
use web_sys::{MessageEvent, Worker}; use web_sys::{MessageEvent, Worker};
macro_rules! console_log { const DEFAULT_TIMEOUT: f64 = std::f64::INFINITY;
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}
const DEFAULT_TIMEOUT: f64 = 10.0;
const HELPER_CODE: &'static str = " const HELPER_CODE: &'static str = "
onmessage = function (ev) { onmessage = function (ev) {
@ -65,7 +55,6 @@ onmessage = function (ev) {
switch (ev.data[0]) { switch (ev.data[0]) {
case 'wait': { case 'wait': {
let [_, ia, index, value, timeout] = ev.data; let [_, ia, index, value, timeout] = ev.data;
console.log('wait event inside a worker');
let result = Atomics.wait(ia, index, value, timeout); let result = Atomics.wait(ia, index, value, timeout);
postMessage(['ok', result]); postMessage(['ok', result]);
break; break;
@ -75,17 +64,17 @@ onmessage = function (ev) {
} }
} }
} catch (e) { } catch (e) {
console.log('Exception in wait helper'); console.log('Exception in wait helper', e);
postMessage(['error', 'Exception']); postMessage(['error', 'Exception']);
} }
} }
"; ";
thread_local! { thread_local! {
static HELPERS: RefCell<Vec<Rc<Worker>>> = RefCell::new(vec![]); static HELPERS: RefCell<Vec<Worker>> = RefCell::new(vec![]);
} }
fn alloc_helper() -> Rc<Worker> { fn alloc_helper() -> Worker {
HELPERS.with(|helpers| { HELPERS.with(|helpers| {
if let Some(helper) = helpers.borrow_mut().pop() { if let Some(helper) = helpers.borrow_mut().pop() {
return helper; return helper;
@ -95,18 +84,18 @@ fn alloc_helper() -> Rc<Worker> {
let encoded: String = encode_uri_component(HELPER_CODE).into(); let encoded: String = encode_uri_component(HELPER_CODE).into();
initialization_string.push_str(&encoded); initialization_string.push_str(&encoded);
return Rc::new(Worker::new(&initialization_string).expect("Should create a Worker")); Worker::new(&initialization_string).expect("Should create a Worker")
}) })
} }
fn free_helper(helper: &Rc<Worker>) { fn free_helper(helper: Worker) {
HELPERS.with(move |helpers| { HELPERS.with(move |helpers| {
helpers.borrow_mut().push(helper.clone()); helpers.borrow_mut().push(helper.clone());
}); });
} }
pub fn wait_async(index: u32, value: i32) -> Result<Promise, JsValue> { pub fn wait_async(value: &AtomicI32) -> Result<Promise, JsValue> {
wait_async_with_timeout(index, value, DEFAULT_TIMEOUT) wait_async_with_timeout(value, DEFAULT_TIMEOUT)
} }
fn get_array_item(array: &JsValue, index: u32) -> JsValue { fn get_array_item(array: &JsValue, index: u32) -> JsValue {
@ -118,7 +107,7 @@ fn get_array_item(array: &JsValue, index: u32) -> JsValue {
// for parameter validation. The promise is resolved with a string as from // for parameter validation. The promise is resolved with a string as from
// Atomics.wait, or, in the case something went completely wrong, it is // Atomics.wait, or, in the case something went completely wrong, it is
// rejected with an error string. // rejected with an error string.
pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result<Promise, JsValue> { pub fn wait_async_with_timeout(value: &AtomicI32, timeout: f64) -> Result<Promise, JsValue> {
let memory_buffer = wasm_bindgen::memory() let memory_buffer = wasm_bindgen::memory()
.dyn_into::<WebAssembly::Memory>() .dyn_into::<WebAssembly::Memory>()
.expect("Should cast a memory to WebAssembly::Memory") .expect("Should cast a memory to WebAssembly::Memory")
@ -126,30 +115,20 @@ pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result<P
let indexed_array = Int32Array::new(&memory_buffer); let indexed_array = Int32Array::new(&memory_buffer);
if !indexed_array.buffer().has_type::<SharedArrayBuffer>() { let index = value as *const AtomicI32 as u32 / 4;
console_log!("polyfill, not a SharedArrayBuffer"); let value_i32 = value.load(Ordering::SeqCst);
return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into());
}
// Optimization, avoid the helper thread in this common case.
if Atomics::load(&indexed_array, index)? != value {
console_log!("polyfill, not-equal");
return Ok(Promise::resolve(&JsString::from("not-equal")));
}
// General case, we must wait. // General case, we must wait.
console_log!("polyfill, general case");
Ok(Promise::new( Ok(Promise::new(
&mut move |resolve: Function, reject: Function| { &mut move |resolve: Function, reject: Function| {
let helper = alloc_helper(); let helper = alloc_helper();
let helper_ref = helper.clone(); let helper_ref = helper.clone();
let onmessage_callback = Closure::once_into_js(Box::new(move |e: MessageEvent| { let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| {
// Free the helper early so that it can be reused if the resolution // Free the helper early so that it can be reused if the resolution
// needs a helper. // needs a helper.
free_helper(&helper_ref); free_helper(helper_ref);
match String::from( match String::from(
get_array_item(&e.data(), 0) get_array_item(&e.data(), 0)
.as_string() .as_string()
@ -172,8 +151,7 @@ pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result<P
// it's not specified in the proposal yet // it's not specified in the proposal yet
_ => (), _ => (),
} }
}) });
as Box<dyn FnMut(MessageEvent)>);
helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
// onmessage_callback.forget(); // onmessage_callback.forget();
@ -196,7 +174,7 @@ pub fn wait_async_with_timeout(index: u32, value: i32, timeout: f64) -> Result<P
&JsString::from("wait"), &JsString::from("wait"),
&indexed_array, &indexed_array,
&JsValue::from(index), &JsValue::from(index),
&JsValue::from(value), &JsValue::from(value_i32),
&JsValue::from(timeout), &JsValue::from(timeout),
); );

View File

@ -0,0 +1,308 @@
use std::cell::{Cell, RefCell};
use std::fmt;
use std::rc::Rc;
use std::sync::Arc;
use futures::executor::{self, Notify, Spawn};
use futures::future;
use futures::prelude::*;
use futures::sync::oneshot;
use js_sys::{Function, Promise};
use wasm_bindgen::prelude::*;
/// A Rust `Future` backed by a JavaScript `Promise`.
///
/// This type is constructed with a JavaScript `Promise` object and translates
/// it to a Rust `Future`. This type implements the `Future` trait from the
/// `futures` crate and will either succeed or fail depending on what happens
/// with the JavaScript `Promise`.
///
/// Currently this type is constructed with `JsFuture::from`.
pub struct JsFuture {
rx: oneshot::Receiver<Result<JsValue, JsValue>>,
}
impl fmt::Debug for JsFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "JsFuture {{ ... }}")
}
}
impl From<Promise> for JsFuture {
fn from(js: Promise) -> JsFuture {
// Use the `then` method to schedule two callbacks, one for the
// resolved value and one for the rejected value. We're currently
// assuming that JS engines will unconditionally invoke precisely one of
// these callbacks, no matter what.
//
// Ideally we'd have a way to cancel the callbacks getting invoked and
// free up state ourselves when this `JsFuture` is dropped. We don't
// have that, though, and one of the callbacks is likely always going to
// be invoked.
//
// As a result we need to make sure that no matter when the callbacks
// are invoked they are valid to be called at any time, which means they
// have to be self-contained. Through the `Closure::once` and some
// `Rc`-trickery we can arrange for both instances of `Closure`, and the
// `Rc`, to all be destroyed once the first one is called.
let (tx, rx) = oneshot::channel();
let state = Rc::new(RefCell::new(None));
let state2 = state.clone();
let resolve = Closure::once(move |val| finish(&state2, Ok(val)));
let state2 = state.clone();
let reject = Closure::once(move |val| finish(&state2, Err(val)));
js.then2(&resolve, &reject);
*state.borrow_mut() = Some((tx, resolve, reject));
return JsFuture { rx };
fn finish(
state: &RefCell<
Option<(
oneshot::Sender<Result<JsValue, JsValue>>,
Closure<dyn FnMut(JsValue)>,
Closure<dyn FnMut(JsValue)>,
)>,
>,
val: Result<JsValue, JsValue>,
) {
match state.borrow_mut().take() {
// We don't have any guarantee that anyone's still listening at this
// point (the Rust `JsFuture` could have been dropped) so simply
// ignore any errors here.
Some((tx, _, _)) => drop(tx.send(val)),
None => wasm_bindgen::throw_str("cannot finish twice"),
}
}
}
}
impl Future for JsFuture {
type Item = JsValue;
type Error = JsValue;
fn poll(&mut self) -> Poll<JsValue, JsValue> {
match self.rx.poll() {
Ok(Async::Ready(val)) => val.map(Async::Ready),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => wasm_bindgen::throw_str("cannot cancel"),
}
}
}
/// Converts a Rust `Future` into a JavaScript `Promise`.
///
/// This function will take any future in Rust and schedule it to be executed,
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
/// to get plumbed into the rest of a system.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references. The
/// returned `Promise` will be resolved or rejected when the future completes,
/// depending on whether it finishes with `Ok` or `Err`.
///
/// # Panics
///
/// Note that in wasm panics are currently translated to aborts, but "abort" in
/// this case means that a JavaScript exception is thrown. The wasm module is
/// still usable (likely erroneously) after Rust panics.
///
/// If the `future` provided panics then the returned `Promise` **will not
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
/// limitation of wasm currently that's hoped to be fixed one day!
pub fn future_to_promise<F>(future: F) -> Promise
where
F: Future<Item = JsValue, Error = JsValue> + 'static,
{
_future_to_promise(Box::new(future))
}
// Implementation of actually transforming a future into a JavaScript `Promise`.
//
// The only primitive we have to work with here is `Promise::new`, which gives
// us two callbacks that we can use to either reject or resolve the promise.
// It's our job to ensure that one of those callbacks is called at the
// appropriate time.
//
// Now we know that JavaScript (in general) can't block and is largely
// notification/callback driven. That means that our future must either have
// synchronous computational work to do, or it's "scheduled a notification" to
// happen. These notifications are likely callbacks to get executed when things
// finish (like a different promise or something like `setTimeout`). The general
// idea here is thus to do as much synchronous work as we can and then otherwise
// translate notifications of a future's task into "let's poll the future!"
//
// This isn't necessarily the greatest future executor in the world, but it
// should get the job done for now hopefully.
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> Promise {
let mut future = Some(executor::spawn(future));
return Promise::new(&mut |resolve, reject| {
Package::poll(&Arc::new(Package {
spawn: RefCell::new(future.take().unwrap()),
resolve,
reject,
notified: Cell::new(State::Notified),
}));
});
struct Package {
// Our "spawned future". This'll have everything we need to poll the
// future and continue to move it forward.
spawn: RefCell<Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>>,
// The current state of this future, expressed in an enum below. This
// indicates whether we're currently polling the future, received a
// notification and need to keep polling, or if we're waiting for a
// notification to come in (and no one is polling).
notified: Cell<State>,
// Our two callbacks connected to the `Promise` that we returned to
// JavaScript. We'll be invoking one of these at the end.
resolve: Function,
reject: Function,
}
// The possible states our `Package` (future) can be in, tracked internally
// and used to guide what happens when polling a future.
enum State {
// This future is currently and actively being polled. Attempting to
// access the future will result in a runtime panic and is considered a
// bug.
Polling,
// This future has been notified, while it was being polled. This marker
// is used in the `Notify` implementation below, and indicates that a
// notification was received that the future is ready to make progress.
// If seen, however, it probably means that the future is also currently
// being polled.
Notified,
// The future is blocked, waiting for something to happen. Stored here
// is a self-reference to the future itself so we can pull it out in
// `Notify` and continue polling.
//
// Note that the self-reference here is an Arc-cycle that will leak
// memory unless the future completes, but currently that should be ok
// as we'll have to stick around anyway while the future is executing!
//
// This state is removed as soon as a notification comes in, so the leak
// should only be "temporary"
Waiting(Arc<Package>),
}
// No shared memory right now, wasm is single threaded, no need to worry
// about this!
unsafe impl Send for Package {}
unsafe impl Sync for Package {}
impl Package {
// Move the future contained in `me` as far forward as we can. This will
// do as much synchronous work as possible to complete the future,
// ensuring that when it blocks we're scheduled to get notified via some
// callback somewhere at some point (vague, right?)
//
// TODO: this probably shouldn't do as much synchronous work as possible
// as it can starve other computations. Rather it should instead
// yield every so often with something like `setTimeout` with the
// timeout set to zero.
fn poll(me: &Arc<Package>) {
loop {
match me.notified.replace(State::Polling) {
// We received a notification while previously polling, or
// this is the initial poll. We've got work to do below!
State::Notified => {}
// We've gone through this loop once and no notification was
// received while we were executing work. That means we got
// `NotReady` below and we're scheduled to receive a
// notification. Block ourselves and wait for later.
//
// When the notification comes in it'll notify our task, see
// our `Waiting` state, and resume the polling process
State::Polling => {
me.notified.set(State::Waiting(me.clone()));
break;
}
State::Waiting(_) => panic!("shouldn't see waiting state!"),
}
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
// If the future is ready, immediately call the
// resolve/reject callback and then return as we're done.
Ok(Async::Ready(value)) => (value, &me.resolve),
Err(value) => (value, &me.reject),
// Otherwise keep going in our loop, if we weren't notified
// we'll break out and start waiting.
Ok(Async::NotReady) => continue,
};
drop(f.call1(&JsValue::undefined(), &val));
break;
}
}
}
impl Notify for Package {
fn notify(&self, _id: usize) {
let me = match self.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
// access to the future as it's being polled, so the future
// polling process later sees this notification and will
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
};
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident.
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = Promise::resolve(&JsValue::undefined());
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<dyn FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
}
}
/// Converts a Rust `Future` on a local task queue.
///
/// The `future` provided must adhere to `'static` because it'll be scheduled
/// to run in the background and cannot contain any stack references.
///
/// # Panics
///
/// This function has the same panic behavior as `future_to_promise`.
pub fn spawn_local<F>(future: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
future_to_promise(
future
.map(|()| JsValue::undefined())
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
);
}

View File

@ -92,7 +92,7 @@ impl Scene {
.map(move |_data| image_data(base, len, width, height).into()); .map(move |_data| image_data(base, len, width, height).into());
Ok(RenderingScene { Ok(RenderingScene {
promise: wasm_bindgen_futures::atomics::future_to_promise(done), promise: wasm_bindgen_futures::future_to_promise(done),
base, base,
len, len,
height, height,