1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-26 14:54:16 +03:00
wezterm/wezterm-mux-server-impl/src/dispatch.rs
Wez Furlong 02eb0b4294
mux: rename Mux::get() -> try_get(), add "infallible" Mux::get()
This allows removing a bunch of unwrap/expect calls.

However, my primary motive was to replace the cases where we used
Mux::get() == None to indicate that we were not on the main thread.

A separate API has been added to test for that explicitly rather than
implicitly.
2022-12-19 11:55:35 -07:00

171 lines
6.2 KiB
Rust

use crate::sessionhandler::{PduSender, SessionHandler};
use crate::UnixStream;
use anyhow::Context;
use async_ossl::AsyncSslStream;
use codec::{DecodedPdu, Pdu};
use futures::FutureExt;
use mux::{Mux, MuxNotification};
use smol::prelude::*;
use smol::Async;
#[cfg(unix)]
pub trait AsRawDesc: std::os::unix::io::AsRawFd {}
#[cfg(windows)]
pub trait AsRawDesc: std::os::windows::io::AsRawSocket {}
impl AsRawDesc for UnixStream {}
impl AsRawDesc for AsyncSslStream {}
#[derive(Debug)]
enum Item {
Notif(MuxNotification),
WritePdu(DecodedPdu),
Readable,
}
pub async fn process<T>(stream: T) -> anyhow::Result<()>
where
T: 'static,
T: std::io::Read,
T: std::io::Write,
T: AsRawDesc,
T: std::fmt::Debug,
{
let stream = smol::Async::new(stream)?;
process_async(stream).await
}
pub async fn process_async<T>(mut stream: Async<T>) -> anyhow::Result<()>
where
T: 'static,
T: std::io::Read,
T: std::io::Write,
T: std::fmt::Debug,
{
log::trace!("process_async called");
let (item_tx, item_rx) = smol::channel::unbounded::<Item>();
let pdu_sender = PduSender::new({
let item_tx = item_tx.clone();
move |pdu| {
item_tx
.try_send(Item::WritePdu(pdu))
.map_err(|e| anyhow::anyhow!("{:?}", e))
}
});
let mut handler = SessionHandler::new(pdu_sender);
{
let mux = Mux::get();
let tx = item_tx.clone();
mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok());
}
loop {
let rx_msg = item_rx.recv();
let wait_for_read = stream.readable().map(|_| Ok(Item::Readable));
match smol::future::or(rx_msg, wait_for_read).await {
Ok(Item::Readable) => {
let decoded = match Pdu::decode_async(&mut stream, None).await {
Ok(data) => data,
Err(err) => {
if let Some(err) = err.root_cause().downcast_ref::<std::io::Error>() {
if err.kind() == std::io::ErrorKind::UnexpectedEof {
// Client disconnected: no need to make a noise
return Ok(());
}
}
return Err(err).context("reading Pdu from client");
}
};
handler.process_one(decoded);
}
Ok(Item::WritePdu(decoded)) => {
match decoded.pdu.encode_async(&mut stream, decoded.serial).await {
Ok(()) => {}
Err(err) => {
if let Some(err) = err.root_cause().downcast_ref::<std::io::Error>() {
if err.kind() == std::io::ErrorKind::BrokenPipe {
// Client disconnected: no need to make a noise
return Ok(());
}
}
return Err(err).context("encoding PDU to client");
}
};
match stream.flush().await {
Ok(()) => {}
Err(err) => {
if err.kind() == std::io::ErrorKind::BrokenPipe {
// Client disconnected: no need to make a noise
return Ok(());
}
return Err(err).context("flushing PDU to client");
}
}
}
Ok(Item::Notif(MuxNotification::PaneOutput(pane_id))) => {
handler.schedule_pane_push(pane_id);
}
Ok(Item::Notif(MuxNotification::PaneAdded(_pane_id))) => {}
Ok(Item::Notif(MuxNotification::PaneRemoved(pane_id))) => {
Pdu::PaneRemoved(codec::PaneRemoved { pane_id })
.encode_async(&mut stream, 0)
.await?;
stream.flush().await.context("flushing PDU to client")?;
}
Ok(Item::Notif(MuxNotification::Alert { pane_id, alert })) => {
{
let per_pane = handler.per_pane(pane_id);
let mut per_pane = per_pane.lock().unwrap();
per_pane.notifications.push(alert);
}
handler.schedule_pane_push(pane_id);
}
Ok(Item::Notif(MuxNotification::SaveToDownloads { .. })) => {}
Ok(Item::Notif(MuxNotification::AssignClipboard {
pane_id,
selection,
clipboard,
})) => {
Pdu::SetClipboard(codec::SetClipboard {
pane_id,
clipboard,
selection,
})
.encode_async(&mut stream, 0)
.await?;
stream.flush().await.context("flushing PDU to client")?;
}
Ok(Item::Notif(MuxNotification::TabAddedToWindow { .. })) => {}
Ok(Item::Notif(MuxNotification::WindowRemoved(_window_id))) => {}
Ok(Item::Notif(MuxNotification::WindowCreated(_window_id))) => {}
Ok(Item::Notif(MuxNotification::WindowInvalidated(_window_id))) => {}
Ok(Item::Notif(MuxNotification::WindowWorkspaceChanged(window_id))) => {
let workspace = {
let mux = Mux::get();
mux.get_window(window_id)
.map(|w| w.get_workspace().to_string())
};
if let Some(workspace) = workspace {
Pdu::WindowWorkspaceChanged(codec::WindowWorkspaceChanged {
window_id,
workspace,
})
.encode_async(&mut stream, 0)
.await?;
stream.flush().await.context("flushing PDU to client")?;
}
}
Ok(Item::Notif(MuxNotification::ActiveWorkspaceChanged(_))) => {}
Ok(Item::Notif(MuxNotification::Empty)) => {}
Err(err) => {
log::error!("process_async Err {}", err);
return Ok(());
}
}
}
}