1
1
mirror of https://github.com/wez/wezterm.git synced 2024-11-23 15:04:36 +03:00

fix an issue with rate limiting data from the child pty

We need to chunk the data that we read from the child otherwise
we may potentially try to admit more data in a single action
than the ratelimiter will ever allow (eg: if we read 4k of data
and the limit is 100 bytes per second, we can never send that
4k of data in a single write).

Our handling of that situation was not good: we'd panic and kill
the background thread that was reading the data, but the rest
of the app was still running.

This commit upgrades to the most recent rate limiter crate
and performs explicit chunking of the output so that we
behave more sanely.

Refs: https://github.com/wez/wezterm/issues/65
This commit is contained in:
Wez Furlong 2019-11-21 08:36:16 -08:00
parent 371e07838d
commit ac028da1b6
3 changed files with 43 additions and 28 deletions

View File

@ -36,7 +36,7 @@ native-tls = "0.2"
palette = "0.4"
portable-pty = { path = "pty", features = ["serde_support", "ssh"]}
promise = { path = "promise" }
ratelimit_meter = "4.1"
ratelimit_meter = "5.0"
rayon = "1.0"
serde = {version="1.0", features = ["rc"]}
serde_derive = "1.0"

View File

@ -66,32 +66,34 @@ fn read_from_tab_pty(config: Arc<Config>, tab_id: TabId, mut reader: Box<dyn std
break;
}
Ok(size) => {
lim.blocking_admittance_check(size as u32);
let data = buf[0..size].to_vec();
/*
match std::str::from_utf8(&data) {
Ok(s) => {
let chars: Vec<u32> = s.chars().map(|c| c as u32).collect();
error!("read chars: {:x?}", chars);
}
Err(e) => {
error!("couldn't convert to string: {:?}", e);
for chunk in buf[..size].chunks(lim.capacity_per_second()) {
lim.blocking_admittance_check(chunk.len() as u32);
let data = chunk.to_vec();
/*
match std::str::from_utf8(&data) {
Ok(s) => {
let chars: Vec<u32> = s.chars().map(|c| c as u32).collect();
error!("read chars: {:x?}", chars);
}
Err(e) => {
error!("couldn't convert to string: {:?}", e);
}
}
*/
Future::with_executor(executor(), move || {
let mux = Mux::get().unwrap();
if let Some(tab) = mux.get_tab(tab_id) {
tab.advance_bytes(
&data,
&mut Host {
writer: &mut *tab.writer(),
},
);
mux.notify(MuxNotification::TabOutput(tab_id));
}
Ok(())
});
}
*/
Future::with_executor(executor(), move || {
let mux = Mux::get().unwrap();
if let Some(tab) = mux.get_tab(tab_id) {
tab.advance_bytes(
&data,
&mut Host {
writer: &mut *tab.writer(),
},
);
mux.notify(MuxNotification::TabOutput(tab_id));
}
Ok(())
});
}
}
}

View File

@ -1,8 +1,9 @@
use ratelimit_meter::algorithms::NonConformanceExt;
use ratelimit_meter::algorithms::NonConformance;
use ratelimit_meter::{DirectRateLimiter, LeakyBucket, NegativeMultiDecision};
pub struct RateLimiter {
lim: DirectRateLimiter<LeakyBucket>,
capacity_per_second: u32,
}
impl RateLimiter {
@ -12,6 +13,7 @@ impl RateLimiter {
std::num::NonZeroU32::new(capacity_per_second)
.expect("RateLimiter capacity to be non-zero"),
),
capacity_per_second,
}
}
@ -19,16 +21,27 @@ impl RateLimiter {
self.lim.check_n(amount).is_ok()
}
pub fn capacity_per_second(&self) -> usize {
self.capacity_per_second as usize
}
pub fn blocking_admittance_check(&mut self, amount: u32) {
loop {
match self.lim.check_n(amount) {
Ok(_) => return,
Err(NegativeMultiDecision::BatchNonConforming(_, over)) => {
let duration = over.wait_time();
let duration = over.wait_time_from(std::time::Instant::now());
log::trace!("RateLimiter: sleep for {:?}", duration);
std::thread::sleep(duration);
}
Err(err) => panic!("{}", err),
Err(NegativeMultiDecision::InsufficientCapacity(n)) => {
panic!(
"Programmer Error: you need to chunk the input \
because you're trying to admit {} items at once \
and this exceeds the maximum limit of {} per second",
n, self.capacity_per_second
);
}
}
}
}