1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-24 22:01:47 +03:00

mux: remove parsing bottleneck

Test scenario is `cat`ing the public domain wiki text data from
http://cs.fit.edu/~mmahoney/compression/enwik8.zip in the terminal;
that file is 96M in size.

`time cat ~/Downloads/enwik8.wiki`

Prior to this change, depending on the OS, the time to cat the file
could be several minutes.

Digging in, the bottleneck appears to be that there isn't sufficient
parallelism between the reader and the parser, which means that the
rate of reading data is constrained by how long it takes to parse
and render a frame.

This commit switches away from using a synchronized vecdeque to
effectively create a non-blocking pipe, to "simply" using a socketpair
with a larger buffer size.

We now do blocking pty read -> write to socketpair in the reader
thread, and then read socketpair -> parse in the other.

The key difference between before and after being that the pty read
can continue to read and accumulate data (with an upper bound, so
that CTRL-C is still responsive) while we're parsing and rendering
a frame.

This increases the throughput for this scenario, bringing it down
from ~3:30 to ~17 seconds on this Ryzen 2700X system.

refs: https://github.com/wez/wezterm/issues/927
This commit is contained in:
Wez Furlong 2021-07-11 08:16:00 -07:00
parent be680955d7
commit edee1335c1

View File

@ -1,21 +1,21 @@
use crate::pane::{Pane, PaneId};
use crate::tab::{Tab, TabId};
use crate::window::{Window, WindowId};
use anyhow::{anyhow, Error};
use anyhow::{anyhow, Context, Error};
use config::{configuration, ExitBehavior};
use domain::{Domain, DomainId};
use filedescriptor::{socketpair, AsRawSocketDescriptor, FileDescriptor};
use log::error;
use metrics::histogram;
use portable_pty::ExitStatus;
use std::cell::{Ref, RefCell, RefMut};
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Read;
use std::io::{Read, Write};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use std::time::Instant;
use termwiz::escape::Action;
use thiserror::*;
@ -61,6 +61,8 @@ pub struct Mux {
banner: RefCell<Option<String>>,
}
const BUFSIZE: usize = 1024 * 1024;
/// This function bounces parsed actions over to the main thread to feed to
/// the pty in the mux.
/// It blocks until the mux has finished consuming the data, which provides
@ -91,62 +93,45 @@ fn send_actions_to_mux(pane_id: PaneId, dead: &Arc<AtomicBool>, actions: Vec<Act
histogram!("send_actions_to_mux.rate", 1.);
}
struct BufState {
queue: Mutex<VecDeque<u8>>,
cond: Condvar,
dead: Arc<AtomicBool>,
}
fn parse_buffered_data(pane_id: PaneId, dead: &Arc<AtomicBool>, mut rx: FileDescriptor) {
let mut buf = vec![0; BUFSIZE];
let mut parser = termwiz::escape::parser::Parser::new();
impl BufState {
fn write(&self, buf: &[u8]) {
let start = Instant::now();
let mut queue = self.queue.lock().unwrap();
queue.extend(buf);
histogram!("read_from_pane_pty.BufState.extend", start.elapsed());
self.cond.notify_one();
loop {
match rx.read(&mut buf) {
Ok(size) if size == 0 => {
dead.store(true, Ordering::Relaxed);
break;
}
Err(_) => {
dead.store(true, Ordering::Relaxed);
break;
}
Ok(size) => {
let mut actions = vec![];
parser.parse(&buf[0..size], |action| actions.push(action));
if !actions.is_empty() {
send_actions_to_mux(pane_id, dead, actions);
}
}
}
}
}
fn parse_buffered_data(pane_id: PaneId, state: &Arc<BufState>) {
let mut parser = termwiz::escape::parser::Parser::new();
let mut queue = state.queue.lock().unwrap();
loop {
if queue.is_empty() {
if state.dead.load(Ordering::Relaxed) {
return;
}
queue = state.cond.wait(queue).unwrap();
continue;
}
let mut actions = vec![];
let buf = queue.make_contiguous();
parser.parse(buf, |action| actions.push(action));
queue.truncate(0);
// Yield briefly to see if more data showed up and
// lump it together with what we've got
loop {
let wait_res = state
.cond
.wait_timeout(queue, Duration::from_millis(1))
.unwrap();
queue = wait_res.0;
if queue.is_empty() {
break;
}
let buf = queue.make_contiguous();
parser.parse(buf, |action| actions.push(action));
queue.truncate(0);
if !actions.is_empty() {
// Don't delay very long if we've got stuff to display!
break;
}
}
if !actions.is_empty() {
send_actions_to_mux(pane_id, &state.dead, actions);
fn set_socket_buffer(fd: &mut FileDescriptor, option: i32, size: usize) -> anyhow::Result<()> {
let socklen = std::mem::size_of_val(&size);
unsafe {
let res = libc::setsockopt(
fd.as_socket_descriptor(),
libc::SOL_SOCKET,
option,
&size as *const usize as *const _,
socklen as _,
);
if res == 0 {
Ok(())
} else {
Err(std::io::Error::last_os_error()).context("setsockopt")
}
}
}
@ -156,26 +141,23 @@ fn parse_buffered_data(pane_id: PaneId, state: &Arc<BufState>) {
/// all platforms and pty/tty types), parse the escape sequences and
/// relay the actions to the mux thread to apply them to the pane.
fn read_from_pane_pty(pane_id: PaneId, banner: Option<String>, mut reader: Box<dyn std::io::Read>) {
const BUFSIZE: usize = 4 * 1024;
let mut buf = [0; BUFSIZE];
let mut buf = vec![0; BUFSIZE];
// This is used to signal that an error occurred either in this thread,
// or in the main mux thread. If `true`, this thread will terminate.
let dead = Arc::new(AtomicBool::new(false));
let state = Arc::new(BufState {
queue: Mutex::new(VecDeque::new()),
cond: Condvar::new(),
dead: Arc::clone(&dead),
});
let (mut tx, mut rx) = socketpair().unwrap();
set_socket_buffer(&mut tx, libc::SO_SNDBUF, BUFSIZE).unwrap();
set_socket_buffer(&mut rx, libc::SO_RCVBUF, BUFSIZE).unwrap();
std::thread::spawn({
let state = Arc::clone(&state);
move || parse_buffered_data(pane_id, &state)
let dead = Arc::clone(&dead);
move || parse_buffered_data(pane_id, &dead, rx)
});
if let Some(banner) = banner {
state.write(banner.as_bytes());
tx.write_all(banner.as_bytes()).ok();
}
while !dead.load(Ordering::Relaxed) {
@ -190,7 +172,13 @@ fn read_from_pane_pty(pane_id: PaneId, banner: Option<String>, mut reader: Box<d
}
Ok(size) => {
histogram!("read_from_pane_pty.bytes.rate", size as f64);
state.write(&buf[..size]);
if let Err(err) = tx.write_all(&buf[..size]) {
error!(
"read_pty failed to write to parser: pane {} {:?}",
pane_id, err
);
break;
}
}
}
}