From 9a4ae0a06b135712d38987a09cdede69df94d66e Mon Sep 17 00:00:00 2001 From: Wez Furlong Date: Fri, 21 Jun 2019 07:16:37 -0700 Subject: [PATCH] ratelimit child process output In the early days we relied upon the bounded length of a sync channel to put back pressure on the output from a child command. We're no longer using that kind of channel, so here's a more deliberate and measurable rate limiting implementation. The `ratelimit_output_bytes_per_second` configuration setting defaults to 2MB/s and constrains the amount of text we send to the escape sequence parser. This value was selected based on it being a combination of responsive to ctrl-c while outputing a lot of data and still generating sleeps to remain within the constraints. This does mean that terminal benchmarks that test how quickly you can dump text to the terminal will hit this artifical upper limit and are thus not going to be a true measure of performance. --- Cargo.toml | 1 + src/config.rs | 9 +++++++++ src/main.rs | 1 + src/mux/mod.rs | 14 ++++++++++++-- src/ratelim.rs | 31 +++++++++++++++++++++++++++++++ 5 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 src/ratelim.rs diff --git a/Cargo.toml b/Cargo.toml index 5a3401d54..a16a731b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ native-tls = "0.2" palette = "0.4" portable-pty = { path = "pty", features = ["serde_support"]} promise = { path = "promise" } +ratelimit_meter = "4.1" rayon = "1.0" serde = {version="1.0", features = ["rc"]} serde_derive = "1.0" diff --git a/src/config.rs b/src/config.rs index 91816b9fd..bba36c55c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -131,6 +131,14 @@ pub struct Config { /// not normally need to override this value. pub mux_client_expected_cn: Option, + /// Constrains the rate at which output from a child command is + /// processed and applied to the terminal model. + /// This acts as a brake in the case of a command spewing a + /// ton of output and allows for the UI to remain responsive + /// so that you can hit CTRL-C to interrupt it if desired. + /// The default value is 2MB/s. + pub ratelimit_output_bytes_per_second: Option, + #[serde(default)] pub keys: Vec, } @@ -394,6 +402,7 @@ impl Default for Config { mux_client_pem_ca: None, mux_client_accept_invalid_hostnames: None, mux_client_expected_cn: None, + ratelimit_output_bytes_per_second: None, mux_pem_root_certs: None, keys: vec![], } diff --git a/src/main.rs b/src/main.rs index 618f13406..3b7ef6d3f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ mod config; mod frontend; mod mux; mod opengl; +mod ratelim; mod server; use crate::frontend::FrontEndSelection; use crate::mux::domain::{Domain, LocalDomain}; diff --git a/src/mux/mod.rs b/src/mux/mod.rs index f8ecdaa16..a01528fc4 100644 --- a/src/mux/mod.rs +++ b/src/mux/mod.rs @@ -2,6 +2,7 @@ use crate::config::Config; use crate::frontend::gui_executor; use crate::mux::tab::{Tab, TabId}; use crate::mux::window::{Window, WindowId}; +use crate::ratelim::RateLimiter; use crate::server::pollable::{pollable_channel, PollableReceiver, PollableSender}; use domain::{Domain, DomainId}; use failure::{bail, format_err, Error, Fallible}; @@ -43,10 +44,17 @@ pub struct Mux { subscribers: RefCell>>, } -fn read_from_tab_pty(tab_id: TabId, mut reader: Box) { +fn read_from_tab_pty(config: Arc, tab_id: TabId, mut reader: Box) { let executor = gui_executor().expect("gui_executor was not registered yet!?"); const BUFSIZE: usize = 32 * 1024; let mut buf = [0; BUFSIZE]; + + let mut lim = RateLimiter::new( + config + .ratelimit_output_bytes_per_second + .unwrap_or(2 * 1024 * 1024), + ); + loop { match reader.read(&mut buf) { Ok(size) if size == 0 => { @@ -58,6 +66,7 @@ fn read_from_tab_pty(tab_id: TabId, mut reader: Box) { break; } Ok(size) => { + lim.blocking_admittance_check(size as u32); let data = buf[0..size].to_vec(); Future::with_executor(executor.clone_executor(), move || { let mux = Mux::get().unwrap(); @@ -184,7 +193,8 @@ impl Mux { let reader = tab.reader()?; let tab_id = tab.tab_id(); - thread::spawn(move || read_from_tab_pty(tab_id, reader)); + let config = Arc::clone(&self.config); + thread::spawn(move || read_from_tab_pty(config, tab_id, reader)); Ok(()) } diff --git a/src/ratelim.rs b/src/ratelim.rs new file mode 100644 index 000000000..5059d7c75 --- /dev/null +++ b/src/ratelim.rs @@ -0,0 +1,31 @@ +use ratelimit_meter::algorithms::NonConformanceExt; +use ratelimit_meter::{DirectRateLimiter, LeakyBucket, NegativeMultiDecision}; + +pub struct RateLimiter { + lim: DirectRateLimiter, +} + +impl RateLimiter { + pub fn new(capacity_per_second: u32) -> Self { + Self { + lim: DirectRateLimiter::::per_second( + std::num::NonZeroU32::new(capacity_per_second) + .expect("RateLimiter capacity to be non-zero"), + ), + } + } + + pub fn blocking_admittance_check(&mut self, amount: u32) { + loop { + match self.lim.check_n(amount) { + Ok(_) => return, + Err(NegativeMultiDecision::BatchNonConforming(_, over)) => { + let duration = over.wait_time(); + log::trace!("RateLimiter: sleep for {:?}", duration); + std::thread::sleep(duration); + } + Err(err) => panic!("{}", err), + } + } + } +}