diff --git a/Cargo.lock b/Cargo.lock index f666c091d..af4c3898a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -687,7 +687,10 @@ name = "config" version = "0.1.0" dependencies = [ "anyhow", + "bstr 0.2.13", "dirs 2.0.2", + "filenamegen", + "hostname", "lazy_static", "libc", "log", @@ -2736,6 +2739,7 @@ dependencies = [ "anyhow", "async-std", "async-task 1.3.1", + "crossbeam", "lazy_static", "thiserror", "tokio", @@ -4129,7 +4133,6 @@ dependencies = [ "async-trait", "base64 0.10.1", "bitflags 1.2.1", - "bstr 0.2.13", "cc", "cocoa", "codec", @@ -4145,14 +4148,12 @@ dependencies = [ "embed-resource", "euclid", "filedescriptor", - "filenamegen", "font-kit", "font-loader", "fontconfig", "freetype 0.1.0", "harfbuzz", "hdrhistogram", - "hostname", "http_req", "image", "lazy_static", @@ -4209,10 +4210,10 @@ dependencies = [ "anyhow", "codec", "config", - "crossbeam", - "daemonize", "filedescriptor", + "foreign-types-shared", "hostname", + "libc", "log", "mux", "openssl", diff --git a/codec/src/lib.rs b/codec/src/lib.rs index ca24bcad7..10e77d5ca 100644 --- a/codec/src/lib.rs +++ b/codec/src/lib.rs @@ -158,7 +158,20 @@ async fn decode_raw_async(r: &mut R) -> anyhow::Result< }; let serial = read_u64_async(r).await.context("reading PDU serial")?; let ident = read_u64_async(r).await.context("reading PDU ident")?; - let data_len = len as usize - (encoded_length(ident) + encoded_length(serial)); + let data_len = + match (len as usize).overflowing_sub(encoded_length(ident) + encoded_length(serial)) { + (_, true) => { + anyhow::bail!( + "sizes don't make sense: len:{} serial:{} (enc={}) ident:{} (enc={})", + len, + serial, + encoded_length(serial), + ident, + encoded_length(ident) + ); + } + (data_len, false) => data_len, + }; if is_compressed { metrics::value!("pdu.decode.compressed.size", data_len as u64); @@ -192,7 +205,20 @@ fn decode_raw(mut r: R) -> anyhow::Result { }; let serial = read_u64(r.by_ref()).context("reading PDU serial")?; let ident = read_u64(r.by_ref()).context("reading PDU ident")?; - let data_len = len as usize - (encoded_length(ident) + encoded_length(serial)); + let data_len = + match (len as usize).overflowing_sub(encoded_length(ident) + encoded_length(serial)) { + (_, true) => { + anyhow::bail!( + "sizes don't make sense: len:{} serial:{} (enc={}) ident:{} (enc={})", + len, + serial, + encoded_length(serial), + ident, + encoded_length(ident) + ); + } + (data_len, false) => data_len, + }; if is_compressed { metrics::value!("pdu.decode.compressed.size", data_len as u64); diff --git a/config/Cargo.toml b/config/Cargo.toml index dea528ac6..426ef0c0e 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -12,7 +12,10 @@ vergen = "3" [dependencies] anyhow = "1.0" +bstr = "0.2" dirs = "2.0" +filenamegen = "0.2" +hostname = "0.3" lazy_static = "1.4" libc = "0.2" luahelper = { path = "../luahelper" } diff --git a/config/src/lib.rs b/config/src/lib.rs index 579b9f3bc..761eb725e 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -29,6 +29,7 @@ mod font; mod frontend; pub mod keyassignment; mod keys; +pub mod lua; mod ssh; mod terminal; mod tls; @@ -54,8 +55,9 @@ lazy_static! { pub static ref CONFIG_DIR: PathBuf = xdg_config_home(); pub static ref RUNTIME_DIR: PathBuf = compute_runtime_dir().unwrap(); static ref CONFIG: Configuration = Configuration::new(); - static ref MAKE_LUA: Mutex> = Mutex::new(None); - static ref SHOW_ERROR: Mutex> = Mutex::new(None); + static ref MAKE_LUA: Mutex> = Mutex::new(Some(lua::make_lua_context)); + static ref SHOW_ERROR: Mutex> = + Mutex::new(Some(|e| log::error!("{}", e))); } pub fn assign_lua_factory(make_lua_context: LuaFactory) { diff --git a/wezterm/src/scripting/mod.rs b/config/src/lua.rs similarity index 95% rename from wezterm/src/scripting/mod.rs rename to config/src/lua.rs index 1b343342a..ada1e5ea6 100644 --- a/wezterm/src/scripting/mod.rs +++ b/config/src/lua.rs @@ -1,6 +1,6 @@ +use crate::{FontAttributes, TextStyle}; use anyhow::anyhow; use bstr::BString; -use config::{FontAttributes, TextStyle}; pub use luahelper::*; use mlua::{Lua, Table, Value}; use serde::*; @@ -48,8 +48,8 @@ pub fn make_lua_context(config_dir: &Path) -> anyhow::Result { array.insert(1, format!("{}/?/init.lua", path.display())); } - prefix_path(&mut path_array, &config::HOME_DIR.join(".wezterm")); - prefix_path(&mut path_array, &config::CONFIG_DIR); + prefix_path(&mut path_array, &crate::HOME_DIR.join(".wezterm")); + prefix_path(&mut path_array, &crate::CONFIG_DIR); if let Ok(exe) = std::env::current_exe() { if let Some(path) = exe.parent() { wezterm_mod.set( @@ -71,12 +71,12 @@ pub fn make_lua_context(config_dir: &Path) -> anyhow::Result { .ok_or_else(|| anyhow!("config dir path is not UTF-8"))?, )?; - wezterm_mod.set("target_triple", config::wezterm_target_triple())?; - wezterm_mod.set("version", config::wezterm_version())?; - wezterm_mod.set("home_dir", config::HOME_DIR.to_str())?; + wezterm_mod.set("target_triple", crate::wezterm_target_triple())?; + wezterm_mod.set("version", crate::wezterm_version())?; + wezterm_mod.set("home_dir", crate::HOME_DIR.to_str())?; wezterm_mod.set( "running_under_wsl", - lua.create_function(|_, ()| Ok(config::running_under_wsl()))?, + lua.create_function(|_, ()| Ok(crate::running_under_wsl()))?, )?; wezterm_mod.set( @@ -207,7 +207,7 @@ fn font_with_fallback<'lua>( fn action<'lua>( _lua: &'lua Lua, action: Table<'lua>, -) -> mlua::Result { +) -> mlua::Result { Ok(from_lua_value(Value::Table(action))?) } diff --git a/promise/Cargo.toml b/promise/Cargo.toml index 9a712ffee..49aa49bfc 100644 --- a/promise/Cargo.toml +++ b/promise/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] async-task = "1.2" async-std = "1.4" +crossbeam = "0.7" tokio = {version="0.2", features=["full"]} anyhow = "1.0" thiserror = "1.0" diff --git a/promise/src/spawn.rs b/promise/src/spawn.rs index ca2b98075..832e5ebec 100644 --- a/promise/src/spawn.rs +++ b/promise/src/spawn.rs @@ -1,3 +1,4 @@ +use crate::SpawnFunc; use anyhow::{anyhow, Result}; use async_task::{JoinHandle, Task}; use std::future::Future; @@ -212,3 +213,35 @@ pub async fn join_handle_result(handle: JoinHandle, ()>) -> .await .ok_or_else(|| anyhow::anyhow!("task was cancelled or panicked"))? } + +pub struct SimpleExecutor { + rx: crossbeam::channel::Receiver, +} + +impl SimpleExecutor { + pub fn new() -> Self { + let (tx, rx) = crossbeam::channel::unbounded(); + + let tx_main = tx.clone(); + let tx_low = tx.clone(); + let queue_func = move |f: SpawnFunc| { + tx_main.send(f).ok(); + }; + let queue_func_low = move |f: SpawnFunc| { + tx_low.send(f).ok(); + }; + set_schedulers( + Box::new(move |task| queue_func(Box::new(move || task.run()))), + Box::new(move |task| queue_func_low(Box::new(move || task.run()))), + ); + Self { rx } + } + + pub fn tick(&self) -> anyhow::Result<()> { + match self.rx.recv() { + Ok(func) => func(), + Err(err) => anyhow::bail!("while waiting for events: {:?}", err), + }; + Ok(()) + } +} diff --git a/wezterm-mux-server/Cargo.toml b/wezterm-mux-server/Cargo.toml index 0d1c81a8e..23e955ced 100644 --- a/wezterm-mux-server/Cargo.toml +++ b/wezterm-mux-server/Cargo.toml @@ -10,9 +10,10 @@ edition = "2018" anyhow = "1.0" codec = { path = "../codec" } config = { path = "../config" } -crossbeam = "0.7" filedescriptor = { version="0.7", path = "../filedescriptor" } +foreign-types-shared = "0.1.1" # to peek into SslStream's SSL hostname = "0.3" +libc = "0.2" log = "0.4" mux = { path = "../mux" } openssl = "0.10" @@ -27,9 +28,6 @@ umask = { path = "../umask" } url = "2" wezterm-term = { path = "../term", features=["use_serde"] } -[target.'cfg(unix)'.dependencies] -daemonize = { git = "https://github.com/wez/daemonize" } - [features] default = ["vendor_openssl"] # FIXME: find a way to magically disable vendor_openssl only on linux! diff --git a/wezterm-mux-server/src/clientsession.rs b/wezterm-mux-server/src/clientsession.rs deleted file mode 100644 index ff6de1d90..000000000 --- a/wezterm-mux-server/src/clientsession.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::pollable::*; -use crate::sessionhandler::{PduSender, SessionHandler}; -use anyhow::{bail, Context, Error}; -use codec::*; -use crossbeam::channel::TryRecvError; -use filedescriptor::poll; -use log::error; -use mux::{Mux, MuxNotification}; -use std::collections::HashSet; - -pub struct ClientSession { - stream: S, - to_write_rx: PollableReceiver, - mux_rx: PollableReceiver, - handler: SessionHandler, -} - -impl ClientSession { - pub fn new(stream: S) -> Self { - let (to_write_tx, to_write_rx) = - pollable_channel().expect("failed to create pollable_channel"); - let mux = Mux::get().expect("to be running on gui thread"); - let (mux_tx, mux_rx) = pollable_channel().expect("failed to create pollable_channel"); - mux.subscribe(move |n| mux_tx.send(n).is_ok()); - let to_write_tx = PduSender::with_pollable(to_write_tx); - let handler = SessionHandler::new(to_write_tx); - Self { - stream, - to_write_rx, - mux_rx, - handler, - } - } - - pub fn run(&mut self) { - if let Err(e) = self.process() { - error!("While processing session loop: {}", e); - } - } - - fn process(&mut self) -> Result<(), Error> { - let mut read_buffer = Vec::with_capacity(1024); - let mut panes_to_output = HashSet::new(); - - loop { - loop { - match self.to_write_rx.try_recv() { - Ok(decoded) => { - log::trace!("writing pdu with serial {}", decoded.serial); - decoded.pdu.encode(&mut self.stream, decoded.serial)?; - self.stream.flush().context("while flushing stream")?; - } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => bail!("ClientSession was destroyed"), - }; - } - loop { - match self.mux_rx.try_recv() { - Ok(notif) => match notif { - // Coalesce multiple TabOutputs for the same tab - MuxNotification::PaneOutput(pane_id) => { - panes_to_output.insert(pane_id); - } - MuxNotification::WindowCreated(_window_id) => {} - }, - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => bail!("mux_rx is Disconnected"), - }; - } - - for pane_id in panes_to_output.drain() { - self.handler.schedule_pane_push(pane_id); - } - - let mut poll_array = [ - self.to_write_rx.as_poll_fd(), - self.stream.as_poll_fd(), - self.mux_rx.as_poll_fd(), - ]; - poll( - &mut poll_array, - Some(std::time::Duration::from_millis(1000)), - )?; - - if poll_array[1].revents != 0 || self.stream.has_read_buffered() { - loop { - self.stream.set_non_blocking(true)?; - let res = Pdu::try_read_and_decode(&mut self.stream, &mut read_buffer); - self.stream.set_non_blocking(false)?; - match res { - Ok(Some(decoded)) => self.handler.process_one(decoded), - Ok(None) => break, - Err(err) => { - log::error!("Error decoding: {}", err); - return Err(err); - } - } - } - } - } - } -} diff --git a/wezterm-mux-server/src/daemonize.rs b/wezterm-mux-server/src/daemonize.rs new file mode 100644 index 000000000..164689616 --- /dev/null +++ b/wezterm-mux-server/src/daemonize.rs @@ -0,0 +1,101 @@ +#![cfg(unix)] +use anyhow::Context; +use libc::pid_t; +use std::io::Write; +use std::os::unix::io::AsRawFd; + +enum Fork { + Child(pid_t), + Parent(pid_t), +} + +fn fork() -> anyhow::Result { + let pid = unsafe { libc::fork() }; + + if pid == 0 { + // We are the child + let pid = unsafe { libc::getpid() }; + Ok(Fork::Child(pid)) + } else if pid < 0 { + let err: anyhow::Error = std::io::Error::last_os_error().into(); + Err(err.context("fork")) + } else { + // We are the parent + Ok(Fork::Parent(pid)) + } +} + +fn setsid() -> anyhow::Result<()> { + let pid = unsafe { libc::setsid() }; + if pid == -1 { + let err: anyhow::Error = std::io::Error::last_os_error().into(); + Err(err.context("setsid")) + } else { + Ok(()) + } +} + +fn lock_pid_file(config: &config::ConfigHandle) -> anyhow::Result { + let pid_file = config.daemon_options.pid_file(); + let file = std::fs::OpenOptions::new() + .create(true) + .write(true) + .open(&pid_file) + .with_context(|| format!("opening pid file {}", pid_file.display()))?; + let res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) }; + if res != 0 { + let err = std::io::Error::last_os_error(); + anyhow::bail!("unable to lock pid file {}: {}", pid_file.display(), err); + } + + unsafe { libc::ftruncate(file.as_raw_fd(), 0) }; + + Ok(file) +} + +pub fn daemonize(config: &config::ConfigHandle) -> anyhow::Result<()> { + let pid_file = if !config::running_under_wsl() { + // pid file locking is only partly functional when running under + // WSL 1; it is possible for the pid file to exist after a reboot + // and for attempts to open and lock it to fail when there are no + // other processes that might possibly hold a lock on it. + // So, we only use a pid file when not under WSL. + + Some(lock_pid_file(config)?) + } else { + None + }; + let stdout = config.daemon_options.open_stdout()?; + let stderr = config.daemon_options.open_stderr()?; + let devnull = std::fs::File::open("/dev/null").context("opening /dev/null for read")?; + + match fork()? { + Fork::Parent(pid) => { + let mut status = 0; + unsafe { libc::waitpid(pid, &mut status, 0) }; + std::process::exit(0); + } + Fork::Child(_) => {} + } + + setsid()?; + match fork()? { + Fork::Parent(_) => { + std::process::exit(0); + } + Fork::Child(_) => {} + } + + if let Some(mut pid_file) = pid_file { + writeln!(pid_file, "{}", unsafe { libc::getpid() }).ok(); + // Leak it so that the descriptor remains open for the duration + // of the process runtime + std::mem::forget(pid_file); + } + + unsafe { libc::dup2(devnull.as_raw_fd(), libc::STDIN_FILENO) }; + unsafe { libc::dup2(stdout.as_raw_fd(), libc::STDOUT_FILENO) }; + unsafe { libc::dup2(stderr.as_raw_fd(), libc::STDERR_FILENO) }; + + Ok(()) +} diff --git a/wezterm-mux-server/src/dispatch.rs b/wezterm-mux-server/src/dispatch.rs new file mode 100644 index 000000000..ec017a1c5 --- /dev/null +++ b/wezterm-mux-server/src/dispatch.rs @@ -0,0 +1,160 @@ +use crate::sessionhandler::{PduSender, SessionHandler}; +use crate::UnixStream; +use codec::{DecodedPdu, Pdu}; +use mux::{Mux, MuxNotification}; +use promise::spawn::spawn; +use smol::io::{AsyncRead, AsyncWrite}; +use std::marker::Unpin; + +#[cfg(unix)] +pub trait AsRawDesc: std::os::unix::io::AsRawFd {} +#[cfg(windows)] +pub trait AsRawDesc: std::os::windows::io::AsRawSocket {} + +pub trait TryClone { + fn try_to_clone(&self) -> anyhow::Result + where + Self: std::marker::Sized; +} + +impl AsRawDesc for UnixStream {} +impl TryClone for UnixStream { + fn try_to_clone(&self) -> anyhow::Result + where + Self: std::marker::Sized, + { + Ok(self.try_clone()?) + } +} + +#[derive(Debug)] +enum Item { + Notif(MuxNotification), + Pdu(DecodedPdu), + Done(String), +} + +async fn catch( + f: impl smol::future::Future>, + tx: smol::channel::Sender, +) -> anyhow::Result<()> { + if let Err(err) = f.await { + tx.try_send(Item::Done(err.to_string())).ok(); + } + Ok(()) +} + +async fn process_write_queue( + mut write_stream: T, + write_rx: smol::channel::Receiver, +) -> anyhow::Result<()> +where + T: AsyncWrite + Unpin, +{ + loop { + let decoded = write_rx.recv().await?; + + log::trace!("writing pdu with serial {}", decoded.serial); + decoded + .pdu + .encode_async(&mut write_stream, decoded.serial) + .await?; + } +} + +async fn read_pdus( + mut read_stream: T, + item_tx: smol::channel::Sender, +) -> anyhow::Result<()> +where + T: AsyncRead + Unpin, +{ + loop { + let decoded = Pdu::decode_async(&mut read_stream).await?; + item_tx.send(Item::Pdu(decoded)).await?; + } +} + +async fn multiplex( + write_tx: smol::channel::Sender, + item_rx: smol::channel::Receiver, +) -> anyhow::Result<()> { + let pdu_sender = PduSender::with_smol(write_tx); + let mut handler = SessionHandler::new(pdu_sender); + + loop { + let item = match item_rx.recv().await { + Ok(item) => item, + Err(err) => { + log::error!("{}", err); + return Ok(()); + } + }; + match item { + Item::Notif(MuxNotification::PaneOutput(pane_id)) => { + handler.schedule_pane_push(pane_id); + } + Item::Notif(MuxNotification::WindowCreated(_window_id)) => {} + Item::Pdu(decoded) => { + handler.process_one(decoded); + } + Item::Done(e) => { + log::error!("{}", e); + return Ok(()); + } + } + } +} + +pub async fn process(stream: T) -> anyhow::Result<()> +where + T: 'static, + T: TryClone, + T: std::io::Read, + T: std::io::Write, + T: AsRawDesc, +{ + let write_stream = smol::Async::new(stream.try_to_clone()?)?; + let read_stream = smol::Async::new(stream)?; + + process_async(write_stream, read_stream).await +} + +pub async fn process_async(write_stream: T, read_stream: T) -> anyhow::Result<()> +where + T: 'static, + T: AsyncRead, + T: AsyncWrite, + T: Unpin, +{ + let (item_tx, item_rx) = smol::channel::unbounded::(); + let (write_tx, write_rx) = smol::channel::unbounded::(); + + // Process the PDU write queue to send to the peer + spawn({ + let item_tx = item_tx.clone(); + async move { + catch( + async move { process_write_queue(write_stream, write_rx).await }, + item_tx, + ) + .await + } + }); + + { + let mux = Mux::get().expect("to be running on gui thread"); + let tx = item_tx.clone(); + mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok()); + } + + { + let item_tx = item_tx.clone(); + spawn(async move { + let tx = item_tx.clone(); + catch(async move { read_pdus(read_stream, item_tx).await }, tx).await + }); + } + + catch(async move { multiplex(write_tx, item_rx).await }, item_tx).await +} diff --git a/wezterm-mux-server/src/local.rs b/wezterm-mux-server/src/local.rs index e55c5bad6..409ea6df3 100644 --- a/wezterm-mux-server/src/local.rs +++ b/wezterm-mux-server/src/local.rs @@ -1,10 +1,7 @@ -use crate::sessionhandler::{PduSender, SessionHandler}; -use crate::{UnixListener, UnixStream}; +use crate::UnixListener; use anyhow::{anyhow, Context as _}; -use codec::{DecodedPdu, Pdu}; use config::{create_user_owned_dirs, UnixDomain}; -use mux::{Mux, MuxNotification}; -use promise::spawn::{spawn, spawn_into_main_thread}; +use promise::spawn::spawn_into_main_thread; pub struct LocalListener { listener: UnixListener, @@ -24,7 +21,12 @@ impl LocalListener { for stream in self.listener.incoming() { match stream { Ok(stream) => { - spawn_into_main_thread(async move { Self::process(stream) }); + spawn_into_main_thread(async move { + crate::dispatch::process(stream).await.map_err(|e| { + log::error!("welp: {:?}", e); + e + }) + }); } Err(err) => { log::error!("accept failed: {}", err); @@ -33,60 +35,6 @@ impl LocalListener { } } } - - async fn process(stream: UnixStream) -> anyhow::Result<()> { - let mut write_stream = smol::Async::new(stream.try_clone()?)?; - let mut read_stream = smol::Async::new(stream)?; - let (write_tx, write_rx) = smol::channel::unbounded::(); - - // Process the PDU write queue to send to the peer - spawn(async move { - while let Ok(decoded) = write_rx.recv().await { - log::trace!("writing pdu with serial {}", decoded.serial); - decoded - .pdu - .encode_async(&mut write_stream, decoded.serial) - .await?; - } - Ok::<(), anyhow::Error>(()) - }); - - let pdu_sender = PduSender::with_smol(write_tx); - let mut handler = SessionHandler::new(pdu_sender); - - enum Item { - Notif(MuxNotification), - Pdu(DecodedPdu), - } - - let (item_tx, item_rx) = smol::channel::unbounded::(); - { - let mux = Mux::get().expect("to be running on gui thread"); - let tx = item_tx.clone(); - mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok()); - } - - spawn(async move { - while let Ok(decoded) = Pdu::decode_async(&mut read_stream).await { - item_tx.send(Item::Pdu(decoded)).await?; - } - Ok::<(), anyhow::Error>(()) - }); - - while let Ok(notif) = item_rx.recv().await { - match notif { - Item::Notif(MuxNotification::PaneOutput(pane_id)) => { - handler.schedule_pane_push(pane_id); - } - Item::Notif(MuxNotification::WindowCreated(_window_id)) => {} - Item::Pdu(decoded) => { - handler.process_one(decoded); - } - } - } - - Ok(()) - } } /// Take care when setting up the listener socket; @@ -95,7 +43,7 @@ impl LocalListener { /// that prevent other users from manipulating its contents. fn safely_create_sock_path(unix_dom: &UnixDomain) -> anyhow::Result { let sock_path = &unix_dom.socket_path(); - log::debug!("setting up {}", sock_path.display()); + log::error!("setting up {}", sock_path.display()); let sock_dir = sock_path .parent() diff --git a/wezterm-mux-server/src/main.rs b/wezterm-mux-server/src/main.rs index 5f5c85298..e9b916e83 100644 --- a/wezterm-mux-server/src/main.rs +++ b/wezterm-mux-server/src/main.rs @@ -1,21 +1,16 @@ -use anyhow::{anyhow, bail, Context, Error}; -use config::{configuration, TlsDomainServer}; -use crossbeam::channel::unbounded as channel; -use log::error; +use config::configuration; use mux::activity::Activity; use mux::domain::{Domain, LocalDomain}; use mux::Mux; use portable_pty::cmdbuilder::CommandBuilder; -use promise::spawn::spawn_into_main_thread; -use promise::*; use std::ffi::OsString; -use std::net::TcpListener; -use std::path::Path; use std::rc::Rc; use std::sync::Arc; use std::thread; use structopt::*; +mod daemonize; + #[cfg(unix)] use std::os::unix::net::{UnixListener, UnixStream}; #[cfg(windows)] @@ -48,8 +43,16 @@ struct Opt { prog: Vec, } -fn main() -> anyhow::Result<()> { +fn main() { pretty_env_logger::init_timed(); + if let Err(err) = run() { + eprintln!("boo {}", err); + log::error!("{}", err); + std::process::exit(1); + } +} + +fn run() -> anyhow::Result<()> { //stats::Stats::init()?; let _saver = umask::UmaskSaver::new(); @@ -62,56 +65,27 @@ fn main() -> anyhow::Result<()> { #[cfg(unix)] { if opts.daemonize { - let stdout = config.daemon_options.open_stdout()?; - let stderr = config.daemon_options.open_stderr()?; - let mut daemonize = daemonize::Daemonize::new() - .stdout(stdout) - .stderr(stderr) - .working_directory(config::HOME_DIR.clone()); - - if !config::running_under_wsl() { - // pid file locking is only partly function when running under - // WSL 1; it is possible for the pid file to exist after a reboot - // and for attempts to open and lock it to fail when there are no - // other processes that might possibly hold a lock on it. - // So, we only use a pid file when not under WSL. - daemonize = daemonize.pid_file(config.daemon_options.pid_file()); - } - if let Err(err) = daemonize.start() { - use daemonize::DaemonizeError; - match err { - DaemonizeError::OpenPidfile - | DaemonizeError::LockPidfile(_) - | DaemonizeError::ChownPidfile(_) - | DaemonizeError::WritePid => { - bail!("{} {}", err, config.daemon_options.pid_file().display()); - } - DaemonizeError::ChangeDirectory => { - bail!("{} {}", err, config::HOME_DIR.display()); - } - _ => return Err(err.into()), - } - } - - // Remove some environment variables that aren't super helpful or - // that are potentially misleading when we're starting up the - // server. - // We may potentially want to look into starting/registering - // a session of some kind here as well in the future. - for name in &[ - "OLDPWD", - "PWD", - "SHLVL", - "SSH_AUTH_SOCK", - "SSH_CLIENT", - "SSH_CONNECTION", - "_", - ] { - std::env::remove_var(name); - } + daemonize::daemonize(&config)?; } } + // Remove some environment variables that aren't super helpful or + // that are potentially misleading when we're starting up the + // server. + // We may potentially want to look into starting/registering + // a session of some kind here as well in the future. + for name in &[ + "OLDPWD", + "PWD", + "SHLVL", + "SSH_AUTH_SOCK", + "SSH_CLIENT", + "SSH_CONNECTION", + "_", + ] { + std::env::remove_var(name); + } + let need_builder = !opts.prog.is_empty() || opts.cwd.is_some(); let cmd = if need_builder { @@ -132,22 +106,12 @@ fn main() -> anyhow::Result<()> { let mux = Rc::new(mux::Mux::new(Some(domain.clone()))); Mux::set_mux(&mux); - let (tx, rx) = channel(); + let executor = promise::spawn::SimpleExecutor::new(); - let tx_main = tx.clone(); - let tx_low = tx.clone(); - let queue_func = move |f: SpawnFunc| { - tx_main.send(f).ok(); - }; - let queue_func_low = move |f: SpawnFunc| { - tx_low.send(f).ok(); - }; - promise::spawn::set_schedulers( - Box::new(move |task| queue_func(Box::new(move || task.run()))), - Box::new(move |task| queue_func_low(Box::new(move || task.run()))), - ); - - spawn_listener()?; + spawn_listener().map_err(|e| { + log::error!("problem spawning listeners: {:?}", e); + e + })?; let activity = Activity::new(); @@ -159,13 +123,10 @@ fn main() -> anyhow::Result<()> { }); loop { - match rx.recv() { - Ok(func) => func(), - Err(err) => bail!("while waiting for events: {:?}", err), - } + executor.tick()?; if Mux::get().unwrap().is_empty() && mux::activity::Activity::count() == 0 { - log::info!("No more tabs; all done!"); + log::error!("No more tabs; all done!"); return Ok(()); } } @@ -191,11 +152,10 @@ fn terminate_with_error(err: anyhow::Error) -> ! { std::process::exit(1); } -mod clientsession; +mod dispatch; mod local; mod ossl; mod pki; -mod pollable; mod sessionhandler; lazy_static::lazy_static! { @@ -214,5 +174,6 @@ pub fn spawn_listener() -> anyhow::Result<()> { for tls_server in &config.tls_servers { ossl::spawn_tls_listener(tls_server)?; } + Ok(()) } diff --git a/wezterm-mux-server/src/ossl.rs b/wezterm-mux-server/src/ossl.rs index 864e73655..d690a4c7e 100644 --- a/wezterm-mux-server/src/ossl.rs +++ b/wezterm-mux-server/src/ossl.rs @@ -1,12 +1,69 @@ -use super::*; +use crate::PKI; +use anyhow::{anyhow, Context, Error}; +use config::TlsDomainServer; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod, SslStream, SslVerifyMode}; use openssl::x509::X509; +use promise::spawn::spawn_into_main_thread; +use std::net::TcpListener; +use std::net::TcpStream; +use std::path::Path; +use std::sync::Arc; struct OpenSSLNetListener { acceptor: Arc, listener: TcpListener, } +struct AsyncSslStream { + s: SslStream, +} + +impl AsyncSslStream { + pub fn new(s: SslStream) -> Self { + Self { s } + } +} + +impl crate::dispatch::TryClone for AsyncSslStream { + fn try_to_clone(&self) -> anyhow::Result { + use foreign_types_shared::ForeignTypeRef; + let stream = self.s.get_ref().try_clone()?; + let s = unsafe { SslStream::from_raw_parts(self.s.ssl().as_ptr(), stream) }; + Ok(Self { s }) + } +} + +#[cfg(unix)] +impl std::os::unix::io::AsRawFd for AsyncSslStream { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.s.get_ref().as_raw_fd() + } +} + +#[cfg(windows)] +impl std::os::windows::io::AsRawSocket for AsyncSslStream { + fn as_raw_socket(&self) -> std::os::windows::io::RawSocket { + self.s.get_ref().as_raw_socket() + } +} + +impl crate::dispatch::AsRawDesc for AsyncSslStream {} + +impl std::io::Read for AsyncSslStream { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.s.read(buf) + } +} + +impl std::io::Write for AsyncSslStream { + fn write(&mut self, buf: &[u8]) -> Result { + self.s.write(buf) + } + fn flush(&mut self) -> Result<(), std::io::Error> { + self.s.flush() + } +} + impl OpenSSLNetListener { pub fn new(listener: TcpListener, acceptor: SslAcceptor) -> Self { Self { @@ -57,7 +114,7 @@ impl OpenSSLNetListener { ); Ok(()) } else { - bail!("CN `{}` did not match $USER `{}`", cn_str, wanted_unix_name); + anyhow::bail!("CN `{}` did not match $USER `{}`", cn_str, wanted_unix_name); } } } @@ -72,22 +129,20 @@ impl OpenSSLNetListener { match acceptor.accept(stream) { Ok(stream) => { if let Err(err) = Self::verify_peer_cert(&stream) { - error!("problem with peer cert: {}", err); + log::error!("problem with peer cert: {}", err); break; } - spawn_into_main_thread(async move { - let mut session = clientsession::ClientSession::new(stream); - thread::spawn(move || session.run()); + crate::dispatch::process(AsyncSslStream::new(stream)).await }); } Err(e) => { - error!("failed TlsAcceptor: {}", e); + log::error!("failed TlsAcceptor: {}", e); } } } Err(err) => { - error!("accept failed: {}", err); + log::error!("accept failed: {}", err); return; } } @@ -156,6 +211,8 @@ pub fn spawn_tls_listener(tls_server: &TlsDomainServer) -> Result<(), Error> { let acceptor = acceptor.build(); + log::error!("listening with TLS on {:?}", tls_server.bind_address); + let mut net_listener = OpenSSLNetListener::new( TcpListener::bind(&tls_server.bind_address).with_context(|| { format!( @@ -165,7 +222,7 @@ pub fn spawn_tls_listener(tls_server: &TlsDomainServer) -> Result<(), Error> { })?, acceptor, ); - thread::spawn(move || { + std::thread::spawn(move || { net_listener.run(); }); Ok(()) diff --git a/wezterm-mux-server/src/pollable.rs b/wezterm-mux-server/src/pollable.rs deleted file mode 100644 index be5cac37e..000000000 --- a/wezterm-mux-server/src/pollable.rs +++ /dev/null @@ -1,131 +0,0 @@ -use crate::UnixStream; -use anyhow::Error; -use crossbeam::channel::{unbounded as channel, Receiver, Sender, TryRecvError}; -use filedescriptor::*; -use std::cell::RefCell; -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::sync::{Arc, Mutex}; - -pub trait ReadAndWrite: std::io::Read + std::io::Write + Send + AsPollFd { - fn set_non_blocking(&self, non_blocking: bool) -> anyhow::Result<()>; - fn has_read_buffered(&self) -> bool; -} -impl ReadAndWrite for UnixStream { - fn set_non_blocking(&self, non_blocking: bool) -> anyhow::Result<()> { - self.set_nonblocking(non_blocking)?; - Ok(()) - } - fn has_read_buffered(&self) -> bool { - false - } -} - -impl ReadAndWrite for openssl::ssl::SslStream { - fn set_non_blocking(&self, non_blocking: bool) -> anyhow::Result<()> { - self.get_ref().set_nonblocking(non_blocking)?; - Ok(()) - } - fn has_read_buffered(&self) -> bool { - self.ssl().pending() != 0 - } -} - -pub struct PollableSender { - sender: Sender, - write: Arc>, -} - -impl PollableSender { - pub fn send(&self, item: T) -> anyhow::Result<()> { - // Attempt to write to the pipe; if it fails due to - // being full, that's fine: it means that the other end - // is going to be signalled already so they won't miss - // anything if our write doesn't happen. - self.write.lock().unwrap().write_all(b"x").ok(); - self.sender.send(item).map_err(Error::msg)?; - Ok(()) - } -} - -impl Clone for PollableSender { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - write: self.write.clone(), - } - } -} - -pub struct PollableReceiver { - receiver: Receiver, - read: RefCell, -} - -impl PollableReceiver { - pub fn try_recv(&self) -> Result { - // try to drain the pipe. - // We do this regardless of whether we popped an item - // so that we avoid being in a perpetually signalled state. - let mut byte = [0u8; 64]; - self.read.borrow_mut().read(&mut byte).ok(); - - Ok(self.receiver.try_recv()?) - } -} - -impl AsPollFd for PollableReceiver { - fn as_poll_fd(&self) -> pollfd { - self.read.borrow().as_socket_descriptor().as_poll_fd() - } -} - -/// A channel that can be polled together with a socket. -/// This uses the self-pipe trick but with a unix domain -/// socketpair. -/// In theory this should also work on windows, but will require -/// windows 10 w/unix domain socket support. -pub fn pollable_channel() -> anyhow::Result<(PollableSender, PollableReceiver)> { - let (sender, receiver) = channel(); - let (mut write, mut read) = socketpair()?; - - write.set_non_blocking(true)?; - read.set_non_blocking(true)?; - - Ok(( - PollableSender { - sender, - write: Arc::new(Mutex::new(FileDescriptor::new(write))), - }, - PollableReceiver { - receiver, - read: RefCell::new(FileDescriptor::new(read)), - }, - )) -} - -pub trait AsPollFd { - fn as_poll_fd(&self) -> pollfd; -} - -impl AsPollFd for SocketDescriptor { - fn as_poll_fd(&self) -> pollfd { - pollfd { - fd: *self, - events: POLLIN, - revents: 0, - } - } -} - -impl AsPollFd for openssl::ssl::SslStream { - fn as_poll_fd(&self) -> pollfd { - self.get_ref().as_socket_descriptor().as_poll_fd() - } -} - -impl AsPollFd for UnixStream { - fn as_poll_fd(&self) -> pollfd { - self.as_socket_descriptor().as_poll_fd() - } -} diff --git a/wezterm-mux-server/src/sessionhandler.rs b/wezterm-mux-server/src/sessionhandler.rs index 8a34aacff..6d8a3cea9 100644 --- a/wezterm-mux-server/src/sessionhandler.rs +++ b/wezterm-mux-server/src/sessionhandler.rs @@ -1,4 +1,3 @@ -use crate::pollable::*; use crate::PKI; use anyhow::anyhow; use codec::*; @@ -28,12 +27,6 @@ impl PduSender { (self.func)(pdu) } - pub fn with_pollable(p: PollableSender) -> Self { - Self { - func: Arc::new(move |pdu| p.send(pdu)), - } - } - pub fn with_smol(p: smol::channel::Sender) -> Self { Self { func: Arc::new(move |pdu| p.try_send(pdu).map_err(|e| anyhow!("{:?}", e))), diff --git a/wezterm/Cargo.toml b/wezterm/Cargo.toml index 9754b0a27..26377cb50 100644 --- a/wezterm/Cargo.toml +++ b/wezterm/Cargo.toml @@ -23,7 +23,6 @@ thiserror = "1.0" base64 = "0.10" rangeset = { path = "../rangeset" } bitflags = "1.0" -bstr = "0.2" codec = { path = "../codec" } config = { path = "../config" } crossbeam = "0.7" @@ -31,12 +30,10 @@ dirs = "2.0" downcast-rs = "1.0" euclid = "0.20" filedescriptor = { version="0.7", path = "../filedescriptor" } -filenamegen = "0.2" pretty_env_logger = "0.4" freetype = { path = "../deps/freetype" } image = "0.23" harfbuzz = { path = "../deps/harfbuzz" } -hostname = "0.3" lazy_static = "1.4" libc = "0.2" log = "0.4" diff --git a/wezterm/src/main.rs b/wezterm/src/main.rs index 3030bde9c..4385efc6a 100644 --- a/wezterm/src/main.rs +++ b/wezterm/src/main.rs @@ -16,9 +16,6 @@ use std::sync::Arc; use structopt::StructOpt; use tabout::{tabulate_output, Alignment, Column}; -// This module defines a macro, so it must be referenced before any other mods -mod scripting; - mod connui; mod gui; use config::keyassignment; @@ -650,7 +647,6 @@ fn terminate_with_error(err: anyhow::Error) -> ! { } fn main() { - config::assign_lua_factory(scripting::make_lua_context); config::assign_error_callback(crate::connui::show_configuration_error_message); notify_on_panic(); if let Err(e) = run() { @@ -834,147 +830,165 @@ fn run() -> anyhow::Result<()> { SubCommand::Serial(serial) => run_serial(config, &serial), SubCommand::Connect(connect) => run_mux_client(config, &connect), SubCommand::ImageCat(cmd) => cmd.run(), - SubCommand::Cli(cli) => { - // Start a front end so that the futures executor is running - let initial = true; - let mut ui = crate::connui::ConnectionUI::new_headless(); - let client = Client::new_default_unix_domain(initial, &mut ui)?; - match cli.sub { - CliSubCommand::List => { - let cols = vec![ - Column { - name: "WINID".to_string(), - alignment: Alignment::Right, - }, - Column { - name: "TABID".to_string(), - alignment: Alignment::Right, - }, - Column { - name: "PANEID".to_string(), - alignment: Alignment::Right, - }, - Column { - name: "SIZE".to_string(), - alignment: Alignment::Left, - }, - Column { - name: "TITLE".to_string(), - alignment: Alignment::Left, - }, - Column { - name: "CWD".to_string(), - alignment: Alignment::Left, - }, - ]; - let mut data = vec![]; - let panes = block_on(client.list_panes())?; // FIXME: blocking + SubCommand::Cli(cli) => run_cli(config, cli), + } +} - for tabroot in panes.tabs { - let mut cursor = tabroot.into_tree().cursor(); +async fn run_cli_async(config: config::ConfigHandle, cli: CliCommand) -> anyhow::Result<()> { + let initial = true; + let mut ui = crate::connui::ConnectionUI::new_headless(); + let client = Client::new_default_unix_domain(initial, &mut ui)?; + match cli.sub { + CliSubCommand::List => { + let cols = vec![ + Column { + name: "WINID".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "TABID".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "PANEID".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "SIZE".to_string(), + alignment: Alignment::Left, + }, + Column { + name: "TITLE".to_string(), + alignment: Alignment::Left, + }, + Column { + name: "CWD".to_string(), + alignment: Alignment::Left, + }, + ]; + let mut data = vec![]; + let panes = client.list_panes().await?; - loop { - if let Some(entry) = cursor.leaf_mut() { - data.push(vec![ - entry.window_id.to_string(), - entry.tab_id.to_string(), - entry.pane_id.to_string(), - format!("{}x{}", entry.size.cols, entry.size.rows), - entry.title.clone(), - entry - .working_dir - .as_ref() - .map(|url| url.url.as_str()) - .unwrap_or("") - .to_string(), - ]); - } - match cursor.preorder_next() { - Ok(c) => cursor = c, - Err(_) => break, - } - } + for tabroot in panes.tabs { + let mut cursor = tabroot.into_tree().cursor(); + + loop { + if let Some(entry) = cursor.leaf_mut() { + data.push(vec![ + entry.window_id.to_string(), + entry.tab_id.to_string(), + entry.pane_id.to_string(), + format!("{}x{}", entry.size.cols, entry.size.rows), + entry.title.clone(), + entry + .working_dir + .as_ref() + .map(|url| url.url.as_str()) + .unwrap_or("") + .to_string(), + ]); + } + match cursor.preorder_next() { + Ok(c) => cursor = c, + Err(_) => break, } - - tabulate_output(&cols, &data, &mut std::io::stdout().lock())?; - } - CliSubCommand::SplitPane { - pane_id, - cwd, - prog, - horizontal, - } => { - let pane_id: PaneId = match pane_id { - Some(p) => p, - None => std::env::var("WEZTERM_PANE") - .map_err(|_| { - anyhow!( - "--pane-id was not specified and $WEZTERM_PANE - is not set in the environment" - ) - })? - .parse()?, - }; - - let spawned = block_on(client.split_pane(codec::SplitPane { - pane_id, - direction: if horizontal { - SplitDirection::Horizontal - } else { - SplitDirection::Vertical - }, - domain: keyassignment::SpawnTabDomain::CurrentPaneDomain, - command: if prog.is_empty() { - None - } else { - let builder = CommandBuilder::from_argv(prog); - Some(builder) - }, - command_dir: cwd.and_then(|c| c.to_str().map(|s| s.to_string())), - }))?; - - log::debug!("{:?}", spawned); - println!("{}", spawned.pane_id); - } - CliSubCommand::Proxy => { - // The client object we created above will have spawned - // the server if needed, so now all we need to do is turn - // ourselves into basically netcat. - drop(client); - - crate::stats::disable_stats_printing(); - - let mux = Rc::new(mux::Mux::new(None)); - Mux::set_mux(&mux); - let unix_dom = config.unix_domains.first().unwrap(); - let sock_path = unix_dom.socket_path(); - let stream = unix_connect_with_retry(&sock_path)?; - - // Keep the threads below alive forever; they'll - // exit the process when they're done. - let _activity = Activity::new(); - - // Spawn a thread to pull data from the socket and write - // it to stdout - let duped = stream.try_clone()?; - std::thread::spawn(move || { - let stdout = std::io::stdout(); - consume_stream_then_exit_process(duped, stdout.lock()); - }); - - // and pull data from stdin and write it to the socket - std::thread::spawn(move || { - let stdin = std::io::stdin(); - consume_stream_then_exit_process(stdin.lock(), stream); - }); - } - CliSubCommand::TlsCreds => { - let creds = block_on(client.get_tls_creds())?; - codec::Pdu::GetTlsCredsResponse(creds).encode(std::io::stdout().lock(), 0)?; } } - Ok(()) + + tabulate_output(&cols, &data, &mut std::io::stdout().lock())?; } + CliSubCommand::SplitPane { + pane_id, + cwd, + prog, + horizontal, + } => { + let pane_id: PaneId = match pane_id { + Some(p) => p, + None => std::env::var("WEZTERM_PANE") + .map_err(|_| { + anyhow!( + "--pane-id was not specified and $WEZTERM_PANE + is not set in the environment" + ) + })? + .parse()?, + }; + + let spawned = client + .split_pane(codec::SplitPane { + pane_id, + direction: if horizontal { + SplitDirection::Horizontal + } else { + SplitDirection::Vertical + }, + domain: keyassignment::SpawnTabDomain::CurrentPaneDomain, + command: if prog.is_empty() { + None + } else { + let builder = CommandBuilder::from_argv(prog); + Some(builder) + }, + command_dir: cwd.and_then(|c| c.to_str().map(|s| s.to_string())), + }) + .await?; + + log::debug!("{:?}", spawned); + println!("{}", spawned.pane_id); + } + CliSubCommand::Proxy => { + // The client object we created above will have spawned + // the server if needed, so now all we need to do is turn + // ourselves into basically netcat. + drop(client); + + crate::stats::disable_stats_printing(); + + let mux = Rc::new(mux::Mux::new(None)); + Mux::set_mux(&mux); + let unix_dom = config.unix_domains.first().unwrap(); + let sock_path = unix_dom.socket_path(); + let stream = unix_connect_with_retry(&sock_path, false)?; + + // Keep the threads below alive forever; they'll + // exit the process when they're done. + let _activity = Activity::new(); + + // Spawn a thread to pull data from the socket and write + // it to stdout + let duped = stream.try_clone()?; + std::thread::spawn(move || { + let stdout = std::io::stdout(); + consume_stream_then_exit_process(duped, stdout.lock()); + }); + + // and pull data from stdin and write it to the socket + std::thread::spawn(move || { + let stdin = std::io::stdin(); + consume_stream_then_exit_process(stdin.lock(), stream); + }); + } + CliSubCommand::TlsCreds => { + let creds = client.get_tls_creds().await?; + codec::Pdu::GetTlsCredsResponse(creds).encode(std::io::stdout().lock(), 0)?; + } + } + Ok(()) +} + +fn run_cli(config: config::ConfigHandle, cli: CliCommand) -> anyhow::Result<()> { + let executor = promise::spawn::SimpleExecutor::new(); + promise::spawn::spawn(async move { + match run_cli_async(config, cli).await { + Ok(_) => std::process::exit(0), + Err(err) => { + terminate_with_error(err); + } + } + }); + loop { + executor.tick()?; } } diff --git a/wezterm/src/server/client.rs b/wezterm/src/server/client.rs index 60a7d223a..4af337e62 100644 --- a/wezterm/src/server/client.rs +++ b/wezterm/src/server/client.rs @@ -100,7 +100,14 @@ async fn process_unilateral_inner_async( local_domain_id: DomainId, decoded: DecodedPdu, ) -> anyhow::Result<()> { - let mux = Mux::get().unwrap(); + let mux = match Mux::get() { + Some(mux) => mux, + None => { + // This can happen for some client scenarios; it is ok to ignore it. + return Ok(()); + } + }; + let client_domain = mux .get_domain(local_domain_id) .ok_or_else(|| anyhow!("no such domain {}", local_domain_id))?; @@ -260,9 +267,16 @@ fn client_thread( } } -pub fn unix_connect_with_retry(path: &Path) -> Result { +pub fn unix_connect_with_retry( + path: &Path, + just_spawned: bool, +) -> Result { let mut error = std::io::Error::last_os_error(); + if just_spawned { + std::thread::sleep(std::time::Duration::from_millis(200)); + } + for iter in 0..10 { if iter > 0 { std::thread::sleep(std::time::Duration::from_millis(iter * 10)); @@ -458,7 +472,7 @@ impl Reconnectable { ui.output_str(&format!("Connect to {}\n", sock_path.display())); info!("connect to {}", sock_path.display()); - let stream = match unix_connect_with_retry(&sock_path) { + let stream = match unix_connect_with_retry(&sock_path, false) { Ok(stream) => stream, Err(e) => { if unix_dom.no_serve_automatically || !initial { @@ -483,15 +497,26 @@ impl Reconnectable { let mut cmd = CommandBuilder::new(&argv[0]); cmd.args(&argv[1..]); let mut child = pair.slave.spawn_command(cmd)?; + drop(pair.slave); + let mut reader = pair.master.try_clone_reader()?; + drop(pair.master); + let mut s = String::new(); + reader.read_to_string(&mut s).ok(); + log::error!("server output: {}", s); + let status = child.wait()?; if !status.success() { log::error!("{:?} failed with status {:?}", argv, status); } + log::error!("{:?} completed with status {:?}", argv, status); drop(child); - drop(pair.slave); - unix_connect_with_retry(&sock_path) - .with_context(|| format!("failed to connect to {}", sock_path.display()))? + unix_connect_with_retry(&sock_path, true).with_context(|| { + format!( + "(after spawning server) failed to connect to {}", + sock_path.display() + ) + })? } }; @@ -781,17 +806,18 @@ impl Client { } async fn detach(local_domain_id: DomainId) -> anyhow::Result<()> { - let mux = Mux::get().unwrap(); - let client_domain = mux - .get_domain(local_domain_id) - .ok_or_else(|| anyhow!("no such domain {}", local_domain_id))?; - let client_domain = - client_domain - .downcast_ref::() - .ok_or_else(|| { - anyhow!("domain {} is not a ClientDomain instance", local_domain_id) - })?; - client_domain.perform_detach(); + if let Some(mux) = Mux::get() { + let client_domain = mux + .get_domain(local_domain_id) + .ok_or_else(|| anyhow!("no such domain {}", local_domain_id))?; + let client_domain = + client_domain + .downcast_ref::() + .ok_or_else(|| { + anyhow!("domain {} is not a ClientDomain instance", local_domain_id) + })?; + client_domain.perform_detach(); + } Ok(()) } promise::spawn::spawn_into_main_thread(async move {