asyncmemo: record all the tasks that polled the future

Summary:
Previously if we had two different tasks (or the same tasks but with different
Notify, as in
[FuturesUnordered](http://alexcrichton.com/futures-rs/futures/stream/futures_unordered/struct.FuturesUnordered.html)),
interchangebly polling the future, only one latest future completes, and all
other futures receive no `notify()` call and may never finish.
This diff fixes it by storing a list of tasks that ever polled a future.

Note that if future was polled twice by the same task, the task will be
recorded twice. This is not great, but unfortunately one can't compare Tokio's
tasks.

Reviewed By: lukaspiatkowski

Differential Revision: D6611511

fbshipit-source-id: 2742cb85b13a684699a13a874e36e17b29fb4480
This commit is contained in:
Stanislau Hlebik 2018-01-19 01:49:57 -08:00 committed by Facebook Github Bot
parent 9ba06ebc3c
commit 3e7319dcfc
2 changed files with 67 additions and 17 deletions

View File

@ -128,9 +128,9 @@ enum Slot<F>
where
F: Future,
{
Waiting(F), // waiting for entry to become available
Polling(Vec<Task>), // Future currently being polled, with waiting Tasks
Complete(F::Item), // got value
Waiting(F, Vec<Task>), // waiting for entry to become available
Polling(Vec<Task>), // Future currently being polled, with waiting Tasks
Complete(F::Item), // got value
}
impl<F> Slot<F>
@ -139,7 +139,7 @@ where
{
fn is_waiting(&self) -> bool {
match self {
&Slot::Waiting(_) => true,
&Slot::Waiting(..) => true,
_ => false,
}
}
@ -152,7 +152,7 @@ where
{
fn get_weight(&self) -> usize {
match self {
&Slot::Polling(_) | &Slot::Waiting(_) => 0,
&Slot::Polling(_) | &Slot::Waiting(..) => 0,
&Slot::Complete(ref v) => v.get_weight(),
}
}
@ -264,17 +264,20 @@ where
None => (), // nothing there for this key
Some(Slot::Complete(v)) => return Ok(Async::Ready(v)),
Some(Slot::Polling(_)) => return Ok(Async::NotReady),
Some(Slot::Waiting(mut fut)) => match fut.poll() {
Some(Slot::Waiting(mut fut, mut tasks)) => match fut.poll() {
Err(err) => {
self.slot_remove();
wake_tasks(tasks);
return Err(err);
}
Ok(Async::NotReady) => {
self.slot_insert(Slot::Waiting(fut));
tasks.push(task::current());
self.slot_insert(Slot::Waiting(fut, tasks));
return Ok(Async::NotReady);
}
Ok(Async::Ready(val)) => {
self.slot_insert(Slot::Complete(val.clone()));
wake_tasks(tasks);
return Ok(Async::Ready(val));
}
},
@ -296,7 +299,7 @@ where
return Err(err);
}
Ok(Async::NotReady) => {
self.slot_insert(Slot::Waiting(filler));
self.slot_insert(Slot::Waiting(filler, vec![task::current()]));
return Ok(Async::NotReady);
}
Ok(Async::Ready(val)) => {

View File

@ -6,6 +6,7 @@
use super::*;
use futures::executor::{spawn, Notify, NotifyHandle};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::usize;
@ -162,7 +163,7 @@ impl<V> Future for Delay<V> {
}
#[derive(Debug)]
struct Delayed<'a>(&'a AtomicUsize);
struct Delayed<'a>(&'a AtomicUsize, usize);
impl<'a> Filler for Delayed<'a> {
type Key = String;
@ -171,7 +172,7 @@ impl<'a> Filler for Delayed<'a> {
fn fill(&self, _cache: &Asyncmemo<Self>, key: &Self::Key) -> Self::Value {
self.0.fetch_add(1, Ordering::Relaxed);
Delay {
remains: 5,
remains: self.1,
v: Some(Ok(key.to_uppercase())),
}
}
@ -186,7 +187,7 @@ impl Notify for DummyNotify {
#[test]
fn delayed() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded(Delayed(&count));
let c = Asyncmemo::new_unbounded(Delayed(&count, 5));
let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {}));
let dummy_id = 0;
@ -275,11 +276,9 @@ impl<'a> Filler for Fib<'a> {
};
Box::new(f) as Box<Future<Item = u32, Error = ()> + 'a>
} else {
let f = cache.get(key - 1).and_then(move |f| {
Delay {
remains: 1,
v: Some(Ok(key + f)),
}
let f = cache.get(key - 1).and_then(move |f| Delay {
remains: 1,
v: Some(Ok(key + f)),
});
Box::new(f) as Box<Future<Item = u32, Error = ()> + 'a>
}
@ -456,7 +455,7 @@ fn failing() {
#[test]
fn multiwait() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded(Delayed(&count));
let c = Asyncmemo::new_unbounded(Delayed(&count, 5));
let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {}));
let dummy_id = 0;
@ -527,3 +526,51 @@ fn multiwait() {
);
assert_eq!(count.load(Ordering::Relaxed), 1);
}
struct SimpleNotify {
pub was_notified: Mutex<bool>,
}
impl SimpleNotify {
fn new() -> Self {
SimpleNotify {
was_notified: Mutex::new(false),
}
}
}
impl Notify for SimpleNotify {
fn notify(&self, _id: usize) {
*self.was_notified.lock().unwrap() = true;
}
}
#[test]
fn timer_multiwait() {
let count = AtomicUsize::new(0);
let c = Asyncmemo::new_unbounded(Delayed(&count, 2));
let mut v1 = spawn(c.get("foo"));
let mut v2 = spawn(c.get("foo"));
let simple_notify_1 = Arc::new(SimpleNotify::new());
let notify_handle_1 = NotifyHandle::from(simple_notify_1.clone());
let dummy_id = 0;
assert_eq!(
v1.poll_future_notify(&notify_handle_1, dummy_id),
Ok(Async::NotReady)
);
let simple_notify_2 = Arc::new(SimpleNotify::new());
let notify_handle_2 = NotifyHandle::from(simple_notify_2.clone());
assert_eq!(
v2.poll_future_notify(&notify_handle_2, dummy_id),
Ok(Async::NotReady)
);
assert_eq!(
v2.poll_future_notify(&notify_handle_2, dummy_id),
Ok(Async::Ready(String::from("FOO")))
);
assert!(*simple_notify_1.was_notified.lock().unwrap());
}