1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-23 05:12:40 +03:00

clean up mux server startup

kindof a lot going on in this commit, unintentionally:

* Need the lua context set to be moved into the config crate
  otherwise configs cannot be parsed by the server and we end
  up with the default configs
* Make the server use smol for async io
* Drop the use of the daemonize crate, which I had forked anyway.
  Just inline our own tighter daemonize module
* Improve daemon spawning synchronization, however, it still needs
  work for windows to avoid blocking forever where we don't do
  daemonizing.
This commit is contained in:
Wez Furlong 2020-10-04 09:39:28 -07:00
parent a4731fd5ab
commit e8be716cb3
19 changed files with 656 additions and 568 deletions

11
Cargo.lock generated
View File

@ -687,7 +687,10 @@ name = "config"
version = "0.1.0"
dependencies = [
"anyhow",
"bstr 0.2.13",
"dirs 2.0.2",
"filenamegen",
"hostname",
"lazy_static",
"libc",
"log",
@ -2736,6 +2739,7 @@ dependencies = [
"anyhow",
"async-std",
"async-task 1.3.1",
"crossbeam",
"lazy_static",
"thiserror",
"tokio",
@ -4129,7 +4133,6 @@ dependencies = [
"async-trait",
"base64 0.10.1",
"bitflags 1.2.1",
"bstr 0.2.13",
"cc",
"cocoa",
"codec",
@ -4145,14 +4148,12 @@ dependencies = [
"embed-resource",
"euclid",
"filedescriptor",
"filenamegen",
"font-kit",
"font-loader",
"fontconfig",
"freetype 0.1.0",
"harfbuzz",
"hdrhistogram",
"hostname",
"http_req",
"image",
"lazy_static",
@ -4209,10 +4210,10 @@ dependencies = [
"anyhow",
"codec",
"config",
"crossbeam",
"daemonize",
"filedescriptor",
"foreign-types-shared",
"hostname",
"libc",
"log",
"mux",
"openssl",

View File

@ -158,7 +158,20 @@ async fn decode_raw_async<R: Unpin + AsyncReadExt>(r: &mut R) -> anyhow::Result<
};
let serial = read_u64_async(r).await.context("reading PDU serial")?;
let ident = read_u64_async(r).await.context("reading PDU ident")?;
let data_len = len as usize - (encoded_length(ident) + encoded_length(serial));
let data_len =
match (len as usize).overflowing_sub(encoded_length(ident) + encoded_length(serial)) {
(_, true) => {
anyhow::bail!(
"sizes don't make sense: len:{} serial:{} (enc={}) ident:{} (enc={})",
len,
serial,
encoded_length(serial),
ident,
encoded_length(ident)
);
}
(data_len, false) => data_len,
};
if is_compressed {
metrics::value!("pdu.decode.compressed.size", data_len as u64);
@ -192,7 +205,20 @@ fn decode_raw<R: std::io::Read>(mut r: R) -> anyhow::Result<Decoded> {
};
let serial = read_u64(r.by_ref()).context("reading PDU serial")?;
let ident = read_u64(r.by_ref()).context("reading PDU ident")?;
let data_len = len as usize - (encoded_length(ident) + encoded_length(serial));
let data_len =
match (len as usize).overflowing_sub(encoded_length(ident) + encoded_length(serial)) {
(_, true) => {
anyhow::bail!(
"sizes don't make sense: len:{} serial:{} (enc={}) ident:{} (enc={})",
len,
serial,
encoded_length(serial),
ident,
encoded_length(ident)
);
}
(data_len, false) => data_len,
};
if is_compressed {
metrics::value!("pdu.decode.compressed.size", data_len as u64);

View File

@ -12,7 +12,10 @@ vergen = "3"
[dependencies]
anyhow = "1.0"
bstr = "0.2"
dirs = "2.0"
filenamegen = "0.2"
hostname = "0.3"
lazy_static = "1.4"
libc = "0.2"
luahelper = { path = "../luahelper" }

View File

@ -29,6 +29,7 @@ mod font;
mod frontend;
pub mod keyassignment;
mod keys;
pub mod lua;
mod ssh;
mod terminal;
mod tls;
@ -54,8 +55,9 @@ lazy_static! {
pub static ref CONFIG_DIR: PathBuf = xdg_config_home();
pub static ref RUNTIME_DIR: PathBuf = compute_runtime_dir().unwrap();
static ref CONFIG: Configuration = Configuration::new();
static ref MAKE_LUA: Mutex<Option<LuaFactory>> = Mutex::new(None);
static ref SHOW_ERROR: Mutex<Option<ErrorCallback>> = Mutex::new(None);
static ref MAKE_LUA: Mutex<Option<LuaFactory>> = Mutex::new(Some(lua::make_lua_context));
static ref SHOW_ERROR: Mutex<Option<ErrorCallback>> =
Mutex::new(Some(|e| log::error!("{}", e)));
}
pub fn assign_lua_factory(make_lua_context: LuaFactory) {

View File

@ -1,6 +1,6 @@
use crate::{FontAttributes, TextStyle};
use anyhow::anyhow;
use bstr::BString;
use config::{FontAttributes, TextStyle};
pub use luahelper::*;
use mlua::{Lua, Table, Value};
use serde::*;
@ -48,8 +48,8 @@ pub fn make_lua_context(config_dir: &Path) -> anyhow::Result<Lua> {
array.insert(1, format!("{}/?/init.lua", path.display()));
}
prefix_path(&mut path_array, &config::HOME_DIR.join(".wezterm"));
prefix_path(&mut path_array, &config::CONFIG_DIR);
prefix_path(&mut path_array, &crate::HOME_DIR.join(".wezterm"));
prefix_path(&mut path_array, &crate::CONFIG_DIR);
if let Ok(exe) = std::env::current_exe() {
if let Some(path) = exe.parent() {
wezterm_mod.set(
@ -71,12 +71,12 @@ pub fn make_lua_context(config_dir: &Path) -> anyhow::Result<Lua> {
.ok_or_else(|| anyhow!("config dir path is not UTF-8"))?,
)?;
wezterm_mod.set("target_triple", config::wezterm_target_triple())?;
wezterm_mod.set("version", config::wezterm_version())?;
wezterm_mod.set("home_dir", config::HOME_DIR.to_str())?;
wezterm_mod.set("target_triple", crate::wezterm_target_triple())?;
wezterm_mod.set("version", crate::wezterm_version())?;
wezterm_mod.set("home_dir", crate::HOME_DIR.to_str())?;
wezterm_mod.set(
"running_under_wsl",
lua.create_function(|_, ()| Ok(config::running_under_wsl()))?,
lua.create_function(|_, ()| Ok(crate::running_under_wsl()))?,
)?;
wezterm_mod.set(
@ -207,7 +207,7 @@ fn font_with_fallback<'lua>(
fn action<'lua>(
_lua: &'lua Lua,
action: Table<'lua>,
) -> mlua::Result<config::keyassignment::KeyAssignment> {
) -> mlua::Result<crate::keyassignment::KeyAssignment> {
Ok(from_lua_value(Value::Table(action))?)
}

View File

@ -7,6 +7,7 @@ edition = "2018"
[dependencies]
async-task = "1.2"
async-std = "1.4"
crossbeam = "0.7"
tokio = {version="0.2", features=["full"]}
anyhow = "1.0"
thiserror = "1.0"

View File

@ -1,3 +1,4 @@
use crate::SpawnFunc;
use anyhow::{anyhow, Result};
use async_task::{JoinHandle, Task};
use std::future::Future;
@ -212,3 +213,35 @@ pub async fn join_handle_result<T>(handle: JoinHandle<anyhow::Result<T>, ()>) ->
.await
.ok_or_else(|| anyhow::anyhow!("task was cancelled or panicked"))?
}
pub struct SimpleExecutor {
rx: crossbeam::channel::Receiver<SpawnFunc>,
}
impl SimpleExecutor {
pub fn new() -> Self {
let (tx, rx) = crossbeam::channel::unbounded();
let tx_main = tx.clone();
let tx_low = tx.clone();
let queue_func = move |f: SpawnFunc| {
tx_main.send(f).ok();
};
let queue_func_low = move |f: SpawnFunc| {
tx_low.send(f).ok();
};
set_schedulers(
Box::new(move |task| queue_func(Box::new(move || task.run()))),
Box::new(move |task| queue_func_low(Box::new(move || task.run()))),
);
Self { rx }
}
pub fn tick(&self) -> anyhow::Result<()> {
match self.rx.recv() {
Ok(func) => func(),
Err(err) => anyhow::bail!("while waiting for events: {:?}", err),
};
Ok(())
}
}

View File

@ -10,9 +10,10 @@ edition = "2018"
anyhow = "1.0"
codec = { path = "../codec" }
config = { path = "../config" }
crossbeam = "0.7"
filedescriptor = { version="0.7", path = "../filedescriptor" }
foreign-types-shared = "0.1.1" # to peek into SslStream's SSL
hostname = "0.3"
libc = "0.2"
log = "0.4"
mux = { path = "../mux" }
openssl = "0.10"
@ -27,9 +28,6 @@ umask = { path = "../umask" }
url = "2"
wezterm-term = { path = "../term", features=["use_serde"] }
[target.'cfg(unix)'.dependencies]
daemonize = { git = "https://github.com/wez/daemonize" }
[features]
default = ["vendor_openssl"]
# FIXME: find a way to magically disable vendor_openssl only on linux!

View File

@ -1,102 +0,0 @@
use crate::pollable::*;
use crate::sessionhandler::{PduSender, SessionHandler};
use anyhow::{bail, Context, Error};
use codec::*;
use crossbeam::channel::TryRecvError;
use filedescriptor::poll;
use log::error;
use mux::{Mux, MuxNotification};
use std::collections::HashSet;
pub struct ClientSession<S: ReadAndWrite> {
stream: S,
to_write_rx: PollableReceiver<DecodedPdu>,
mux_rx: PollableReceiver<MuxNotification>,
handler: SessionHandler,
}
impl<S: ReadAndWrite> ClientSession<S> {
pub fn new(stream: S) -> Self {
let (to_write_tx, to_write_rx) =
pollable_channel().expect("failed to create pollable_channel");
let mux = Mux::get().expect("to be running on gui thread");
let (mux_tx, mux_rx) = pollable_channel().expect("failed to create pollable_channel");
mux.subscribe(move |n| mux_tx.send(n).is_ok());
let to_write_tx = PduSender::with_pollable(to_write_tx);
let handler = SessionHandler::new(to_write_tx);
Self {
stream,
to_write_rx,
mux_rx,
handler,
}
}
pub fn run(&mut self) {
if let Err(e) = self.process() {
error!("While processing session loop: {}", e);
}
}
fn process(&mut self) -> Result<(), Error> {
let mut read_buffer = Vec::with_capacity(1024);
let mut panes_to_output = HashSet::new();
loop {
loop {
match self.to_write_rx.try_recv() {
Ok(decoded) => {
log::trace!("writing pdu with serial {}", decoded.serial);
decoded.pdu.encode(&mut self.stream, decoded.serial)?;
self.stream.flush().context("while flushing stream")?;
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => bail!("ClientSession was destroyed"),
};
}
loop {
match self.mux_rx.try_recv() {
Ok(notif) => match notif {
// Coalesce multiple TabOutputs for the same tab
MuxNotification::PaneOutput(pane_id) => {
panes_to_output.insert(pane_id);
}
MuxNotification::WindowCreated(_window_id) => {}
},
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => bail!("mux_rx is Disconnected"),
};
}
for pane_id in panes_to_output.drain() {
self.handler.schedule_pane_push(pane_id);
}
let mut poll_array = [
self.to_write_rx.as_poll_fd(),
self.stream.as_poll_fd(),
self.mux_rx.as_poll_fd(),
];
poll(
&mut poll_array,
Some(std::time::Duration::from_millis(1000)),
)?;
if poll_array[1].revents != 0 || self.stream.has_read_buffered() {
loop {
self.stream.set_non_blocking(true)?;
let res = Pdu::try_read_and_decode(&mut self.stream, &mut read_buffer);
self.stream.set_non_blocking(false)?;
match res {
Ok(Some(decoded)) => self.handler.process_one(decoded),
Ok(None) => break,
Err(err) => {
log::error!("Error decoding: {}", err);
return Err(err);
}
}
}
}
}
}
}

View File

@ -0,0 +1,101 @@
#![cfg(unix)]
use anyhow::Context;
use libc::pid_t;
use std::io::Write;
use std::os::unix::io::AsRawFd;
enum Fork {
Child(pid_t),
Parent(pid_t),
}
fn fork() -> anyhow::Result<Fork> {
let pid = unsafe { libc::fork() };
if pid == 0 {
// We are the child
let pid = unsafe { libc::getpid() };
Ok(Fork::Child(pid))
} else if pid < 0 {
let err: anyhow::Error = std::io::Error::last_os_error().into();
Err(err.context("fork"))
} else {
// We are the parent
Ok(Fork::Parent(pid))
}
}
fn setsid() -> anyhow::Result<()> {
let pid = unsafe { libc::setsid() };
if pid == -1 {
let err: anyhow::Error = std::io::Error::last_os_error().into();
Err(err.context("setsid"))
} else {
Ok(())
}
}
fn lock_pid_file(config: &config::ConfigHandle) -> anyhow::Result<std::fs::File> {
let pid_file = config.daemon_options.pid_file();
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(&pid_file)
.with_context(|| format!("opening pid file {}", pid_file.display()))?;
let res = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if res != 0 {
let err = std::io::Error::last_os_error();
anyhow::bail!("unable to lock pid file {}: {}", pid_file.display(), err);
}
unsafe { libc::ftruncate(file.as_raw_fd(), 0) };
Ok(file)
}
pub fn daemonize(config: &config::ConfigHandle) -> anyhow::Result<()> {
let pid_file = if !config::running_under_wsl() {
// pid file locking is only partly functional when running under
// WSL 1; it is possible for the pid file to exist after a reboot
// and for attempts to open and lock it to fail when there are no
// other processes that might possibly hold a lock on it.
// So, we only use a pid file when not under WSL.
Some(lock_pid_file(config)?)
} else {
None
};
let stdout = config.daemon_options.open_stdout()?;
let stderr = config.daemon_options.open_stderr()?;
let devnull = std::fs::File::open("/dev/null").context("opening /dev/null for read")?;
match fork()? {
Fork::Parent(pid) => {
let mut status = 0;
unsafe { libc::waitpid(pid, &mut status, 0) };
std::process::exit(0);
}
Fork::Child(_) => {}
}
setsid()?;
match fork()? {
Fork::Parent(_) => {
std::process::exit(0);
}
Fork::Child(_) => {}
}
if let Some(mut pid_file) = pid_file {
writeln!(pid_file, "{}", unsafe { libc::getpid() }).ok();
// Leak it so that the descriptor remains open for the duration
// of the process runtime
std::mem::forget(pid_file);
}
unsafe { libc::dup2(devnull.as_raw_fd(), libc::STDIN_FILENO) };
unsafe { libc::dup2(stdout.as_raw_fd(), libc::STDOUT_FILENO) };
unsafe { libc::dup2(stderr.as_raw_fd(), libc::STDERR_FILENO) };
Ok(())
}

View File

@ -0,0 +1,160 @@
use crate::sessionhandler::{PduSender, SessionHandler};
use crate::UnixStream;
use codec::{DecodedPdu, Pdu};
use mux::{Mux, MuxNotification};
use promise::spawn::spawn;
use smol::io::{AsyncRead, AsyncWrite};
use std::marker::Unpin;
#[cfg(unix)]
pub trait AsRawDesc: std::os::unix::io::AsRawFd {}
#[cfg(windows)]
pub trait AsRawDesc: std::os::windows::io::AsRawSocket {}
pub trait TryClone {
fn try_to_clone(&self) -> anyhow::Result<Self>
where
Self: std::marker::Sized;
}
impl AsRawDesc for UnixStream {}
impl TryClone for UnixStream {
fn try_to_clone(&self) -> anyhow::Result<Self>
where
Self: std::marker::Sized,
{
Ok(self.try_clone()?)
}
}
#[derive(Debug)]
enum Item {
Notif(MuxNotification),
Pdu(DecodedPdu),
Done(String),
}
async fn catch(
f: impl smol::future::Future<Output = anyhow::Result<()>>,
tx: smol::channel::Sender<Item>,
) -> anyhow::Result<()> {
if let Err(err) = f.await {
tx.try_send(Item::Done(err.to_string())).ok();
}
Ok(())
}
async fn process_write_queue<T>(
mut write_stream: T,
write_rx: smol::channel::Receiver<DecodedPdu>,
) -> anyhow::Result<()>
where
T: AsyncWrite + Unpin,
{
loop {
let decoded = write_rx.recv().await?;
log::trace!("writing pdu with serial {}", decoded.serial);
decoded
.pdu
.encode_async(&mut write_stream, decoded.serial)
.await?;
}
}
async fn read_pdus<T>(
mut read_stream: T,
item_tx: smol::channel::Sender<Item>,
) -> anyhow::Result<()>
where
T: AsyncRead + Unpin,
{
loop {
let decoded = Pdu::decode_async(&mut read_stream).await?;
item_tx.send(Item::Pdu(decoded)).await?;
}
}
async fn multiplex(
write_tx: smol::channel::Sender<DecodedPdu>,
item_rx: smol::channel::Receiver<Item>,
) -> anyhow::Result<()> {
let pdu_sender = PduSender::with_smol(write_tx);
let mut handler = SessionHandler::new(pdu_sender);
loop {
let item = match item_rx.recv().await {
Ok(item) => item,
Err(err) => {
log::error!("{}", err);
return Ok(());
}
};
match item {
Item::Notif(MuxNotification::PaneOutput(pane_id)) => {
handler.schedule_pane_push(pane_id);
}
Item::Notif(MuxNotification::WindowCreated(_window_id)) => {}
Item::Pdu(decoded) => {
handler.process_one(decoded);
}
Item::Done(e) => {
log::error!("{}", e);
return Ok(());
}
}
}
}
pub async fn process<T>(stream: T) -> anyhow::Result<()>
where
T: 'static,
T: TryClone,
T: std::io::Read,
T: std::io::Write,
T: AsRawDesc,
{
let write_stream = smol::Async::new(stream.try_to_clone()?)?;
let read_stream = smol::Async::new(stream)?;
process_async(write_stream, read_stream).await
}
pub async fn process_async<T>(write_stream: T, read_stream: T) -> anyhow::Result<()>
where
T: 'static,
T: AsyncRead,
T: AsyncWrite,
T: Unpin,
{
let (item_tx, item_rx) = smol::channel::unbounded::<Item>();
let (write_tx, write_rx) = smol::channel::unbounded::<DecodedPdu>();
// Process the PDU write queue to send to the peer
spawn({
let item_tx = item_tx.clone();
async move {
catch(
async move { process_write_queue(write_stream, write_rx).await },
item_tx,
)
.await
}
});
{
let mux = Mux::get().expect("to be running on gui thread");
let tx = item_tx.clone();
mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok());
}
{
let item_tx = item_tx.clone();
spawn(async move {
let tx = item_tx.clone();
catch(async move { read_pdus(read_stream, item_tx).await }, tx).await
});
}
catch(async move { multiplex(write_tx, item_rx).await }, item_tx).await
}

View File

@ -1,10 +1,7 @@
use crate::sessionhandler::{PduSender, SessionHandler};
use crate::{UnixListener, UnixStream};
use crate::UnixListener;
use anyhow::{anyhow, Context as _};
use codec::{DecodedPdu, Pdu};
use config::{create_user_owned_dirs, UnixDomain};
use mux::{Mux, MuxNotification};
use promise::spawn::{spawn, spawn_into_main_thread};
use promise::spawn::spawn_into_main_thread;
pub struct LocalListener {
listener: UnixListener,
@ -24,7 +21,12 @@ impl LocalListener {
for stream in self.listener.incoming() {
match stream {
Ok(stream) => {
spawn_into_main_thread(async move { Self::process(stream) });
spawn_into_main_thread(async move {
crate::dispatch::process(stream).await.map_err(|e| {
log::error!("welp: {:?}", e);
e
})
});
}
Err(err) => {
log::error!("accept failed: {}", err);
@ -33,60 +35,6 @@ impl LocalListener {
}
}
}
async fn process(stream: UnixStream) -> anyhow::Result<()> {
let mut write_stream = smol::Async::new(stream.try_clone()?)?;
let mut read_stream = smol::Async::new(stream)?;
let (write_tx, write_rx) = smol::channel::unbounded::<DecodedPdu>();
// Process the PDU write queue to send to the peer
spawn(async move {
while let Ok(decoded) = write_rx.recv().await {
log::trace!("writing pdu with serial {}", decoded.serial);
decoded
.pdu
.encode_async(&mut write_stream, decoded.serial)
.await?;
}
Ok::<(), anyhow::Error>(())
});
let pdu_sender = PduSender::with_smol(write_tx);
let mut handler = SessionHandler::new(pdu_sender);
enum Item {
Notif(MuxNotification),
Pdu(DecodedPdu),
}
let (item_tx, item_rx) = smol::channel::unbounded::<Item>();
{
let mux = Mux::get().expect("to be running on gui thread");
let tx = item_tx.clone();
mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok());
}
spawn(async move {
while let Ok(decoded) = Pdu::decode_async(&mut read_stream).await {
item_tx.send(Item::Pdu(decoded)).await?;
}
Ok::<(), anyhow::Error>(())
});
while let Ok(notif) = item_rx.recv().await {
match notif {
Item::Notif(MuxNotification::PaneOutput(pane_id)) => {
handler.schedule_pane_push(pane_id);
}
Item::Notif(MuxNotification::WindowCreated(_window_id)) => {}
Item::Pdu(decoded) => {
handler.process_one(decoded);
}
}
}
Ok(())
}
}
/// Take care when setting up the listener socket;
@ -95,7 +43,7 @@ impl LocalListener {
/// that prevent other users from manipulating its contents.
fn safely_create_sock_path(unix_dom: &UnixDomain) -> anyhow::Result<UnixListener> {
let sock_path = &unix_dom.socket_path();
log::debug!("setting up {}", sock_path.display());
log::error!("setting up {}", sock_path.display());
let sock_dir = sock_path
.parent()

View File

@ -1,21 +1,16 @@
use anyhow::{anyhow, bail, Context, Error};
use config::{configuration, TlsDomainServer};
use crossbeam::channel::unbounded as channel;
use log::error;
use config::configuration;
use mux::activity::Activity;
use mux::domain::{Domain, LocalDomain};
use mux::Mux;
use portable_pty::cmdbuilder::CommandBuilder;
use promise::spawn::spawn_into_main_thread;
use promise::*;
use std::ffi::OsString;
use std::net::TcpListener;
use std::path::Path;
use std::rc::Rc;
use std::sync::Arc;
use std::thread;
use structopt::*;
mod daemonize;
#[cfg(unix)]
use std::os::unix::net::{UnixListener, UnixStream};
#[cfg(windows)]
@ -48,8 +43,16 @@ struct Opt {
prog: Vec<OsString>,
}
fn main() -> anyhow::Result<()> {
fn main() {
pretty_env_logger::init_timed();
if let Err(err) = run() {
eprintln!("boo {}", err);
log::error!("{}", err);
std::process::exit(1);
}
}
fn run() -> anyhow::Result<()> {
//stats::Stats::init()?;
let _saver = umask::UmaskSaver::new();
@ -62,34 +65,7 @@ fn main() -> anyhow::Result<()> {
#[cfg(unix)]
{
if opts.daemonize {
let stdout = config.daemon_options.open_stdout()?;
let stderr = config.daemon_options.open_stderr()?;
let mut daemonize = daemonize::Daemonize::new()
.stdout(stdout)
.stderr(stderr)
.working_directory(config::HOME_DIR.clone());
if !config::running_under_wsl() {
// pid file locking is only partly function when running under
// WSL 1; it is possible for the pid file to exist after a reboot
// and for attempts to open and lock it to fail when there are no
// other processes that might possibly hold a lock on it.
// So, we only use a pid file when not under WSL.
daemonize = daemonize.pid_file(config.daemon_options.pid_file());
}
if let Err(err) = daemonize.start() {
use daemonize::DaemonizeError;
match err {
DaemonizeError::OpenPidfile
| DaemonizeError::LockPidfile(_)
| DaemonizeError::ChownPidfile(_)
| DaemonizeError::WritePid => {
bail!("{} {}", err, config.daemon_options.pid_file().display());
}
DaemonizeError::ChangeDirectory => {
bail!("{} {}", err, config::HOME_DIR.display());
}
_ => return Err(err.into()),
daemonize::daemonize(&config)?;
}
}
@ -109,8 +85,6 @@ fn main() -> anyhow::Result<()> {
] {
std::env::remove_var(name);
}
}
}
let need_builder = !opts.prog.is_empty() || opts.cwd.is_some();
@ -132,22 +106,12 @@ fn main() -> anyhow::Result<()> {
let mux = Rc::new(mux::Mux::new(Some(domain.clone())));
Mux::set_mux(&mux);
let (tx, rx) = channel();
let executor = promise::spawn::SimpleExecutor::new();
let tx_main = tx.clone();
let tx_low = tx.clone();
let queue_func = move |f: SpawnFunc| {
tx_main.send(f).ok();
};
let queue_func_low = move |f: SpawnFunc| {
tx_low.send(f).ok();
};
promise::spawn::set_schedulers(
Box::new(move |task| queue_func(Box::new(move || task.run()))),
Box::new(move |task| queue_func_low(Box::new(move || task.run()))),
);
spawn_listener()?;
spawn_listener().map_err(|e| {
log::error!("problem spawning listeners: {:?}", e);
e
})?;
let activity = Activity::new();
@ -159,13 +123,10 @@ fn main() -> anyhow::Result<()> {
});
loop {
match rx.recv() {
Ok(func) => func(),
Err(err) => bail!("while waiting for events: {:?}", err),
}
executor.tick()?;
if Mux::get().unwrap().is_empty() && mux::activity::Activity::count() == 0 {
log::info!("No more tabs; all done!");
log::error!("No more tabs; all done!");
return Ok(());
}
}
@ -191,11 +152,10 @@ fn terminate_with_error(err: anyhow::Error) -> ! {
std::process::exit(1);
}
mod clientsession;
mod dispatch;
mod local;
mod ossl;
mod pki;
mod pollable;
mod sessionhandler;
lazy_static::lazy_static! {
@ -214,5 +174,6 @@ pub fn spawn_listener() -> anyhow::Result<()> {
for tls_server in &config.tls_servers {
ossl::spawn_tls_listener(tls_server)?;
}
Ok(())
}

View File

@ -1,12 +1,69 @@
use super::*;
use crate::PKI;
use anyhow::{anyhow, Context, Error};
use config::TlsDomainServer;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod, SslStream, SslVerifyMode};
use openssl::x509::X509;
use promise::spawn::spawn_into_main_thread;
use std::net::TcpListener;
use std::net::TcpStream;
use std::path::Path;
use std::sync::Arc;
struct OpenSSLNetListener {
acceptor: Arc<SslAcceptor>,
listener: TcpListener,
}
struct AsyncSslStream {
s: SslStream<TcpStream>,
}
impl AsyncSslStream {
pub fn new(s: SslStream<TcpStream>) -> Self {
Self { s }
}
}
impl crate::dispatch::TryClone for AsyncSslStream {
fn try_to_clone(&self) -> anyhow::Result<Self> {
use foreign_types_shared::ForeignTypeRef;
let stream = self.s.get_ref().try_clone()?;
let s = unsafe { SslStream::from_raw_parts(self.s.ssl().as_ptr(), stream) };
Ok(Self { s })
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for AsyncSslStream {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
self.s.get_ref().as_raw_fd()
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawSocket for AsyncSslStream {
fn as_raw_socket(&self) -> std::os::windows::io::RawSocket {
self.s.get_ref().as_raw_socket()
}
}
impl crate::dispatch::AsRawDesc for AsyncSslStream {}
impl std::io::Read for AsyncSslStream {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.s.read(buf)
}
}
impl std::io::Write for AsyncSslStream {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.s.write(buf)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
self.s.flush()
}
}
impl OpenSSLNetListener {
pub fn new(listener: TcpListener, acceptor: SslAcceptor) -> Self {
Self {
@ -57,7 +114,7 @@ impl OpenSSLNetListener {
);
Ok(())
} else {
bail!("CN `{}` did not match $USER `{}`", cn_str, wanted_unix_name);
anyhow::bail!("CN `{}` did not match $USER `{}`", cn_str, wanted_unix_name);
}
}
}
@ -72,22 +129,20 @@ impl OpenSSLNetListener {
match acceptor.accept(stream) {
Ok(stream) => {
if let Err(err) = Self::verify_peer_cert(&stream) {
error!("problem with peer cert: {}", err);
log::error!("problem with peer cert: {}", err);
break;
}
spawn_into_main_thread(async move {
let mut session = clientsession::ClientSession::new(stream);
thread::spawn(move || session.run());
crate::dispatch::process(AsyncSslStream::new(stream)).await
});
}
Err(e) => {
error!("failed TlsAcceptor: {}", e);
log::error!("failed TlsAcceptor: {}", e);
}
}
}
Err(err) => {
error!("accept failed: {}", err);
log::error!("accept failed: {}", err);
return;
}
}
@ -156,6 +211,8 @@ pub fn spawn_tls_listener(tls_server: &TlsDomainServer) -> Result<(), Error> {
let acceptor = acceptor.build();
log::error!("listening with TLS on {:?}", tls_server.bind_address);
let mut net_listener = OpenSSLNetListener::new(
TcpListener::bind(&tls_server.bind_address).with_context(|| {
format!(
@ -165,7 +222,7 @@ pub fn spawn_tls_listener(tls_server: &TlsDomainServer) -> Result<(), Error> {
})?,
acceptor,
);
thread::spawn(move || {
std::thread::spawn(move || {
net_listener.run();
});
Ok(())

View File

@ -1,131 +0,0 @@
use crate::UnixStream;
use anyhow::Error;
use crossbeam::channel::{unbounded as channel, Receiver, Sender, TryRecvError};
use filedescriptor::*;
use std::cell::RefCell;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
pub trait ReadAndWrite: std::io::Read + std::io::Write + Send + AsPollFd {
fn set_non_blocking(&self, non_blocking: bool) -> anyhow::Result<()>;
fn has_read_buffered(&self) -> bool;
}
impl ReadAndWrite for UnixStream {
fn set_non_blocking(&self, non_blocking: bool) -> anyhow::Result<()> {
self.set_nonblocking(non_blocking)?;
Ok(())
}
fn has_read_buffered(&self) -> bool {
false
}
}
impl ReadAndWrite for openssl::ssl::SslStream<std::net::TcpStream> {
fn set_non_blocking(&self, non_blocking: bool) -> anyhow::Result<()> {
self.get_ref().set_nonblocking(non_blocking)?;
Ok(())
}
fn has_read_buffered(&self) -> bool {
self.ssl().pending() != 0
}
}
pub struct PollableSender<T> {
sender: Sender<T>,
write: Arc<Mutex<FileDescriptor>>,
}
impl<T: Send + Sync + 'static> PollableSender<T> {
pub fn send(&self, item: T) -> anyhow::Result<()> {
// Attempt to write to the pipe; if it fails due to
// being full, that's fine: it means that the other end
// is going to be signalled already so they won't miss
// anything if our write doesn't happen.
self.write.lock().unwrap().write_all(b"x").ok();
self.sender.send(item).map_err(Error::msg)?;
Ok(())
}
}
impl<T> Clone for PollableSender<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
write: self.write.clone(),
}
}
}
pub struct PollableReceiver<T> {
receiver: Receiver<T>,
read: RefCell<FileDescriptor>,
}
impl<T> PollableReceiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
// try to drain the pipe.
// We do this regardless of whether we popped an item
// so that we avoid being in a perpetually signalled state.
let mut byte = [0u8; 64];
self.read.borrow_mut().read(&mut byte).ok();
Ok(self.receiver.try_recv()?)
}
}
impl<T> AsPollFd for PollableReceiver<T> {
fn as_poll_fd(&self) -> pollfd {
self.read.borrow().as_socket_descriptor().as_poll_fd()
}
}
/// A channel that can be polled together with a socket.
/// This uses the self-pipe trick but with a unix domain
/// socketpair.
/// In theory this should also work on windows, but will require
/// windows 10 w/unix domain socket support.
pub fn pollable_channel<T>() -> anyhow::Result<(PollableSender<T>, PollableReceiver<T>)> {
let (sender, receiver) = channel();
let (mut write, mut read) = socketpair()?;
write.set_non_blocking(true)?;
read.set_non_blocking(true)?;
Ok((
PollableSender {
sender,
write: Arc::new(Mutex::new(FileDescriptor::new(write))),
},
PollableReceiver {
receiver,
read: RefCell::new(FileDescriptor::new(read)),
},
))
}
pub trait AsPollFd {
fn as_poll_fd(&self) -> pollfd;
}
impl AsPollFd for SocketDescriptor {
fn as_poll_fd(&self) -> pollfd {
pollfd {
fd: *self,
events: POLLIN,
revents: 0,
}
}
}
impl AsPollFd for openssl::ssl::SslStream<TcpStream> {
fn as_poll_fd(&self) -> pollfd {
self.get_ref().as_socket_descriptor().as_poll_fd()
}
}
impl AsPollFd for UnixStream {
fn as_poll_fd(&self) -> pollfd {
self.as_socket_descriptor().as_poll_fd()
}
}

View File

@ -1,4 +1,3 @@
use crate::pollable::*;
use crate::PKI;
use anyhow::anyhow;
use codec::*;
@ -28,12 +27,6 @@ impl PduSender {
(self.func)(pdu)
}
pub fn with_pollable(p: PollableSender<DecodedPdu>) -> Self {
Self {
func: Arc::new(move |pdu| p.send(pdu)),
}
}
pub fn with_smol(p: smol::channel::Sender<DecodedPdu>) -> Self {
Self {
func: Arc::new(move |pdu| p.try_send(pdu).map_err(|e| anyhow!("{:?}", e))),

View File

@ -23,7 +23,6 @@ thiserror = "1.0"
base64 = "0.10"
rangeset = { path = "../rangeset" }
bitflags = "1.0"
bstr = "0.2"
codec = { path = "../codec" }
config = { path = "../config" }
crossbeam = "0.7"
@ -31,12 +30,10 @@ dirs = "2.0"
downcast-rs = "1.0"
euclid = "0.20"
filedescriptor = { version="0.7", path = "../filedescriptor" }
filenamegen = "0.2"
pretty_env_logger = "0.4"
freetype = { path = "../deps/freetype" }
image = "0.23"
harfbuzz = { path = "../deps/harfbuzz" }
hostname = "0.3"
lazy_static = "1.4"
libc = "0.2"
log = "0.4"

View File

@ -16,9 +16,6 @@ use std::sync::Arc;
use structopt::StructOpt;
use tabout::{tabulate_output, Alignment, Column};
// This module defines a macro, so it must be referenced before any other mods
mod scripting;
mod connui;
mod gui;
use config::keyassignment;
@ -650,7 +647,6 @@ fn terminate_with_error(err: anyhow::Error) -> ! {
}
fn main() {
config::assign_lua_factory(scripting::make_lua_context);
config::assign_error_callback(crate::connui::show_configuration_error_message);
notify_on_panic();
if let Err(e) = run() {
@ -834,8 +830,11 @@ fn run() -> anyhow::Result<()> {
SubCommand::Serial(serial) => run_serial(config, &serial),
SubCommand::Connect(connect) => run_mux_client(config, &connect),
SubCommand::ImageCat(cmd) => cmd.run(),
SubCommand::Cli(cli) => {
// Start a front end so that the futures executor is running
SubCommand::Cli(cli) => run_cli(config, cli),
}
}
async fn run_cli_async(config: config::ConfigHandle, cli: CliCommand) -> anyhow::Result<()> {
let initial = true;
let mut ui = crate::connui::ConnectionUI::new_headless();
let client = Client::new_default_unix_domain(initial, &mut ui)?;
@ -868,7 +867,7 @@ fn run() -> anyhow::Result<()> {
},
];
let mut data = vec![];
let panes = block_on(client.list_panes())?; // FIXME: blocking
let panes = client.list_panes().await?;
for tabroot in panes.tabs {
let mut cursor = tabroot.into_tree().cursor();
@ -916,7 +915,8 @@ fn run() -> anyhow::Result<()> {
.parse()?,
};
let spawned = block_on(client.split_pane(codec::SplitPane {
let spawned = client
.split_pane(codec::SplitPane {
pane_id,
direction: if horizontal {
SplitDirection::Horizontal
@ -931,7 +931,8 @@ fn run() -> anyhow::Result<()> {
Some(builder)
},
command_dir: cwd.and_then(|c| c.to_str().map(|s| s.to_string())),
}))?;
})
.await?;
log::debug!("{:?}", spawned);
println!("{}", spawned.pane_id);
@ -948,7 +949,7 @@ fn run() -> anyhow::Result<()> {
Mux::set_mux(&mux);
let unix_dom = config.unix_domains.first().unwrap();
let sock_path = unix_dom.socket_path();
let stream = unix_connect_with_retry(&sock_path)?;
let stream = unix_connect_with_retry(&sock_path, false)?;
// Keep the threads below alive forever; they'll
// exit the process when they're done.
@ -969,13 +970,26 @@ fn run() -> anyhow::Result<()> {
});
}
CliSubCommand::TlsCreds => {
let creds = block_on(client.get_tls_creds())?;
let creds = client.get_tls_creds().await?;
codec::Pdu::GetTlsCredsResponse(creds).encode(std::io::stdout().lock(), 0)?;
}
}
Ok(())
}
fn run_cli(config: config::ConfigHandle, cli: CliCommand) -> anyhow::Result<()> {
let executor = promise::spawn::SimpleExecutor::new();
promise::spawn::spawn(async move {
match run_cli_async(config, cli).await {
Ok(_) => std::process::exit(0),
Err(err) => {
terminate_with_error(err);
}
}
});
loop {
executor.tick()?;
}
}
fn consume_stream<F: Read, T: Write>(mut from_stream: F, mut to_stream: T) -> anyhow::Result<()> {

View File

@ -100,7 +100,14 @@ async fn process_unilateral_inner_async(
local_domain_id: DomainId,
decoded: DecodedPdu,
) -> anyhow::Result<()> {
let mux = Mux::get().unwrap();
let mux = match Mux::get() {
Some(mux) => mux,
None => {
// This can happen for some client scenarios; it is ok to ignore it.
return Ok(());
}
};
let client_domain = mux
.get_domain(local_domain_id)
.ok_or_else(|| anyhow!("no such domain {}", local_domain_id))?;
@ -260,9 +267,16 @@ fn client_thread(
}
}
pub fn unix_connect_with_retry(path: &Path) -> Result<UnixStream, std::io::Error> {
pub fn unix_connect_with_retry(
path: &Path,
just_spawned: bool,
) -> Result<UnixStream, std::io::Error> {
let mut error = std::io::Error::last_os_error();
if just_spawned {
std::thread::sleep(std::time::Duration::from_millis(200));
}
for iter in 0..10 {
if iter > 0 {
std::thread::sleep(std::time::Duration::from_millis(iter * 10));
@ -458,7 +472,7 @@ impl Reconnectable {
ui.output_str(&format!("Connect to {}\n", sock_path.display()));
info!("connect to {}", sock_path.display());
let stream = match unix_connect_with_retry(&sock_path) {
let stream = match unix_connect_with_retry(&sock_path, false) {
Ok(stream) => stream,
Err(e) => {
if unix_dom.no_serve_automatically || !initial {
@ -483,15 +497,26 @@ impl Reconnectable {
let mut cmd = CommandBuilder::new(&argv[0]);
cmd.args(&argv[1..]);
let mut child = pair.slave.spawn_command(cmd)?;
drop(pair.slave);
let mut reader = pair.master.try_clone_reader()?;
drop(pair.master);
let mut s = String::new();
reader.read_to_string(&mut s).ok();
log::error!("server output: {}", s);
let status = child.wait()?;
if !status.success() {
log::error!("{:?} failed with status {:?}", argv, status);
}
log::error!("{:?} completed with status {:?}", argv, status);
drop(child);
drop(pair.slave);
unix_connect_with_retry(&sock_path)
.with_context(|| format!("failed to connect to {}", sock_path.display()))?
unix_connect_with_retry(&sock_path, true).with_context(|| {
format!(
"(after spawning server) failed to connect to {}",
sock_path.display()
)
})?
}
};
@ -781,7 +806,7 @@ impl Client {
}
async fn detach(local_domain_id: DomainId) -> anyhow::Result<()> {
let mux = Mux::get().unwrap();
if let Some(mux) = Mux::get() {
let client_domain = mux
.get_domain(local_domain_id)
.ok_or_else(|| anyhow!("no such domain {}", local_domain_id))?;
@ -792,6 +817,7 @@ impl Client {
anyhow!("domain {} is not a ClientDomain instance", local_domain_id)
})?;
client_domain.perform_detach();
}
Ok(())
}
promise::spawn::spawn_into_main_thread(async move {