From aad493ab2a121fab7678b3a528091734bdbf69bb Mon Sep 17 00:00:00 2001 From: Wez Furlong Date: Mon, 5 Oct 2020 08:42:56 -0700 Subject: [PATCH] simplify promise crate Implement Future and Promise in terms of a bounded channel of size 1. --- Cargo.lock | 1 + promise/Cargo.toml | 1 + promise/src/lib.rs | 341 +++--------------------------------------- promise/src/spawn.rs | 8 - wezterm/src/connui.rs | 7 +- 5 files changed, 28 insertions(+), 330 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c85184d0e..69560720e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2670,6 +2670,7 @@ dependencies = [ "async-task 4.0.2", "crossbeam", "lazy_static", + "smol", "thiserror", ] diff --git a/promise/Cargo.toml b/promise/Cargo.toml index fe35c4769..c97fc42f9 100644 --- a/promise/Cargo.toml +++ b/promise/Cargo.toml @@ -5,6 +5,7 @@ version = "0.2.0" edition = "2018" [dependencies] +smol = "1.2" async-task = "4.0" async-std = "1.4" crossbeam = "0.7" diff --git a/promise/src/lib.rs b/promise/src/lib.rs index f2383c3ed..f99ccf46a 100644 --- a/promise/src/lib.rs +++ b/promise/src/lib.rs @@ -1,47 +1,23 @@ -use anyhow::{bail, Error, Result as Fallible}; +use anyhow::Error; +use smol::channel::{bounded, Receiver, Sender}; use std::pin::Pin; -use std::sync::{Arc, Condvar, Mutex}; use std::task::{Context, Poll}; use thiserror::*; pub mod spawn; - -type NextFunc = Box) + Send>; pub type SpawnFunc = Box; #[derive(Debug, Error)] #[error("Promise was dropped before completion")] pub struct BrokenPromise {} -enum PromiseState { - Waiting(Arc>), - Fulfilled, -} - -enum FutureState { - Waiting(Arc>), - Ready(Result), - Resolved, -} - -struct CoreData { - result: Option>, - propagate: Option>, - waker: Option, -} - -struct Core { - data: Mutex>, - cond: Condvar, -} - pub struct Promise { - state: PromiseState, - future: Option>, + tx: Option>>, + rx: Option>>, } pub struct Future { - state: FutureState, + rx: Receiver>, } impl Default for Promise { @@ -50,45 +26,17 @@ impl Default for Promise { } } -impl Drop for Promise { - fn drop(&mut self) { - if let PromiseState::Waiting(core) = &mut self.state { - let err = Err(BrokenPromise {}.into()); - let mut locked = core.data.lock().unwrap(); - if let Some(func) = locked.propagate.take() { - func(err); - } else { - locked.result = Some(err); - } - if let Some(waker) = locked.waker.take() { - waker.wake(); - } - core.cond.notify_one(); - } - } -} - impl Promise { pub fn new() -> Self { - let core = Arc::new(Core { - data: Mutex::new(CoreData { - result: None, - propagate: None, - waker: None, - }), - cond: Condvar::new(), - }); - + let (tx, rx) = bounded(1); Self { - state: PromiseState::Waiting(Arc::clone(&core)), - future: Some(Future { - state: FutureState::Waiting(core), - }), + tx: Some(tx), + rx: Some(rx), } } pub fn get_future(&mut self) -> Option> { - self.future.take() + self.rx.take().map(|rx| Future { rx }) } pub fn ok(&mut self, value: T) { @@ -100,29 +48,11 @@ impl Promise { } pub fn result(&mut self, result: Result) { - match std::mem::replace(&mut self.state, PromiseState::Fulfilled) { - PromiseState::Waiting(core) => { - let mut locked = core.data.lock().unwrap(); - match locked.propagate.take() { - Some(func) => func(result), - None => { - locked.result = Some(result); - } - } - if let Some(waker) = locked.waker.take() { - waker.wake(); - } - - core.cond.notify_one(); - } - PromiseState::Fulfilled => panic!("Promise already fulfilled"), - } - } -} - -impl std::convert::From> for Future { - fn from(result: Result) -> Future { - Future::result(result) + self.tx + .take() + .expect("Promise already fulfilled") + .try_send(result) + .ok(); } } @@ -142,127 +72,9 @@ impl Future { /// Create a leaf future which is immediately ready with /// the provided result pub fn result(result: Result) -> Self { - Self { - state: FutureState::Ready(result), - } - } - - fn chain(self, f: NextFunc) { - match self.state { - FutureState::Ready(result) => { - f(result); - } - FutureState::Waiting(core) => { - let mut locked = core.data.lock().unwrap(); - if let Some(result) = locked.result.take() { - f(result); - } else { - locked.propagate = Some(f); - } - } - FutureState::Resolved => panic!("cannot chain a Resolved future"), - } - } - - /// Blocks until the associated promise is fulfilled - pub fn wait(self) -> Result { - match self.state { - FutureState::Waiting(core) => { - let mut locked = core.data.lock().unwrap(); - loop { - if let Some(result) = locked.result.take() { - return result; - } - locked = core.cond.wait(locked).unwrap(); - } - } - FutureState::Ready(result) => result, - FutureState::Resolved => bail!("Future is already Resolved"), - } - } - - pub fn is_ready(&self) -> bool { - match &self.state { - FutureState::Waiting(core) => { - let locked = core.data.lock().unwrap(); - locked.result.is_some() - } - FutureState::Ready(_) | FutureState::Resolved => true, - } - } - - /// When this future resolves, then map the result via the - /// supplied lambda, which returns something that is convertible - /// to a Future. - pub fn then(self, f: F) -> Future - where - F: FnOnce(Result) -> IF + Send + 'static, - IF: Into> + 'static, - U: Send + 'static, - { let mut promise = Promise::new(); let future = promise.get_future().unwrap(); - let func = Box::new(f); - - let promise_chain = Box::new(move |result| promise.result(result)); - - self.chain(Box::new(move |result| { - let future = func(result).into(); - future.chain(promise_chain); - })); - future - } - - /// When this future resolves successfully, map the result via - /// the supplied lambda, which returns something that is convertible - /// to a Future. - /// When this future resolves with an error, the error is propagated - /// along as the error value of the returned future. - pub fn map(self, f: F) -> Future - where - F: FnOnce(T) -> IF + Send + 'static, - IF: Into> + 'static, - U: Send + 'static, - { - let mut promise = Promise::new(); - let future = promise.get_future().unwrap(); - let func = Box::new(f); - - let promise_chain = Box::new(move |result| promise.result(result)); - - self.chain(Box::new(move |result| { - let future = match result { - Ok(value) => func(value).into(), - Err(err) => Err(err).into(), - }; - future.chain(promise_chain); - })); - future - } - - /// When this future resolves with an error, map the error result - /// via the supplied lambda, with returns something that is convertible - /// to a Future. - /// When this future resolves successfully, the value is propagated - /// along as the Ok value of the returned future. - pub fn map_err(self, f: F) -> Future - where - F: FnOnce(Error) -> IF + Send + 'static, - IF: Into> + 'static, - { - let mut promise = Promise::new(); - let future = promise.get_future().unwrap(); - let func = Box::new(f); - - let promise_chain = Box::new(move |result| promise.result(result)); - - self.chain(Box::new(move |result| { - let future = match result { - Ok(value) => Ok(value).into(), - Err(err) => func(err).into(), - }; - future.chain(promise_chain); - })); + promise.result(result); future } } @@ -271,122 +83,13 @@ impl std::future::Future for Future { type Output = Result; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - // This should be safe because we're not moving the Future, - // but instead replacing a field, and since no one is able to - // reference the state field, we should be ok with moving that. - let myself = unsafe { Pin::get_unchecked_mut(self) }; - - let state = std::mem::replace(&mut myself.state, FutureState::Resolved); - match state { - FutureState::Waiting(core) => { - let mut locked = core.data.lock().unwrap(); - if let Some(result) = locked.result.take() { - return Poll::Ready(result); - } else { - locked.waker = Some(ctx.waker().clone()); - } - drop(locked); - myself.state = FutureState::Waiting(core); - Poll::Pending - } - FutureState::Ready(result) => Poll::Ready(result), - FutureState::Resolved => panic!("polling a Resolved Future"), + let rx = unsafe { &mut self.get_unchecked_mut().rx }; + let f = rx.recv(); + smol::pin!(f); + match f.poll(ctx) { + Poll::Ready(Ok(res)) => Poll::Ready(res), + Poll::Ready(Err(_)) => Poll::Ready(Err(BrokenPromise {}.into())), + Poll::Pending => Poll::Pending, } } } - -#[cfg(test)] -mod test { - use super::*; - use anyhow::anyhow; - #[test] - fn basic_promise() { - let mut p = Promise::new(); - p.ok(true); - assert_eq!(p.get_future().unwrap().wait().unwrap(), true); - } - - #[test] - fn basic_promise_future_first() { - let mut p = Promise::new(); - let f = p.get_future().unwrap(); - p.ok(true); - assert_eq!(f.wait().unwrap(), true); - } - - #[test] - fn promise_chain() { - let mut p = Promise::new(); - let f = p - .get_future() - .unwrap() - .then(|result| Ok(result.unwrap() + 1)) - .then(|result| Ok(result.unwrap() + 3)); - p.ok(1); - assert_eq!(f.wait().unwrap(), 5); - } - - #[test] - fn promise_map() { - let mut p = Promise::new(); - let f = p.get_future().unwrap().map(|value| Ok(value + 1)); - p.ok(1); - assert_eq!(f.wait().unwrap(), 2); - } - - #[test] - fn promise_map_err() { - let mut p = Promise::new(); - let f: Future = p - .get_future() - .unwrap() - .map(|_value| Err(anyhow!("boo"))) - .map_err(|err| Err(anyhow!("whoops: {}", err))); - p.ok(1); - assert_eq!(format!("{}", f.wait().unwrap_err()), "whoops: boo"); - } - - #[test] - fn promise_chain_future() { - let mut p = Promise::new(); - let f = p - .get_future() - .unwrap() - .then(|result| Future::ok(result.unwrap() + 1)) - .then(|result| Ok(result.unwrap() + 3)); - p.ok(1); - assert_eq!(f.wait().unwrap(), 5); - } - - #[test] - fn promise_thread() { - let mut p = Promise::new(); - let f = p.get_future().unwrap(); - - std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::new(0, 500)); - p.ok(123); - }); - - let f2 = f.then(move |result| Ok(result.unwrap() * 2)); - - assert_eq!(f2.wait().unwrap(), 246); - } - - #[test] - fn promise_thread_slow_chain() { - let mut p = Promise::new(); - let f = p.get_future().unwrap(); - - std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::new(0, 500)); - p.ok(123); - }); - - std::thread::sleep(std::time::Duration::new(1, 0)); - - let f2 = f.then(move |result| Ok(result.unwrap() * 2)); - - assert_eq!(f2.wait().unwrap(), 246); - } -} diff --git a/promise/src/spawn.rs b/promise/src/spawn.rs index a3cddfb7d..4b1c74b7f 100644 --- a/promise/src/spawn.rs +++ b/promise/src/spawn.rs @@ -158,14 +158,6 @@ where /// Block the current thread until the passed future completes. pub use async_std::task::block_on; -/* -pub async fn join_handle_result(handle: JoinHandle, ()>) -> anyhow::Result { - handle - .await - .ok_or_else(|| anyhow::anyhow!("task was cancelled or panicked"))? -} -*/ - pub struct SimpleExecutor { rx: crossbeam::channel::Receiver, } diff --git a/wezterm/src/connui.rs b/wezterm/src/connui.rs index 7ebd7fbd5..922c7872a 100644 --- a/wezterm/src/connui.rs +++ b/wezterm/src/connui.rs @@ -1,6 +1,7 @@ use crate::termwiztermtab; use anyhow::{anyhow, bail, Context as _}; use crossbeam::channel::{bounded, Receiver, Sender}; +use promise::spawn::block_on; use promise::Promise; use std::sync::Mutex; use std::time::{Duration, Instant}; @@ -333,7 +334,7 @@ impl ConnectionUI { }) .context("send to ConnectionUI failed")?; - future.wait() + block_on(future) } /// Crack a multi-line prompt into an optional preamble and the prompt @@ -368,7 +369,7 @@ impl ConnectionUI { }) .context("send to ConnectionUI failed")?; - future.wait() + block_on(future) } pub fn password(&self, prompt: &str) -> anyhow::Result { @@ -388,7 +389,7 @@ impl ConnectionUI { }) .context("send to ConnectionUI failed")?; - future.wait() + block_on(future) } pub fn close(&self) {