1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-24 13:52:55 +03:00

mux: re-jigger pty output processing

This removes the ratelimiter from the mux pty output reader.
Instead, we now have two reader threads:

* One to perform blocking reads from the pty output and send them
  to the other thread
* The other thread waits for data from the first, then tries to find
  a newline character so that it can send 1+ lines of data to the
  terminal parser.  If it doesn't find any lines, it waits ~50ms for
  additional data from the first thread to bundle together eg:
  really long lines, or image protocol data.  It will keep doing this
  until no more data arrives within 50ms or until it finds a newline.
  Once no more data arrives within 50ms, it sends whatever it has
  accumulated and then blocks waiting for the next chunk

I tried a quick ctrl-c test with this; running `find /` and seeing
how easily interruptible it is, and it seems OK on my M1 mac.
I don't think we need the output rate limiter any more, but I'll
try this out on my bigger linux machine as well to see if that
feels as good.

With this change, `cat test-data/emoji-test.txt` no longer has wonky
spacing when it gets to the England flag at the bottom.

refs: https://github.com/wez/wezterm/issues/339
This commit is contained in:
Wez Furlong 2020-12-10 23:41:52 -08:00
parent 77d560029c
commit ab342d9c46

View File

@ -5,12 +5,12 @@ use anyhow::{anyhow, Error};
use domain::{Domain, DomainId};
use log::error;
use portable_pty::ExitStatus;
use ratelim::RateLimiter;
use std::cell::{Ref, RefCell, RefMut};
use std::collections::HashMap;
use std::io::Read;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread;
use thiserror::*;
@ -47,36 +47,7 @@ pub struct Mux {
subscribers: RefCell<HashMap<usize, Box<dyn Fn(MuxNotification) -> bool>>>,
}
fn read_from_pane_pty(pane_id: PaneId, mut reader: Box<dyn std::io::Read>) {
const BUFSIZE: usize = 32 * 1024;
let mut buf = [0; BUFSIZE];
let mut lim = RateLimiter::new(|config| config.ratelimit_output_bytes_per_second);
let dead = Arc::new(AtomicBool::new(false));
'outer: while !dead.load(Ordering::Relaxed) {
match reader.read(&mut buf) {
Ok(size) if size == 0 => {
error!("read_pty EOF: pane_id {}", pane_id);
break;
}
Err(err) => {
error!("read_pty failed: pane {} {:?}", pane_id, err);
break;
}
Ok(size) => {
let buf = &buf[..size];
let mut pos = 0;
while pos < size {
if dead.load(Ordering::Relaxed) {
break 'outer;
}
match lim.admit_check((size - pos) as u32) {
Ok(len) => {
let len = len as usize;
let data = buf[pos..pos + len].to_vec();
pos += len;
fn send_to_mux(pane_id: PaneId, dead: &Arc<AtomicBool>, data: Vec<u8>) {
promise::spawn::spawn_into_main_thread_with_low_priority({
let dead = Arc::clone(&dead);
async move {
@ -94,13 +65,39 @@ fn read_from_pane_pty(pane_id: PaneId, mut reader: Box<dyn std::io::Read>) {
})
.detach();
}
Err(delay) => {
log::trace!("RateLimiter: sleep for {:?}", delay);
std::thread::sleep(delay);
}
}
fn accumulator(pane_id: PaneId, dead: &Arc<AtomicBool>, rx: Receiver<Vec<u8>>) {
let mut buf = vec![];
'outer: while let Ok(mut data) = rx.recv() {
buf.append(&mut data);
while !buf.is_empty() {
if let Some(idx) = buf.iter().rposition(|&b| b == b'\n') {
let mut split = buf.split_off(idx + 1);
std::mem::swap(&mut split, &mut buf);
send_to_mux(pane_id, &dead, split);
}
match rx.recv_timeout(std::time::Duration::from_millis(50)) {
Ok(mut extra) => {
buf.append(&mut extra);
}
Err(RecvTimeoutError::Timeout) => {
// No more data to read right now, so pass whatever
// we have pending on to the mux thread and then block
// waiting for the next.
let mut to_send = vec![];
std::mem::swap(&mut to_send, &mut buf);
send_to_mux(pane_id, &dead, to_send);
break;
}
Err(RecvTimeoutError::Disconnected) => break 'outer,
}
}
if dead.load(Ordering::Relaxed) {
break;
}
}
promise::spawn::spawn_into_main_thread(async move {
@ -110,6 +107,41 @@ fn read_from_pane_pty(pane_id: PaneId, mut reader: Box<dyn std::io::Read>) {
.detach();
}
fn read_from_pane_pty(pane_id: PaneId, mut reader: Box<dyn std::io::Read>) {
const BUFSIZE: usize = 32 * 1024;
let mut buf = [0; BUFSIZE];
let dead = Arc::new(AtomicBool::new(false));
let (tx, rx) = channel();
std::thread::spawn({
let dead = Arc::clone(&dead);
move || {
accumulator(pane_id, &dead, rx);
}
});
while !dead.load(Ordering::Relaxed) {
match reader.read(&mut buf) {
Ok(size) if size == 0 => {
error!("read_pty EOF: pane_id {}", pane_id);
break;
}
Err(err) => {
error!("read_pty failed: pane {} {:?}", pane_id, err);
break;
}
Ok(size) => {
let buf = &buf[..size];
if tx.send(buf.to_vec()).is_err() {
break;
}
}
}
}
dead.store(true, Ordering::Relaxed);
}
thread_local! {
static MUX: RefCell<Option<Rc<Mux>>> = RefCell::new(None);
}