make test more stable

This commit is contained in:
Stephan Dilly 2021-09-01 23:02:08 +02:00
parent 777d362dfc
commit 8905fd22ea

View File

@ -63,7 +63,9 @@ impl<J: 'static + AsyncJob, T: Copy + Send + 'static>
} }
} }
/// spawns `task` if nothing is running currently, otherwise schedules as `next` overwriting if `next` was set before /// spawns `task` if nothing is running currently,
/// otherwise schedules as `next` overwriting if `next` was set before.
/// return `true` if the new task gets started right away.
pub fn spawn(&mut self, task: J) -> bool { pub fn spawn(&mut self, task: J) -> bool {
self.schedule_next(task); self.schedule_next(task);
self.check_for_job() self.check_for_job()
@ -129,23 +131,35 @@ mod test {
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::{ use std::{
sync::atomic::AtomicU32, thread::sleep, time::Duration, sync::atomic::{AtomicBool, AtomicU32, Ordering},
thread,
time::Duration,
}; };
#[derive(Clone)] #[derive(Clone)]
struct TestJob { struct TestJob {
v: Arc<AtomicU32>, v: Arc<AtomicU32>,
finish: Arc<AtomicBool>,
value_to_add: u32, value_to_add: u32,
} }
impl AsyncJob for TestJob { impl AsyncJob for TestJob {
fn run(&mut self) { fn run(&mut self) {
sleep(Duration::from_millis(100)); self.finish.store(false, Ordering::Relaxed);
self.v.fetch_add( println!("[job] wait");
self.value_to_add,
std::sync::atomic::Ordering::Relaxed, while self.finish.load(Ordering::Relaxed) {
); std::thread::yield_now();
}
println!("[job] sleep");
thread::sleep(Duration::from_millis(100));
println!("[job] done sleeping");
self.v.fetch_add(self.value_to_add, Ordering::Relaxed);
} }
} }
@ -160,15 +174,20 @@ mod test {
let task = TestJob { let task = TestJob {
v: Arc::new(AtomicU32::new(1)), v: Arc::new(AtomicU32::new(1)),
finish: Arc::new(AtomicBool::new(false)),
value_to_add: 1, value_to_add: 1,
}; };
assert!(job.spawn(task.clone())); assert!(job.spawn(task.clone()));
sleep(Duration::from_millis(1)); thread::sleep(Duration::from_millis(10));
for _ in 0..5 { for _ in 0..5 {
println!("spawn");
assert!(!job.spawn(task.clone())); assert!(!job.spawn(task.clone()));
} }
task.finish.store(true, Ordering::Relaxed);
let _foo = receiver.recv().unwrap(); let _foo = receiver.recv().unwrap();
let _foo = receiver.recv().unwrap(); let _foo = receiver.recv().unwrap();
assert!(receiver.is_empty()); assert!(receiver.is_empty());
@ -179,6 +198,12 @@ mod test {
); );
} }
fn wait_for_job(job: &AsyncSingleJob<TestJob, Notificaton>) {
while job.is_pending() {
thread::sleep(Duration::from_millis(10));
}
}
#[test] #[test]
fn test_cancel() { fn test_cancel() {
let (sender, receiver) = unbounded(); let (sender, receiver) = unbounded();
@ -188,17 +213,26 @@ mod test {
let task = TestJob { let task = TestJob {
v: Arc::new(AtomicU32::new(1)), v: Arc::new(AtomicU32::new(1)),
finish: Arc::new(AtomicBool::new(false)),
value_to_add: 1, value_to_add: 1,
}; };
assert!(job.spawn(task.clone())); assert!(job.spawn(task.clone()));
sleep(Duration::from_millis(1)); task.finish.store(true, Ordering::Relaxed);
thread::sleep(Duration::from_millis(10));
for _ in 0..5 { for _ in 0..5 {
println!("spawn");
assert!(!job.spawn(task.clone())); assert!(!job.spawn(task.clone()));
} }
println!("cancel");
assert!(job.cancel()); assert!(job.cancel());
task.finish.store(true, Ordering::Relaxed);
wait_for_job(&job);
let _foo = receiver.recv().unwrap(); let _foo = receiver.recv().unwrap();
assert_eq!( assert_eq!(