diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 6eb5a8f616..dcdd325eae 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -3392,12 +3392,10 @@ impl ModelHandle { #[cfg(any(test, feature = "test-support"))] pub fn next_notification(&self, cx: &TestAppContext) -> impl Future { - use postage::prelude::{Sink as _, Stream as _}; - - let (mut tx, mut rx) = postage::mpsc::channel(1); + let (tx, mut rx) = futures::channel::mpsc::unbounded(); let mut cx = cx.cx.borrow_mut(); let subscription = cx.observe(self, move |_, _| { - tx.try_send(()).ok(); + tx.unbounded_send(()).ok(); }); let duration = if std::env::var("CI").is_ok() { @@ -3407,7 +3405,7 @@ impl ModelHandle { }; async move { - let notification = crate::util::timeout(duration, rx.recv()) + let notification = crate::util::timeout(duration, rx.next()) .await .expect("next notification timed out"); drop(subscription); @@ -3420,12 +3418,10 @@ impl ModelHandle { where T::Event: Clone, { - use postage::prelude::{Sink as _, Stream as _}; - - let (mut tx, mut rx) = postage::mpsc::channel(1); + let (tx, mut rx) = futures::channel::mpsc::unbounded(); let mut cx = cx.cx.borrow_mut(); let subscription = cx.subscribe(self, move |_, event, _| { - tx.blocking_send(event.clone()).ok(); + tx.unbounded_send(event.clone()).ok(); }); let duration = if std::env::var("CI").is_ok() { @@ -3434,8 +3430,9 @@ impl ModelHandle { Duration::from_secs(1) }; + cx.foreground.start_waiting(); async move { - let event = crate::util::timeout(duration, rx.recv()) + let event = crate::util::timeout(duration, rx.next()) .await .expect("next event timed out"); drop(subscription); @@ -3449,22 +3446,20 @@ impl ModelHandle { cx: &TestAppContext, mut predicate: impl FnMut(&T, &AppContext) -> bool, ) -> impl Future { - use postage::prelude::{Sink as _, Stream as _}; - - let (tx, mut rx) = postage::mpsc::channel(1024); + let (tx, mut rx) = futures::channel::mpsc::unbounded(); let mut cx = cx.cx.borrow_mut(); let subscriptions = ( cx.observe(self, { - let mut tx = tx.clone(); + let tx = tx.clone(); move |_, _| { - tx.blocking_send(()).ok(); + tx.unbounded_send(()).ok(); } }), cx.subscribe(self, { - let mut tx = tx.clone(); + let tx = tx.clone(); move |_, _, _| { - tx.blocking_send(()).ok(); + tx.unbounded_send(()).ok(); } }), ); @@ -3495,7 +3490,7 @@ impl ModelHandle { } cx.borrow().foreground().start_waiting(); - rx.recv() + rx.next() .await .expect("model dropped with pending condition"); cx.borrow().foreground().finish_waiting();