1
1
mirror of https://github.com/wez/wezterm.git synced 2024-11-22 22:42:48 +03:00

move spawn_task into a new promise::spawn module

This commit is contained in:
Wez Furlong 2020-01-16 03:39:44 -08:00
parent ac3ccab1c5
commit 75eb16bec4
16 changed files with 190 additions and 85 deletions

2
Cargo.lock generated
View File

@ -2020,6 +2020,8 @@ name = "promise"
version = "0.2.0"
dependencies = [
"anyhow",
"async-task",
"lazy_static",
"thiserror",
]

View File

@ -5,5 +5,7 @@ version = "0.2.0"
edition = "2018"
[dependencies]
async-task = "1.2"
anyhow = "1.0"
thiserror = "1.0"
lazy_static = "1.3"

View File

@ -4,6 +4,8 @@ use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll};
use thiserror::*;
pub mod spawn;
type NextFunc<T> = Box<dyn FnOnce(Fallible<T>) + Send>;
pub type SpawnFunc = Box<dyn FnOnce() + Send>;

150
promise/src/spawn.rs Normal file
View File

@ -0,0 +1,150 @@
use anyhow::{anyhow, Result};
use async_task::{JoinHandle, Task};
use std::future::Future;
use std::sync::mpsc::{sync_channel, Receiver, TryRecvError};
use std::sync::{Arc, Mutex};
use std::task::{Poll, Waker};
pub type ScheduleFunc = Box<dyn Fn(Task<()>) + Send + Sync + 'static>;
fn no_schedule_configured(_: Task<()>) {
panic!("no scheduler has been configured");
}
lazy_static::lazy_static! {
static ref ON_MAIN_THREAD: Mutex<ScheduleFunc> = Mutex::new(Box::new(no_schedule_configured));
static ref ON_MAIN_THREAD_LOW_PRI: Mutex<ScheduleFunc> = Mutex::new(Box::new(no_schedule_configured));
}
pub fn set_schedulers(main: ScheduleFunc, low_pri: ScheduleFunc) {
*ON_MAIN_THREAD.lock().unwrap() = Box::new(main);
*ON_MAIN_THREAD_LOW_PRI.lock().unwrap() = Box::new(low_pri);
}
/// Spawn a new thread to execute the provided function.
/// Returns a JoinHandle that implements the Future trait
/// and that can be used to await and yield the return value
/// from the thread.
/// Can be called from any thread.
pub fn spawn_into_new_thread<F, T>(f: F) -> JoinHandle<Result<T>, ()>
where
F: FnOnce() -> Result<T>,
F: Send + 'static,
T: Send + 'static,
{
let (tx, rx) = sync_channel(1);
// Holds the waker that may later observe
// during the Future::poll call.
struct WakerHolder {
waker: Mutex<Option<Waker>>,
}
let holder = Arc::new(WakerHolder {
waker: Mutex::new(None),
});
let thread_waker = Arc::clone(&holder);
std::thread::spawn(move || {
// Run the thread
let res = f();
// Pass the result back
tx.send(res).unwrap();
// If someone polled the thread before we got here,
// they will have populated the waker; extract it
// and wake up the scheduler so that it will poll
// the result again.
let mut waker = thread_waker.waker.lock().unwrap();
if let Some(waker) = waker.take() {
waker.wake();
}
});
struct PendingResult<T> {
rx: Receiver<Result<T>>,
holder: Arc<WakerHolder>,
}
impl<T> std::future::Future for PendingResult<T> {
type Output = Result<T>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
match self.rx.try_recv() {
Ok(res) => Poll::Ready(res),
Err(TryRecvError::Empty) => {
let mut waker = self.holder.waker.lock().unwrap();
waker.replace(cx.waker().clone());
Poll::Pending
}
Err(TryRecvError::Disconnected) => {
Poll::Ready(Err(anyhow!("thread terminated without providing a result")))
}
}
}
}
spawn_into_main_thread(PendingResult { rx, holder })
}
/// Spawn a future into the main thread; it will be polled in the
/// main thread.
/// This function can be called from any thread.
/// If you are on the main thread already, consider using
/// spawn() instead to lift the `Send` requirement.
pub fn spawn_into_main_thread<F, R>(future: F) -> JoinHandle<R, ()>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, |task| ON_MAIN_THREAD.lock().unwrap()(task), ());
task.schedule();
handle
}
/// Spawn a future into the main thread; it will be polled in
/// the main thread in the low priority queue--all other normal
/// priority items will be drained before considering low priority
/// spawns.
/// If you are on the main thread already, consider using `spawn_with_low_priority`
/// instead to lift the `Send` requirement.
pub fn spawn_into_main_thread_with_low_priority<F, R>(future: F) -> JoinHandle<R, ()>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(
future,
|task| ON_MAIN_THREAD_LOW_PRI.lock().unwrap()(task),
(),
);
task.schedule();
handle
}
/// Spawn a future with normal priority.
pub fn spawn<F, R>(future: F) -> JoinHandle<R, ()>
where
F: Future<Output = R> + 'static,
R: 'static,
{
let (task, handle) =
async_task::spawn_local(future, |task| ON_MAIN_THREAD.lock().unwrap()(task), ());
task.schedule();
handle
}
/// Spawn a future with low priority; it will be polled only after
/// all other normal priority items are processed.
pub fn spawn_with_low_priority<F, R>(future: F) -> JoinHandle<R, ()>
where
F: Future<Output = R> + 'static,
R: 'static,
{
let (task, handle) = async_task::spawn_local(
future,
|task| ON_MAIN_THREAD_LOW_PRI.lock().unwrap()(task),
(),
);
task.schedule();
handle
}

