1
1
mirror of https://github.com/wez/wezterm.git synced 2024-11-22 22:42:48 +03:00

remove futurecore

This commit is contained in:
Wez Furlong 2019-03-04 22:32:29 +00:00
parent 12e93fce57
commit 4b4be6ac21
9 changed files with 41 additions and 364 deletions

View File

@ -9,7 +9,6 @@ bitflags = "~1.0"
euclid = "~0.19"
failure = "~0.1"
failure_derive = "~0.1"
futures = "~0.1"
gl = "~0.11"
libc = "~0.2"
palette = "~0.4"

View File

@ -5,10 +5,16 @@ use std::sync::{Arc, Condvar, Mutex};
type NextFunc<T> = SendBoxFnOnce<'static, (Result<T, Error>,)>;
pub type SpawnFunc = SendBoxFnOnce<'static, ()>;
pub trait Executor {
pub trait Executor: Sync + Send {
fn execute(&self, f: SpawnFunc);
}
impl Executor for Arc<Executor> {
fn execute(&self, f: SpawnFunc) {
Executor::execute(&**self, f)
}
}
/// An executor for spawning futures into the rayon global
/// thread pool
pub struct RayonExecutor {}

View File

@ -1,203 +0,0 @@
//! A fairly simple executor for futures that run on the GUI thread.
//! Ideally we'd use something like a tokio core for this, but that
//! would only work on X11 systems and we'd need to subvert more
//! of the winit event loop driving.
//! Instead we use winit Awakened events to fall out of the blocking
//! gui loop and tend to the queued futures.
//! The core dispatching portion of this code is derived from one
//! of the test cases in the futures-rs library which is licensed
//! under the MIT license and has this copyright:
//! Copyright (c) 2016 Alex Crichton
//! Copyright (c) 2017 The Tokio Authors
use failure::Error;
use futures::executor::{self, Notify, Spawn};
use futures::future::{ExecuteError, Executor};
use futures::{Async, Future};
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
const EXTERNAL_SPAWN: usize = !0usize;
pub trait CoreSender: Send {
fn send(&self, idx: usize) -> Result<(), Error>;
fn clone_sender(&self) -> Box<CoreSender>;
}
pub trait CoreReceiver {
fn try_recv(&self) -> Result<usize, mpsc::TryRecvError>;
}
pub struct Core {
tx: Box<CoreSender>,
rx: Box<CoreReceiver>,
notify: Arc<Notifier>,
// Slab of running futures used to track what's running and what slots are
// empty. Slot indexes are then sent along tx/rx above to indicate which
// future is ready to get polled.
tasks: RefCell<Vec<Slot>>,
next_vacant: Cell<usize>,
external_spawn: Arc<SpawningFromAnotherThread>,
}
#[derive(Default)]
struct SpawningFromAnotherThread {
futures: Mutex<VecDeque<Box<Future<Item = (), Error = ()> + Send>>>,
}
pub struct Spawner {
tx: Box<CoreSender>,
external_spawn: Arc<SpawningFromAnotherThread>,
}
impl Spawner {
pub fn spawn(&self, future: Box<dyn Future<Item = (), Error = ()> + Send>) {
let mut futures = self.external_spawn.futures.lock().unwrap();
futures.push_back(future);
self.tx.send(EXTERNAL_SPAWN).unwrap();
}
}
impl Clone for Spawner {
fn clone(&self) -> Spawner {
Spawner {
tx: self.tx.clone_sender(),
external_spawn: Arc::clone(&self.external_spawn),
}
}
}
enum Slot {
Vacant { next_vacant: usize },
RunningFuture(Option<Spawn<Box<Future<Item = (), Error = ()>>>>),
}
impl Core {
pub fn new(tx: Box<CoreSender>, rx: Box<CoreReceiver>) -> Self {
let tx2 = tx.clone_sender();
Self {
notify: Arc::new(Notifier {
tx: Mutex::new(tx2),
}),
tx,
rx,
next_vacant: Cell::new(0),
tasks: RefCell::new(Vec::new()),
external_spawn: Arc::new(SpawningFromAnotherThread::default()),
}
}
pub fn get_spawner(&self) -> Spawner {
Spawner {
tx: self.tx.clone_sender(),
external_spawn: Arc::clone(&self.external_spawn),
}
}
/// Spawn a future to be executed by a future call to `turn`.
/// The future `f` provided will not be executed until the
/// `turn` method is called.
pub fn spawn<F>(&self, f: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
self.spawn_impl(Box::new(f), true);
}
fn spawn_impl(&self, f: Box<Future<Item = (), Error = ()> + 'static>, do_tx: bool) -> usize {
let idx = self.next_vacant.get();
let mut tasks = self.tasks.borrow_mut();
match tasks.get_mut(idx) {
Some(&mut Slot::Vacant { next_vacant }) => {
self.next_vacant.set(next_vacant);
}
Some(&mut Slot::RunningFuture(_)) => panic!("vacant points to running future"),
None => {
assert_eq!(idx, tasks.len());
tasks.push(Slot::Vacant { next_vacant: 0 });
self.next_vacant.set(idx + 1);
}
}
tasks[idx] = Slot::RunningFuture(Some(executor::spawn(Box::new(f))));
if do_tx {
self.tx.send(idx).unwrap();
}
idx
}
fn spawn_external(&self) -> Option<usize> {
let mut futures = self.external_spawn.futures.lock().unwrap();
if let Some(future) = futures.pop_front() {
Some(self.spawn_impl(future, false))
} else {
None
}
}
/// "Turns" this event loop one tick.
/// Does not block.
/// Returns `false` if there were no futures in a known-ready state.
pub fn turn(&self) -> bool {
let task_id = match self.rx.try_recv() {
Ok(task_id) if task_id == EXTERNAL_SPAWN => match self.spawn_external() {
Some(task_id) => task_id,
_ => return false,
},
Ok(task_id) => task_id,
Err(mpsc::TryRecvError::Empty) => return false,
Err(mpsc::TryRecvError::Disconnected) => panic!("futurecore rx Disconnected"),
};
// This may be a spurious wakeup so we're not guaranteed to have a
// future associated with `task_id`, so do a fallible lookup.
//
// Note that we don't want to borrow `self.tasks` for too long so we
// try to extract the future here and leave behind a tombstone future
// which'll get replaced or removed later. This is how we support
// spawn-in-run.
let mut future = match self.tasks.borrow_mut().get_mut(task_id) {
Some(&mut Slot::RunningFuture(ref mut future)) => future.take().unwrap(),
Some(&mut Slot::Vacant { .. }) | None => return false,
};
// Drive this future forward. If it's done we remove it and if it's not
// done then we put it back in the tasks array.
let done = match future.poll_future_notify(&self.notify, task_id) {
Ok(Async::Ready(())) | Err(()) => true,
Ok(Async::NotReady) => false,
};
let mut tasks = self.tasks.borrow_mut();
if done {
tasks[task_id] = Slot::Vacant {
next_vacant: self.next_vacant.get(),
};
self.next_vacant.set(task_id);
} else {
tasks[task_id] = Slot::RunningFuture(Some(future));
}
true
}
}
impl<F> Executor<F> for Core
where
F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(future);
Ok(())
}
}
struct Notifier {
tx: Mutex<Box<CoreSender>>,
}
impl Notify for Notifier {
fn notify(&self, id: usize) {
self.tx.lock().unwrap().send(id).ok();
}
}

