mirror of
https://github.com/rustwasm/wasm-bindgen.git
synced 2024-12-15 04:23:12 +03:00
moved threadsafe futures implementation to a separate file, made updates after review
This commit is contained in:
parent
e466e1a6f1
commit
16c6bdc966
351
crates/futures/src/atomics.rs
Normal file
351
crates/futures/src/atomics.rs
Normal file
@ -0,0 +1,351 @@
|
|||||||
|
use std::cell::{Cell, RefCell};
|
||||||
|
use std::fmt;
|
||||||
|
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use futures::executor::{self, Notify, Spawn};
|
||||||
|
use futures::future;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use futures::sync::oneshot;
|
||||||
|
use js_sys::{Atomics, Int32Array, WebAssembly, Function, Promise};
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
use wasm_bindgen::JsCast;
|
||||||
|
|
||||||
|
macro_rules! console_log {
|
||||||
|
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[wasm_bindgen]
|
||||||
|
extern "C" {
|
||||||
|
#[wasm_bindgen(js_namespace = console)]
|
||||||
|
fn log(s: &str);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A Rust `Future` backed by a JavaScript `Promise`.
|
||||||
|
///
|
||||||
|
/// This type is constructed with a JavaScript `Promise` object and translates
|
||||||
|
/// it to a Rust `Future`. This type implements the `Future` trait from the
|
||||||
|
/// `futures` crate and will either succeed or fail depending on what happens
|
||||||
|
/// with the JavaScript `Promise`.
|
||||||
|
///
|
||||||
|
/// Currently this type is constructed with `JsFuture::from`.
|
||||||
|
pub struct JsFuture {
|
||||||
|
resolved: oneshot::Receiver<JsValue>,
|
||||||
|
rejected: oneshot::Receiver<JsValue>,
|
||||||
|
callbacks: Option<(Closure<dyn FnMut(JsValue)>, Closure<dyn FnMut(JsValue)>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for JsFuture {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "JsFuture {{ ... }}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Promise> for JsFuture {
|
||||||
|
fn from(js: Promise) -> JsFuture {
|
||||||
|
// Use the `then` method to schedule two callbacks, one for the
|
||||||
|
// resolved value and one for the rejected value. These two callbacks
|
||||||
|
// will be connected to oneshot channels which feed back into our
|
||||||
|
// future.
|
||||||
|
//
|
||||||
|
// This may not be the speediest option today but it should work!
|
||||||
|
let (tx1, rx1) = oneshot::channel();
|
||||||
|
let (tx2, rx2) = oneshot::channel();
|
||||||
|
let mut tx1 = Some(tx1);
|
||||||
|
let resolve = Closure::wrap(Box::new(move |val| {
|
||||||
|
drop(tx1.take().unwrap().send(val));
|
||||||
|
}) as Box<dyn FnMut(_)>);
|
||||||
|
let mut tx2 = Some(tx2);
|
||||||
|
let reject = Closure::wrap(Box::new(move |val| {
|
||||||
|
drop(tx2.take().unwrap().send(val));
|
||||||
|
}) as Box<dyn FnMut(_)>);
|
||||||
|
|
||||||
|
js.then2(&resolve, &reject);
|
||||||
|
|
||||||
|
JsFuture {
|
||||||
|
resolved: rx1,
|
||||||
|
rejected: rx2,
|
||||||
|
callbacks: Some((resolve, reject)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for JsFuture {
|
||||||
|
type Item = JsValue;
|
||||||
|
type Error = JsValue;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<JsValue, JsValue> {
|
||||||
|
// Test if either our resolved or rejected side is finished yet. Note
|
||||||
|
// that they will return errors if they're disconnected which can't
|
||||||
|
// happen until we drop the `callbacks` field, which doesn't happen
|
||||||
|
// till we're done, so we dont need to handle that.
|
||||||
|
if let Ok(Async::Ready(val)) = self.resolved.poll() {
|
||||||
|
drop(self.callbacks.take());
|
||||||
|
return Ok(val.into());
|
||||||
|
}
|
||||||
|
if let Ok(Async::Ready(val)) = self.rejected.poll() {
|
||||||
|
drop(self.callbacks.take());
|
||||||
|
return Err(val);
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts a Rust `Future` into a JavaScript `Promise`.
|
||||||
|
///
|
||||||
|
/// This function will take any future in Rust and schedule it to be executed,
|
||||||
|
/// returning a JavaScript `Promise` which can then be passed back to JavaScript
|
||||||
|
/// to get plumbed into the rest of a system.
|
||||||
|
///
|
||||||
|
/// The `future` provided must adhere to `'static` because it'll be scheduled
|
||||||
|
/// to run in the background and cannot contain any stack references. The
|
||||||
|
/// returned `Promise` will be resolved or rejected when the future completes,
|
||||||
|
/// depending on whether it finishes with `Ok` or `Err`.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Note that in wasm panics are currently translated to aborts, but "abort" in
|
||||||
|
/// this case means that a JavaScript exception is thrown. The wasm module is
|
||||||
|
/// still usable (likely erroneously) after Rust panics.
|
||||||
|
///
|
||||||
|
/// If the `future` provided panics then the returned `Promise` **will not
|
||||||
|
/// resolve**. Instead it will be a leaked promise. This is an unfortunate
|
||||||
|
/// limitation of wasm currently that's hoped to be fixed one day!
|
||||||
|
pub fn future_to_promise<F>(future: F) -> Promise
|
||||||
|
where
|
||||||
|
F: Future<Item = JsValue, Error = JsValue> + 'static,
|
||||||
|
{
|
||||||
|
_future_to_promise(Box::new(future))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implementation of actually transforming a future into a JavaScript `Promise`.
|
||||||
|
//
|
||||||
|
// The only primitive we have to work with here is `Promise::new`, which gives
|
||||||
|
// us two callbacks that we can use to either reject or resolve the promise.
|
||||||
|
// It's our job to ensure that one of those callbacks is called at the
|
||||||
|
// appropriate time.
|
||||||
|
//
|
||||||
|
// Now we know that JavaScript (in general) can't block and is largely
|
||||||
|
// notification/callback driven. That means that our future must either have
|
||||||
|
// synchronous computational work to do, or it's "scheduled a notification" to
|
||||||
|
// happen. These notifications are likely callbacks to get executed when things
|
||||||
|
// finish (like a different promise or something like `setTimeout`). The general
|
||||||
|
// idea here is thus to do as much synchronous work as we can and then otherwise
|
||||||
|
// translate notifications of a future's task into "let's poll the future!"
|
||||||
|
//
|
||||||
|
// This isn't necessarily the greatest future executor in the world, but it
|
||||||
|
// should get the job done for now hopefully.
|
||||||
|
fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>) -> Promise {
|
||||||
|
let mut future = Some(executor::spawn(future));
|
||||||
|
return Promise::new(&mut |resolve, reject| {
|
||||||
|
Package::poll(&Arc::new(Package {
|
||||||
|
spawn: RefCell::new(future.take().unwrap()),
|
||||||
|
resolve,
|
||||||
|
reject,
|
||||||
|
notified: Cell::new(State::Notified),
|
||||||
|
waker: Arc::new(Waker::default()),
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
struct Package {
|
||||||
|
// Our "spawned future". This'll have everything we need to poll the
|
||||||
|
// future and continue to move it forward.
|
||||||
|
spawn: RefCell<Spawn<Box<dyn Future<Item = JsValue, Error = JsValue>>>>,
|
||||||
|
|
||||||
|
// The current state of this future, expressed in an enum below. This
|
||||||
|
// indicates whether we're currently polling the future, received a
|
||||||
|
// notification and need to keep polling, or if we're waiting for a
|
||||||
|
// notification to come in (and no one is polling).
|
||||||
|
notified: Cell<State>,
|
||||||
|
|
||||||
|
// Our two callbacks connected to the `Promise` that we returned to
|
||||||
|
// JavaScript. We'll be invoking one of these at the end.
|
||||||
|
resolve: Function,
|
||||||
|
reject: Function,
|
||||||
|
|
||||||
|
// Struct to wake a future
|
||||||
|
waker: Arc<Waker>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// The possible states our `Package` (future) can be in, tracked internally
|
||||||
|
// and used to guide what happens when polling a future.
|
||||||
|
enum State {
|
||||||
|
// This future is currently and actively being polled. Attempting to
|
||||||
|
// access the future will result in a runtime panic and is considered a
|
||||||
|
// bug.
|
||||||
|
Polling,
|
||||||
|
|
||||||
|
// This future has been notified, while it was being polled. This marker
|
||||||
|
// is used in the `Notify` implementation below, and indicates that a
|
||||||
|
// notification was received that the future is ready to make progress.
|
||||||
|
// If seen, however, it probably means that the future is also currently
|
||||||
|
// being polled.
|
||||||
|
Notified,
|
||||||
|
|
||||||
|
// The future is blocked, waiting for something to happen. Stored here
|
||||||
|
// is a self-reference to the future itself so we can pull it out in
|
||||||
|
// `Notify` and continue polling.
|
||||||
|
//
|
||||||
|
// Note that the self-reference here is an Arc-cycle that will leak
|
||||||
|
// memory unless the future completes, but currently that should be ok
|
||||||
|
// as we'll have to stick around anyway while the future is executing!
|
||||||
|
//
|
||||||
|
// This state is removed as soon as a notification comes in, so the leak
|
||||||
|
// should only be "temporary"
|
||||||
|
Waiting(Arc<Package>),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Waker {
|
||||||
|
value: AtomicI32,
|
||||||
|
notified: AtomicBool,
|
||||||
|
};
|
||||||
|
|
||||||
|
impl Default for Waker {
|
||||||
|
fn default() -> Self {
|
||||||
|
Waker {
|
||||||
|
value: AtomicI32::new(0),
|
||||||
|
notified: AtomicBool::new(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Notify for Waker {
|
||||||
|
fn notify(&self, id: usize) {
|
||||||
|
console_log!("Waker notify");
|
||||||
|
if !self.notified.swap(true, Ordering::SeqCst) {
|
||||||
|
console_log!("Waker, inside if");
|
||||||
|
let _ = unsafe { core::arch::wasm32::atomic_notify(&self.value as *const AtomicI32 as *mut i32, 0) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_again(package: Arc<Package>) {
|
||||||
|
console_log!("poll_again called");
|
||||||
|
let me = match package.notified.replace(State::Notified) {
|
||||||
|
// we need to schedule polling to resume, so keep going
|
||||||
|
State::Waiting(me) => {
|
||||||
|
console_log!("poll_again Waiting");
|
||||||
|
me
|
||||||
|
}
|
||||||
|
|
||||||
|
// we were already notified, and were just notified again;
|
||||||
|
// having now coalesced the notifications we return as it's
|
||||||
|
// still someone else's job to process this
|
||||||
|
State::Notified => {
|
||||||
|
console_log!("poll_again Notified");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// the future was previously being polled, and we've just
|
||||||
|
// switched it to the "you're notified" state. We don't have
|
||||||
|
// access to the future as it's being polled, so the future
|
||||||
|
// polling process later sees this notification and will
|
||||||
|
// continue polling. For us, though, there's nothing else to do,
|
||||||
|
// so we bail out.
|
||||||
|
// later see
|
||||||
|
State::Polling => {
|
||||||
|
console_log!("poll_again Polling");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let memory_buffer = wasm_bindgen::memory()
|
||||||
|
.dyn_into::<WebAssembly::Memory>()
|
||||||
|
.expect("Should cast a memory to WebAssembly::Memory")
|
||||||
|
.buffer();
|
||||||
|
|
||||||
|
let value_location = &package.waker.value as *const AtomicI32 as u32 / 4;
|
||||||
|
let array = Int32Array::new(&memory_buffer);
|
||||||
|
|
||||||
|
// Use `Promise.then` on a resolved promise to place our execution
|
||||||
|
// onto the next turn of the microtask queue, enqueueing our poll
|
||||||
|
// operation. We don't currently poll immediately as it turns out
|
||||||
|
// `futures` crate adapters aren't compatible with it and it also
|
||||||
|
// helps avoid blowing the stack by accident.
|
||||||
|
let promise = crate::polyfill::wait_async(array, value_location, 0).expect("Should create a Promise");
|
||||||
|
let closure = Closure::once(Box::new(move |_| {
|
||||||
|
Package::poll(&me);
|
||||||
|
}) as Box<dyn FnMut(JsValue)>);
|
||||||
|
promise.then(&closure);
|
||||||
|
closure.forget();
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Package {
|
||||||
|
// Move the future contained in `me` as far forward as we can. This will
|
||||||
|
// do as much synchronous work as possible to complete the future,
|
||||||
|
// ensuring that when it blocks we're scheduled to get notified via some
|
||||||
|
// callback somewhere at some point (vague, right?)
|
||||||
|
//
|
||||||
|
// TODO: this probably shouldn't do as much synchronous work as possible
|
||||||
|
// as it can starve other computations. Rather it should instead
|
||||||
|
// yield every so often with something like `setTimeout` with the
|
||||||
|
// timeout set to zero.
|
||||||
|
fn poll(me: &Arc<Package>) {
|
||||||
|
loop {
|
||||||
|
match me.notified.replace(State::Polling) {
|
||||||
|
// We received a notification while previously polling, or
|
||||||
|
// this is the initial poll. We've got work to do below!
|
||||||
|
State::Notified => {
|
||||||
|
console_log!("Package::poll Notified");
|
||||||
|
}
|
||||||
|
|
||||||
|
// We've gone through this loop once and no notification was
|
||||||
|
// received while we were executing work. That means we got
|
||||||
|
// `NotReady` below and we're scheduled to receive a
|
||||||
|
// notification. Block ourselves and wait for later.
|
||||||
|
//
|
||||||
|
// When the notification comes in it'll notify our task, see
|
||||||
|
// our `Waiting` state, and resume the polling process
|
||||||
|
State::Polling => {
|
||||||
|
console_log!("Package::poll Polling");
|
||||||
|
|
||||||
|
me.notified.set(State::Waiting(me.clone()));
|
||||||
|
|
||||||
|
poll_again(me.clone());
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
State::Waiting(_) => {
|
||||||
|
console_log!("Package::poll Waiting");
|
||||||
|
|
||||||
|
panic!("shouldn't see waiting state!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) {
|
||||||
|
// If the future is ready, immediately call the
|
||||||
|
// resolve/reject callback and then return as we're done.
|
||||||
|
Ok(Async::Ready(value)) => (value, &me.resolve),
|
||||||
|
Err(value) => (value, &me.reject),
|
||||||
|
|
||||||
|
// Otherwise keep going in our loop, if we weren't notified
|
||||||
|
// we'll break out and start waiting.
|
||||||
|
Ok(Async::NotReady) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
drop(f.call1(&JsValue::undefined(), &val));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts a Rust `Future` on a local task queue.
|
||||||
|
///
|
||||||
|
/// The `future` provided must adhere to `'static` because it'll be scheduled
|
||||||
|
/// to run in the background and cannot contain any stack references.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This function has the same panic behavior as `future_to_promise`.
|
||||||
|
pub fn spawn_local<F>(future: F)
|
||||||
|
where
|
||||||
|
F: Future<Item = (), Error = ()> + 'static,
|
||||||
|
{
|
||||||
|
future_to_promise(
|
||||||
|
future
|
||||||
|
.map(|()| JsValue::undefined())
|
||||||
|
.or_else(|()| future::ok::<JsValue, JsValue>(JsValue::undefined())),
|
||||||
|
);
|
||||||
|
}
|
@ -101,44 +101,33 @@
|
|||||||
//! }
|
//! }
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
|
#![feature(stdsimd)]
|
||||||
|
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
#[cfg(feature = "futures_0_3")]
|
#[cfg(feature = "futures_0_3")]
|
||||||
/// Contains a Futures 0.3 implementation of this crate.
|
/// Contains a Futures 0.3 implementation of this crate.
|
||||||
pub mod futures_0_3;
|
pub mod futures_0_3;
|
||||||
|
|
||||||
|
#[cfg(target_feature = "atomics")]
|
||||||
|
/// Contains a thread-safe version of this crate, with Futures 0.1
|
||||||
|
pub mod atomics;
|
||||||
|
|
||||||
|
#[cfg(target_feature = "atomics")]
|
||||||
|
/// Polyfill for `Atomics.waitAsync` function
|
||||||
|
mod polyfill;
|
||||||
|
|
||||||
use std::cell::{Cell, RefCell};
|
use std::cell::{Cell, RefCell};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
use std::sync::Mutex;
|
|
||||||
|
|
||||||
use futures::executor::{self, Notify, Spawn};
|
use futures::executor::{self, Notify, Spawn};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
use js_sys::{Function, Promise};
|
use js_sys::{Function, Promise};
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
use js_sys::{Atomics, Int32Array, SharedArrayBuffer, WebAssembly};
|
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
use wasm_bindgen::JsCast;
|
|
||||||
|
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
mod polyfill;
|
|
||||||
|
|
||||||
macro_rules! console_log {
|
|
||||||
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[wasm_bindgen]
|
|
||||||
extern "C" {
|
|
||||||
#[wasm_bindgen(js_namespace = console)]
|
|
||||||
fn log(s: &str);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A Rust `Future` backed by a JavaScript `Promise`.
|
/// A Rust `Future` backed by a JavaScript `Promise`.
|
||||||
///
|
///
|
||||||
@ -273,8 +262,6 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
|
|||||||
resolve,
|
resolve,
|
||||||
reject,
|
reject,
|
||||||
notified: Cell::new(State::Notified),
|
notified: Cell::new(State::Notified),
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
waker: Arc::new(Waker::new(vec![0; 4], false)),
|
|
||||||
}));
|
}));
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -293,10 +280,6 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
|
|||||||
// JavaScript. We'll be invoking one of these at the end.
|
// JavaScript. We'll be invoking one of these at the end.
|
||||||
resolve: Function,
|
resolve: Function,
|
||||||
reject: Function,
|
reject: Function,
|
||||||
|
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
// Struct to wake a future
|
|
||||||
waker: Arc<Waker>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// The possible states our `Package` (future) can be in, tracked internally
|
// The possible states our `Package` (future) can be in, tracked internally
|
||||||
@ -327,108 +310,9 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
|
|||||||
Waiting(Arc<Package>),
|
Waiting(Arc<Package>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
struct Waker {
|
|
||||||
array: Vec<i32>,
|
|
||||||
notified: AtomicBool,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
impl Waker {
|
|
||||||
fn new(array: Vec<i32>, notified: bool) -> Self {
|
|
||||||
Waker {
|
|
||||||
array,
|
|
||||||
notified: AtomicBool::new(notified),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
impl Notify for Waker {
|
|
||||||
fn notify(&self, id: usize) {
|
|
||||||
console_log!("Waker notify");
|
|
||||||
if !self.notified.swap(true, Ordering::SeqCst) {
|
|
||||||
console_log!("Waker, inside if");
|
|
||||||
let memory_buffer = wasm_bindgen::memory()
|
|
||||||
.dyn_into::<WebAssembly::Memory>()
|
|
||||||
.expect("Should cast a memory to WebAssembly::Memory")
|
|
||||||
.buffer();
|
|
||||||
|
|
||||||
let array_location = self.array.as_ptr() as u32 / 4;
|
|
||||||
let array = Int32Array::new(&memory_buffer)
|
|
||||||
.subarray(array_location, array_location + self.array.len() as u32);
|
|
||||||
|
|
||||||
let _ = Atomics::notify(&array, id as u32);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
fn poll_again(package: Arc<Package>, id: usize) {
|
|
||||||
console_log!("poll_again called");
|
|
||||||
let me = match package.notified.replace(State::Notified) {
|
|
||||||
// we need to schedule polling to resume, so keep going
|
|
||||||
State::Waiting(me) => {
|
|
||||||
console_log!("poll_again Waiting");
|
|
||||||
me
|
|
||||||
},
|
|
||||||
|
|
||||||
// we were already notified, and were just notified again;
|
|
||||||
// having now coalesced the notifications we return as it's
|
|
||||||
// still someone else's job to process this
|
|
||||||
State::Notified => {
|
|
||||||
console_log!("poll_again Notified");
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
|
|
||||||
// the future was previously being polled, and we've just
|
|
||||||
// switched it to the "you're notified" state. We don't have
|
|
||||||
// access to the future as it's being polled, so the future
|
|
||||||
// polling process later sees this notification and will
|
|
||||||
// continue polling. For us, though, there's nothing else to do,
|
|
||||||
// so we bail out.
|
|
||||||
// later see
|
|
||||||
State::Polling => {
|
|
||||||
console_log!("poll_again Polling");
|
|
||||||
return;
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
let memory_buffer = wasm_bindgen::memory()
|
|
||||||
.dyn_into::<WebAssembly::Memory>()
|
|
||||||
.expect("Should cast a memory to WebAssembly::Memory")
|
|
||||||
.buffer();
|
|
||||||
|
|
||||||
let array_location = package.waker.array.as_ptr() as u32 / 4;
|
|
||||||
let array = Int32Array::new(&memory_buffer)
|
|
||||||
.subarray(array_location, array_location + package.waker.array.len() as u32);
|
|
||||||
|
|
||||||
// Use `Promise.then` on a resolved promise to place our execution
|
|
||||||
// onto the next turn of the microtask queue, enqueueing our poll
|
|
||||||
// operation. We don't currently poll immediately as it turns out
|
|
||||||
// `futures` crate adapters aren't compatible with it and it also
|
|
||||||
// helps avoid blowing the stack by accident.
|
|
||||||
//
|
|
||||||
// Note that the `Rc`/`RefCell` trick here is basically to just
|
|
||||||
// ensure that our `Closure` gets cleaned up appropriately.
|
|
||||||
let promise = polyfill::wait_async(array, id as u32, 0)
|
|
||||||
.expect("Should create a Promise");
|
|
||||||
let slot = Rc::new(RefCell::new(None));
|
|
||||||
let slot2 = slot.clone();
|
|
||||||
let closure = Closure::wrap(Box::new(move |_| {
|
|
||||||
let myself = slot2.borrow_mut().take();
|
|
||||||
debug_assert!(myself.is_some());
|
|
||||||
Package::poll(&me);
|
|
||||||
}) as Box<dyn FnMut(JsValue)>);
|
|
||||||
promise.then(&closure);
|
|
||||||
*slot.borrow_mut() = Some(closure);
|
|
||||||
}
|
|
||||||
|
|
||||||
// No shared memory right now, wasm is single threaded, no need to worry
|
// No shared memory right now, wasm is single threaded, no need to worry
|
||||||
// about this!
|
// about this!
|
||||||
#[cfg(not(target_feature = "atomics"))]
|
|
||||||
unsafe impl Send for Package {}
|
unsafe impl Send for Package {}
|
||||||
#[cfg(not(target_feature = "atomics"))]
|
|
||||||
unsafe impl Sync for Package {}
|
unsafe impl Sync for Package {}
|
||||||
|
|
||||||
impl Package {
|
impl Package {
|
||||||
@ -446,9 +330,7 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
|
|||||||
match me.notified.replace(State::Polling) {
|
match me.notified.replace(State::Polling) {
|
||||||
// We received a notification while previously polling, or
|
// We received a notification while previously polling, or
|
||||||
// this is the initial poll. We've got work to do below!
|
// this is the initial poll. We've got work to do below!
|
||||||
State::Notified => {
|
State::Notified => {}
|
||||||
console_log!("Package::poll Notified");
|
|
||||||
}
|
|
||||||
|
|
||||||
// We've gone through this loop once and no notification was
|
// We've gone through this loop once and no notification was
|
||||||
// received while we were executing work. That means we got
|
// received while we were executing work. That means we got
|
||||||
@ -458,31 +340,17 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
|
|||||||
// When the notification comes in it'll notify our task, see
|
// When the notification comes in it'll notify our task, see
|
||||||
// our `Waiting` state, and resume the polling process
|
// our `Waiting` state, and resume the polling process
|
||||||
State::Polling => {
|
State::Polling => {
|
||||||
console_log!("Package::poll Polling");
|
|
||||||
|
|
||||||
me.notified.set(State::Waiting(me.clone()));
|
me.notified.set(State::Waiting(me.clone()));
|
||||||
|
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
poll_again(me.clone(), 0);
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
State::Waiting(_) => {
|
State::Waiting(_) => {
|
||||||
console_log!("Package::poll Waiting");
|
|
||||||
|
|
||||||
panic!("shouldn't see waiting state!")
|
panic!("shouldn't see waiting state!")
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
|
||||||
#[cfg(target_feature = "atomics")]
|
|
||||||
let waker = &me.waker;
|
|
||||||
|
|
||||||
#[cfg(not(target_feature = "atomics"))]
|
|
||||||
let waker = me;
|
|
||||||
|
|
||||||
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(waker, 0) {
|
|
||||||
// If the future is ready, immediately call the
|
// If the future is ready, immediately call the
|
||||||
// resolve/reject callback and then return as we're done.
|
// resolve/reject callback and then return as we're done.
|
||||||
Ok(Async::Ready(value)) => (value, &me.resolve),
|
Ok(Async::Ready(value)) => (value, &me.resolve),
|
||||||
@ -499,10 +367,8 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target_feature = "atomics"))]
|
|
||||||
impl Notify for Package {
|
impl Notify for Package {
|
||||||
fn notify(&self, _id: usize) {
|
fn notify(&self, _id: usize) {
|
||||||
console_log!("Package::notify Waiting");
|
|
||||||
let me = match self.notified.replace(State::Notified) {
|
let me = match self.notified.replace(State::Notified) {
|
||||||
// we need to schedule polling to resume, so keep going
|
// we need to schedule polling to resume, so keep going
|
||||||
State::Waiting(me) => me,
|
State::Waiting(me) => me,
|
||||||
|
@ -1,130 +0,0 @@
|
|||||||
/*
|
|
||||||
* The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
||||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
||||||
*
|
|
||||||
* Author: Lars T Hansen, lhansen@mozilla.com
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* Polyfill for Atomics.waitAsync() for web browsers.
|
|
||||||
*
|
|
||||||
* Any kind of agent that is able to create a new Worker can use this polyfill.
|
|
||||||
*
|
|
||||||
* Load this file in all agents that will use Atomics.waitAsync.
|
|
||||||
*
|
|
||||||
* Agents that don't call Atomics.waitAsync need do nothing special.
|
|
||||||
*
|
|
||||||
* Any kind of agent can wake another agent that is sleeping in
|
|
||||||
* Atomics.waitAsync by just calling Atomics.wake for the location being slept
|
|
||||||
* on, as normal.
|
|
||||||
*
|
|
||||||
* The implementation is not completely faithful to the proposed semantics: in
|
|
||||||
* the case where an agent first asyncWaits and then waits on the same location:
|
|
||||||
* when it is woken, the two waits will be woken in order, while in the real
|
|
||||||
* semantics, the sync wait will be woken first.
|
|
||||||
*
|
|
||||||
* In this polyfill Atomics.waitAsync is not very fast.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* Implementation:
|
|
||||||
*
|
|
||||||
* For every wait we fork off a Worker to perform the wait. Workers are reused
|
|
||||||
* when possible. The worker communicates with its parent using postMessage.
|
|
||||||
*/
|
|
||||||
|
|
||||||
const helperCode = `
|
|
||||||
onmessage = function (ev) {
|
|
||||||
try {
|
|
||||||
switch (ev.data[0]) {
|
|
||||||
case 'wait': {
|
|
||||||
let [_, ia, index, value, timeout] = ev.data;
|
|
||||||
let result = Atomics.wait(ia, index, value, timeout)
|
|
||||||
postMessage(['ok', result]);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default: {
|
|
||||||
throw new Error("Wrong message sent to wait helper: " + ev.data.join(','));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
console.log("Exception in wait helper");
|
|
||||||
postMessage(['error', 'Exception']);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
`;
|
|
||||||
|
|
||||||
const helpers = [];
|
|
||||||
|
|
||||||
function allocHelper() {
|
|
||||||
if (helpers.length > 0) {
|
|
||||||
return helpers.pop();
|
|
||||||
}
|
|
||||||
return new Worker("data:application/javascript," + encodeURIComponent(helperCode));
|
|
||||||
}
|
|
||||||
|
|
||||||
function freeHelper(h) {
|
|
||||||
helpers.push(h);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Atomics.waitAsync always returns a promise. Throws standard errors
|
|
||||||
// for parameter validation. The promise is resolved with a string as from
|
|
||||||
// Atomics.wait, or, in the case something went completely wrong, it is
|
|
||||||
// rejected with an error string.
|
|
||||||
export function waitAsync(ia, index, value, timeout = Infinity) {
|
|
||||||
if (typeof ia != "object"
|
|
||||||
|| !(ia instanceof Int32Array)
|
|
||||||
|| !(ia.buffer instanceof SharedArrayBuffer)
|
|
||||||
) {
|
|
||||||
throw new TypeError("Expected shared memory");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Range checking for the index.
|
|
||||||
|
|
||||||
ia[index];
|
|
||||||
|
|
||||||
// Optimization, avoid the helper thread in this common case.
|
|
||||||
|
|
||||||
if (Atomics.load(ia, index) !== value) {
|
|
||||||
return Promise.resolve("not-equal");
|
|
||||||
}
|
|
||||||
|
|
||||||
// General case, we must wait.
|
|
||||||
|
|
||||||
return new Promise(function (resolve, reject) {
|
|
||||||
const h = allocHelper();
|
|
||||||
h.onmessage = function (ev) {
|
|
||||||
// Free the helper early so that it can be reused if the resolution
|
|
||||||
// needs a helper.
|
|
||||||
freeHelper(h);
|
|
||||||
switch (ev.data[0]) {
|
|
||||||
case 'ok':
|
|
||||||
resolve(ev.data[1]);
|
|
||||||
break;
|
|
||||||
case 'error':
|
|
||||||
// Note, rejection is not in the spec, it is an artifact of the polyfill.
|
|
||||||
// The helper already printed an error to the console.
|
|
||||||
reject(ev.data[1]);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// It's possible to do better here if the ia is already known to the
|
|
||||||
// helper. In that case we can communicate the other data through
|
|
||||||
// shared memory and wake the agent. And it is possible to make ia
|
|
||||||
// known to the helper by waking it with a special value so that it
|
|
||||||
// checks its messages, and then posting the ia to the helper. Some
|
|
||||||
// caching / decay scheme is useful no doubt, to improve performance
|
|
||||||
// and avoid leaks.
|
|
||||||
//
|
|
||||||
// In the event we wake the helper directly, we can micro-wait here
|
|
||||||
// for a quick result. We'll need to restructure some code to make
|
|
||||||
// that work out properly, and some synchronization is necessary for
|
|
||||||
// the helper to know that we've picked up the result and no
|
|
||||||
// postMessage is necessary.
|
|
||||||
|
|
||||||
h.postMessage(['wait', ia, index, value, timeout]);
|
|
||||||
})
|
|
||||||
}
|
|
@ -80,10 +80,10 @@ onmessage = function (ev) {
|
|||||||
";
|
";
|
||||||
|
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static HELPERS: RefCell<Vec<Rc<RefCell<Worker>>>> = RefCell::new(vec![]);
|
static HELPERS: RefCell<Vec<Rc<Worker>>> = RefCell::new(vec![]);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn alloc_helper() -> Rc<RefCell<Worker>> {
|
fn alloc_helper() -> Rc<Worker> {
|
||||||
HELPERS.with(|helpers| {
|
HELPERS.with(|helpers| {
|
||||||
if let Some(helper) = helpers.borrow_mut().pop() {
|
if let Some(helper) = helpers.borrow_mut().pop() {
|
||||||
return helper;
|
return helper;
|
||||||
@ -93,20 +93,18 @@ fn alloc_helper() -> Rc<RefCell<Worker>> {
|
|||||||
let encoded: String = encode_uri_component(HELPER_CODE).into();
|
let encoded: String = encode_uri_component(HELPER_CODE).into();
|
||||||
initialization_string.push_str(&encoded);
|
initialization_string.push_str(&encoded);
|
||||||
|
|
||||||
return Rc::new(RefCell::new(
|
return Rc::new(Worker::new(&initialization_string).expect("Should create a Worker"));
|
||||||
Worker::new(&initialization_string).expect("Should create a Worker"),
|
|
||||||
));
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn free_helper(helper: &Rc<RefCell<Worker>>) {
|
fn free_helper(helper: &Rc<Worker>) {
|
||||||
HELPERS.with(move |helpers| {
|
HELPERS.with(move |helpers| {
|
||||||
helpers.borrow_mut().push(helper.clone());
|
helpers.borrow_mut().push(helper.clone());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Result<Promise, JsValue> {
|
pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Result<Promise, JsValue> {
|
||||||
let timeout = 0.0;
|
let timeout = 0.1;
|
||||||
wait_async_with_timeout(indexed_array, index, value, timeout)
|
wait_async_with_timeout(indexed_array, index, value, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,11 +139,11 @@ pub fn wait_async_with_timeout(
|
|||||||
console_log!("polyfill, general case");
|
console_log!("polyfill, general case");
|
||||||
|
|
||||||
Ok(Promise::new(
|
Ok(Promise::new(
|
||||||
&mut Box::new(move |resolve: Function, reject: Function| {
|
&mut move |resolve: Function, reject: Function| {
|
||||||
let helper = alloc_helper();
|
let helper = alloc_helper();
|
||||||
let helper_ref = helper.clone();
|
let helper_ref = helper.clone();
|
||||||
|
|
||||||
let onmessage_callback = Closure::wrap(Box::new(move |e: MessageEvent| {
|
let onmessage_callback = Closure::once_into_js(Box::new(move |e: MessageEvent| {
|
||||||
// Free the helper early so that it can be reused if the resolution
|
// Free the helper early so that it can be reused if the resolution
|
||||||
// needs a helper.
|
// needs a helper.
|
||||||
free_helper(&helper_ref);
|
free_helper(&helper_ref);
|
||||||
@ -171,12 +169,11 @@ pub fn wait_async_with_timeout(
|
|||||||
// it's not specified in the proposal yet
|
// it's not specified in the proposal yet
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
}) as Box<dyn FnMut(MessageEvent)>);
|
})
|
||||||
helper
|
as Box<dyn FnMut(MessageEvent)>);
|
||||||
.borrow()
|
helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
|
||||||
.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
|
|
||||||
|
|
||||||
onmessage_callback.forget();
|
// onmessage_callback.forget();
|
||||||
|
|
||||||
// It's possible to do better here if the ia is already known to the
|
// It's possible to do better here if the ia is already known to the
|
||||||
// helper. In that case we can communicate the other data through
|
// helper. In that case we can communicate the other data through
|
||||||
@ -201,9 +198,8 @@ pub fn wait_async_with_timeout(
|
|||||||
);
|
);
|
||||||
|
|
||||||
helper
|
helper
|
||||||
.borrow()
|
|
||||||
.post_message(&data)
|
.post_message(&data)
|
||||||
.expect("Should successfully post data to a Worker");
|
.expect("Should successfully post data to a Worker");
|
||||||
}) as &mut dyn FnMut(Function, Function),
|
},
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
@ -495,9 +495,6 @@ extern "C" {
|
|||||||
pub fn slice_with_end(this: &SharedArrayBuffer, begin: u32, end: u32) -> SharedArrayBuffer;
|
pub fn slice_with_end(this: &SharedArrayBuffer, begin: u32, end: u32) -> SharedArrayBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for SharedArrayBuffer {}
|
|
||||||
unsafe impl Sync for SharedArrayBuffer {}
|
|
||||||
|
|
||||||
// Array Iterator
|
// Array Iterator
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -18,7 +18,7 @@ wasm-bindgen = { version = "0.2.48", features = ['serde-serialize'] }
|
|||||||
wasm-bindgen-futures = "0.3.25"
|
wasm-bindgen-futures = "0.3.25"
|
||||||
|
|
||||||
[dependencies.web-sys]
|
[dependencies.web-sys]
|
||||||
version = "0.3.4"
|
version = "0.3.23"
|
||||||
features = [
|
features = [
|
||||||
'CanvasRenderingContext2d',
|
'CanvasRenderingContext2d',
|
||||||
'ErrorEvent',
|
'ErrorEvent',
|
||||||
|
@ -92,7 +92,7 @@ impl Scene {
|
|||||||
.map(move |_data| image_data(base, len, width, height).into());
|
.map(move |_data| image_data(base, len, width, height).into());
|
||||||
|
|
||||||
Ok(RenderingScene {
|
Ok(RenderingScene {
|
||||||
promise: wasm_bindgen_futures::future_to_promise(done),
|
promise: wasm_bindgen_futures::atomics::future_to_promise(done),
|
||||||
base,
|
base,
|
||||||
len,
|
len,
|
||||||
height,
|
height,
|
||||||
|
Loading…
Reference in New Issue
Block a user