1
1
mirror of https://github.com/wez/wezterm.git synced 2024-11-22 22:42:48 +03:00

simplify promise crate

Implement Future and Promise in terms of a bounded channel of size 1.
This commit is contained in:
Wez Furlong 2020-10-05 08:42:56 -07:00
parent 5eb4d32004
commit aad493ab2a
5 changed files with 28 additions and 330 deletions

1
Cargo.lock generated
View File

@ -2670,6 +2670,7 @@ dependencies = [
"async-task 4.0.2",
"crossbeam",
"lazy_static",
"smol",
"thiserror",
]

View File

@ -5,6 +5,7 @@ version = "0.2.0"
edition = "2018"
[dependencies]
smol = "1.2"
async-task = "4.0"
async-std = "1.4"
crossbeam = "0.7"

View File

@ -1,47 +1,23 @@
use anyhow::{bail, Error, Result as Fallible};
use anyhow::Error;
use smol::channel::{bounded, Receiver, Sender};
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll};
use thiserror::*;
pub mod spawn;
type NextFunc<T> = Box<dyn FnOnce(Fallible<T>) + Send>;
pub type SpawnFunc = Box<dyn FnOnce() + Send>;
#[derive(Debug, Error)]
#[error("Promise was dropped before completion")]
pub struct BrokenPromise {}
enum PromiseState<T> {
Waiting(Arc<Core<T>>),
Fulfilled,
}
enum FutureState<T> {
Waiting(Arc<Core<T>>),
Ready(Result<T, Error>),
Resolved,
}
struct CoreData<T> {
result: Option<Result<T, Error>>,
propagate: Option<NextFunc<T>>,
waker: Option<std::task::Waker>,
}
struct Core<T> {
data: Mutex<CoreData<T>>,
cond: Condvar,
}
pub struct Promise<T> {
state: PromiseState<T>,
future: Option<Future<T>>,
tx: Option<Sender<anyhow::Result<T>>>,
rx: Option<Receiver<anyhow::Result<T>>>,
}
pub struct Future<T> {
state: FutureState<T>,
rx: Receiver<anyhow::Result<T>>,
}
impl<T> Default for Promise<T> {
@ -50,45 +26,17 @@ impl<T> Default for Promise<T> {
}
}
impl<T> Drop for Promise<T> {
fn drop(&mut self) {
if let PromiseState::Waiting(core) = &mut self.state {
let err = Err(BrokenPromise {}.into());
let mut locked = core.data.lock().unwrap();
if let Some(func) = locked.propagate.take() {
func(err);
} else {
locked.result = Some(err);
}
if let Some(waker) = locked.waker.take() {
waker.wake();
}
core.cond.notify_one();
}
}
}
impl<T> Promise<T> {
pub fn new() -> Self {
let core = Arc::new(Core {
data: Mutex::new(CoreData {
result: None,
propagate: None,
waker: None,
}),
cond: Condvar::new(),
});
let (tx, rx) = bounded(1);
Self {
state: PromiseState::Waiting(Arc::clone(&core)),
future: Some(Future {
state: FutureState::Waiting(core),
}),
tx: Some(tx),
rx: Some(rx),
}
}
pub fn get_future(&mut self) -> Option<Future<T>> {
self.future.take()
self.rx.take().map(|rx| Future { rx })
}
pub fn ok(&mut self, value: T) {
@ -100,29 +48,11 @@ impl<T> Promise<T> {
}
pub fn result(&mut self, result: Result<T, Error>) {
match std::mem::replace(&mut self.state, PromiseState::Fulfilled) {
PromiseState::Waiting(core) => {
let mut locked = core.data.lock().unwrap();
match locked.propagate.take() {
Some(func) => func(result),
None => {
locked.result = Some(result);
}
}
if let Some(waker) = locked.waker.take() {
waker.wake();
}
core.cond.notify_one();
}
PromiseState::Fulfilled => panic!("Promise already fulfilled"),
}
}
}
impl<T: Send + 'static> std::convert::From<Result<T, Error>> for Future<T> {
fn from(result: Result<T, Error>) -> Future<T> {
Future::result(result)
self.tx
.take()
.expect("Promise already fulfilled")
.try_send(result)
.ok();
}
}
@ -142,127 +72,9 @@ impl<T: Send + 'static> Future<T> {
/// Create a leaf future which is immediately ready with
/// the provided result
pub fn result(result: Result<T, Error>) -> Self {
Self {
state: FutureState::Ready(result),
}
}
fn chain(self, f: NextFunc<T>) {
match self.state {
FutureState::Ready(result) => {
f(result);
}
FutureState::Waiting(core) => {
let mut locked = core.data.lock().unwrap();
if let Some(result) = locked.result.take() {
f(result);
} else {
locked.propagate = Some(f);
}
}
FutureState::Resolved => panic!("cannot chain a Resolved future"),
}
}
/// Blocks until the associated promise is fulfilled
pub fn wait(self) -> Result<T, Error> {
match self.state {
FutureState::Waiting(core) => {
let mut locked = core.data.lock().unwrap();
loop {
if let Some(result) = locked.result.take() {
return result;
}
locked = core.cond.wait(locked).unwrap();
}
}
FutureState::Ready(result) => result,
FutureState::Resolved => bail!("Future is already Resolved"),
}
}
pub fn is_ready(&self) -> bool {
match &self.state {
FutureState::Waiting(core) => {
let locked = core.data.lock().unwrap();
locked.result.is_some()
}
FutureState::Ready(_) | FutureState::Resolved => true,
}
}
/// When this future resolves, then map the result via the
/// supplied lambda, which returns something that is convertible
/// to a Future.
pub fn then<U, F, IF>(self, f: F) -> Future<U>
where
F: FnOnce(Result<T, Error>) -> IF + Send + 'static,
IF: Into<Future<U>> + 'static,
U: Send + 'static,
{
let mut promise = Promise::new();
let future = promise.get_future().unwrap();
let func = Box::new(f);
let promise_chain = Box::new(move |result| promise.result(result));
self.chain(Box::new(move |result| {
let future = func(result).into();
future.chain(promise_chain);
}));
future
}
/// When this future resolves successfully, map the result via
/// the supplied lambda, which returns something that is convertible
/// to a Future.
/// When this future resolves with an error, the error is propagated
/// along as the error value of the returned future.
pub fn map<U, F, IF>(self, f: F) -> Future<U>
where
F: FnOnce(T) -> IF + Send + 'static,
IF: Into<Future<U>> + 'static,
U: Send + 'static,
{
let mut promise = Promise::new();
let future = promise.get_future().unwrap();
let func = Box::new(f);
let promise_chain = Box::new(move |result| promise.result(result));
self.chain(Box::new(move |result| {
let future = match result {
Ok(value) => func(value).into(),
Err(err) => Err(err).into(),
};
future.chain(promise_chain);
}));
future
}
/// When this future resolves with an error, map the error result
/// via the supplied lambda, with returns something that is convertible
/// to a Future.
/// When this future resolves successfully, the value is propagated
/// along as the Ok value of the returned future.
pub fn map_err<F, IF>(self, f: F) -> Future<T>
where
F: FnOnce(Error) -> IF + Send + 'static,
IF: Into<Future<T>> + 'static,
{
let mut promise = Promise::new();
let future = promise.get_future().unwrap();
let func = Box::new(f);
let promise_chain = Box::new(move |result| promise.result(result));
self.chain(Box::new(move |result| {
let future = match result {
Ok(value) => Ok(value).into(),
Err(err) => func(err).into(),
};
future.chain(promise_chain);
}));
promise.result(result);
future
}
}
@ -271,122 +83,13 @@ impl<T: Send + 'static> std::future::Future for Future<T> {
type Output = Result<T, Error>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
// This should be safe because we're not moving the Future,
// but instead replacing a field, and since no one is able to
// reference the state field, we should be ok with moving that.
let myself = unsafe { Pin::get_unchecked_mut(self) };
let state = std::mem::replace(&mut myself.state, FutureState::Resolved);
match state {
FutureState::Waiting(core) => {
let mut locked = core.data.lock().unwrap();
if let Some(result) = locked.result.take() {
return Poll::Ready(result);
} else {
locked.waker = Some(ctx.waker().clone());
}
drop(locked);
myself.state = FutureState::Waiting(core);
Poll::Pending
}
FutureState::Ready(result) => Poll::Ready(result),
FutureState::Resolved => panic!("polling a Resolved Future"),
let rx = unsafe { &mut self.get_unchecked_mut().rx };
let f = rx.recv();
smol::pin!(f);
match f.poll(ctx) {
Poll::Ready(Ok(res)) => Poll::Ready(res),
Poll::Ready(Err(_)) => Poll::Ready(Err(BrokenPromise {}.into())),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use anyhow::anyhow;
#[test]
fn basic_promise() {
let mut p = Promise::new();
p.ok(true);
assert_eq!(p.get_future().unwrap().wait().unwrap(), true);
}
#[test]
fn basic_promise_future_first() {
let mut p = Promise::new();
let f = p.get_future().unwrap();
p.ok(true);
assert_eq!(f.wait().unwrap(), true);
}
#[test]
fn promise_chain() {
let mut p = Promise::new();
let f = p
.get_future()
.unwrap()
.then(|result| Ok(result.unwrap() + 1))
.then(|result| Ok(result.unwrap() + 3));
p.ok(1);
assert_eq!(f.wait().unwrap(), 5);
}
#[test]
fn promise_map() {
let mut p = Promise::new();
let f = p.get_future().unwrap().map(|value| Ok(value + 1));
p.ok(1);
assert_eq!(f.wait().unwrap(), 2);
}
#[test]
fn promise_map_err() {
let mut p = Promise::new();
let f: Future<usize> = p
.get_future()
.unwrap()
.map(|_value| Err(anyhow!("boo")))
.map_err(|err| Err(anyhow!("whoops: {}", err)));
p.ok(1);
assert_eq!(format!("{}", f.wait().unwrap_err()), "whoops: boo");
}
#[test]
fn promise_chain_future() {
let mut p = Promise::new();
let f = p
.get_future()
.unwrap()
.then(|result| Future::ok(result.unwrap() + 1))
.then(|result| Ok(result.unwrap() + 3));
p.ok(1);
assert_eq!(f.wait().unwrap(), 5);
}
#[test]
fn promise_thread() {
let mut p = Promise::new();
let f = p.get_future().unwrap();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::new(0, 500));
p.ok(123);
});
let f2 = f.then(move |result| Ok(result.unwrap() * 2));
assert_eq!(f2.wait().unwrap(), 246);
}
#[test]
fn promise_thread_slow_chain() {
let mut p = Promise::new();
let f = p.get_future().unwrap();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::new(0, 500));
p.ok(123);
});
std::thread::sleep(std::time::Duration::new(1, 0));
let f2 = f.then(move |result| Ok(result.unwrap() * 2));
assert_eq!(f2.wait().unwrap(), 246);
}
}

