mirror of
https://github.com/wez/wezterm.git
synced 2024-11-27 12:23:46 +03:00
client: remove race condition in cli shutdown
The client machinery would try to spawn an async attempt at detaching the unixdomain from the mux on shutdown, but since we're in the middle of tearing down the scoped executor, we could sometimes panic. The client we have in this situation isn't actually associated with a domain, so we don't need to detach in that situation. Formalize that by making it an Option, and address the fanout.
This commit is contained in:
parent
2695b30376
commit
7a824a2594
@ -9,7 +9,7 @@ use config::{configuration, SshBackend, SshDomain, TlsDomainClient, UnixDomain,
|
|||||||
use filedescriptor::FileDescriptor;
|
use filedescriptor::FileDescriptor;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use mux::connui::ConnectionUI;
|
use mux::connui::ConnectionUI;
|
||||||
use mux::domain::{alloc_domain_id, DomainId};
|
use mux::domain::DomainId;
|
||||||
use mux::pane::PaneId;
|
use mux::pane::PaneId;
|
||||||
use mux::ssh::ssh_connect_with_ui;
|
use mux::ssh::ssh_connect_with_ui;
|
||||||
use mux::Mux;
|
use mux::Mux;
|
||||||
@ -40,7 +40,7 @@ enum ReaderMessage {
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
sender: Sender<ReaderMessage>,
|
sender: Sender<ReaderMessage>,
|
||||||
local_domain_id: DomainId,
|
local_domain_id: Option<DomainId>,
|
||||||
pub is_reconnectable: bool,
|
pub is_reconnectable: bool,
|
||||||
pub is_local: bool,
|
pub is_local: bool,
|
||||||
}
|
}
|
||||||
@ -158,7 +158,16 @@ async fn process_unilateral_inner_async(
|
|||||||
client_pane.process_unilateral(decoded.pdu)
|
client_pane.process_unilateral(decoded.pdu)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_unilateral(local_domain_id: DomainId, decoded: DecodedPdu) -> anyhow::Result<()> {
|
fn process_unilateral(
|
||||||
|
local_domain_id: Option<DomainId>,
|
||||||
|
decoded: DecodedPdu,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let local_domain_id = local_domain_id.ok_or_else(|| {
|
||||||
|
anyhow::anyhow!(
|
||||||
|
"client doesn't have a real local domain, \
|
||||||
|
so unilateral message cannot be processed by it"
|
||||||
|
)
|
||||||
|
})?;
|
||||||
if let Some(pane_id) = decoded.pdu.pane_id() {
|
if let Some(pane_id) = decoded.pdu.pane_id() {
|
||||||
promise::spawn::spawn_into_main_thread(async move {
|
promise::spawn::spawn_into_main_thread(async move {
|
||||||
process_unilateral_inner(pane_id, local_domain_id, decoded)
|
process_unilateral_inner(pane_id, local_domain_id, decoded)
|
||||||
@ -178,7 +187,7 @@ enum NotReconnectableError {
|
|||||||
|
|
||||||
fn client_thread(
|
fn client_thread(
|
||||||
reconnectable: &mut Reconnectable,
|
reconnectable: &mut Reconnectable,
|
||||||
local_domain_id: DomainId,
|
local_domain_id: Option<DomainId>,
|
||||||
rx: &mut Receiver<ReaderMessage>,
|
rx: &mut Receiver<ReaderMessage>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
block_on(client_thread_async(reconnectable, local_domain_id, rx))
|
block_on(client_thread_async(reconnectable, local_domain_id, rx))
|
||||||
@ -186,7 +195,7 @@ fn client_thread(
|
|||||||
|
|
||||||
async fn client_thread_async(
|
async fn client_thread_async(
|
||||||
reconnectable: &mut Reconnectable,
|
reconnectable: &mut Reconnectable,
|
||||||
local_domain_id: DomainId,
|
local_domain_id: Option<DomainId>,
|
||||||
rx: &mut Receiver<ReaderMessage>,
|
rx: &mut Receiver<ReaderMessage>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut next_serial = 1u64;
|
let mut next_serial = 1u64;
|
||||||
@ -875,7 +884,7 @@ impl Reconnectable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
fn new(local_domain_id: DomainId, mut reconnectable: Reconnectable) -> Self {
|
fn new(local_domain_id: Option<DomainId>, mut reconnectable: Reconnectable) -> Self {
|
||||||
let is_reconnectable = reconnectable.reconnectable();
|
let is_reconnectable = reconnectable.reconnectable();
|
||||||
let is_local = reconnectable.is_local();
|
let is_local = reconnectable.is_local();
|
||||||
let (sender, mut receiver) = unbounded();
|
let (sender, mut receiver) = unbounded();
|
||||||
@ -887,11 +896,13 @@ impl Client {
|
|||||||
let mut backoff = BASE_INTERVAL;
|
let mut backoff = BASE_INTERVAL;
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = client_thread(&mut reconnectable, local_domain_id, &mut receiver) {
|
if let Err(e) = client_thread(&mut reconnectable, local_domain_id, &mut receiver) {
|
||||||
if !reconnectable.reconnectable() {
|
if !reconnectable.reconnectable() || local_domain_id.is_none() {
|
||||||
log::debug!("client thread ended: {}", e);
|
log::debug!("client thread ended: {}", e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let local_domain_id = local_domain_id.expect("checked above");
|
||||||
|
|
||||||
if let Some(ioerr) = e.root_cause().downcast_ref::<std::io::Error>() {
|
if let Some(ioerr) = e.root_cause().downcast_ref::<std::io::Error>() {
|
||||||
if let std::io::ErrorKind::UnexpectedEof = ioerr.kind() {
|
if let std::io::ErrorKind::UnexpectedEof = ioerr.kind() {
|
||||||
// Don't reconnect for a simple EOF
|
// Don't reconnect for a simple EOF
|
||||||
@ -956,10 +967,12 @@ impl Client {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
if let Some(domain_id) = local_domain_id {
|
||||||
promise::spawn::spawn_into_main_thread(async move {
|
promise::spawn::spawn_into_main_thread(async move {
|
||||||
detach(local_domain_id).await.ok();
|
detach(domain_id).await.ok();
|
||||||
})
|
})
|
||||||
.detach();
|
.detach();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
@ -1005,7 +1018,7 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn local_domain_id(&self) -> DomainId {
|
pub fn local_domain_id(&self) -> Option<DomainId> {
|
||||||
self.local_domain_id
|
self.local_domain_id
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1052,11 +1065,11 @@ impl Client {
|
|||||||
class_name: &str,
|
class_name: &str,
|
||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
let unix_dom = Self::compute_unix_domain(prefer_mux, class_name)?;
|
let unix_dom = Self::compute_unix_domain(prefer_mux, class_name)?;
|
||||||
Self::new_unix_domain(alloc_domain_id(), &unix_dom, initial, ui, no_auto_start)
|
Self::new_unix_domain(None, &unix_dom, initial, ui, no_auto_start)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_unix_domain(
|
pub fn new_unix_domain(
|
||||||
local_domain_id: DomainId,
|
local_domain_id: Option<DomainId>,
|
||||||
unix_dom: &UnixDomain,
|
unix_dom: &UnixDomain,
|
||||||
initial: bool,
|
initial: bool,
|
||||||
ui: &mut ConnectionUI,
|
ui: &mut ConnectionUI,
|
||||||
@ -1077,7 +1090,7 @@ impl Client {
|
|||||||
Reconnectable::new(ClientDomainConfig::Tls(tls_client.clone()), None);
|
Reconnectable::new(ClientDomainConfig::Tls(tls_client.clone()), None);
|
||||||
let no_auto_start = true;
|
let no_auto_start = true;
|
||||||
reconnectable.connect(true, ui, no_auto_start)?;
|
reconnectable.connect(true, ui, no_auto_start)?;
|
||||||
Ok(Self::new(local_domain_id, reconnectable))
|
Ok(Self::new(Some(local_domain_id), reconnectable))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_ssh(
|
pub fn new_ssh(
|
||||||
@ -1088,7 +1101,7 @@ impl Client {
|
|||||||
let mut reconnectable = Reconnectable::new(ClientDomainConfig::Ssh(ssh_dom.clone()), None);
|
let mut reconnectable = Reconnectable::new(ClientDomainConfig::Ssh(ssh_dom.clone()), None);
|
||||||
let no_auto_start = true;
|
let no_auto_start = true;
|
||||||
reconnectable.connect(true, ui, no_auto_start)?;
|
reconnectable.connect(true, ui, no_auto_start)?;
|
||||||
Ok(Self::new(local_domain_id, reconnectable))
|
Ok(Self::new(Some(local_domain_id), reconnectable))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_pdu(&self, pdu: Pdu) -> anyhow::Result<Pdu> {
|
pub async fn send_pdu(&self, pdu: Pdu) -> anyhow::Result<Pdu> {
|
||||||
|
@ -486,7 +486,7 @@ impl Domain for ClientDomain {
|
|||||||
let initial = true;
|
let initial = true;
|
||||||
let no_auto_start = false;
|
let no_auto_start = false;
|
||||||
Client::new_unix_domain(
|
Client::new_unix_domain(
|
||||||
domain_id,
|
Some(domain_id),
|
||||||
unix,
|
unix,
|
||||||
initial,
|
initial,
|
||||||
&mut cloned_ui,
|
&mut cloned_ui,
|
||||||
|
@ -387,13 +387,7 @@ fn run_terminal_gui(opts: StartCommand) -> anyhow::Result<()> {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let mut ui = mux::connui::ConnectionUI::new_headless();
|
let mut ui = mux::connui::ConnectionUI::new_headless();
|
||||||
match wezterm_client::client::Client::new_unix_domain(
|
match wezterm_client::client::Client::new_unix_domain(None, &dom, false, &mut ui, true) {
|
||||||
mux::domain::alloc_domain_id(),
|
|
||||||
&dom,
|
|
||||||
false,
|
|
||||||
&mut ui,
|
|
||||||
true,
|
|
||||||
) {
|
|
||||||
Ok(client) => {
|
Ok(client) => {
|
||||||
let executor = promise::spawn::ScopedExecutor::new();
|
let executor = promise::spawn::ScopedExecutor::new();
|
||||||
let command = cmd.clone();
|
let command = cmd.clone();
|
||||||
|
Loading…
Reference in New Issue
Block a user