1
1
mirror of https://github.com/wez/wezterm.git synced 2024-12-25 14:22:37 +03:00

can now push changes from server->client

Need to emit them in more places.  The idea is that this helps
to reduce the event->display latency.
This commit is contained in:
Wez Furlong 2019-06-17 08:47:24 -07:00
parent 2f5b710b3e
commit c18dd96472
6 changed files with 152 additions and 87 deletions

View File

@ -146,10 +146,10 @@ fn run_terminal_gui(config: Arc<config::Config>, opts: &StartCommand) -> Result<
};
let domain: Arc<dyn Domain> = if opts.mux_client_as_default_domain {
let client = Client::new_unix_domain(&config, None)?;
let client = Client::new_unix_domain(&config)?;
Arc::new(ClientDomain::new(client))
} else if opts.mux_tls_client_as_default_domain {
let client = Client::new_tls(&config, None)?;
let client = Client::new_tls(&config)?;
Arc::new(ClientDomain::new(client))
} else {
Arc::new(LocalDomain::new(&config)?)
@ -210,7 +210,7 @@ fn main() -> Result<(), Error> {
run_terminal_gui(config, &start)
}
SubCommand::Cli(cli) => {
let client = Client::new_unix_domain(&config, None)?;
let client = Client::new_unix_domain(&config)?;
match cli.sub {
CliSubCommand::List => {
let cols = vec![

View File

@ -1,10 +1,13 @@
#![allow(dead_code)]
use crate::config::Config;
use crate::frontend::gui_executor;
use crate::mux::Mux;
use crate::server::codec::*;
use crate::server::listener::IdentitySource;
use crate::server::pollable::*;
use crate::server::tab::ClientTab;
use crate::server::UnixStream;
use crossbeam_channel::{Sender, TryRecvError};
use crossbeam_channel::TryRecvError;
use failure::{bail, err_msg, format_err, Fallible};
use log::info;
use native_tls::TlsConnector;
@ -55,23 +58,32 @@ macro_rules! rpc {
};
}
fn process_unilateral(decoded: DecodedPdu) -> Fallible<()> {
if let Some(tab_id) = decoded.pdu.tab_id() {
let pdu = decoded.pdu;
Future::with_executor(gui_executor().unwrap(), move || {
let mux = Mux::get().unwrap();
let tab = mux
.get_tab(tab_id)
.ok_or_else(|| format_err!("no such tab {}", tab_id))?;
let client_tab = tab.downcast_ref::<ClientTab>().unwrap();
client_tab.process_unilateral(pdu)
});
} else {
bail!("don't know how to handle {:?}", decoded);
}
Ok(())
}
fn client_thread(
mut stream: Box<dyn ReadAndWrite>,
rx: PollableReceiver<ReaderMessage>,
mut unilaterals: Option<Sender<Pdu>>,
) -> Fallible<()> {
let mut next_serial = 1u64;
let mut promises = HashMap::new();
let mut read_buffer = Vec::with_capacity(1024);
loop {
let mut poll_array = [rx.as_poll_fd(), stream.as_poll_fd()];
poll_for_read(&mut poll_array);
log::trace!(
"out: {}, in: {}",
poll_array[0].revents,
poll_array[1].revents
);
if poll_array[0].revents != 0 {
loop {
match rx.try_recv() {
Ok(msg) => match msg {
ReaderMessage::SendPdu { pdu, promise } => {
@ -83,49 +95,57 @@ fn client_thread(
stream.flush()?;
}
},
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => bail!("Client was destroyed"),
};
}
if poll_array[1].revents != 0 {
let mut poll_array = [rx.as_poll_fd(), stream.as_poll_fd()];
poll_for_read(&mut poll_array);
log::trace!(
"out: {}, in: {}",
poll_array[0].revents,
poll_array[1].revents
);
if poll_array[1].revents != 0 || stream.has_read_buffered() {
// When TLS is enabled on a stream, it may require a mixture of
// reads AND writes in order to satisfy a given read or write.
// As a result, we may appear ready to read a PDU, but may not
// be able to read a complete PDU.
// Set to non-blocking mode while we try to decode a packet to
// avoid blocking.
stream.set_non_blocking(true)?;
let res = Pdu::try_read_and_decode(&mut stream, &mut read_buffer);
stream.set_non_blocking(false)?;
if let Some(decoded) = res? {
if decoded.serial == 0 {
if let Some(uni) = unilaterals.as_mut() {
uni.send(decoded.pdu)?;
loop {
stream.set_non_blocking(true)?;
let res = Pdu::try_read_and_decode(&mut stream, &mut read_buffer);
stream.set_non_blocking(false)?;
if let Some(decoded) = res? {
log::trace!("decoded serial {}", decoded.serial);
if decoded.serial == 0 {
process_unilateral(decoded)?;
} else if let Some(mut promise) = promises.remove(&decoded.serial) {
promise.result(Ok(decoded.pdu));
} else {
log::error!("got unilateral, but there is no handler");
log::error!(
"got serial {} without a corresponding promise",
decoded.serial
);
}
} else if let Some(mut promise) = promises.remove(&decoded.serial) {
promise.result(Ok(decoded.pdu));
} else {
log::error!(
"got serial {} without a corresponding promise",
decoded.serial
);
log::trace!("spurious/incomplete read wakeup");
break;
}
} else {
log::trace!("spurious/incomplete read wakeup");
}
}
}
}
impl Client {
pub fn new(stream: Box<dyn ReadAndWrite>, unilaterals: Option<Sender<Pdu>>) -> Self {
pub fn new(stream: Box<dyn ReadAndWrite>) -> Self {
let (sender, receiver) = pollable_channel().expect("failed to create pollable_channel");
thread::spawn(move || {
if let Err(e) = client_thread(stream, receiver, unilaterals) {
if let Err(e) = client_thread(stream, receiver) {
log::error!("client thread ended: {}", e);
}
});
@ -133,10 +153,7 @@ impl Client {
Self { sender }
}
pub fn new_unix_domain(
config: &Arc<Config>,
unilaterals: Option<Sender<Pdu>>,
) -> Fallible<Self> {
pub fn new_unix_domain(config: &Arc<Config>) -> Fallible<Self> {
let sock_path = Path::new(
config
.mux_server_unix_domain_socket_path
@ -145,10 +162,10 @@ impl Client {
);
info!("connect to {}", sock_path.display());
let stream = Box::new(UnixStream::connect(sock_path)?);
Ok(Self::new(stream, unilaterals))
Ok(Self::new(stream))
}
pub fn new_tls(config: &Arc<Config>, unilaterals: Option<Sender<Pdu>>) -> Fallible<Self> {
pub fn new_tls(config: &Arc<Config>) -> Fallible<Self> {
let remote_address = config
.mux_server_remote_address
.as_ref()
@ -191,7 +208,7 @@ impl Client {
e
)
})?);
Ok(Self::new(stream, unilaterals))
Ok(Self::new(stream))
}
pub fn send_pdu(&self, pdu: Pdu) -> Future<Pdu> {
@ -216,9 +233,5 @@ impl Client {
rpc!(key_down, SendKeyDown, UnitResponse);
rpc!(mouse_event, SendMouseEvent, SendMouseEventResponse);
rpc!(resize, Resize, UnitResponse);
rpc!(
get_tab_render_changes,
GetTabRenderChanges,
GetTabRenderChangesResponse
);
rpc!(get_tab_render_changes, GetTabRenderChanges, UnitResponse);
}

View File

@ -298,6 +298,15 @@ impl Pdu {
buffer.extend_from_slice(&buf[0..size]);
}
}
pub fn tab_id(&self) -> Option<TabId> {
match self {
Pdu::GetTabRenderChangesResponse(GetTabRenderChangesResponse { tab_id, .. }) => {
Some(*tab_id)
}
_ => None,
}
}
}
#[derive(Deserialize, Serialize, PartialEq, Debug)]

View File

@ -197,6 +197,34 @@ pub struct ClientSession<S: ReadAndWrite> {
to_write_tx: PollableSender<DecodedPdu>,
}
fn maybe_push_tab_changes(
surfaces: &Arc<Mutex<HashMap<TabId, ClientSurfaceState>>>,
tab: &Rc<dyn Tab>,
sender: PollableSender<DecodedPdu>,
) -> Fallible<()> {
let tab_id = tab.tab_id();
let mut surfaces = surfaces.lock().unwrap();
let (rows, cols) = tab.renderer().physical_dimensions();
let surface = surfaces
.entry(tab_id)
.or_insert_with(|| ClientSurfaceState::new(cols, rows));
surface.update_surface_from_screen(&tab);
let (new_seq, changes) = surface.get_and_flush_changes(surface.last_seq);
if !changes.is_empty() {
sender.send(DecodedPdu {
pdu: Pdu::GetTabRenderChangesResponse(GetTabRenderChangesResponse {
tab_id,
sequence_no: surface.last_seq,
changes,
}),
serial: 0,
})?;
surface.last_seq = new_seq;
}
Ok(())
}
struct ClientSurfaceState {
surface: Surface,
last_seq: SequenceNo,
@ -315,26 +343,31 @@ impl<S: ReadAndWrite> ClientSession<S> {
fn process(&mut self) -> Result<(), Error> {
let mut read_buffer = Vec::with_capacity(1024);
loop {
let mut poll_array = [self.to_write_rx.as_poll_fd(), self.stream.as_poll_fd()];
poll_for_read(&mut poll_array);
if poll_array[0].revents != 0 {
loop {
match self.to_write_rx.try_recv() {
Ok(decoded) => {
log::trace!("writing pdu with serial {}", decoded.serial);
decoded.pdu.encode(&mut self.stream, decoded.serial)?;
self.stream.flush()?;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => bail!("ClientSession was destroyed"),
};
}
if poll_array[1].revents != 0 {
self.stream.set_non_blocking(true)?;
let res = Pdu::try_read_and_decode(&mut self.stream, &mut read_buffer);
self.stream.set_non_blocking(false)?;
if let Some(decoded) = res? {
self.process_one(decoded)?;
let mut poll_array = [self.to_write_rx.as_poll_fd(), self.stream.as_poll_fd()];
poll_for_read(&mut poll_array);
if poll_array[1].revents != 0 || self.stream.has_read_buffered() {
loop {
self.stream.set_non_blocking(true)?;
let res = Pdu::try_read_and_decode(&mut self.stream, &mut read_buffer);
self.stream.set_non_blocking(false)?;
if let Some(decoded) = res? {
self.process_one(decoded)?;
} else {
break;
}
}
}
}
@ -351,7 +384,7 @@ impl<S: ReadAndWrite> ClientSession<S> {
reason: format!("Error: {}", err),
}),
};
log::trace!("processing time {:?}", start.elapsed());
log::trace!("{} processing time {:?}", serial, start.elapsed());
sender.send(DecodedPdu { pdu, serial })
});
Ok(())
@ -418,22 +451,28 @@ impl<S: ReadAndWrite> ClientSession<S> {
}
Pdu::WriteToTab(WriteToTab { tab_id, data }) => {
let surfaces = Arc::clone(&self.surfaces_by_tab);
let sender = self.to_write_tx.clone();
Future::with_executor(self.executor.clone_executor(), move || {
let mux = Mux::get().unwrap();
let tab = mux
.get_tab(tab_id)
.ok_or_else(|| format_err!("no such tab {}", tab_id))?;
tab.writer().write_all(&data)?;
maybe_push_tab_changes(&surfaces, &tab, sender)?;
Ok(Pdu::UnitResponse(UnitResponse {}))
})
}
Pdu::SendPaste(SendPaste { tab_id, data }) => {
let surfaces = Arc::clone(&self.surfaces_by_tab);
let sender = self.to_write_tx.clone();
Future::with_executor(self.executor.clone_executor(), move || {
let mux = Mux::get().unwrap();
let tab = mux
.get_tab(tab_id)
.ok_or_else(|| format_err!("no such tab {}", tab_id))?;
tab.send_paste(&data)?;
maybe_push_tab_changes(&surfaces, &tab, sender)?;
Ok(Pdu::UnitResponse(UnitResponse {}))
})
}
@ -450,16 +489,21 @@ impl<S: ReadAndWrite> ClientSession<S> {
}
Pdu::SendKeyDown(SendKeyDown { tab_id, event }) => {
let surfaces = Arc::clone(&self.surfaces_by_tab);
let sender = self.to_write_tx.clone();
Future::with_executor(self.executor.clone_executor(), move || {
let mux = Mux::get().unwrap();
let tab = mux
.get_tab(tab_id)
.ok_or_else(|| format_err!("no such tab {}", tab_id))?;
tab.key_down(event.key, event.modifiers)?;
maybe_push_tab_changes(&surfaces, &tab, sender)?;
Ok(Pdu::UnitResponse(UnitResponse {}))
})
}
Pdu::SendMouseEvent(SendMouseEvent { tab_id, event }) => {
let surfaces = Arc::clone(&self.surfaces_by_tab);
let sender = self.to_write_tx.clone();
Future::with_executor(self.executor.clone_executor(), move || {
let mux = Mux::get().unwrap();
let tab = mux
@ -471,6 +515,8 @@ impl<S: ReadAndWrite> ClientSession<S> {
title: None,
};
tab.mouse_event(event, &mut host)?;
maybe_push_tab_changes(&surfaces, &tab, sender)?;
Ok(Pdu::SendMouseEventResponse(SendMouseEventResponse {
clipboard: host.clipboard,
selection_range: tab.selection_range(),
@ -500,31 +546,16 @@ impl<S: ReadAndWrite> ClientSession<S> {
}))
}),
Pdu::GetTabRenderChanges(GetTabRenderChanges {
tab_id,
sequence_no,
}) => {
Pdu::GetTabRenderChanges(GetTabRenderChanges { tab_id, .. }) => {
let surfaces = Arc::clone(&self.surfaces_by_tab);
let sender = self.to_write_tx.clone();
Future::with_executor(self.executor.clone_executor(), move || {
let mux = Mux::get().unwrap();
let tab = mux
.get_tab(tab_id)
.ok_or_else(|| format_err!("no such tab {}", tab_id))?;
let mut surfaces = surfaces.lock().unwrap();
let (rows, cols) = tab.renderer().physical_dimensions();
let surface = surfaces
.entry(tab_id)
.or_insert_with(|| ClientSurfaceState::new(cols, rows));
surface.update_surface_from_screen(&tab);
let (new_seq, changes) = surface.get_and_flush_changes(sequence_no);
Ok(Pdu::GetTabRenderChangesResponse(
GetTabRenderChangesResponse {
tab_id,
sequence_no: new_seq,
changes,
},
))
maybe_push_tab_changes(&surfaces, &tab, sender)?;
Ok(Pdu::UnitResponse(UnitResponse {}))
})
}

View File

@ -12,18 +12,25 @@ use winapi::um::winsock2::{WSAPoll as poll, POLLERR, POLLIN, WSAPOLLFD as pollfd
pub trait ReadAndWrite: std::io::Read + std::io::Write + Send + AsPollFd {
fn set_non_blocking(&self, non_blocking: bool) -> Fallible<()>;
fn has_read_buffered(&self) -> bool;
}
impl ReadAndWrite for UnixStream {
fn set_non_blocking(&self, non_blocking: bool) -> Fallible<()> {
self.set_nonblocking(non_blocking)?;
Ok(())
}
fn has_read_buffered(&self) -> bool {
false
}
}
impl ReadAndWrite for native_tls::TlsStream<std::net::TcpStream> {
fn set_non_blocking(&self, non_blocking: bool) -> Fallible<()> {
self.get_ref().set_nonblocking(non_blocking)?;
Ok(())
}
fn has_read_buffered(&self) -> bool {
self.buffered_read_size().unwrap_or(0) != 0
}
}
pub struct PollableSender<T> {

View File

@ -5,7 +5,7 @@ use crate::mux::tab::{alloc_tab_id, Tab, TabId};
use crate::server::client::Client;
use crate::server::codec::*;
use crate::server::domain::ClientInner;
use failure::Fallible;
use failure::{bail, Fallible};
use filedescriptor::Pipe;
use log::error;
use portable_pty::PtySize;
@ -171,6 +171,19 @@ impl ClientTab {
reader,
}
}
pub fn process_unilateral(&self, pdu: Pdu) -> Fallible<()> {
match pdu {
Pdu::GetTabRenderChangesResponse(delta) => {
log::trace!("new delta {}", delta.sequence_no);
self.renderable
.borrow()
.apply_changes_to_surface(delta.sequence_no, delta.changes);
}
_ => bail!("unhandled unilateral pdu: {:?}", pdu),
};
Ok(())
}
}
impl Tab for ClientTab {
@ -271,7 +284,7 @@ struct RenderableState {
remote_tab_id: TabId,
last_poll: RefCell<Instant>,
dead: RefCell<bool>,
poll_future: RefCell<Option<Future<GetTabRenderChangesResponse>>>,
poll_future: RefCell<Option<Future<UnitResponse>>>,
surface: RefCell<Surface>,
remote_sequence: RefCell<SequenceNo>,
local_sequence: RefCell<SequenceNo>,
@ -298,15 +311,7 @@ impl RenderableState {
.map(Future::is_ready)
.unwrap_or(false);
if ready {
let delta = self.poll_future.borrow_mut().take().unwrap().wait()?;
log::trace!(
"poll: got delta {} {} in {:?}",
delta.sequence_no,
delta.changes.len(),
self.last_poll.borrow().elapsed()
);
self.apply_changes_to_surface(delta.sequence_no, delta.changes);
self.poll_future.borrow_mut().take().unwrap().wait()?;
*self.last_poll.borrow_mut() = Instant::now();
} else if self.poll_future.borrow().is_some() {
// We have a poll in progress