use crate::sessionhandler::{PduSender, SessionHandler}; use crate::UnixStream; use anyhow::Context; use async_ossl::AsyncSslStream; use codec::{DecodedPdu, Pdu}; use futures::FutureExt; use mux::{Mux, MuxNotification}; use smol::prelude::*; use smol::Async; #[cfg(unix)] pub trait AsRawDesc: std::os::unix::io::AsRawFd {} #[cfg(windows)] pub trait AsRawDesc: std::os::windows::io::AsRawSocket {} impl AsRawDesc for UnixStream {} impl AsRawDesc for AsyncSslStream {} #[derive(Debug)] enum Item { Notif(MuxNotification), WritePdu(DecodedPdu), Readable, } pub async fn process(stream: T) -> anyhow::Result<()> where T: 'static, T: std::io::Read, T: std::io::Write, T: AsRawDesc, T: std::fmt::Debug, { let stream = smol::Async::new(stream)?; process_async(stream).await } pub async fn process_async(mut stream: Async) -> anyhow::Result<()> where T: 'static, T: std::io::Read, T: std::io::Write, T: std::fmt::Debug, { log::trace!("process_async called"); let (item_tx, item_rx) = smol::channel::unbounded::(); let pdu_sender = PduSender::new({ let item_tx = item_tx.clone(); move |pdu| { item_tx .try_send(Item::WritePdu(pdu)) .map_err(|e| anyhow::anyhow!("{:?}", e)) } }); let mut handler = SessionHandler::new(pdu_sender); { let mux = Mux::get().expect("to be running on gui thread"); let tx = item_tx.clone(); mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok()); } loop { let rx_msg = item_rx.recv(); let wait_for_read = stream.readable().map(|_| Ok(Item::Readable)); match smol::future::or(rx_msg, wait_for_read).await { Ok(Item::Readable) => { let decoded = Pdu::decode_async(&mut stream).await?; handler.process_one(decoded); } Ok(Item::WritePdu(decoded)) => { decoded .pdu .encode_async(&mut stream, decoded.serial) .await?; stream.flush().await.context("flushing PDU to client")?; } Ok(Item::Notif(MuxNotification::PaneOutput(pane_id))) => { handler.schedule_pane_push(pane_id); } Ok(Item::Notif(MuxNotification::Alert { pane_id, alert: _ })) => { // FIXME: queue notification to send to client! handler.schedule_pane_push(pane_id); } Ok(Item::Notif(MuxNotification::WindowCreated(_window_id))) => {} Ok(Item::Notif(MuxNotification::WindowInvalidated(_window_id))) => {} Ok(Item::Notif(MuxNotification::Empty)) => {} Err(err) => { log::error!("process_async Err {}", err); return Ok(()); } } } }