View File

@ -158,14 +158,6 @@ where
/// Block the current thread until the passed future completes.
pub use async_std::task::block_on;
/*
pub async fn join_handle_result<T>(handle: JoinHandle<anyhow::Result<T>, ()>) -> anyhow::Result<T> {
handle
.await
.ok_or_else(|| anyhow::anyhow!("task was cancelled or panicked"))?
}
*/
pub struct SimpleExecutor {
rx: crossbeam::channel::Receiver<SpawnFunc>,
}

View File

@ -1,6 +1,7 @@
use crate::termwiztermtab;
use anyhow::{anyhow, bail, Context as _};
use crossbeam::channel::{bounded, Receiver, Sender};
use promise::spawn::block_on;
use promise::Promise;
use std::sync::Mutex;
use std::time::{Duration, Instant};
@ -333,7 +334,7 @@ impl ConnectionUI {
})
.context("send to ConnectionUI failed")?;
future.wait()
block_on(future)
}
/// Crack a multi-line prompt into an optional preamble and the prompt
@ -368,7 +369,7 @@ impl ConnectionUI {
})
.context("send to ConnectionUI failed")?;
future.wait()
block_on(future)
}
pub fn password(&self, prompt: &str) -> anyhow::Result<String> {
@ -388,7 +389,7 @@ impl ConnectionUI {
})
.context("send to ConnectionUI failed")?;
future.wait()
block_on(future)
}
pub fn close(&self) {