From 5fb1414b69424d1192148e3ba0e81b4b62afc40b Mon Sep 17 00:00:00 2001 From: Wez Furlong Date: Fri, 9 Oct 2020 11:07:18 -0700 Subject: [PATCH] fix clipboard on x11 This was broken by the changes in aad493ab2a121fab7678b3a528091734bdbf69bb. The issue was that the channel send didn't wakeup the receiver. I'm not sure why, and I tried a couple of different async channel implementation. Doing the simplistic solution here works reliably. --- Cargo.lock | 35 +++++++++++++++- promise/Cargo.toml | 1 + promise/src/lib.rs | 76 ++++++++++++++++++++--------------- promise/src/spawn.rs | 45 ++++++++++++--------- wezterm/src/gui/termwindow.rs | 31 +++++++------- window/src/spawn.rs | 25 ++++++------ 6 files changed, 132 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cf20a69ad..323959961 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1123,6 +1123,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bad48618fdb549078c333a7a8528acb57af271d0433bdecd523eb620628364e" +[[package]] +name = "flume" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9e818efa7776f4dd7df0e542f877f7a5a87bddd6a1a10f59a7732b71ffb9d55" +dependencies = [ + "futures-core", + "futures-sink", + "rand 0.7.3", + "spinning_top", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1776,6 +1788,15 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lock_api" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28247cc5a5be2f05fbcd76dd0cf2c7d3b5400cb978a28042abcd4fa0b3f8261c" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.11" @@ -2404,7 +2425,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" dependencies = [ - "lock_api", + "lock_api 0.3.4", "parking_lot_core 0.6.2", "rustc_version", ] @@ -2415,7 +2436,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" dependencies = [ - "lock_api", + "lock_api 0.3.4", "parking_lot_core 0.7.2", ] @@ -2678,6 +2699,7 @@ dependencies = [ "async-std", "async-task 4.0.2", "crossbeam", + "flume", "lazy_static", "smol", "thiserror", @@ -3295,6 +3317,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spinning_top" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e529d73e80d64b5f2631f9035113347c578a1c9c7774b83a2b880788459ab36" +dependencies = [ + "lock_api 0.4.1", +] + [[package]] name = "ssh2" version = "0.8.2" diff --git a/promise/Cargo.toml b/promise/Cargo.toml index c97fc42f9..8ae115d25 100644 --- a/promise/Cargo.toml +++ b/promise/Cargo.toml @@ -12,3 +12,4 @@ crossbeam = "0.7" anyhow = "1.0" thiserror = "1.0" lazy_static = "1.3" +flume = "0.9" diff --git a/promise/src/lib.rs b/promise/src/lib.rs index f99ccf46a..cfe0b9ce9 100644 --- a/promise/src/lib.rs +++ b/promise/src/lib.rs @@ -1,23 +1,28 @@ use anyhow::Error; -use smol::channel::{bounded, Receiver, Sender}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; use thiserror::*; pub mod spawn; -pub type SpawnFunc = Box; #[derive(Debug, Error)] #[error("Promise was dropped before completion")] pub struct BrokenPromise {} -pub struct Promise { - tx: Option>>, - rx: Option>>, +#[derive(Debug)] +struct Core { + result: Option>, + waker: Option, } +pub struct Promise { + core: Arc>>, +} + +#[derive(Debug)] pub struct Future { - rx: Receiver>, + core: Arc>>, } impl Default for Promise { @@ -28,31 +33,35 @@ impl Default for Promise { impl Promise { pub fn new() -> Self { - let (tx, rx) = bounded(1); Self { - tx: Some(tx), - rx: Some(rx), + core: Arc::new(Mutex::new(Core { + result: None, + waker: None, + })), } } pub fn get_future(&mut self) -> Option> { - self.rx.take().map(|rx| Future { rx }) + Some(Future { + core: Arc::clone(&self.core), + }) } - pub fn ok(&mut self, value: T) { - self.result(Ok(value)); + pub fn ok(&mut self, value: T) -> bool { + self.result(Ok(value)) } - pub fn err(&mut self, err: Error) { - self.result(Err(err)); + pub fn err(&mut self, err: Error) -> bool { + self.result(Err(err)) } - pub fn result(&mut self, result: Result) { - self.tx - .take() - .expect("Promise already fulfilled") - .try_send(result) - .ok(); + pub fn result(&mut self, result: Result) -> bool { + let mut core = self.core.lock().unwrap(); + core.result.replace(result); + if let Some(waker) = core.waker.take() { + waker.wake(); + } + true } } @@ -72,10 +81,12 @@ impl Future { /// Create a leaf future which is immediately ready with /// the provided result pub fn result(result: Result) -> Self { - let mut promise = Promise::new(); - let future = promise.get_future().unwrap(); - promise.result(result); - future + Self { + core: Arc::new(Mutex::new(Core { + result: Some(result), + waker: None, + })), + } } } @@ -83,13 +94,14 @@ impl std::future::Future for Future { type Output = Result; fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - let rx = unsafe { &mut self.get_unchecked_mut().rx }; - let f = rx.recv(); - smol::pin!(f); - match f.poll(ctx) { - Poll::Ready(Ok(res)) => Poll::Ready(res), - Poll::Ready(Err(_)) => Poll::Ready(Err(BrokenPromise {}.into())), - Poll::Pending => Poll::Pending, + let waker = ctx.waker().clone(); + + let mut core = self.core.lock().unwrap(); + if let Some(result) = core.result.take() { + Poll::Ready(result) + } else { + core.waker.replace(waker); + Poll::Pending } } } diff --git a/promise/src/spawn.rs b/promise/src/spawn.rs index 4b1c74b7f..2a94c9676 100644 --- a/promise/src/spawn.rs +++ b/promise/src/spawn.rs @@ -1,12 +1,11 @@ -use crate::SpawnFunc; use anyhow::{anyhow, Result}; -use async_task::Runnable; +use flume::{bounded, unbounded, Receiver, TryRecvError}; use std::future::Future; -use std::sync::mpsc::{sync_channel, Receiver, TryRecvError}; use std::sync::{Arc, Mutex}; use std::task::{Poll, Waker}; -pub use async_task::Task; +pub use async_task::{Runnable, Task}; +pub type SpawnFunc = Box; pub type ScheduleFunc = Box; fn no_schedule_configured(_: Runnable) { @@ -18,6 +17,16 @@ lazy_static::lazy_static! { static ref ON_MAIN_THREAD_LOW_PRI: Mutex = Mutex::new(Box::new(no_schedule_configured)); } +fn schedule_runnable(runnable: Runnable, high_pri: bool) { + let func = if high_pri { + ON_MAIN_THREAD.lock() + } else { + ON_MAIN_THREAD_LOW_PRI.lock() + } + .unwrap(); + func(runnable); +} + /// Set callbacks for scheduling normal and low priority futures. /// Why this and not "just tokio"? In a GUI application there is typically /// a special GUI processing loop that may need to run on the "main thread", @@ -41,7 +50,7 @@ where F: Send + 'static, T: Send + 'static, { - let (tx, rx) = sync_channel(1); + let (tx, rx) = bounded(1); // Holds the waker that may later observe // during the Future::poll call. @@ -105,9 +114,8 @@ where F: Future + Send + 'static, R: Send + 'static, { - let (runnable, task) = - async_task::spawn(future, |runnable| ON_MAIN_THREAD.lock().unwrap()(runnable)); - ON_MAIN_THREAD.lock().unwrap()(runnable); + let (runnable, task) = async_task::spawn(future, |runnable| schedule_runnable(runnable, true)); + runnable.schedule(); task } @@ -122,10 +130,8 @@ where F: Future + Send + 'static, R: Send + 'static, { - let (runnable, task) = async_task::spawn(future, |runnable| { - ON_MAIN_THREAD_LOW_PRI.lock().unwrap()(runnable) - }); - ON_MAIN_THREAD_LOW_PRI.lock().unwrap()(runnable); + let (runnable, task) = async_task::spawn(future, |runnable| schedule_runnable(runnable, false)); + runnable.schedule(); task } @@ -136,8 +142,8 @@ where R: 'static, { let (runnable, task) = - async_task::spawn_local(future, |runnable| ON_MAIN_THREAD.lock().unwrap()(runnable)); - ON_MAIN_THREAD.lock().unwrap()(runnable); + async_task::spawn_local(future, |runnable| schedule_runnable(runnable, true)); + runnable.schedule(); task } @@ -148,10 +154,9 @@ where F: Future + 'static, R: 'static, { - let (runnable, task) = async_task::spawn_local(future, |runnable| { - ON_MAIN_THREAD_LOW_PRI.lock().unwrap()(runnable) - }); - ON_MAIN_THREAD_LOW_PRI.lock().unwrap()(runnable); + let (runnable, task) = + async_task::spawn_local(future, |runnable| schedule_runnable(runnable, false)); + runnable.schedule(); task } @@ -159,12 +164,12 @@ where pub use async_std::task::block_on; pub struct SimpleExecutor { - rx: crossbeam::channel::Receiver, + rx: Receiver, } impl SimpleExecutor { pub fn new() -> Self { - let (tx, rx) = crossbeam::channel::unbounded(); + let (tx, rx) = unbounded(); let tx_main = tx.clone(); let tx_low = tx.clone(); diff --git a/wezterm/src/gui/termwindow.rs b/wezterm/src/gui/termwindow.rs index eb33bd462..2d35674a1 100644 --- a/wezterm/src/gui/termwindow.rs +++ b/wezterm/src/gui/termwindow.rs @@ -1768,21 +1768,24 @@ impl TermWindow { let future = window.get_clipboard(clipboard); promise::spawn::spawn(async move { if let Ok(clip) = future.await { - window.apply(move |term_window, _window| { - let clip = clip.clone(); - if let Some(term_window) = term_window.downcast_mut::() { - if let Some(pane) = - term_window.pane_state(pane_id).overlay.clone().or_else(|| { - let mux = Mux::get().unwrap(); - mux.get_pane(pane_id) - }) - { - pane.trickle_paste(clip).ok(); + window + .apply(move |term_window, _window| { + let clip = clip.clone(); + if let Some(term_window) = term_window.downcast_mut::() { + if let Some(pane) = + term_window.pane_state(pane_id).overlay.clone().or_else(|| { + let mux = Mux::get().unwrap(); + mux.get_pane(pane_id) + }) + { + pane.trickle_paste(clip).ok(); + } } - } - Ok(()) - }); - } + Ok(()) + }) + .await?; + }; + Ok::<(), anyhow::Error>(()) }) .detach(); } diff --git a/window/src/spawn.rs b/window/src/spawn.rs index bd7551684..3559edc29 100644 --- a/window/src/spawn.rs +++ b/window/src/spawn.rs @@ -2,7 +2,7 @@ use crate::os::windows::event::EventHandle; #[cfg(target_os = "macos")] use core_foundation::runloop::*; -use promise::SpawnFunc; +use promise::spawn::{Runnable, SpawnFunc}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -36,6 +36,15 @@ pub(crate) struct SpawnQueue { read: Mutex, } +fn schedule_with_pri(runnable: Runnable, high_pri: bool) { + SPAWN_QUEUE.spawn_impl( + Box::new(move || { + runnable.run(); + }), + high_pri, + ); +} + impl SpawnQueue { pub fn new() -> anyhow::Result { Self::new_impl() @@ -44,20 +53,10 @@ impl SpawnQueue { pub fn register_promise_schedulers(&self) { promise::spawn::set_schedulers( Box::new(|runnable| { - SPAWN_QUEUE.spawn_impl( - Box::new(move || { - runnable.run(); - }), - true, - ); + schedule_with_pri(runnable, true); }), Box::new(|runnable| { - SPAWN_QUEUE.spawn_impl( - Box::new(move || { - runnable.run(); - }), - false, - ); + schedule_with_pri(runnable, false); }), ); }