View File

@ -30,7 +30,6 @@ struct Host {
/// fullscreen mode.
is_fullscreen: Option<LogicalPosition>,
config: Arc<Config>,
fonts: Rc<FontConfiguration>,
}
impl HostHelper for Host {
@ -235,7 +234,6 @@ impl GliumTerminalWindow {
window_position,
is_fullscreen: None,
config: Arc::clone(config),
fonts: Rc::clone(fonts),
});
host.display.gl_window().set_cursor(MouseCursor::Text);

View File

@ -1,16 +1,14 @@
use super::GuiSystem;
use crate::config::Config;
use crate::font::{FontConfiguration, FontSystemSelection};
use crate::futurecore;
use crate::gliumwindows;
pub use crate::gliumwindows::GliumTerminalWindow;
use crate::guicommon::tabs::Tab;
use crate::guicommon::window::TerminalWindow;
use crate::guiloop::SessionTerminated;
use crate::mux::{Mux, PtyEvent, PtyEventSender};
use crate::mux::Mux;
use crate::spawn_tab;
use failure::Error;
use futures::future;
use glium;
use glium::glutin::EventsLoopProxy;
pub use glium::glutin::WindowId;
@ -40,27 +38,6 @@ impl<T: Send> GuiSender<T> {
}
}
impl futurecore::CoreSender for GuiSender<usize> {
fn send(&self, idx: usize) -> Result<(), Error> {
GuiSender::send(self, idx)
}
fn clone_sender(&self) -> Box<futurecore::CoreSender> {
Box::new(GuiSender::clone(self))
}
}
impl PtyEventSender for GuiSender<PtyEvent> {
fn send(&self, event: PtyEvent) -> Result<(), Error> {
GuiSender::send(self, event)
}
}
impl futurecore::CoreReceiver for Receiver<usize> {
fn try_recv(&self) -> Result<usize, mpsc::TryRecvError> {
Receiver::try_recv(self)
}
}
pub fn channel<T: Send>(proxy: EventsLoopProxy) -> (GuiSender<T>, Receiver<T>) {
// Set an upper bound on the number of items in the queue, so that
// we don't swamp the gui loop; this puts back pressure on the
@ -92,9 +69,6 @@ struct Windows {
pub struct GuiEventLoop {
pub event_loop: RefCell<glium::glutin::EventsLoop>,
windows: Rc<RefCell<Windows>>,
core: futurecore::Core,
poll_tx: GuiSender<PtyEvent>,
poll_rx: Receiver<PtyEvent>,
gui_tx: Arc<GuiSender<SpawnFunc>>,
gui_rx: Receiver<SpawnFunc>,
tick_rx: Receiver<()>,
@ -118,24 +92,9 @@ impl GlutinGuiSystem {
GLUTIN_EVENT_LOOP.with(|f| *f.borrow_mut() = Some(Rc::clone(&event_loop)));
Ok(Rc::new(Self { event_loop }))
}
/// Loop through the core and dispatch any tasks that have been
/// notified as ready to run. Returns once all such tasks have
/// been polled and there are no more pending task notifications.
fn process_futures(&self) {
loop {
if !self.event_loop.core.turn() {
break;
}
}
}
}
impl GuiSystem for GlutinGuiSystem {
fn pty_sender(&self) -> Box<PtyEventSender> {
Box::new(self.event_loop.poll_tx.clone())
}
fn gui_executor(&self) -> Arc<Executor + Sync + Send> {
self.event_loop.gui_executor()
}
@ -145,8 +104,6 @@ impl GuiSystem for GlutinGuiSystem {
// https://github.com/tomaka/winit/issues/413
let myself = &self.event_loop;
loop {
self.process_futures();
// Check the window count; if after processing the futures there
// are no windows left, then we are done.
{
@ -158,7 +115,6 @@ impl GuiSystem for GlutinGuiSystem {
}
myself.run_event_loop()?;
myself.process_poll()?;
myself.process_gui_exec()?;
myself.process_tick()?;
}
@ -180,11 +136,6 @@ impl GuiEventLoop {
pub fn new(mux: &Rc<Mux>) -> Result<Self, Error> {
let event_loop = glium::glutin::EventsLoop::new();
let (fut_tx, fut_rx) = channel(event_loop.create_proxy());
let core = futurecore::Core::new(Box::new(fut_tx), Box::new(fut_rx));
mux.set_spawner(core.get_spawner());
let (poll_tx, poll_rx) = channel(event_loop.create_proxy());
let (gui_tx, gui_rx) = channel(event_loop.create_proxy());
// The glutin/glium plumbing has no native tick/timer stuff, so
@ -198,9 +149,6 @@ impl GuiEventLoop {
});
Ok(Self {
core,
poll_tx,
poll_rx,
gui_rx,
gui_tx: Arc::new(gui_tx),
tick_rx,
@ -227,8 +175,7 @@ impl GuiEventLoop {
}
pub fn register_tab(&self, tab: &Rc<Tab>) -> Result<(), Error> {
self.mux
.add_tab(self.gui_executor(), Box::new(self.poll_tx.clone()), tab)
self.mux.add_tab(self.gui_executor(), tab)
}
fn do_spawn_new_window(
@ -237,8 +184,7 @@ impl GuiEventLoop {
fonts: &Rc<FontConfiguration>,
) -> Result<(), Error> {
let tab = spawn_tab(&config, None)?;
let sender = Box::new(self.poll_tx.clone());
self.mux.add_tab(self.gui_executor(), sender, &tab)?;
self.mux.add_tab(self.gui_executor(), &tab)?;
let events = Self::get().expect("to be called on gui thread");
let window = GliumTerminalWindow::new(&events, &fonts, &config, &tab)?;
@ -349,29 +295,6 @@ impl GuiEventLoop {
}
}
/// Process events on poll_rx. We may have a pty
/// event or our interval timer may have expired, indicating that
/// we need to paint.
fn process_poll(&self) -> Result<(), Error> {
let start = SystemTime::now();
loop {
match start.elapsed() {
Ok(elapsed) if elapsed > MAX_POLL_LOOP_DURATION => {
return Ok(());
}
Err(_) => {
return Ok(());
}
_ => {}
}
match self.poll_rx.try_recv() {
Ok(event) => self.mux.process_pty_event(event)?,
Err(TryRecvError::Empty) => return Ok(()),
Err(err) => bail!("poll_rx disconnected {:?}", err),
}
}
}
fn process_gui_exec(&self) -> Result<(), Error> {
let start = SystemTime::now();
loop {

View File

@ -2,7 +2,7 @@ use super::ExitStatus;
use crate::config::Config;
use crate::font::FontConfiguration;
use crate::guicommon::tabs::Tab;
use crate::mux::{Mux, PtyEventSender};
use crate::mux::Mux;
use failure::Error;
use promise::Executor;
use std::cell::RefCell;
@ -95,8 +95,6 @@ pub trait GuiSystem {
tab: &Rc<Tab>,
) -> Result<(), Error>;
fn pty_sender(&self) -> Box<PtyEventSender>;
fn gui_executor(&self) -> Arc<Executor + Sync + Send>;
}

View File

@ -156,7 +156,7 @@ impl GuiEventLoop {
}
pub fn register_tab(&self, tab: &Rc<Tab>) -> Result<(), Error> {
self.mux.add_tab(Box::new(self.pty_tx.clone()), tab)
self.mux.add_tab(self.gui_executor(), tab)
}
fn run(&self) -> Result<(), Error> {
@ -241,8 +241,7 @@ impl GuiEventLoop {
fonts: &Rc<FontConfiguration>,
) -> Result<(), Error> {
let tab = spawn_tab(&config, None)?;
let sender = Box::new(events.pty_tx.clone());
events.mux.add_tab(sender, &tab)?;
events.mux.add_tab(self.gui_executor(), &tab)?;
let window = X11TerminalWindow::new(&events, &fonts, &config, &tab)?;
events.add_window(window)

View File

@ -17,7 +17,6 @@ use std::rc::Rc;
use std::sync::Arc;
mod config;
mod futurecore;
mod gliumwindows;
mod guicommon;
mod guiloop;
@ -173,7 +172,7 @@ fn spawn_window(
fontconfig: &Rc<FontConfiguration>,
) -> Result<(), Error> {
let tab = spawn_tab(config, cmd)?;
mux.add_tab(gui.gui_executor(), gui.pty_sender(), &tab)?;
mux.add_tab(gui.gui_executor(), &tab)?;
gui.spawn_new_window(config, &fontconfig, &tab)
}

View File

@ -1,7 +1,6 @@
use crate::futurecore::Spawner;
use crate::guicommon::tabs::{Tab, TabId};
use failure::Error;
use promise::Executor;
use promise::{Executor, Future};
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Read;
@ -16,53 +15,44 @@ pub mod renderable;
#[derive(Default)]
pub struct Mux {
tabs: RefCell<HashMap<TabId, Rc<Tab>>>,
spawner: RefCell<Option<Spawner>>,
}
#[derive(Clone)]
pub enum PtyEvent {
Data { tab_id: TabId, data: Vec<u8> },
Terminated { tab_id: TabId },
}
pub trait PtyEventSender: Send {
fn send(&self, event: PtyEvent) -> Result<(), Error>;
}
fn read_from_tab_pty(
executor: Arc<Executor>,
spawner: Spawner,
sender: Box<PtyEventSender>,
tab_id: TabId,
mut reader: Box<std::io::Read>,
) {
fn read_from_tab_pty(executor: Arc<Executor>, tab_id: TabId, mut reader: Box<std::io::Read>) {
const BUFSIZE: usize = 32 * 1024;
let mut buf = [0; BUFSIZE];
loop {
match reader.read(&mut buf) {
Ok(size) if size == 0 => {
eprintln!("read_pty EOF: tab_id {}", tab_id);
sender.send(PtyEvent::Terminated { tab_id }).ok();
Future::with_executor(Arc::clone(&executor), move || {
let mux = Mux::get().unwrap();
mux.remove_tab(tab_id);
Ok(())
});
return;
}
Ok(size) => {
spawner.spawn(Box::new(futures::future::lazy(|| {
eprintln!("I was spawned from a pty thread");
futures::future::ok(())
})));
if sender
.send(PtyEvent::Data {
tab_id,
data: buf[0..size].to_vec(),
})
.is_err()
{
return;
}
let data = buf[0..size].to_vec();
Future::with_executor(Arc::clone(&executor), move || {
let mux = Mux::get().unwrap();
if let Some(tab) = mux.get_tab(tab_id) {
tab.advance_bytes(
&data,
&mut Host {
writer: &mut *tab.writer(),
},
);
}
Ok(())
});
}
Err(err) => {
eprintln!("read_pty failed: tab {} {:?}", tab_id, err);
sender.send(PtyEvent::Terminated { tab_id }).ok();
Future::with_executor(Arc::clone(&executor), move || {
let mux = Mux::get().unwrap();
mux.remove_tab(tab_id);
Ok(())
});
return;
}
}
@ -106,10 +96,6 @@ thread_local! {
}
impl Mux {
pub fn set_spawner(&self, spawner: Spawner) {
*self.spawner.borrow_mut() = Some(spawner);
}
pub fn set_mux(mux: &Rc<Mux>) {
MUX.with(|m| {
*m.borrow_mut() = Some(Rc::clone(mux));
@ -130,18 +116,12 @@ impl Mux {
self.tabs.borrow().get(&tab_id).map(Rc::clone)
}
pub fn add_tab(
&self,
executor: Arc<Executor + Send + Sync>,
sender: Box<PtyEventSender>,
tab: &Rc<Tab>,
) -> Result<(), Error> {
pub fn add_tab(&self, executor: Arc<Executor>, tab: &Rc<Tab>) -> Result<(), Error> {
self.tabs.borrow_mut().insert(tab.tab_id(), Rc::clone(tab));
let reader = tab.reader()?;
let tab_id = tab.tab_id();
let spawner = self.spawner.borrow().as_ref().unwrap().clone();
thread::spawn(move || read_from_tab_pty(executor, spawner, sender, tab_id, reader));
thread::spawn(move || read_from_tab_pty(executor, tab_id, reader));
Ok(())
}
@ -151,28 +131,6 @@ impl Mux {
self.tabs.borrow_mut().remove(&tab_id);
}
pub fn process_pty_event(&self, event: PtyEvent) -> Result<(), Error> {
match event {
PtyEvent::Data { tab_id, data } => {
if let Some(tab) = self.get_tab(tab_id) {
tab.advance_bytes(
&data,
&mut Host {
writer: &mut *tab.writer(),
},
);
}
}
PtyEvent::Terminated { tab_id } => {
// The fact that we woke up is enough to trigger each
// window to check for termination
eprintln!("tab {} terminated", tab_id);
self.remove_tab(tab_id);
}
}
Ok(())
}
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.tabs.borrow().is_empty()