1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-23 13:21:38 +03:00

adopt async-task for muxserver task runner

This simplifies some of its code
This commit is contained in:
Wez Furlong 2020-01-16 01:03:32 -08:00
parent 2ce1cb018b
commit 62f0f7a273
3 changed files with 29 additions and 116 deletions

11
Cargo.lock generated
View File

@ -131,6 +131,16 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
[[package]]
name = "async-task"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9f534e76ca33eaa82bc8da5adb1b9e94a16f6fa217b78e9b400094dbbf844f9"
dependencies = [
"libc",
"winapi 0.3.8",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -3134,6 +3144,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"allsorts", "allsorts",
"anyhow", "anyhow",
"async-task",
"base64", "base64",
"base91", "base91",
"bitflags 1.2.1", "bitflags 1.2.1",

View File

@ -13,6 +13,7 @@ embed-resource = "1.3"
[dependencies] [dependencies]
allsorts = "0.1" allsorts = "0.1"
async-task = "1.2"
anyhow = "1.0" anyhow = "1.0"
thiserror = "1.0" thiserror = "1.0"
base64 = "0.10" base64 = "0.10"

View File

@ -1,109 +1,16 @@
//! Implements the multiplexer server frontend //! Implements the multiplexer server frontend
use crate::font::FontConfiguration; use crate::font::FontConfiguration;
use crate::frontend::{executor, front_end, FrontEnd}; use crate::frontend::FrontEnd;
use crate::mux::tab::Tab; use crate::mux::tab::Tab;
use crate::mux::window::WindowId; use crate::mux::window::WindowId;
use crate::mux::Mux; use crate::mux::Mux;
use crate::server::listener::spawn_listener; use crate::server::listener::spawn_listener;
use anyhow::{bail, Error}; use anyhow::{bail, Error};
use async_task::JoinHandle;
use crossbeam_channel::{unbounded as channel, Receiver, Sender};
use log::info; use log::info;
use promise::*; use promise::*;
use std::cell::{Cell, RefCell};
use std::future::Future;
use std::rc::Rc; use std::rc::Rc;
use std::sync::mpsc::{self, Receiver, Sender};
use std::task::{Context, RawWaker, RawWakerVTable, Waker};
struct Task(pub std::pin::Pin<Box<dyn Future<Output = ()>>>);
enum Slot {
Vacant { next_vacant: usize },
Running(Option<Task>),
}
#[derive(Default)]
struct Tasks {
tasks: RefCell<Vec<Slot>>,
next_vacant: Cell<usize>,
}
impl Tasks {
pub fn add_task(&self, task: Task) -> usize {
let idx = self.next_vacant.get();
let mut tasks = self.tasks.borrow_mut();
match tasks.get_mut(idx) {
Some(&mut Slot::Vacant { next_vacant }) => {
self.next_vacant.set(next_vacant);
tasks[idx] = Slot::Running(Some(task));
}
Some(&mut Slot::Running(_)) => panic!("vacant points to running task"),
None => {
assert_eq!(idx, tasks.len());
tasks.push(Slot::Running(Some(task)));
self.next_vacant.set(idx + 1);
}
}
idx
}
pub fn poll_by_slot(&self, slot: usize) -> bool {
let mut task = match self.tasks.borrow_mut().get_mut(slot) {
Some(&mut Slot::Running(ref mut task)) => task.take().unwrap(),
Some(&mut Slot::Vacant { .. }) | None => return false,
};
let waker = TaskWaker::new_waker(slot);
let mut context = Context::from_waker(&waker);
let done = task.0.as_mut().poll(&mut context).is_ready();
let mut tasks = self.tasks.borrow_mut();
if done {
tasks[slot] = Slot::Vacant {
next_vacant: self.next_vacant.get(),
};
self.next_vacant.set(slot);
} else {
tasks[slot] = Slot::Running(Some(task));
}
true
}
}
struct TaskWaker(usize);
static VTBL: RawWakerVTable = RawWakerVTable::new(
TaskWaker::waker_clone,
TaskWaker::waker_wake,
TaskWaker::waker_wake_by_ref,
TaskWaker::waker_drop,
);
impl TaskWaker {
fn new_waker(slot: usize) -> Waker {
let raw = RawWaker::new(slot as *const (), &VTBL);
unsafe { Waker::from_raw(raw) }
}
unsafe fn waker_clone(p: *const ()) -> RawWaker {
RawWaker::new(p, &VTBL)
}
unsafe fn waker_wake(p: *const ()) {
let id: usize = std::mem::transmute(p);
wake_task_by_id(id);
}
unsafe fn waker_wake_by_ref(p: *const ()) {
let id: usize = std::mem::transmute(p);
wake_task_by_id(id);
}
unsafe fn waker_drop(_p: *const ()) {
/* no action required */
}
}
#[derive(Clone)] #[derive(Clone)]
struct MuxExecutor { struct MuxExecutor {
@ -127,32 +34,17 @@ impl Executor for MuxExecutor {
pub struct MuxServerFrontEnd { pub struct MuxServerFrontEnd {
tx: Sender<SpawnFunc>, tx: Sender<SpawnFunc>,
rx: Receiver<SpawnFunc>, rx: Receiver<SpawnFunc>,
tasks: Tasks,
}
fn wake_task_by_id(id: usize) {
executor().execute(Box::new(move || {
let frontend = front_end().unwrap();
let frontend = frontend
.downcast_ref::<MuxServerFrontEnd>()
.expect("mux server");
frontend.tasks.poll_by_slot(id);
}));
} }
impl MuxServerFrontEnd { impl MuxServerFrontEnd {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))]
fn new(start_listener: bool) -> Result<Rc<dyn FrontEnd>, Error> { fn new(start_listener: bool) -> Result<Rc<dyn FrontEnd>, Error> {
let (tx, rx) = mpsc::channel(); let (tx, rx) = channel();
if start_listener { if start_listener {
spawn_listener()?; spawn_listener()?;
} }
Ok(Rc::new(Self { Ok(Rc::new(Self { tx, rx }))
tx,
rx,
tasks: Tasks::default(),
}))
} }
pub fn try_new() -> Result<Rc<dyn FrontEnd>, Error> { pub fn try_new() -> Result<Rc<dyn FrontEnd>, Error> {
@ -163,9 +55,18 @@ impl MuxServerFrontEnd {
Self::new(false) Self::new(false)
} }
pub fn spawn_task<F: std::future::Future<Output = ()> + 'static>(&self, future: F) { pub fn spawn_task<F: std::future::Future<Output = ()> + 'static>(
let id = self.tasks.add_task(Task(Box::pin(future))); &self,
wake_task_by_id(id); future: F,
) -> JoinHandle<(), ()> {
let tx = self.tx.clone();
let (task, handle) = async_task::spawn_local(
future,
move |task| tx.send(Box::new(move || task.run())).unwrap(),
(),
);
task.schedule();
handle
} }
} }