diff --git a/mux/src/lib.rs b/mux/src/lib.rs index f357ee244..7506eba4e 100644 --- a/mux/src/lib.rs +++ b/mux/src/lib.rs @@ -96,37 +96,20 @@ pub struct Mux { const BUFSIZE: usize = 1024 * 1024; -/// This function bounces parsed actions over to the main thread to feed to -/// the pty in the mux. -/// It blocks until the mux has finished consuming the data, which provides -/// some back-pressure so that eg: ctrl-c can remain responsive. -fn send_actions_to_mux(pane_id: PaneId, dead: &Arc, actions: Vec) { +/// This function applies parsed actions to the pane and notifies any +/// mux subscribers about the output event +fn send_actions_to_mux(pane: &Arc, actions: Vec) { let start = Instant::now(); - promise::spawn::block_on(promise::spawn::spawn_into_main_thread({ - let dead = Arc::clone(&dead); - async move { - let mux = Mux::get().unwrap(); - if let Some(pane) = mux.get_pane(pane_id) { - let start = Instant::now(); - pane.perform_actions(actions); - histogram!( - "send_actions_to_mux.perform_actions.latency", - start.elapsed() - ); - mux.notify(MuxNotification::PaneOutput(pane_id)); - } else { - // Something else removed the pane from - // the mux, so signal that we should stop - // trying to process it in read_from_pane_pty. - dead.store(true, Ordering::Relaxed); - } - } - })); - histogram!("send_actions_to_mux.latency", start.elapsed()); + pane.perform_actions(actions); + histogram!( + "send_actions_to_mux.perform_actions.latency", + start.elapsed() + ); + Mux::notify_from_any_thread(MuxNotification::PaneOutput(pane.pane_id())); histogram!("send_actions_to_mux.rate", 1.); } -fn parse_buffered_data(pane_id: PaneId, dead: &Arc, mut rx: FileDescriptor) { +fn parse_buffered_data(pane: Arc, dead: &Arc, mut rx: FileDescriptor) { let mut buf = vec![0; configuration().mux_output_parser_buffer_size]; let mut parser = termwiz::escape::parser::Parser::new(); let mut actions = vec![]; @@ -155,7 +138,7 @@ fn parse_buffered_data(pane_id: PaneId, dead: &Arc, mut rx: FileDesc // Flush prior actions if !actions.is_empty() { - send_actions_to_mux(pane_id, dead, std::mem::take(&mut actions)); + send_actions_to_mux(&pane, std::mem::take(&mut actions)); action_size = 0; } } @@ -174,7 +157,7 @@ fn parse_buffered_data(pane_id: PaneId, dead: &Arc, mut rx: FileDesc action.append_to(&mut actions); if flush && !actions.is_empty() { - send_actions_to_mux(pane_id, dead, std::mem::take(&mut actions)); + send_actions_to_mux(&pane, std::mem::take(&mut actions)); action_size = 0; } }); @@ -202,7 +185,7 @@ fn parse_buffered_data(pane_id: PaneId, dead: &Arc, mut rx: FileDesc } } - send_actions_to_mux(pane_id, dead, std::mem::take(&mut actions)); + send_actions_to_mux(&pane, std::mem::take(&mut actions)); action_size = 0; } @@ -243,13 +226,19 @@ fn allocate_socketpair() -> anyhow::Result<(FileDescriptor, FileDescriptor)> { /// blocking reads from the pty (non-blocking reads are not portable to /// all platforms and pty/tty types), parse the escape sequences and /// relay the actions to the mux thread to apply them to the pane. -fn read_from_pane_pty(pane_id: PaneId, banner: Option, mut reader: Box) { +fn read_from_pane_pty( + pane: Arc, + banner: Option, + mut reader: Box, +) { let mut buf = vec![0; BUFSIZE]; // This is used to signal that an error occurred either in this thread, // or in the main mux thread. If `true`, this thread will terminate. let dead = Arc::new(AtomicBool::new(false)); + let pane_id = pane.pane_id(); + let (mut tx, rx) = match allocate_socketpair() { Ok(pair) => pair, Err(err) => { @@ -267,7 +256,7 @@ fn read_from_pane_pty(pane_id: PaneId, banner: Option, mut reader: Box