added polyfill for Atomics.wait and used it to notify futures

This commit is contained in:
ibaryshnikov 2019-05-19 22:04:17 +03:00
parent 1807de74a7
commit d1d3021271
4 changed files with 231 additions and 49 deletions

View File

@ -110,15 +110,18 @@ pub mod futures_0_3;
use std::cell::{Cell, RefCell};
use std::fmt;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use futures::executor::{self, Notify, Spawn};
use futures::future;
use futures::prelude::*;
use futures::sync::oneshot;
use js_sys::{Function, Promise};
use js_sys::{Atomics, Function, Int32Array, Promise, SharedArrayBuffer};
use wasm_bindgen::prelude::*;
mod polyfill;
/// A Rust `Future` backed by a JavaScript `Promise`.
///
/// This type is constructed with a JavaScript `Promise` object and translates
@ -252,6 +255,7 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
resolve,
reject,
notified: Cell::new(State::Notified),
waker: Arc::new(Waker::new(SharedArrayBuffer::new(4), false)),
}));
});
@ -270,6 +274,9 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
// JavaScript. We'll be invoking one of these at the end.
resolve: Function,
reject: Function,
// Struct to wake a future
waker: Arc<Waker>,
}
// The possible states our `Package` (future) can be in, tracked internally
@ -300,10 +307,68 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
Waiting(Arc<Package>),
}
// No shared memory right now, wasm is single threaded, no need to worry
// about this!
unsafe impl Send for Package {}
unsafe impl Sync for Package {}
struct Waker {
buffer: SharedArrayBuffer,
notified: AtomicBool,
};
impl Waker {
fn new(buffer: SharedArrayBuffer, notified: bool) -> Self {
Self {
buffer,
notified: AtomicBool::new(notified),
}
}
}
impl Notify for Waker {
fn notify(&self, id: usize) {
if !self.notified.swap(true, Ordering::SeqCst) {
let array = Int32Array::new(&self.buffer);
let _ = Atomics::notify(&array, id as u32);
}
}
}
fn poll_again(package: Arc<Package>, id: usize) {
let me = match package.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
// access to the future as it's being polled, so the future
// polling process later sees this notification and will
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
};
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident.
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0);
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
impl Package {
// Move the future contained in `me` as far forward as we can. This will
@ -331,13 +396,14 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
// our `Waiting` state, and resume the polling process
State::Polling => {
me.notified.set(State::Waiting(me.clone()));
poll_again(me.clone(), 0);
break;
}
State::Waiting(_) => panic!("shouldn't see waiting state!"),
}
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) {
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) {
// If the future is ready, immediately call the
// resolve/reject callback and then return as we're done.
Ok(Async::Ready(value)) => (value, &me.resolve),
@ -353,48 +419,6 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
}
}
}
impl Notify for Package {
fn notify(&self, _id: usize) {
let me = match self.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
// access to the future as it's being polled, so the future
// polling process later sees this notification and will
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
};
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident.
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = Promise::resolve(&JsValue::undefined());
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<dyn FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
}
}
/// Converts a Rust `Future` on a local task queue.

View File

