2020-10-04 19:39:28 +03:00
|
|
|
use crate::sessionhandler::{PduSender, SessionHandler};
|
|
|
|
use crate::UnixStream;
|
2020-10-05 07:07:40 +03:00
|
|
|
use anyhow::Context;
|
|
|
|
use async_ossl::AsyncSslStream;
|
2020-10-04 19:39:28 +03:00
|
|
|
use codec::{DecodedPdu, Pdu};
|
2020-10-05 07:07:40 +03:00
|
|
|
use futures::FutureExt;
|
2020-10-04 19:39:28 +03:00
|
|
|
use mux::{Mux, MuxNotification};
|
2020-10-05 07:07:40 +03:00
|
|
|
use smol::prelude::*;
|
|
|
|
use smol::Async;
|
2020-10-04 19:39:28 +03:00
|
|
|
|
|
|
|
#[cfg(unix)]
|
|
|
|
pub trait AsRawDesc: std::os::unix::io::AsRawFd {}
|
|
|
|
#[cfg(windows)]
|
|
|
|
pub trait AsRawDesc: std::os::windows::io::AsRawSocket {}
|
|
|
|
|
|
|
|
impl AsRawDesc for UnixStream {}
|
2020-10-05 07:07:40 +03:00
|
|
|
impl AsRawDesc for AsyncSslStream {}
|
2020-10-04 19:39:28 +03:00
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
enum Item {
|
|
|
|
Notif(MuxNotification),
|
2020-10-05 07:07:40 +03:00
|
|
|
WritePdu(DecodedPdu),
|
|
|
|
Readable,
|
2020-10-04 19:39:28 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn process<T>(stream: T) -> anyhow::Result<()>
|
|
|
|
where
|
|
|
|
T: 'static,
|
|
|
|
T: std::io::Read,
|
|
|
|
T: std::io::Write,
|
|
|
|
T: AsRawDesc,
|
2020-10-05 07:07:40 +03:00
|
|
|
T: std::fmt::Debug,
|
2020-10-04 19:39:28 +03:00
|
|
|
{
|
2020-10-05 07:07:40 +03:00
|
|
|
let stream = smol::Async::new(stream)?;
|
|
|
|
process_async(stream).await
|
2020-10-04 19:39:28 +03:00
|
|
|
}
|
|
|
|
|
2020-10-05 07:07:40 +03:00
|
|
|
pub async fn process_async<T>(mut stream: Async<T>) -> anyhow::Result<()>
|
2020-10-04 19:39:28 +03:00
|
|
|
where
|
|
|
|
T: 'static,
|
2020-10-05 07:07:40 +03:00
|
|
|
T: std::io::Read,
|
|
|
|
T: std::io::Write,
|
|
|
|
T: std::fmt::Debug,
|
2020-10-04 19:39:28 +03:00
|
|
|
{
|
2020-12-30 03:33:58 +03:00
|
|
|
log::trace!("process_async called");
|
2020-10-05 07:07:40 +03:00
|
|
|
|
2020-10-04 19:39:28 +03:00
|
|
|
let (item_tx, item_rx) = smol::channel::unbounded::<Item>();
|
|
|
|
|
2020-10-05 07:07:40 +03:00
|
|
|
let pdu_sender = PduSender::new({
|
2020-10-04 19:39:28 +03:00
|
|
|
let item_tx = item_tx.clone();
|
2020-10-05 07:07:40 +03:00
|
|
|
move |pdu| {
|
|
|
|
item_tx
|
|
|
|
.try_send(Item::WritePdu(pdu))
|
|
|
|
.map_err(|e| anyhow::anyhow!("{:?}", e))
|
2020-10-04 19:39:28 +03:00
|
|
|
}
|
|
|
|
});
|
2020-10-05 07:07:40 +03:00
|
|
|
let mut handler = SessionHandler::new(pdu_sender);
|
2020-10-04 19:39:28 +03:00
|
|
|
|
|
|
|
{
|
|
|
|
let mux = Mux::get().expect("to be running on gui thread");
|
|
|
|
let tx = item_tx.clone();
|
|
|
|
mux.subscribe(move |n| tx.try_send(Item::Notif(n)).is_ok());
|
|
|
|
}
|
|
|
|
|
2020-10-05 07:07:40 +03:00
|
|
|
loop {
|
|
|
|
let rx_msg = item_rx.recv();
|
|
|
|
let wait_for_read = stream.readable().map(|_| Ok(Item::Readable));
|
2020-10-04 19:39:28 +03:00
|
|
|
|
2020-10-05 07:07:40 +03:00
|
|
|
match smol::future::or(rx_msg, wait_for_read).await {
|
|
|
|
Ok(Item::Readable) => {
|
|
|
|
let decoded = Pdu::decode_async(&mut stream).await?;
|
|
|
|
handler.process_one(decoded);
|
|
|
|
}
|
|
|
|
Ok(Item::WritePdu(decoded)) => {
|
|
|
|
decoded
|
|
|
|
.pdu
|
|
|
|
.encode_async(&mut stream, decoded.serial)
|
|
|
|
.await?;
|
|
|
|
stream.flush().await.context("flushing PDU to client")?;
|
|
|
|
}
|
|
|
|
Ok(Item::Notif(MuxNotification::PaneOutput(pane_id))) => {
|
|
|
|
handler.schedule_pane_push(pane_id);
|
|
|
|
}
|
|
|
|
Ok(Item::Notif(MuxNotification::WindowCreated(_window_id))) => {}
|
|
|
|
Err(err) => {
|
|
|
|
log::error!("process_async Err {}", err);
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-10-04 19:39:28 +03:00
|
|
|
}
|