From ab342d9c461824894aa768e30ff6d7831b0e4b04 Mon Sep 17 00:00:00 2001 From: Wez Furlong Date: Thu, 10 Dec 2020 23:41:52 -0800 Subject: [PATCH] mux: re-jigger pty output processing This removes the ratelimiter from the mux pty output reader. Instead, we now have two reader threads: * One to perform blocking reads from the pty output and send them to the other thread * The other thread waits for data from the first, then tries to find a newline character so that it can send 1+ lines of data to the terminal parser. If it doesn't find any lines, it waits ~50ms for additional data from the first thread to bundle together eg: really long lines, or image protocol data. It will keep doing this until no more data arrives within 50ms or until it finds a newline. Once no more data arrives within 50ms, it sends whatever it has accumulated and then blocks waiting for the next chunk I tried a quick ctrl-c test with this; running `find /` and seeing how easily interruptible it is, and it seems OK on my M1 mac. I don't think we need the output rate limiter any more, but I'll try this out on my bigger linux machine as well to see if that feels as good. With this change, `cat test-data/emoji-test.txt` no longer has wonky spacing when it gets to the England flag at the bottom. refs: https://github.com/wez/wezterm/issues/339 --- mux/src/lib.rs | 116 +++++++++++++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 42 deletions(-) diff --git a/mux/src/lib.rs b/mux/src/lib.rs index 5898d0f95..93ecee97b 100644 --- a/mux/src/lib.rs +++ b/mux/src/lib.rs @@ -5,12 +5,12 @@ use anyhow::{anyhow, Error}; use domain::{Domain, DomainId}; use log::error; use portable_pty::ExitStatus; -use ratelim::RateLimiter; use std::cell::{Ref, RefCell, RefMut}; use std::collections::HashMap; use std::io::Read; use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread; use thiserror::*; @@ -47,14 +47,80 @@ pub struct Mux { subscribers: RefCell bool>>>, } +fn send_to_mux(pane_id: PaneId, dead: &Arc, data: Vec) { + promise::spawn::spawn_into_main_thread_with_low_priority({ + let dead = Arc::clone(&dead); + async move { + let mux = Mux::get().unwrap(); + if let Some(pane) = mux.get_pane(pane_id) { + pane.advance_bytes(&data); + mux.notify(MuxNotification::PaneOutput(pane_id)); + } else { + // Something else removed the pane from + // the mux, so we should stop trying to + // process it. + dead.store(true, Ordering::Relaxed); + } + } + }) + .detach(); +} + +fn accumulator(pane_id: PaneId, dead: &Arc, rx: Receiver>) { + let mut buf = vec![]; + + 'outer: while let Ok(mut data) = rx.recv() { + buf.append(&mut data); + + while !buf.is_empty() { + if let Some(idx) = buf.iter().rposition(|&b| b == b'\n') { + let mut split = buf.split_off(idx + 1); + std::mem::swap(&mut split, &mut buf); + send_to_mux(pane_id, &dead, split); + } + + match rx.recv_timeout(std::time::Duration::from_millis(50)) { + Ok(mut extra) => { + buf.append(&mut extra); + } + Err(RecvTimeoutError::Timeout) => { + // No more data to read right now, so pass whatever + // we have pending on to the mux thread and then block + // waiting for the next. + let mut to_send = vec![]; + std::mem::swap(&mut to_send, &mut buf); + send_to_mux(pane_id, &dead, to_send); + break; + } + Err(RecvTimeoutError::Disconnected) => break 'outer, + } + } + + if dead.load(Ordering::Relaxed) { + break; + } + } + promise::spawn::spawn_into_main_thread(async move { + let mux = Mux::get().unwrap(); + mux.remove_pane(pane_id); + }) + .detach(); +} + fn read_from_pane_pty(pane_id: PaneId, mut reader: Box) { const BUFSIZE: usize = 32 * 1024; let mut buf = [0; BUFSIZE]; - - let mut lim = RateLimiter::new(|config| config.ratelimit_output_bytes_per_second); let dead = Arc::new(AtomicBool::new(false)); - 'outer: while !dead.load(Ordering::Relaxed) { + let (tx, rx) = channel(); + std::thread::spawn({ + let dead = Arc::clone(&dead); + move || { + accumulator(pane_id, &dead, rx); + } + }); + + while !dead.load(Ordering::Relaxed) { match reader.read(&mut buf) { Ok(size) if size == 0 => { error!("read_pty EOF: pane_id {}", pane_id); @@ -66,48 +132,14 @@ fn read_from_pane_pty(pane_id: PaneId, mut reader: Box) { } Ok(size) => { let buf = &buf[..size]; - let mut pos = 0; - - while pos < size { - if dead.load(Ordering::Relaxed) { - break 'outer; - } - match lim.admit_check((size - pos) as u32) { - Ok(len) => { - let len = len as usize; - let data = buf[pos..pos + len].to_vec(); - pos += len; - promise::spawn::spawn_into_main_thread_with_low_priority({ - let dead = Arc::clone(&dead); - async move { - let mux = Mux::get().unwrap(); - if let Some(pane) = mux.get_pane(pane_id) { - pane.advance_bytes(&data); - mux.notify(MuxNotification::PaneOutput(pane_id)); - } else { - // Something else removed the pane from - // the mux, so we should stop trying to - // process it. - dead.store(true, Ordering::Relaxed); - } - } - }) - .detach(); - } - Err(delay) => { - log::trace!("RateLimiter: sleep for {:?}", delay); - std::thread::sleep(delay); - } - } + if tx.send(buf.to_vec()).is_err() { + break; } } } } - promise::spawn::spawn_into_main_thread(async move { - let mux = Mux::get().unwrap(); - mux.remove_pane(pane_id); - }) - .detach(); + + dead.store(true, Ordering::Relaxed); } thread_local! {