mirror of
https://github.com/wez/wezterm.git
synced 2024-12-26 14:54:16 +03:00
6b67ae842c
Use some heuristics to verify the data that is about to be parsed; this can help to detect eg: data being output to stdout prior to us sending any encoded data to the remote mux. In addition, add a timeout to help avoid waiting forever in the case that we didn't detect a problem. refs: https://github.com/wez/wezterm/issues/1860
170 lines
6.2 KiB
Rust
170 lines
6.2 KiB
Rust
use crate::sessionhandler::{PduSender, SessionHandler};
|
|
use crate::UnixStream;
|
|
use anyhow::Context;
|
|
use async_ossl::AsyncSslStream;
|
|
use codec::{DecodedPdu, Pdu};
|
|
use futures::FutureExt;
|
|
use mux::{Mux, MuxNotification};
|
|
use smol::prelude::*;
|
|
use smol::Async;
|
|
|
|
#[cfg(unix)]
|
|
pub trait AsRawDesc: std::os::unix::io::AsRawFd {}
|
|
#[cfg(windows)]
|
|
pub trait AsRawDesc: std::os::windows::io::AsRawSocket {}
|
|
|
|
impl AsRawDesc for UnixStream {}
|
|
impl AsRawDesc for AsyncSslStream {}
|
|
|
|
#[derive(Debug)]
|
|
enum Item {
|
|
Notif(MuxNotification),
|
|
WritePdu(DecodedPdu),
|
|
Readable,
|
|
}
|
|
|
|
pub async fn process<T>(stream: T) -> anyhow::Result<()>
|
|
where
|
|
T: 'static,
|
|
T: std::io::Read,
|
|
T: std::io::Write,
|
|
T: AsRawDesc,
|
|
T: std::fmt::Debug,
|
|
{
|
|
let stream = smol::Async::new(stream)?;
|
|
process_async(stream).await
|
|
}
|
|
|
|
pub async fn process_async<T>(mut stream: Async<T>) -> anyhow::Result<()>
|
|
where
|
|
T: 'static,
|
|
T: std::io::Read,
|
|
T: std::io::Write,
|
|
T: std::fmt::Debug,
|
|
{
|
|
log::trace!("process_async called");
|
|
|
|
let (item_tx, item_rx) = smol::channel::unbounded::<Item>();
|
|
|
|
let pdu_sender = PduSender::new({
|
|
let item_tx = item_tx.clone();
|
|
move |pdu| {
|
|
item_tx
|
|
.try_send(Item::WritePdu(pdu))
|
|
.map_err(|e| anyhow::anyhow!("{:?}", e))
|
|
}
|
|
});
|
|
let mut handler = SessionHandler::new(pdu_sender);
|
|
|
|
{
|
|
let mux = Mux::get().expect("to be running on gui thread");
|
|
let tx = item_tx.clone();
|
|
mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok());
|
|
}
|
|
|
|
loop {
|
|
let rx_msg = item_rx.recv();
|
|
let wait_for_read = stream.readable().map(|_| Ok(Item::Readable));
|
|
|
|
match smol::future::or(rx_msg, wait_for_read).await {
|
|
Ok(Item::Readable) => {
|
|
let decoded = match Pdu::decode_async(&mut stream, None).await {
|
|
Ok(data) => data,
|
|
Err(err) => {
|
|
if let Some(err) = err.root_cause().downcast_ref::<std::io::Error>() {
|
|
if err.kind() == std::io::ErrorKind::UnexpectedEof {
|
|
// Client disconnected: no need to make a noise
|
|
return Ok(());
|
|
}
|
|
}
|
|
return Err(err).context("reading Pdu from client");
|
|
}
|
|
};
|
|
handler.process_one(decoded);
|
|
}
|
|
Ok(Item::WritePdu(decoded)) => {
|
|
match decoded.pdu.encode_async(&mut stream, decoded.serial).await {
|
|
Ok(()) => {}
|
|
Err(err) => {
|
|
if let Some(err) = err.root_cause().downcast_ref::<std::io::Error>() {
|
|
if err.kind() == std::io::ErrorKind::BrokenPipe {
|
|
// Client disconnected: no need to make a noise
|
|
return Ok(());
|
|
}
|
|
}
|
|
return Err(err).context("encoding PDU to client");
|
|
}
|
|
};
|
|
match stream.flush().await {
|
|
Ok(()) => {}
|
|
Err(err) => {
|
|
if err.kind() == std::io::ErrorKind::BrokenPipe {
|
|
// Client disconnected: no need to make a noise
|
|
return Ok(());
|
|
}
|
|
return Err(err).context("flushing PDU to client");
|
|
}
|
|
}
|
|
}
|
|
Ok(Item::Notif(MuxNotification::PaneOutput(pane_id))) => {
|
|
handler.schedule_pane_push(pane_id);
|
|
}
|
|
Ok(Item::Notif(MuxNotification::PaneAdded(_pane_id))) => {}
|
|
Ok(Item::Notif(MuxNotification::PaneRemoved(pane_id))) => {
|
|
Pdu::PaneRemoved(codec::PaneRemoved { pane_id })
|
|
.encode_async(&mut stream, 0)
|
|
.await?;
|
|
stream.flush().await.context("flushing PDU to client")?;
|
|
}
|
|
Ok(Item::Notif(MuxNotification::Alert { pane_id, alert })) => {
|
|
{
|
|
let per_pane = handler.per_pane(pane_id);
|
|
let mut per_pane = per_pane.lock().unwrap();
|
|
per_pane.notifications.push(alert);
|
|
}
|
|
handler.schedule_pane_push(pane_id);
|
|
}
|
|
Ok(Item::Notif(MuxNotification::SaveToDownloads { .. })) => {}
|
|
Ok(Item::Notif(MuxNotification::AssignClipboard {
|
|
pane_id,
|
|
selection,
|
|
clipboard,
|
|
})) => {
|
|
Pdu::SetClipboard(codec::SetClipboard {
|
|
pane_id,
|
|
clipboard,
|
|
selection,
|
|
})
|
|
.encode_async(&mut stream, 0)
|
|
.await?;
|
|
stream.flush().await.context("flushing PDU to client")?;
|
|
}
|
|
Ok(Item::Notif(MuxNotification::WindowRemoved(_window_id))) => {}
|
|
Ok(Item::Notif(MuxNotification::WindowCreated(_window_id))) => {}
|
|
Ok(Item::Notif(MuxNotification::WindowInvalidated(_window_id))) => {}
|
|
Ok(Item::Notif(MuxNotification::WindowWorkspaceChanged(window_id))) => {
|
|
let workspace = {
|
|
let mux = Mux::get().expect("to be running on gui thread");
|
|
mux.get_window(window_id)
|
|
.map(|w| w.get_workspace().to_string())
|
|
};
|
|
if let Some(workspace) = workspace {
|
|
Pdu::WindowWorkspaceChanged(codec::WindowWorkspaceChanged {
|
|
window_id,
|
|
workspace,
|
|
})
|
|
.encode_async(&mut stream, 0)
|
|
.await?;
|
|
stream.flush().await.context("flushing PDU to client")?;
|
|
}
|
|
}
|
|
Ok(Item::Notif(MuxNotification::ActiveWorkspaceChanged(_))) => {}
|
|
Ok(Item::Notif(MuxNotification::Empty)) => {}
|
|
Err(err) => {
|
|
log::error!("process_async Err {}", err);
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
}
|