Report backtraces of pending conditions when deterministic executor illegally parks

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>
This commit is contained in:
Nathan Sobo 2022-01-06 13:33:55 -07:00
parent 2dbee1d914
commit 943571af2a
3 changed files with 108 additions and 58 deletions

View File

@ -2660,8 +2660,6 @@ impl<T: Entity> ModelHandle<T> {
loop {
{
let cx = cx.borrow();
let executor = cx.foreground();
let cx = cx.as_ref();
if predicate(
handle
@ -2672,15 +2670,13 @@ impl<T: Entity> ModelHandle<T> {
) {
break;
}
if executor.parking_forbidden() && executor.would_park() {
panic!("parked while waiting on condition");
}
}
cx.borrow().foreground().start_waiting();
rx.recv()
.await
.expect("model dropped with pending condition");
cx.borrow().foreground().finish_waiting();
}
})
.await
@ -2920,9 +2916,11 @@ impl<T: View> ViewHandle<T> {
}
}
cx.borrow().foreground().start_waiting();
rx.recv()
.await
.expect("view dropped with pending condition");
cx.borrow().foreground().finish_waiting();
}
})
.await

View File

@ -77,6 +77,7 @@ struct DeterministicState {
block_on_ticks: RangeInclusive<usize>,
now: Instant,
pending_timers: Vec<(Instant, barrier::Sender)>,
waiting_backtrace: Option<Backtrace>,
}
pub struct Deterministic {
@ -97,6 +98,7 @@ impl Deterministic {
block_on_ticks: 0..=1000,
now: Instant::now(),
pending_timers: Default::default(),
waiting_backtrace: None,
})),
parker: Default::default(),
}
@ -143,8 +145,8 @@ impl Deterministic {
return result;
}
if !woken.load(SeqCst) && self.state.lock().forbid_parking {
panic!("deterministic executor parked after a call to forbid_parking");
if !woken.load(SeqCst) {
self.state.lock().will_park();
}
woken.store(false, SeqCst);
@ -206,7 +208,11 @@ impl Deterministic {
}
let state = self.state.lock();
if state.would_park() {
if state.scheduled_from_foreground.is_empty()
&& state.scheduled_from_background.is_empty()
&& state.spawned_from_foreground.is_empty()
{
return None;
}
}
@ -241,11 +247,9 @@ impl Deterministic {
if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
return Some(result);
}
let state = self.state.lock();
let mut state = self.state.lock();
if state.scheduled_from_background.is_empty() {
if state.forbid_parking {
panic!("deterministic executor parked after a call to forbid_parking");
}
state.will_park();
drop(state);
self.parker.lock().park();
}
@ -259,11 +263,22 @@ impl Deterministic {
}
impl DeterministicState {
fn would_park(&self) -> bool {
self.forbid_parking
&& self.scheduled_from_foreground.is_empty()
&& self.scheduled_from_background.is_empty()
&& self.spawned_from_foreground.is_empty()
fn will_park(&mut self) {
if self.forbid_parking {
let mut backtrace_message = String::new();
if let Some(backtrace) = self.waiting_backtrace.as_mut() {
backtrace.resolve();
backtrace_message = format!(
"\nbacktrace of waiting future:\n{:?}",
CwdBacktrace::new(backtrace)
);
}
panic!(
"deterministic executor parked after a call to forbid_parking{}",
backtrace_message
);
}
}
}
@ -312,32 +327,53 @@ impl Trace {
}
}
impl Debug for Trace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
struct CwdBacktrace<'a> {
backtrace: &'a Backtrace,
first_frame_only: bool,
}
impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
let cwd = std::env::current_dir().unwrap();
let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
fmt::Display::fmt(&path, fmt)
};
let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
for frame in self.0.frames() {
let mut formatted_frame = fmt.frame();
if frame
.symbols()
.iter()
.any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
{
formatted_frame.backtrace_frame(frame)?;
break;
}
impl<'a> CwdBacktrace<'a> {
fn new(backtrace: &'a Backtrace) -> Self {
Self {
backtrace,
first_frame_only: false,
}
}
fn first_frame(backtrace: &'a Backtrace) -> Self {
Self {
backtrace,
first_frame_only: true,
}
}
}
impl<'a> Debug for CwdBacktrace<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
let cwd = std::env::current_dir().unwrap();
let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
fmt::Display::fmt(&path, fmt)
};
let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
for frame in self.backtrace.frames() {
let mut formatted_frame = fmt.frame();
if frame
.symbols()
.iter()
.any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
{
formatted_frame.backtrace_frame(frame)?;
if self.first_frame_only {
break;
}
fmt.finish()
}
}
fmt.finish()
}
}
impl Debug for Trace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for ((backtrace, scheduled), spawned_from_foreground) in self
.executed
.iter()
@ -346,7 +382,7 @@ impl Debug for Trace {
{
writeln!(f, "Scheduled")?;
for backtrace in scheduled {
writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
}
if scheduled.is_empty() {
writeln!(f, "None")?;
@ -355,14 +391,14 @@ impl Debug for Trace {
writeln!(f, "Spawned from foreground")?;
for backtrace in spawned_from_foreground {
writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
}
if spawned_from_foreground.is_empty() {
writeln!(f, "None")?;
}
writeln!(f, "==========")?;
writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?;
writeln!(f, "+++++++++++++++++++")?;
}
@ -446,9 +482,20 @@ impl Foreground {
}
}
pub fn would_park(&self) -> bool {
pub fn start_waiting(&self) {
match self {
Self::Deterministic(executor) => executor.state.lock().would_park(),
Self::Deterministic(executor) => {
executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
}
_ => panic!("this method can only be called on a deterministic executor"),
}
}
pub fn finish_waiting(&self) {
match self {
Self::Deterministic(executor) => {
executor.state.lock().waiting_backtrace.take();
}
_ => panic!("this method can only be called on a deterministic executor"),
}
}

View File

@ -1054,7 +1054,7 @@ mod tests {
};
use ::rpc::Peer;
use async_std::task;
use gpui::{ModelHandle, TestAppContext};
use gpui::{executor, ModelHandle, TestAppContext};
use parking_lot::Mutex;
use postage::{mpsc, watch};
use rpc::PeerId;
@ -1063,6 +1063,7 @@ mod tests {
use std::{
ops::Deref,
path::Path,
rc::Rc,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc,
@ -1092,7 +1093,7 @@ mod tests {
cx_a.foreground().forbid_parking();
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -1225,7 +1226,7 @@ mod tests {
cx_a.foreground().forbid_parking();
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -1301,7 +1302,7 @@ mod tests {
cx_a.foreground().forbid_parking();
// Connect to a server as 3 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
let client_c = server.create_client(&mut cx_c, "user_c").await;
@ -1451,7 +1452,7 @@ mod tests {
let fs = Arc::new(FakeFs::new());
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -1547,7 +1548,7 @@ mod tests {
let fs = Arc::new(FakeFs::new());
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -1627,7 +1628,7 @@ mod tests {
let fs = Arc::new(FakeFs::new());
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -1702,7 +1703,7 @@ mod tests {
let fs = Arc::new(FakeFs::new());
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -1789,7 +1790,7 @@ mod tests {
)));
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -1988,7 +1989,7 @@ mod tests {
cx_a.foreground().forbid_parking();
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
@ -2127,7 +2128,7 @@ mod tests {
async fn test_chat_message_validation(mut cx_a: TestAppContext) {
cx_a.foreground().forbid_parking();
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let db = &server.app_state.db;
@ -2188,7 +2189,7 @@ mod tests {
cx_a.foreground().forbid_parking();
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
let mut status_b = client_b.status();
@ -2406,7 +2407,7 @@ mod tests {
let fs = Arc::new(FakeFs::new());
// Connect to a server as 3 clients.
let mut server = TestServer::start().await;
let mut server = TestServer::start(cx_a.foreground()).await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
let client_c = server.create_client(&mut cx_c, "user_c").await;
@ -2539,6 +2540,7 @@ mod tests {
peer: Arc<Peer>,
app_state: Arc<AppState>,
server: Arc<Server>,
foreground: Rc<executor::Foreground>,
notifications: mpsc::Receiver<()>,
connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
forbid_connections: Arc<AtomicBool>,
@ -2546,7 +2548,7 @@ mod tests {
}
impl TestServer {
async fn start() -> Self {
async fn start(foreground: Rc<executor::Foreground>) -> Self {
let test_db = TestDb::new();
let app_state = Self::build_app_state(&test_db).await;
let peer = Peer::new();
@ -2556,6 +2558,7 @@ mod tests {
peer,
app_state,
server,
foreground,
notifications: notifications.1,
connection_killers: Default::default(),
forbid_connections: Default::default(),
@ -2671,7 +2674,9 @@ mod tests {
{
async_std::future::timeout(Duration::from_millis(500), async {
while !(predicate)(&*self.server.store.read()) {
self.foreground.start_waiting();
self.notifications.recv().await;
self.foreground.finish_waiting();
}
})
.await