1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-23 13:21:38 +03:00

ssh: avoid busy loop when all channels are closed on the session

We need to notice when all of the streams associated with a channel are
closed and remove the channel from the set that we're polling in the
main loop, to avoid continually polling the closed descriptors.

Additionally, if the Session has been dropped, we know that we cannot
be asked to create any new channels, so if there are no more channels
then we can and should exit that dispatch loop and allow the resources
to be cleaned up.

refs: https://github.com/wez/wezterm/issues/1993#issuecomment-1130539934
This commit is contained in:
Wez Furlong 2022-05-19 20:57:45 -07:00
parent bc7c838cb7
commit c8c3b8378a
3 changed files with 36 additions and 4 deletions

View File

@ -596,7 +596,7 @@ impl Reconnectable {
let mut child = exec.child; let mut child = exec.child;
std::thread::spawn(move || match child.wait() { std::thread::spawn(move || match child.wait() {
Err(err) => log::error!("waiting on {} failed: {:#}", cmd, err), Err(err) => log::error!("waiting on {} failed: {:#}", cmd, err),
Ok(status) if !status.success() => log::error!("{} failed", cmd), Ok(status) if !status.success() => log::error!("{}: {}", cmd, status),
_ => {} _ => {}
}); });

View File

@ -52,6 +52,7 @@ pub(crate) enum SessionRequest {
Exec(Exec, Sender<anyhow::Result<ExecResult>>), Exec(Exec, Sender<anyhow::Result<ExecResult>>),
Sftp(SftpRequest), Sftp(SftpRequest),
SignalChannel(SignalChannel), SignalChannel(SignalChannel),
SessionDropped,
} }
#[derive(Debug)] #[derive(Debug)]
@ -73,6 +74,7 @@ pub struct Session {
impl Drop for Session { impl Drop for Session {
fn drop(&mut self) { fn drop(&mut self) {
self.tx.try_send(SessionRequest::SessionDropped).ok();
log::trace!("Drop Session"); log::trace!("Drop Session");
} }
} }
@ -100,6 +102,7 @@ impl Session {
next_channel_id: 1, next_channel_id: 1,
next_file_id: 1, next_file_id: 1,
sender_read, sender_read,
session_was_dropped: false,
}; };
std::thread::spawn(move || inner.run()); std::thread::spawn(move || inner.run());
Ok((Self { tx: session_sender }, rx_event)) Ok((Self { tx: session_sender }, rx_event))

View File

@ -45,6 +45,7 @@ pub(crate) struct SessionInner {
pub next_channel_id: ChannelId, pub next_channel_id: ChannelId,
pub next_file_id: FileId, pub next_file_id: FileId,
pub sender_read: FileDescriptor, pub sender_read: FileDescriptor,
pub session_was_dropped: bool,
} }
impl Drop for SessionInner { impl Drop for SessionInner {
@ -353,6 +354,13 @@ impl SessionInner {
self.drain_request_pipe(); self.drain_request_pipe();
self.dispatch_pending_requests(sess)?; self.dispatch_pending_requests(sess)?;
if self.channels.is_empty() && self.session_was_dropped {
log::trace!(
"Stopping session loop as there are no more channels and Session was dropped"
);
return Ok(());
}
let mut poll_array = vec![ let mut poll_array = vec![
pollfd { pollfd {
fd: self.sender_read.as_socket_descriptor(), fd: self.sender_read.as_socket_descriptor(),
@ -436,7 +444,8 @@ impl SessionInner {
/// Goal: if we have data to write to channels, try to send it. /// Goal: if we have data to write to channels, try to send it.
/// If we have room in our channel fd write buffers, try to fill it /// If we have room in our channel fd write buffers, try to fill it
fn tick_io(&mut self) -> anyhow::Result<()> { fn tick_io(&mut self) -> anyhow::Result<()> {
for chan in self.channels.values_mut() { let mut dead = vec![];
for (id, chan) in self.channels.iter_mut() {
if chan.exit.is_some() { if chan.exit.is_some() {
if let Some(status) = chan.channel.exit_status() { if let Some(status) = chan.channel.exit_status() {
let exit = chan.exit.take().unwrap(); let exit = chan.exit.take().unwrap();
@ -470,20 +479,36 @@ impl SessionInner {
Err(err) => { Err(err) => {
if out.buf.is_empty() { if out.buf.is_empty() {
log::trace!( log::trace!(
"Failed to read data from channel: {:#}, closing pipe", "Failed to read data from channel {} stream {}: {:#}, closing pipe",
id,
idx,
err err
); );
out.fd.take(); out.fd.take();
} else { } else {
log::trace!( log::trace!(
"Failed to read data from channel: {:#}, but \ "Failed to read data from channel {} stream {}: {:#}, but \
still have some buffer to drain", still have some buffer to drain",
id,
idx,
err err
); );
} }
} }
} }
} }
if chan
.descriptors
.iter()
.all(|descriptor| descriptor.fd.is_none())
{
log::trace!("all descriptors on channel {} are closed", id);
dead.push(*id);
}
}
for id in dead {
self.channels.remove(&id);
} }
Ok(()) Ok(())
} }
@ -505,6 +530,10 @@ impl SessionInner {
Ok(req) => { Ok(req) => {
sess.set_blocking(true); sess.set_blocking(true);
let res = match req { let res = match req {
SessionRequest::SessionDropped => {
self.session_was_dropped = true;
Ok(true)
}
SessionRequest::NewPty(newpty, reply) => { SessionRequest::NewPty(newpty, reply) => {
dispatch(reply, || self.new_pty(sess, newpty), "NewPty") dispatch(reply, || self.new_pty(sess, newpty), "NewPty")
} }