moved threadsafe futures behind a flag

This commit is contained in:
ibaryshnikov 2019-06-10 21:10:06 +03:00
parent 2fdfe79574
commit e466e1a6f1
3 changed files with 156 additions and 15 deletions

View File

@ -110,18 +110,36 @@ pub mod futures_0_3;
use std::cell::{Cell, RefCell};
use std::fmt;
use std::rc::Rc;
#[cfg(target_feature = "atomics")]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[cfg(target_feature = "atomics")]
use std::sync::Mutex;
use futures::executor::{self, Notify, Spawn};
use futures::future;
use futures::prelude::*;
use futures::sync::oneshot;
use js_sys::{Atomics, Function, Int32Array, Promise, SharedArrayBuffer};
use js_sys::{Function, Promise};
#[cfg(target_feature = "atomics")]
use js_sys::{Atomics, Int32Array, SharedArrayBuffer, WebAssembly};
use wasm_bindgen::prelude::*;
#[cfg(target_feature = "atomics")]
use wasm_bindgen::JsCast;
#[cfg(target_feature = "atomics")]
mod polyfill;
macro_rules! console_log {
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}
/// A Rust `Future` backed by a JavaScript `Promise`.
///
/// This type is constructed with a JavaScript `Promise` object and translates
@ -255,7 +273,8 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
resolve,
reject,
notified: Cell::new(State::Notified),
waker: Arc::new(Waker::new(SharedArrayBuffer::new(4), false)),
#[cfg(target_feature = "atomics")]
waker: Arc::new(Waker::new(vec![0; 4], false)),
}));
});
@ -275,6 +294,7 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
resolve: Function,
reject: Function,
#[cfg(target_feature = "atomics")]
// Struct to wake a future
waker: Arc<Waker>,
}
@ -307,38 +327,59 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
Waiting(Arc<Package>),
}
#[cfg(target_feature = "atomics")]
struct Waker {
buffer: SharedArrayBuffer,
array: Vec<i32>,
notified: AtomicBool,
};
#[cfg(target_feature = "atomics")]
impl Waker {
fn new(buffer: SharedArrayBuffer, notified: bool) -> Self {
Self {
buffer,
fn new(array: Vec<i32>, notified: bool) -> Self {
Waker {
array,
notified: AtomicBool::new(notified),
}
}
}
#[cfg(target_feature = "atomics")]
impl Notify for Waker {
fn notify(&self, id: usize) {
console_log!("Waker notify");
if !self.notified.swap(true, Ordering::SeqCst) {
let array = Int32Array::new(&self.buffer);
console_log!("Waker, inside if");
let memory_buffer = wasm_bindgen::memory()
.dyn_into::<WebAssembly::Memory>()
.expect("Should cast a memory to WebAssembly::Memory")
.buffer();
let array_location = self.array.as_ptr() as u32 / 4;
let array = Int32Array::new(&memory_buffer)
.subarray(array_location, array_location + self.array.len() as u32);
let _ = Atomics::notify(&array, id as u32);
}
}
}
#[cfg(target_feature = "atomics")]
fn poll_again(package: Arc<Package>, id: usize) {
console_log!("poll_again called");
let me = match package.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
State::Waiting(me) => {
console_log!("poll_again Waiting");
me
},
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
State::Notified => {
console_log!("poll_again Notified");
return;
},
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
@ -347,9 +388,21 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
State::Polling => {
console_log!("poll_again Polling");
return;
},
};
let memory_buffer = wasm_bindgen::memory()
.dyn_into::<WebAssembly::Memory>()
.expect("Should cast a memory to WebAssembly::Memory")
.buffer();
let array_location = package.waker.array.as_ptr() as u32 / 4;
let array = Int32Array::new(&memory_buffer)
.subarray(array_location, array_location + package.waker.array.len() as u32);
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
@ -358,7 +411,7 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0)
let promise = polyfill::wait_async(array, id as u32, 0)
.expect("Should create a Promise");
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
@ -366,11 +419,18 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<FnMut(JsValue)>);
}) as Box<dyn FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
// No shared memory right now, wasm is single threaded, no need to worry
// about this!
#[cfg(not(target_feature = "atomics"))]
unsafe impl Send for Package {}
#[cfg(not(target_feature = "atomics"))]
unsafe impl Sync for Package {}
impl Package {
// Move the future contained in `me` as far forward as we can. This will
// do as much synchronous work as possible to complete the future,
@ -386,7 +446,9 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
match me.notified.replace(State::Polling) {
// We received a notification while previously polling, or
// this is the initial poll. We've got work to do below!
State::Notified => {}
State::Notified => {
console_log!("Package::poll Notified");
}
// We've gone through this loop once and no notification was
// received while we were executing work. That means we got
@ -396,15 +458,31 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
// When the notification comes in it'll notify our task, see
// our `Waiting` state, and resume the polling process
State::Polling => {
console_log!("Package::poll Polling");
me.notified.set(State::Waiting(me.clone()));
#[cfg(target_feature = "atomics")]
poll_again(me.clone(), 0);
break;
}
State::Waiting(_) => panic!("shouldn't see waiting state!"),
State::Waiting(_) => {
console_log!("Package::poll Waiting");
panic!("shouldn't see waiting state!")
},
}
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) {
#[cfg(target_feature = "atomics")]
let waker = &me.waker;
#[cfg(not(target_feature = "atomics"))]
let waker = me;
let (val, f) = match me.spawn.borrow_mut().poll_future_notify(waker, 0) {
// If the future is ready, immediately call the
// resolve/reject callback and then return as we're done.
Ok(Async::Ready(value)) => (value, &me.resolve),
@ -420,6 +498,50 @@ fn _future_to_promise(future: Box<dyn Future<Item = JsValue, Error = JsValue>>)
}
}
}
#[cfg(not(target_feature = "atomics"))]
impl Notify for Package {
fn notify(&self, _id: usize) {
console_log!("Package::notify Waiting");
let me = match self.notified.replace(State::Notified) {
// we need to schedule polling to resume, so keep going
State::Waiting(me) => me,
// we were already notified, and were just notified again;
// having now coalesced the notifications we return as it's
// still someone else's job to process this
State::Notified => return,
// the future was previously being polled, and we've just
// switched it to the "you're notified" state. We don't have
// access to the future as it's being polled, so the future
// polling process later sees this notification and will
// continue polling. For us, though, there's nothing else to do,
// so we bail out.
// later see
State::Polling => return,
};
// Use `Promise.then` on a resolved promise to place our execution
// onto the next turn of the microtask queue, enqueueing our poll
// operation. We don't currently poll immediately as it turns out
// `futures` crate adapters aren't compatible with it and it also
// helps avoid blowing the stack by accident.
//
// Note that the `Rc`/`RefCell` trick here is basically to just
// ensure that our `Closure` gets cleaned up appropriately.
let promise = Promise::resolve(&JsValue::undefined());
let slot = Rc::new(RefCell::new(None));
let slot2 = slot.clone();
let closure = Closure::wrap(Box::new(move |_| {
let myself = slot2.borrow_mut().take();
debug_assert!(myself.is_some());
Package::poll(&me);
}) as Box<dyn FnMut(JsValue)>);
promise.then(&closure);
*slot.borrow_mut() = Some(closure);
}
}
}
/// Converts a Rust `Future` on a local task queue.