@ -0,0 +1,130 @@
/*
* The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async
*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Author: Lars T Hansen, lhansen@mozilla.com
*/
/* Polyfill for Atomics.waitAsync() for web browsers.
*
* Any kind of agent that is able to create a new Worker can use this polyfill.
*
* Load this file in all agents that will use Atomics.waitAsync.
*
* Agents that don't call Atomics.waitAsync need do nothing special.
*
* Any kind of agent can wake another agent that is sleeping in
* Atomics.waitAsync by just calling Atomics.wake for the location being slept
* on, as normal.
*
* The implementation is not completely faithful to the proposed semantics: in
* the case where an agent first asyncWaits and then waits on the same location:
* when it is woken, the two waits will be woken in order, while in the real
* semantics, the sync wait will be woken first.
*
* In this polyfill Atomics.waitAsync is not very fast.
*/
/* Implementation:
*
* For every wait we fork off a Worker to perform the wait. Workers are reused
* when possible. The worker communicates with its parent using postMessage.
*/
const helperCode = `
onmessage = function (ev) {
try {
switch (ev.data[0]) {
case 'wait': {
let [_, ia, index, value, timeout] = ev.data;
let result = Atomics.wait(ia, index, value, timeout)
postMessage(['ok', result]);
break;
}
default: {
throw new Error("Wrong message sent to wait helper: " + ev.data.join(','));
}
}
} catch (e) {
console.log("Exception in wait helper");
postMessage(['error', 'Exception']);
}
}
`;
const helpers = [];
function allocHelper() {
if (helpers.length > 0) {
return helpers.pop();
}
return new Worker("data:application/javascript," + encodeURIComponent(helperCode));
}
function freeHelper(h) {
helpers.push(h);
}
// Atomics.waitAsync always returns a promise. Throws standard errors
// for parameter validation. The promise is resolved with a string as from
// Atomics.wait, or, in the case something went completely wrong, it is
// rejected with an error string.
export function waitAsync(ia, index, value, timeout = Infinity) {
if (typeof ia != "object"
|| !(ia instanceof Int32Array)
|| !(ia.buffer instanceof SharedArrayBuffer)
) {
throw new TypeError("Expected shared memory");
}
// Range checking for the index.
ia[index];
// Optimization, avoid the helper thread in this common case.
if (Atomics.load(ia, index) !== value) {
return Promise.resolve("not-equal");
}
// General case, we must wait.
return new Promise(function (resolve, reject) {
const h = allocHelper();
h.onmessage = function (ev) {
// Free the helper early so that it can be reused if the resolution
// needs a helper.
freeHelper(h);
switch (ev.data[0]) {
case 'ok':
resolve(ev.data[1]);
break;
case 'error':
// Note, rejection is not in the spec, it is an artifact of the polyfill.
// The helper already printed an error to the console.
reject(ev.data[1]);
break;
}
};
// It's possible to do better here if the ia is already known to the
// helper. In that case we can communicate the other data through
// shared memory and wake the agent. And it is possible to make ia
// known to the helper by waking it with a special value so that it
// checks its messages, and then posting the ia to the helper. Some
// caching / decay scheme is useful no doubt, to improve performance
// and avoid leaks.
//
// In the event we wake the helper directly, we can micro-wait here
// for a quick result. We'll need to restructure some code to make
// that work out properly, and some synchronization is necessary for
// the helper to know that we've picked up the result and no
// postMessage is necessary.
h.postMessage(['wait', ia, index, value, timeout]);
})
}

View File

@ -0,0 +1,16 @@
use js_sys::{Int32Array, Promise, SharedArrayBuffer};
use wasm_bindgen::prelude::*;
#[wasm_bindgen(module = "/src/polyfill.js")]
extern "C" {
#[wasm_bindgen(js_name = waitAsync)]
pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Promise;
#[wasm_bindgen(js_name = waitAsync)]
pub fn wait_async_with_timeout(
indexed_array: Int32Array,
index: u32,
value: i32,
timeout: f64,
) -> Promise;
}

View File

@ -495,6 +495,9 @@ extern "C" {
pub fn slice_with_end(this: &SharedArrayBuffer, begin: u32, end: u32) -> SharedArrayBuffer;
}
unsafe impl Send for SharedArrayBuffer {}
unsafe impl Sync for SharedArrayBuffer {}
// Array Iterator
#[wasm_bindgen]
extern "C" {
@ -598,10 +601,19 @@ pub mod Atomics {
/// The static `Atomics.notify()` method notifies up some agents that
/// are sleeping in the wait queue.
/// Note: This operation works with a shared `Int32Array` only.
/// If `count` is not provided, notifies all the agents int the queue.
///
/// [MDN documentation](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify)
#[wasm_bindgen(js_namespace = Atomics, catch)]
pub fn notify(typed_array: &Int32Array, index: u32, count: u32) -> Result<u32, JsValue>;
pub fn notify(typed_array: &Int32Array, index: u32) -> Result<u32, JsValue>;
/// Notifies up to `count` agents in the wait queue.
#[wasm_bindgen(js_namespace = Atomics, catch, js_name = notify)]
pub fn notify_with_count(
typed_array: &Int32Array,
index: u32,
count: u32,
) -> Result<u32, JsValue>;
/// The static `Atomics.or()` method computes a bitwise OR with a given value
/// at a given position in the array, and returns the old value at that position.