From b7314ef2aa863f5a7ab209828dc91017a3a38a50 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 24 Jan 2022 18:45:14 +0100 Subject: [PATCH] WIP: Start restructuring executor --- Cargo.lock | 1 + crates/gpui/Cargo.toml | 4 +- crates/gpui/src/executor.rs | 317 ++++++++++---------------- crates/gpui/src/test.rs | 15 +- crates/gpui_macros/src/gpui_macros.rs | 8 +- crates/server/src/rpc.rs | 2 +- 6 files changed, 136 insertions(+), 211 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29c2228e7d..dc7b6921a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,6 +2128,7 @@ dependencies = [ "block", "cc", "cocoa", + "collections", "core-foundation", "core-graphics", "core-text", diff --git a/crates/gpui/Cargo.toml b/crates/gpui/Cargo.toml index 4414936c9e..197a5ca12e 100644 --- a/crates/gpui/Cargo.toml +++ b/crates/gpui/Cargo.toml @@ -8,9 +8,10 @@ version = "0.1.0" path = "src/gpui.rs" [features] -test-support = ["env_logger"] +test-support = ["env_logger", "collections/test-support"] [dependencies] +collections = { path = "../collections" } gpui_macros = { path = "../gpui_macros" } sum_tree = { path = "../sum_tree" } async-task = "4.0.3" @@ -47,6 +48,7 @@ bindgen = "0.58.1" cc = "1.0.67" [dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } env_logger = "0.8" png = "0.16" simplelog = "0.9" diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index e939008bb3..7133aa5de8 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Result}; use async_task::Runnable; use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString}; +use collections::HashMap; use parking_lot::Mutex; use postage::{barrier, prelude::Stream as _}; use rand::prelude::*; @@ -33,7 +34,10 @@ pub enum Foreground { dispatcher: Arc, _not_send_or_sync: PhantomData>, }, - Deterministic(Arc), + Deterministic { + cx_id: usize, + executor: Arc, + }, } pub enum Background { @@ -69,9 +73,8 @@ unsafe impl Send for Task {} struct DeterministicState { rng: StdRng, seed: u64, - scheduled_from_foreground: Vec<(Runnable, Backtrace)>, - scheduled_from_background: Vec<(Runnable, Backtrace)>, - spawned_from_foreground: Vec<(Runnable, Backtrace)>, + scheduled_from_foreground: HashMap>, + scheduled_from_background: Vec, forbid_parking: bool, block_on_ticks: RangeInclusive, now: Instant, @@ -79,20 +82,24 @@ struct DeterministicState { waiting_backtrace: Option, } +enum ScheduledForeground { + MainFuture, + Runnable(Runnable), +} + pub struct Deterministic { state: Arc>, parker: Mutex, } impl Deterministic { - fn new(seed: u64) -> Self { - Self { + pub fn new(seed: u64) -> Arc { + Arc::new(Self { state: Arc::new(Mutex::new(DeterministicState { rng: StdRng::seed_from_u64(seed), seed, scheduled_from_foreground: Default::default(), scheduled_from_background: Default::default(), - spawned_from_foreground: Default::default(), forbid_parking: false, block_on_ticks: 0..=1000, now: Instant::now(), @@ -100,22 +107,32 @@ impl Deterministic { waiting_backtrace: None, })), parker: Default::default(), - } + }) } - fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask { - let backtrace = Backtrace::new_unresolved(); - let scheduled_once = AtomicBool::new(false); + pub fn build_background(self: &Arc) -> Arc { + Arc::new(Background::Deterministic { + executor: self.clone(), + }) + } + + pub fn build_foreground(self: &Arc, id: usize) -> Rc { + Rc::new(Foreground::Deterministic { + cx_id: id, + executor: self.clone(), + }) + } + + fn spawn_from_foreground(&self, cx_id: usize, future: AnyLocalFuture) -> AnyLocalTask { let state = self.state.clone(); let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn_local(future, move |runnable| { let mut state = state.lock(); - let backtrace = backtrace.clone(); - if scheduled_once.fetch_or(true, SeqCst) { - state.scheduled_from_foreground.push((runnable, backtrace)); - } else { - state.spawned_from_foreground.push((runnable, backtrace)); - } + state + .scheduled_from_foreground + .entry(cx_id) + .or_default() + .push(ScheduledForeground::Runnable(runnable)); unparker.unpark(); }); runnable.schedule(); @@ -123,24 +140,21 @@ impl Deterministic { } fn spawn(&self, future: AnyFuture) -> AnyTask { - let backtrace = Backtrace::new_unresolved(); let state = self.state.clone(); let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn(future, move |runnable| { let mut state = state.lock(); - state - .scheduled_from_background - .push((runnable, backtrace.clone())); + state.scheduled_from_background.push(runnable); unparker.unpark(); }); runnable.schedule(); task } - fn run(&self, mut future: AnyLocalFuture) -> Box { + fn run(&self, cx_id: usize, mut future: AnyLocalFuture) -> Box { let woken = Arc::new(AtomicBool::new(false)); loop { - if let Some(result) = self.run_internal(woken.clone(), &mut future) { + if let Some(result) = self.run_internal(cx_id, woken.clone(), &mut future) { return result; } @@ -153,67 +167,92 @@ impl Deterministic { } } - fn run_until_parked(&self) { + fn run_until_parked(&self, cx_id: usize) { let woken = Arc::new(AtomicBool::new(false)); let mut future = any_local_future(std::future::pending::<()>()); - self.run_internal(woken, &mut future); + self.run_internal(cx_id, woken, &mut future); } fn run_internal( &self, + cx_id: usize, woken: Arc, future: &mut AnyLocalFuture, ) -> Option> { let unparker = self.parker.lock().unparker(); - let waker = waker_fn(move || { - woken.store(true, SeqCst); - unparker.unpark(); + let scheduled_main_future = Arc::new(AtomicBool::new(true)); + self.state + .lock() + .scheduled_from_foreground + .entry(cx_id) + .or_default() + .insert(0, ScheduledForeground::MainFuture); + + let waker = waker_fn({ + let state = self.state.clone(); + let scheduled_main_future = scheduled_main_future.clone(); + move || { + woken.store(true, SeqCst); + if !scheduled_main_future.load(SeqCst) { + scheduled_main_future.store(true, SeqCst); + state + .lock() + .scheduled_from_foreground + .entry(cx_id) + .or_default() + .push(ScheduledForeground::MainFuture); + } + + unparker.unpark(); + } }); let mut cx = Context::from_waker(&waker); - let mut trace = Trace::default(); loop { let mut state = self.state.lock(); - let runnable_count = state.scheduled_from_foreground.len() - + state.scheduled_from_background.len() - + state.spawned_from_foreground.len(); - let ix = state.rng.gen_range(0..=runnable_count); - if ix < state.scheduled_from_foreground.len() { - let (_, backtrace) = &state.scheduled_from_foreground[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_foreground.remove(ix).0; - drop(state); - runnable.run(); - } else if ix - state.scheduled_from_foreground.len() - < state.scheduled_from_background.len() + if state.scheduled_from_foreground.is_empty() + && state.scheduled_from_background.is_empty() { - let ix = ix - state.scheduled_from_foreground.len(); - let (_, backtrace) = &state.scheduled_from_background[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_background.remove(ix).0; + return None; + } + + if !state.scheduled_from_background.is_empty() && state.rng.gen() { + let background_len = state.scheduled_from_background.len(); + let ix = state.rng.gen_range(0..background_len); + let runnable = state.scheduled_from_background.remove(ix); drop(state); runnable.run(); - } else if ix < runnable_count { - let (_, backtrace) = &state.spawned_from_foreground[0]; - trace.record(&state, backtrace.clone()); - let runnable = state.spawned_from_foreground.remove(0).0; + } else if !state.scheduled_from_foreground.is_empty() { + let available_cx_ids = state + .scheduled_from_foreground + .keys() + .copied() + .collect::>(); + let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap(); + let scheduled_from_cx = state + .scheduled_from_foreground + .get_mut(&cx_id_to_run) + .unwrap(); + let runnable = scheduled_from_cx.remove(0); + if scheduled_from_cx.is_empty() { + state.scheduled_from_foreground.remove(&cx_id_to_run); + } + drop(state); - runnable.run(); + match runnable { + ScheduledForeground::MainFuture => { + scheduled_main_future.store(false, SeqCst); + if let Poll::Ready(result) = future.poll(&mut cx) { + return Some(result); + } + } + ScheduledForeground::Runnable(runnable) => { + runnable.run(); + } + } } else { - drop(state); - if let Poll::Ready(result) = future.poll(&mut cx) { - return Some(result); - } - - let state = self.state.lock(); - - if state.scheduled_from_foreground.is_empty() - && state.scheduled_from_background.is_empty() - && state.spawned_from_foreground.is_empty() - { - return None; - } + return None; } } } @@ -230,15 +269,12 @@ impl Deterministic { }; let mut cx = Context::from_waker(&waker); - let mut trace = Trace::default(); for _ in 0..max_ticks { let mut state = self.state.lock(); let runnable_count = state.scheduled_from_background.len(); let ix = state.rng.gen_range(0..=runnable_count); if ix < state.scheduled_from_background.len() { - let (_, backtrace) = &state.scheduled_from_background[ix]; - trace.record(&state, backtrace.clone()); - let runnable = state.scheduled_from_background.remove(ix).0; + let runnable = state.scheduled_from_background.remove(ix); drop(state); runnable.run(); } else { @@ -281,69 +317,13 @@ impl DeterministicState { } } -#[derive(Default)] -struct Trace { - executed: Vec, - scheduled: Vec>, - spawned_from_foreground: Vec>, -} - -impl Trace { - fn record(&mut self, state: &DeterministicState, executed: Backtrace) { - self.scheduled.push( - state - .scheduled_from_foreground - .iter() - .map(|(_, backtrace)| backtrace.clone()) - .collect(), - ); - self.spawned_from_foreground.push( - state - .spawned_from_foreground - .iter() - .map(|(_, backtrace)| backtrace.clone()) - .collect(), - ); - self.executed.push(executed); - } - - fn resolve(&mut self) { - for backtrace in &mut self.executed { - backtrace.resolve(); - } - - for backtraces in &mut self.scheduled { - for backtrace in backtraces { - backtrace.resolve(); - } - } - - for backtraces in &mut self.spawned_from_foreground { - for backtrace in backtraces { - backtrace.resolve(); - } - } - } -} - struct CwdBacktrace<'a> { backtrace: &'a Backtrace, - first_frame_only: bool, } impl<'a> CwdBacktrace<'a> { fn new(backtrace: &'a Backtrace) -> Self { - Self { - backtrace, - first_frame_only: false, - } - } - - fn first_frame(backtrace: &'a Backtrace) -> Self { - Self { - backtrace, - first_frame_only: true, - } + Self { backtrace } } } @@ -362,69 +342,12 @@ impl<'a> Debug for CwdBacktrace<'a> { .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd))) { formatted_frame.backtrace_frame(frame)?; - if self.first_frame_only { - break; - } } } fmt.finish() } } -impl Debug for Trace { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for ((backtrace, scheduled), spawned_from_foreground) in self - .executed - .iter() - .zip(&self.scheduled) - .zip(&self.spawned_from_foreground) - { - writeln!(f, "Scheduled")?; - for backtrace in scheduled { - writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?; - } - if scheduled.is_empty() { - writeln!(f, "None")?; - } - writeln!(f, "==========")?; - - writeln!(f, "Spawned from foreground")?; - for backtrace in spawned_from_foreground { - writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?; - } - if spawned_from_foreground.is_empty() { - writeln!(f, "None")?; - } - writeln!(f, "==========")?; - - writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?; - writeln!(f, "+++++++++++++++++++")?; - } - - Ok(()) - } -} - -impl Drop for Trace { - fn drop(&mut self) { - let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") { - trace_on_panic == "1" || trace_on_panic == "true" - } else { - false - }; - let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") { - trace_always == "1" || trace_always == "true" - } else { - false - }; - - if trace_always || (trace_on_panic && thread::panicking()) { - self.resolve(); - dbg!(self); - } - } -} - impl Foreground { pub fn platform(dispatcher: Arc) -> Result { if dispatcher.is_main_thread() { @@ -440,7 +363,9 @@ impl Foreground { pub fn spawn(&self, future: impl Future + 'static) -> Task { let future = any_local_future(future); let any_task = match self { - Self::Deterministic(executor) => executor.spawn_from_foreground(future), + Self::Deterministic { cx_id, executor } => { + executor.spawn_from_foreground(*cx_id, future) + } Self::Platform { dispatcher, .. } => { fn spawn_inner( future: AnyLocalFuture, @@ -462,7 +387,7 @@ impl Foreground { pub fn run(&self, future: impl 'static + Future) -> T { let future = any_local_future(future); let any_value = match self { - Self::Deterministic(executor) => executor.run(future), + Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future), Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"), }; *any_value.downcast().unwrap() @@ -470,14 +395,14 @@ impl Foreground { pub fn parking_forbidden(&self) -> bool { match self { - Self::Deterministic(executor) => executor.state.lock().forbid_parking, + Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking, _ => panic!("this method can only be called on a deterministic executor"), } } pub fn start_waiting(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved()); } _ => panic!("this method can only be called on a deterministic executor"), @@ -486,7 +411,7 @@ impl Foreground { pub fn finish_waiting(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { executor.state.lock().waiting_backtrace.take(); } _ => panic!("this method can only be called on a deterministic executor"), @@ -495,7 +420,7 @@ impl Foreground { pub fn forbid_parking(&self) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { let mut state = executor.state.lock(); state.forbid_parking = true; state.rng = StdRng::seed_from_u64(state.seed); @@ -506,7 +431,7 @@ impl Foreground { pub async fn timer(&self, duration: Duration) { match self { - Self::Deterministic(executor) => { + Self::Deterministic { executor, .. } => { let (tx, mut rx) = barrier::channel(); { let mut state = executor.state.lock(); @@ -523,8 +448,8 @@ impl Foreground { pub fn advance_clock(&self, duration: Duration) { match self { - Self::Deterministic(executor) => { - executor.run_until_parked(); + Self::Deterministic { cx_id, executor } => { + executor.run_until_parked(*cx_id); let mut state = executor.state.lock(); state.now += duration; @@ -541,7 +466,7 @@ impl Foreground { pub fn set_block_on_ticks(&self, range: RangeInclusive) { match self { - Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range, + Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range, _ => panic!("this method can only be called on a deterministic executor"), } } @@ -579,7 +504,7 @@ impl Background { let future = any_future(future); let any_task = match self { Self::Production { executor, .. } => executor.spawn(future), - Self::Deterministic { executor, .. } => executor.spawn(future), + Self::Deterministic { executor } => executor.spawn(future), }; Task::send(any_task) } @@ -646,14 +571,6 @@ impl<'a> Scope<'a> { } } -pub fn deterministic(seed: u64) -> (Rc, Arc) { - let executor = Arc::new(Deterministic::new(seed)); - ( - Rc::new(Foreground::Deterministic(executor.clone())), - Arc::new(Background::Deterministic { executor }), - ) -} - impl Task { pub fn ready(value: T) -> Self { Self::Ready(Some(value)) diff --git a/crates/gpui/src/test.rs b/crates/gpui/src/test.rs index ef95ea435a..b4b4e621ac 100644 --- a/crates/gpui/src/test.rs +++ b/crates/gpui/src/test.rs @@ -28,7 +28,12 @@ pub fn run_test( mut starting_seed: u64, max_retries: usize, test_fn: &mut (dyn RefUnwindSafe - + Fn(&mut MutableAppContext, Rc, u64)), + + Fn( + &mut MutableAppContext, + Rc, + Arc, + u64, + )), ) { let is_randomized = num_iterations > 1; if is_randomized { @@ -60,16 +65,16 @@ pub fn run_test( dbg!(seed); } - let (foreground, background) = executor::deterministic(seed); + let deterministic = executor::Deterministic::new(seed); let mut cx = TestAppContext::new( foreground_platform.clone(), platform.clone(), - foreground.clone(), - background.clone(), + deterministic.build_foreground(usize::MAX), + deterministic.build_background(), font_cache.clone(), 0, ); - cx.update(|cx| test_fn(cx, foreground_platform.clone(), seed)); + cx.update(|cx| test_fn(cx, foreground_platform.clone(), deterministic, seed)); atomic_seed.fetch_add(1, SeqCst); } diff --git a/crates/gpui_macros/src/gpui_macros.rs b/crates/gpui_macros/src/gpui_macros.rs index e94318172e..21d978d9fb 100644 --- a/crates/gpui_macros/src/gpui_macros.rs +++ b/crates/gpui_macros/src/gpui_macros.rs @@ -77,8 +77,8 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #namespace::TestAppContext::new( foreground_platform.clone(), cx.platform().clone(), - cx.foreground().clone(), - cx.background().clone(), + deterministic.build_foreground(#ix), + deterministic.build_background(), cx.font_cache().clone(), #first_entity_id, ), @@ -115,7 +115,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #num_iterations as u64, #starting_seed as u64, #max_retries, - &mut |cx, foreground_platform, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args)) + &mut |cx, foreground_platform, deterministic, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args)) ); } } @@ -147,7 +147,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream { #num_iterations as u64, #starting_seed as u64, #max_retries, - &mut |cx, _, seed| #inner_fn_name(#inner_fn_args) + &mut |cx, _, _, seed| #inner_fn_name(#inner_fn_args) ); } } diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 6a01950c6c..1e731a9388 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1794,7 +1794,7 @@ mod tests { }); } - #[gpui::test] + #[gpui::test(iterations = 100)] async fn test_editing_while_guest_opens_buffer( mut cx_a: TestAppContext, mut cx_b: TestAppContext,