View File

@ -47,12 +47,23 @@ use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use web_sys::{MessageEvent, Worker};
macro_rules! console_log {
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
}
const HELPER_CODE: &'static str = "
onmessage = function (ev) {
try {
switch (ev.data[0]) {
case 'wait': {
let [_, ia, index, value, timeout] = ev.data;
console.log('wait event inside a worker');
let result = Atomics.wait(ia, index, value, timeout);
postMessage(['ok', result]);
break;
@ -115,16 +126,20 @@ pub fn wait_async_with_timeout(
timeout: f64,
) -> Result<Promise, JsValue> {
if !indexed_array.buffer().has_type::<SharedArrayBuffer>() {
console_log!("polyfill, not a SharedArrayBuffer");
return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into());
}
// Optimization, avoid the helper thread in this common case.
if Atomics::load(&indexed_array, index)? != value {
console_log!("polyfill, not-equal");
return Ok(Promise::resolve(&JsString::from("not-equal")));
}
// General case, we must wait.
console_log!("polyfill, general case");
Ok(Promise::new(
&mut Box::new(move |resolve: Function, reject: Function| {
let helper = alloc_helper();
@ -161,6 +176,8 @@ pub fn wait_async_with_timeout(
.borrow()
.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
onmessage_callback.forget();
// It's possible to do better here if the ia is already known to the
// helper. In that case we can communicate the other data through
// shared memory and wake the agent. And it is possible to make ia

View File

@ -6,6 +6,8 @@ extern crate wasm_bindgen;
extern crate wasm_bindgen_futures;
extern crate wasm_bindgen_test;
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
use futures::unsync::oneshot;
use futures::Future;
use wasm_bindgen::prelude::*;