1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-24 22:01:47 +03:00

re-structure client in readiness for unilateral responses

This commit is contained in:
Wez Furlong 2019-06-16 18:41:57 -07:00
parent cf2b4896c5
commit 6ceefb2576
4 changed files with 179 additions and 41 deletions

View File

@ -15,6 +15,7 @@ embed-resource = "1.1"
base91 = { path = "base91" }
bitflags = "1.0"
clipboard = "0.5"
crossbeam-channel = "0.3"
dirs = "1.0"
downcast-rs = "1.0"
euclid = "0.19"
@ -77,6 +78,7 @@ winapi = { version = "0.3", features = [
"fileapi",
"namedpipeapi",
"synchapi",
"winsock2",
]}
[target.'cfg(any(target_os = "android", all(unix, not(target_os = "macos"))))'.dependencies]

View File

@ -3,21 +3,133 @@ use crate::config::Config;
use crate::server::codec::*;
use crate::server::listener::IdentitySource;
use crate::server::UnixStream;
use crossbeam_channel::{unbounded as channel, Receiver, Sender, TryRecvError};
use failure::{bail, err_msg, format_err, Fallible};
use filedescriptor::{AsRawFileDescriptor, FileDescriptor, RawFileDescriptor};
#[cfg(unix)]
use libc::{poll, pollfd, POLLERR, POLLIN};
use log::info;
use native_tls::TlsConnector;
use promise::{Future, Promise};
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::TryInto;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::path::Path;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::Arc;
use std::thread;
#[cfg(windows)]
use winapi::um::winsock2::{WSAPoll as poll, POLLERR, POLLIN, WSAPOLLFD as pollfd};
pub trait ReadAndWrite: std::io::Read + std::io::Write + Send {}
impl ReadAndWrite for UnixStream {}
impl ReadAndWrite for native_tls::TlsStream<std::net::TcpStream> {}
struct PollableSender<T> {
sender: Sender<T>,
write: RefCell<FileDescriptor>,
}
impl<T> PollableSender<T> {
pub fn send(&self, item: T) -> Fallible<()> {
self.write.borrow_mut().write(b"x")?;
self.sender.send(item).map_err(|e| format_err!("{}", e))?;
Ok(())
}
}
impl<T> Clone for PollableSender<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
write: RefCell::new(
self.write
.borrow()
.try_clone()
.expect("failed to clone PollableSender fd"),
),
}
}
}
struct PollableReceiver<T> {
receiver: Receiver<T>,
read: RefCell<FileDescriptor>,
}
impl<T> PollableReceiver<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
let item = self.receiver.try_recv()?;
let mut byte = [0u8];
self.read.borrow_mut().read(&mut byte).ok();
Ok(item)
}
}
impl<T> AsPollFd for PollableReceiver<T> {
fn as_poll_fd(&self) -> pollfd {
self.read.borrow().as_raw_file_descriptor().as_poll_fd()
}
}
/// A channel that can be polled together with a socket.
/// This uses the self-pipe trick but with a unix domain
/// socketpair.
/// In theory this should also work on windows, but will require
/// windows 10 w/unix domain socket support.
fn pollable_channel<T>() -> Fallible<(PollableSender<T>, PollableReceiver<T>)> {
let (sender, receiver) = channel();
let (write, read) = UnixStream::pair()?;
Ok((
PollableSender {
sender,
write: RefCell::new(FileDescriptor::new(write)),
},
PollableReceiver {
receiver,
read: RefCell::new(FileDescriptor::new(read)),
},
))
}
pub trait AsPollFd {
fn as_poll_fd(&self) -> pollfd;
}
impl AsPollFd for RawFileDescriptor {
fn as_poll_fd(&self) -> pollfd {
pollfd {
fd: *self,
events: POLLIN | POLLERR,
revents: 0,
}
}
}
impl AsPollFd for native_tls::TlsStream<TcpStream> {
fn as_poll_fd(&self) -> pollfd {
self.get_ref().as_raw_file_descriptor().as_poll_fd()
}
}
impl AsPollFd for UnixStream {
fn as_poll_fd(&self) -> pollfd {
self.as_raw_file_descriptor().as_poll_fd()
}
}
pub trait ReadAndWrite: std::io::Read + std::io::Write + Send + AsPollFd {
fn set_non_blocking(&self, non_blocking: bool) -> Fallible<()>;
}
impl ReadAndWrite for UnixStream {
fn set_non_blocking(&self, non_blocking: bool) -> Fallible<()> {
self.set_nonblocking(non_blocking)?;
Ok(())
}
}
impl ReadAndWrite for native_tls::TlsStream<std::net::TcpStream> {
fn set_non_blocking(&self, non_blocking: bool) -> Fallible<()> {
self.get_ref().set_nonblocking(non_blocking)?;
Ok(())
}
}
enum ReaderMessage {
SendPdu { pdu: Pdu, promise: Promise<Pdu> },
@ -25,7 +137,7 @@ enum ReaderMessage {
#[derive(Clone)]
pub struct Client {
sender: Sender<ReaderMessage>,
sender: PollableSender<ReaderMessage>,
}
macro_rules! rpc {
@ -59,49 +171,64 @@ macro_rules! rpc {
fn client_thread(
mut stream: Box<dyn ReadAndWrite>,
rx: Receiver<ReaderMessage>,
_unilaterals: Option<Sender<Pdu>>,
rx: PollableReceiver<ReaderMessage>,
mut unilaterals: Option<Sender<Pdu>>,
) -> Fallible<()> {
let mut next_serial = 1u64;
let mut promises = HashMap::new();
let mut read_buffer = Vec::with_capacity(1024);
loop {
let msg = if promises.is_empty() {
// If we don't have any results to read back, then we can and
// should block on an incoming request, otherwise we'll busy
// wait in this loop
match rx.recv() {
Ok(msg) => Some(msg),
Err(err) => bail!("Client was destroyed: {}", err),
}
} else {
let mut poll_array = [rx.as_poll_fd(), stream.as_poll_fd()];
unsafe { poll(poll_array.as_mut_ptr(), poll_array.len() as _, -1 as _) };
log::trace!(
"out: {}, in: {}",
poll_array[0].revents,
poll_array[1].revents
);
if poll_array[0].revents != 0 {
match rx.try_recv() {
Ok(msg) => Some(msg),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => bail!("Client was destroyed"),
}
};
if let Some(msg) = msg {
match msg {
ReaderMessage::SendPdu { pdu, promise } => {
let serial = next_serial;
next_serial += 1;
promises.insert(serial, promise);
Ok(msg) => match msg {
ReaderMessage::SendPdu { pdu, promise } => {
let serial = next_serial;
next_serial += 1;
promises.insert(serial, promise);
pdu.encode(&mut stream, serial)?;
stream.flush()?;
}
}
pdu.encode(&mut stream, serial)?;
stream.flush()?;
}
},
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => bail!("Client was destroyed"),
};
}
if !promises.is_empty() {
let decoded = Pdu::decode(&mut stream)?;
if let Some(mut promise) = promises.remove(&decoded.serial) {
promise.result(Ok(decoded.pdu));
if poll_array[1].revents != 0 {
// When TLS is enabled on a stream, it may require a mixture of
// reads AND writes in order to satisfy a given read or write.
// As a result, we may appear ready to read a PDU, but may not
// be able to read a complete PDU.
// Set to non-blocking mode while we try to decode a packet to
// avoid blocking.
stream.set_non_blocking(true)?;
let res = Pdu::try_read_and_decode(&mut stream, &mut read_buffer);
stream.set_non_blocking(false)?;
if let Some(decoded) = res? {
if decoded.serial == 0 {
if let Some(uni) = unilaterals.as_mut() {
uni.send(decoded.pdu)?;
} else {
log::error!("got unilateral, but there is no handler");
}
} else if let Some(mut promise) = promises.remove(&decoded.serial) {
promise.result(Ok(decoded.pdu));
} else {
log::error!(
"got serial {} without a corresponding promise",
decoded.serial
);
}
} else {
log::error!(
"got serial {} without a corresponding promise",
decoded.serial
);
log::trace!("spurious/incomplete read wakeup");
}
}
}
@ -109,7 +236,7 @@ fn client_thread(
impl Client {
pub fn new(stream: Box<dyn ReadAndWrite>, unilaterals: Option<Sender<Pdu>>) -> Self {
let (sender, receiver) = channel();
let (sender, receiver) = pollable_channel().expect("failed to create pollable_channel");
thread::spawn(move || {
if let Err(e) = client_thread(stream, receiver, unilaterals) {

View File

@ -280,7 +280,15 @@ impl Pdu {
}
let mut buf = [0u8; 4096];
let size = r.read(&mut buf)?;
let size = match r.read(&mut buf) {
Ok(size) => size,
Err(err) => {
if err.kind() == std::io::ErrorKind::WouldBlock {
return Ok(None);
}
return Err(err.into());
}
};
if size == 0 {
return Err(
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "End Of File").into(),

View File

@ -343,6 +343,7 @@ impl Renderable for RenderableState {
surface.flush_changes_older_than(seq);
let selection = *self.selection_range.lock().unwrap();
*self.something_changed.lock().unwrap() = false;
*self.local_sequence.borrow_mut() = seq;
surface
.screen_lines()
.into_iter()