diff --git a/asyncmemo/src/lib.rs b/asyncmemo/src/lib.rs index 9052559f6b..69f7b8ee25 100644 --- a/asyncmemo/src/lib.rs +++ b/asyncmemo/src/lib.rs @@ -128,9 +128,9 @@ enum Slot where F: Future, { - Waiting(F), // waiting for entry to become available - Polling(Vec), // Future currently being polled, with waiting Tasks - Complete(F::Item), // got value + Waiting(F, Vec), // waiting for entry to become available + Polling(Vec), // Future currently being polled, with waiting Tasks + Complete(F::Item), // got value } impl Slot @@ -139,7 +139,7 @@ where { fn is_waiting(&self) -> bool { match self { - &Slot::Waiting(_) => true, + &Slot::Waiting(..) => true, _ => false, } } @@ -152,7 +152,7 @@ where { fn get_weight(&self) -> usize { match self { - &Slot::Polling(_) | &Slot::Waiting(_) => 0, + &Slot::Polling(_) | &Slot::Waiting(..) => 0, &Slot::Complete(ref v) => v.get_weight(), } } @@ -264,17 +264,20 @@ where None => (), // nothing there for this key Some(Slot::Complete(v)) => return Ok(Async::Ready(v)), Some(Slot::Polling(_)) => return Ok(Async::NotReady), - Some(Slot::Waiting(mut fut)) => match fut.poll() { + Some(Slot::Waiting(mut fut, mut tasks)) => match fut.poll() { Err(err) => { self.slot_remove(); + wake_tasks(tasks); return Err(err); } Ok(Async::NotReady) => { - self.slot_insert(Slot::Waiting(fut)); + tasks.push(task::current()); + self.slot_insert(Slot::Waiting(fut, tasks)); return Ok(Async::NotReady); } Ok(Async::Ready(val)) => { self.slot_insert(Slot::Complete(val.clone())); + wake_tasks(tasks); return Ok(Async::Ready(val)); } }, @@ -296,7 +299,7 @@ where return Err(err); } Ok(Async::NotReady) => { - self.slot_insert(Slot::Waiting(filler)); + self.slot_insert(Slot::Waiting(filler, vec![task::current()])); return Ok(Async::NotReady); } Ok(Async::Ready(val)) => { diff --git a/asyncmemo/src/test.rs b/asyncmemo/src/test.rs index f46525a65c..fdf3bfa984 100644 --- a/asyncmemo/src/test.rs +++ b/asyncmemo/src/test.rs @@ -6,6 +6,7 @@ use super::*; use futures::executor::{spawn, Notify, NotifyHandle}; +use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use std::usize; @@ -162,7 +163,7 @@ impl Future for Delay { } #[derive(Debug)] -struct Delayed<'a>(&'a AtomicUsize); +struct Delayed<'a>(&'a AtomicUsize, usize); impl<'a> Filler for Delayed<'a> { type Key = String; @@ -171,7 +172,7 @@ impl<'a> Filler for Delayed<'a> { fn fill(&self, _cache: &Asyncmemo, key: &Self::Key) -> Self::Value { self.0.fetch_add(1, Ordering::Relaxed); Delay { - remains: 5, + remains: self.1, v: Some(Ok(key.to_uppercase())), } } @@ -186,7 +187,7 @@ impl Notify for DummyNotify { #[test] fn delayed() { let count = AtomicUsize::new(0); - let c = Asyncmemo::new_unbounded(Delayed(&count)); + let c = Asyncmemo::new_unbounded(Delayed(&count, 5)); let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {})); let dummy_id = 0; @@ -275,11 +276,9 @@ impl<'a> Filler for Fib<'a> { }; Box::new(f) as Box + 'a> } else { - let f = cache.get(key - 1).and_then(move |f| { - Delay { - remains: 1, - v: Some(Ok(key + f)), - } + let f = cache.get(key - 1).and_then(move |f| Delay { + remains: 1, + v: Some(Ok(key + f)), }); Box::new(f) as Box + 'a> } @@ -456,7 +455,7 @@ fn failing() { #[test] fn multiwait() { let count = AtomicUsize::new(0); - let c = Asyncmemo::new_unbounded(Delayed(&count)); + let c = Asyncmemo::new_unbounded(Delayed(&count, 5)); let notify_handle = NotifyHandle::from(Arc::new(DummyNotify {})); let dummy_id = 0; @@ -527,3 +526,51 @@ fn multiwait() { ); assert_eq!(count.load(Ordering::Relaxed), 1); } + +struct SimpleNotify { + pub was_notified: Mutex, +} + +impl SimpleNotify { + fn new() -> Self { + SimpleNotify { + was_notified: Mutex::new(false), + } + } +} + +impl Notify for SimpleNotify { + fn notify(&self, _id: usize) { + *self.was_notified.lock().unwrap() = true; + } +} + +#[test] +fn timer_multiwait() { + let count = AtomicUsize::new(0); + let c = Asyncmemo::new_unbounded(Delayed(&count, 2)); + let mut v1 = spawn(c.get("foo")); + let mut v2 = spawn(c.get("foo")); + + let simple_notify_1 = Arc::new(SimpleNotify::new()); + let notify_handle_1 = NotifyHandle::from(simple_notify_1.clone()); + let dummy_id = 0; + assert_eq!( + v1.poll_future_notify(¬ify_handle_1, dummy_id), + Ok(Async::NotReady) + ); + + let simple_notify_2 = Arc::new(SimpleNotify::new()); + let notify_handle_2 = NotifyHandle::from(simple_notify_2.clone()); + assert_eq!( + v2.poll_future_notify(¬ify_handle_2, dummy_id), + Ok(Async::NotReady) + ); + + assert_eq!( + v2.poll_future_notify(¬ify_handle_2, dummy_id), + Ok(Async::Ready(String::from("FOO"))) + ); + + assert!(*simple_notify_1.was_notified.lock().unwrap()); +}