View File

@ -8,7 +8,7 @@ use crate::font::FontConfiguration;
use crate::frontend::gui::scrollbar::*;
use crate::frontend::gui::selection::*;
use crate::frontend::gui::tabbar::{TabBarItem, TabBarState};
use crate::frontend::{executor, front_end, spawn_task};
use crate::frontend::{executor, front_end};
use crate::keyassignment::{KeyAssignment, KeyMap, SpawnTabDomain};
use crate::mux::renderable::{Renderable, RenderableDimensions, StableCursorPosition};
use crate::mux::tab::{Tab, TabId};
@ -1159,7 +1159,7 @@ impl TermWindow {
Paste => {
let tab_id = tab.tab_id();
let future = self.window.as_ref().unwrap().get_clipboard();
spawn_task(async move {
promise::spawn::spawn(async move {
if let Ok(clip) = future.await {
promise::Future::with_executor(executor(), move || {
let mux = Mux::get().unwrap();
@ -2654,7 +2654,7 @@ impl TermWindow {
) => {
let tab_id = tab.tab_id();
let future = self.window.as_ref().unwrap().get_clipboard();
spawn_task(async move {
promise::spawn::spawn(async move {
if let Ok(clip) = future.await {
promise::Future::with_executor(executor(), move || {
let mux = Mux::get().unwrap();

View File

@ -1,5 +1,4 @@
use crate::font::FontConfiguration;
use crate::frontend::muxserver::MuxServerFrontEnd;
use crate::mux::tab::Tab;
use crate::mux::window::WindowId;
use anyhow::{anyhow, Error};
@ -10,7 +9,6 @@ use serde::Deserialize;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Mutex;
use window::connection::ConnectionOps;
pub mod activity;
pub mod gui;
@ -64,17 +62,6 @@ pub fn front_end() -> Option<Rc<dyn FrontEnd>> {
res
}
pub fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
future: F,
) -> async_task::JoinHandle<(), ()> {
let frontend = front_end().expect("spawn_task must be called from the gui thread");
if let Some(frontend) = frontend.downcast_ref::<MuxServerFrontEnd>() {
frontend.spawn_task(future)
} else {
window::Connection::get().unwrap().spawn_task(future)
}
}
impl FrontEndSelection {
pub fn try_new(self) -> Result<Rc<dyn FrontEnd>, Error> {
let front_end = match self {

View File

@ -6,7 +6,6 @@ use crate::mux::window::WindowId;
use crate::mux::Mux;
use crate::server::listener::spawn_listener;
use anyhow::{bail, Error};
use async_task::JoinHandle;
use crossbeam_channel::{unbounded as channel, Receiver, Sender};
use log::info;
use promise::*;
@ -41,6 +40,19 @@ impl MuxServerFrontEnd {
fn new(start_listener: bool) -> Result<Rc<dyn FrontEnd>, Error> {
let (tx, rx) = channel();
let tx_main = tx.clone();
let tx_low = tx.clone();
let queue_func = move |f: SpawnFunc| {
tx_main.send(f).unwrap();
};
let queue_func_low = move |f: SpawnFunc| {
tx_low.send(f).unwrap();
};
promise::spawn::set_schedulers(
Box::new(move |task| queue_func(Box::new(move || task.run()))),
Box::new(move |task| queue_func_low(Box::new(move || task.run()))),
);
if start_listener {
spawn_listener()?;
}
@ -54,20 +66,6 @@ impl MuxServerFrontEnd {
pub fn new_null() -> Result<Rc<dyn FrontEnd>, Error> {
Self::new(false)
}
pub fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
&self,
future: F,
) -> JoinHandle<(), ()> {
let tx = self.tx.clone();
let (task, handle) = async_task::spawn_local(
future,
move |task| tx.send(Box::new(move || task.run())).unwrap(),
(),
);
task.schedule();
handle
}
}
impl FrontEnd for MuxServerFrontEnd {

View File

@ -25,7 +25,7 @@ mod stats;
mod termwiztermtab;
use crate::frontend::activity::Activity;
use crate::frontend::{front_end, spawn_task, FrontEndSelection};
use crate::frontend::{front_end, FrontEndSelection};
use crate::mux::domain::{Domain, LocalDomain};
use crate::mux::Mux;
use crate::server::client::{unix_connect_with_retry, Client};
@ -398,7 +398,7 @@ fn run_ssh(config: config::ConfigHandle, opts: SshCommand) -> anyhow::Result<()>
// Initiate an ssh connection; since that is a blocking process with
// callbacks, we have to run it in another thread
spawn_task(async {
promise::spawn::spawn(async {
if let Err(err) = async_run_ssh(opts, params).await {
terminate_with_error(err);
}
@ -478,7 +478,7 @@ fn run_mux_client(config: config::ConfigHandle, opts: &ConnectCommand) -> anyhow
};
let activity = Activity::new();
spawn_task(async {
promise::spawn::spawn(async {
if let Err(err) = spawn_tab_in_default_domain_if_mux_is_empty(cmd).await {
terminate_with_error(err);
}
@ -603,7 +603,7 @@ fn run_terminal_gui(config: config::ConfigHandle, opts: StartCommand) -> anyhow:
let do_auto_connect =
front_end_selection != FrontEndSelection::MuxServer && !opts.no_auto_connect;
spawn_task(async move {
promise::spawn::spawn(async move {
if let Err(err) = async_run_terminal_gui(cmd, do_auto_connect).await {
terminate_with_error(err);
}

View File

@ -1,4 +1,5 @@
use ::window::*;
use promise::spawn::spawn;
use std::any::Any;
struct MyWindow {
@ -90,7 +91,7 @@ async fn spawn_window() -> Result<(), Box<dyn std::error::Error>> {
fn main() -> anyhow::Result<()> {
let conn = Connection::init()?;
conn.spawn_task(async {
spawn(async {
eprintln!("running this async block");
spawn_window().await.ok();
eprintln!("end of async block");

View File

@ -21,15 +21,12 @@ pub trait ConnectionOps {
fn init() -> Fallible<Rc<Connection>> {
let conn = Rc::new(Connection::create_new()?);
CONN.with(|m| *m.borrow_mut() = Some(Rc::clone(&conn)));
crate::spawn::SPAWN_QUEUE.register_promise_schedulers();
Ok(conn)
}
fn terminate_message_loop(&self);
fn run_message_loop(&self) -> Fallible<()>;
fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
&self,
future: F,
) -> async_task::JoinHandle<(), ()>;
// TODO: return a handle that can be used to cancel the timer
fn schedule_timer<F: FnMut() + 'static>(&self, interval: std::time::Duration, callback: F);

View File

@ -94,13 +94,6 @@ impl ConnectionOps for Connection {
Ok(())
}
fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
&self,
future: F,
) -> async_task::JoinHandle<(), ()> {
SPAWN_QUEUE.spawn_task(future)
}
fn schedule_timer<F: FnMut() + 'static>(&self, interval: std::time::Duration, callback: F) {
let secs_f64 =
(interval.as_secs() as f64) + (f64::from(interval.subsec_nanos()) / 1_000_000_000_f64);

View File

@ -178,13 +178,6 @@ impl WaylandConnection {
}
impl ConnectionOps for WaylandConnection {
fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
&self,
future: F,
) -> async_task::JoinHandle<(), ()> {
SPAWN_QUEUE.spawn_task(future)
}
fn terminate_message_loop(&self) {
*self.should_terminate.borrow_mut() = true;
}

View File

@ -55,10 +55,6 @@ impl ConnectionOps for Connection {
}
}
fn spawn_task<F: std::future::Future<Output = ()> + 'static>(&self, future: F) {
SPAWN_QUEUE.spawn_task(future);
}
fn schedule_timer<F: FnMut() + 'static>(&self, interval: std::time::Duration, callback: F) {
let millis = interval
.as_millis()

View File

@ -166,13 +166,6 @@ fn server_supports_shm() -> bool {
}
impl ConnectionOps for XConnection {
fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
&self,
future: F,
) -> async_task::JoinHandle<(), ()> {
SPAWN_QUEUE.spawn_task(future)
}
fn terminate_message_loop(&self) {
*self.should_terminate.borrow_mut() = true;
}

View File

@ -101,13 +101,6 @@ impl Connection {
}
impl ConnectionOps for Connection {
fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
&self,
future: F,
) -> async_task::JoinHandle<(), ()> {
SPAWN_QUEUE.spawn_task(future)
}
fn terminate_message_loop(&self) {
match self {
Self::X11(x) => x.terminate_message_loop(),

View File

@ -41,6 +41,17 @@ impl SpawnQueue {
Self::new_impl()
}
pub fn register_promise_schedulers(&self) {
promise::spawn::set_schedulers(
Box::new(|task| {
SPAWN_QUEUE.queue_func(Box::new(move || task.run()), true);
}),
Box::new(|low_pri_task| {
SPAWN_QUEUE.queue_func(Box::new(move || low_pri_task.run()), false);
}),
);
}
pub fn spawn(&self, f: SpawnFunc) {
self.spawn_impl(f, true)
}
@ -64,19 +75,6 @@ impl SpawnQueue {
}
}
pub fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
&self,
future: F,
) -> async_task::JoinHandle<(), ()> {
let (task, handle) = async_task::spawn_local(
future,
move |task| SPAWN_QUEUE.spawn(Box::new(move || task.run())),
(),
);
task.schedule();
handle
}
fn queue_func(&self, f: SpawnFunc, high_pri: bool) {
let f = InstrumentedSpawnFunc {
func: f,