1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-27 15:37:29 +03:00
wezterm/wezterm-mux-server-impl/src/dispatch.rs

109 lines
3.4 KiB
Rust
Raw Normal View History

use crate::sessionhandler::{PduSender, SessionHandler};
use crate::UnixStream;
use anyhow::Context;
use async_ossl::AsyncSslStream;
use codec::{DecodedPdu, Pdu};
use futures::FutureExt;
use mux::{Mux, MuxNotification};
use smol::prelude::*;
use smol::Async;
#[cfg(unix)]
pub trait AsRawDesc: std::os::unix::io::AsRawFd {}
#[cfg(windows)]
pub trait AsRawDesc: std::os::windows::io::AsRawSocket {}
impl AsRawDesc for UnixStream {}
impl AsRawDesc for AsyncSslStream {}
#[derive(Debug)]
enum Item {
Notif(MuxNotification),
WritePdu(DecodedPdu),
Readable,
}
pub async fn process<T>(stream: T) -> anyhow::Result<()>
where
T: 'static,
T: std::io::Read,
T: std::io::Write,
T: AsRawDesc,
T: std::fmt::Debug,
{
let stream = smol::Async::new(stream)?;
process_async(stream).await
}
pub async fn process_async<T>(mut stream: Async<T>) -> anyhow::Result<()>
where
T: 'static,
T: std::io::Read,
T: std::io::Write,
T: std::fmt::Debug,
{
2020-12-30 03:33:58 +03:00
log::trace!("process_async called");
let (item_tx, item_rx) = smol::channel::unbounded::<Item>();
let pdu_sender = PduSender::new({
let item_tx = item_tx.clone();
move |pdu| {
item_tx
.try_send(Item::WritePdu(pdu))
.map_err(|e| anyhow::anyhow!("{:?}", e))
}
});
let mut handler = SessionHandler::new(pdu_sender);
{
let mux = Mux::get().expect("to be running on gui thread");
let tx = item_tx.clone();
mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok());
}
loop {
let rx_msg = item_rx.recv();
let wait_for_read = stream.readable().map(|_| Ok(Item::Readable));
match smol::future::or(rx_msg, wait_for_read).await {
Ok(Item::Readable) => {
let decoded = Pdu::decode_async(&mut stream).await?;
handler.process_one(decoded);
}
Ok(Item::WritePdu(decoded)) => {
decoded
.pdu
.encode_async(&mut stream, decoded.serial)
.await?;
stream.flush().await.context("flushing PDU to client")?;
}
Ok(Item::Notif(MuxNotification::PaneOutput(pane_id))) => {
handler.schedule_pane_push(pane_id);
}
Ok(Item::Notif(MuxNotification::PaneRemoved(pane_id))) => {
Pdu::PaneRemoved(codec::PaneRemoved { pane_id })
.encode_async(&mut stream, 0)
.await?;
stream.flush().await.context("flushing PDU to client")?;
}
Ok(Item::Notif(MuxNotification::Alert { pane_id, alert })) => {
{
let per_pane = handler.per_pane(pane_id);
let mut per_pane = per_pane.lock().unwrap();
per_pane.notifications.push(alert);
}
handler.schedule_pane_push(pane_id);
}
Ok(Item::Notif(MuxNotification::WindowRemoved(_window_id))) => {}
Ok(Item::Notif(MuxNotification::WindowCreated(_window_id))) => {}
Ok(Item::Notif(MuxNotification::WindowInvalidated(_window_id))) => {}
Ok(Item::Notif(MuxNotification::Empty)) => {}
Err(err) => {
log::error!("process_async Err {}", err);
return Ok(());
}
}
}
}