1
1
mirror of https://github.com/wez/wezterm.git synced 2024-11-22 22:42:48 +03:00

pty: try_clone_writer -> take_writer

This breaking API change allows us to explicitly generate EOF when the
taken writer is dropped.

The examples have been updated to show how to manage read, write
and waiting without deadlock for both linux and windows.
Need to confirm that this is still good on macOS, but my
confidence is high.

I've also removed ssh2 support from this crate as part of this
change. We haven't used it directly in wezterm in a long while
and removing it from here means that there is slightly less code
to keep compiling over and over.

refs: https://github.com/wez/wezterm/discussions/2392
refs: https://github.com/wez/wezterm/issues/1396
This commit is contained in:
Wez Furlong 2022-08-12 07:38:18 -07:00
parent 259210dc2d
commit e6421d1b72
18 changed files with 193 additions and 442 deletions

3
Cargo.lock generated
View File

@ -3295,7 +3295,7 @@ dependencies = [
[[package]]
name = "portable-pty"
version = "0.7.0"
version = "0.8.0"
dependencies = [
"anyhow",
"bitflags",
@ -3312,7 +3312,6 @@ dependencies = [
"shared_library",
"shell-words",
"smol",
"ssh2",
"winapi",
"winreg",
]

View File

@ -18,8 +18,10 @@ use downcast_rs::{impl_downcast, Downcast};
use portable_pty::{native_pty_system, CommandBuilder, PtySystem};
use std::collections::HashMap;
use std::ffi::OsString;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use wezterm_term::TerminalSize;
static DOMAIN_ID: ::std::sync::atomic::AtomicUsize = ::std::sync::atomic::AtomicUsize::new(0);
@ -438,6 +440,34 @@ impl LocalDomain {
}
}
/// Allows sharing the writer between the Pane and the Terminal.
/// This could potentially be eliminated in the future if we can
/// teach the Pane impl to reference the writer in the Termninal,
/// but the Pane trait returns a RefMut and that makes it a bit
/// awkward at the moment.
#[derive(Clone)]
pub(crate) struct WriterWrapper {
writer: Arc<Mutex<Box<dyn Write + Send>>>,
}
impl WriterWrapper {
pub fn new(writer: Box<dyn Write + Send>) -> Self {
Self {
writer: Arc::new(Mutex::new(writer)),
}
}
}
impl std::io::Write for WriterWrapper {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.lock().unwrap().write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.writer.lock().unwrap().flush()
}
}
#[async_trait(?Send)]
impl Domain for LocalDomain {
async fn spawn_pane(
@ -467,14 +497,14 @@ impl Domain for LocalDomain {
let child = pair.slave.spawn_command(cmd)?;
log::trace!("spawned: {:?}", child);
let writer = pair.master.try_clone_writer()?;
let writer = WriterWrapper::new(pair.master.take_writer()?);
let mut terminal = wezterm_term::Terminal::new(
size,
std::sync::Arc::new(config::TermConfig::new()),
"WezTerm",
config::wezterm_version(),
Box::new(writer),
Box::new(writer.clone()),
);
if self.is_conpty() {
terminal.enable_conpty_quirks();
@ -485,6 +515,7 @@ impl Domain for LocalDomain {
terminal,
child,
pair.master,
Box::new(writer),
self.id,
command_description,
));

View File

@ -15,7 +15,7 @@ use std::borrow::Cow;
use std::cell::{RefCell, RefMut};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::io::Result as IoResult;
use std::io::{Result as IoResult, Write};
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -55,6 +55,7 @@ pub struct LocalPane {
terminal: RefCell<Terminal>,
process: RefCell<ProcessState>,
pty: RefCell<Box<dyn MasterPty>>,
writer: RefCell<Box<dyn Write>>,
domain_id: DomainId,
tmux_domain: RefCell<Option<Arc<TmuxDomainState>>>,
proc_list: RefCell<Option<CachedProcInfo>>,
@ -294,7 +295,7 @@ impl Pane for LocalPane {
fn writer(&self) -> RefMut<dyn std::io::Write> {
Mux::get().unwrap().record_input_for_current_identity();
self.pty.borrow_mut()
self.writer.borrow_mut()
}
fn reader(&self) -> anyhow::Result<Option<Box<dyn std::io::Read + Send>>> {
@ -799,6 +800,7 @@ impl LocalPane {
mut terminal: Terminal,
process: Box<dyn Child + Send>,
pty: Box<dyn MasterPty>,
writer: Box<dyn Write>,
domain_id: DomainId,
command_description: String,
) -> Self {
@ -809,6 +811,7 @@ impl LocalPane {
tmux_domain: None,
}));
terminal.set_notification_handler(Box::new(LocalPaneNotifHandler { pane_id }));
Self {
pane_id,
terminal: RefCell::new(terminal),
@ -819,6 +822,7 @@ impl LocalPane {
killed: false,
}),
pty: RefCell::new(pty),
writer: RefCell::new(writer),
domain_id,
tmux_domain: RefCell::new(None),
proc_list: RefCell::new(None),

View File

@ -1,5 +1,5 @@
use crate::connui::ConnectionUI;
use crate::domain::{alloc_domain_id, Domain, DomainId, DomainState};
use crate::domain::{alloc_domain_id, Domain, DomainId, DomainState, WriterWrapper};
use crate::localpane::LocalPane;
use crate::pane::{alloc_pane_id, Pane, PaneId};
use crate::Mux;
@ -520,7 +520,7 @@ fn connect_ssh_session(
// Obtain the real stdin/stdout for the pty
let reader = pty.try_clone_reader()?;
let writer = pty.try_clone_writer()?;
let writer = pty.take_writer()?;
// And send them to the wrapped reader/writer
stdin_tx
@ -583,7 +583,7 @@ impl Domain for RemoteSshDomain {
pty = Box::new(concrete_pty);
child = Box::new(concrete_child);
writer = Box::new(pty.try_clone_writer()?);
writer = Box::new(pty.take_writer()?);
} else {
// We're starting the session
let (session, events) = Session::connect(self.ssh_config()?)?;
@ -667,12 +667,14 @@ impl Domain for RemoteSshDomain {
// eg: tmux integration to be tunnelled via the remote
// session without duplicating a lot of logic over here.
let writer = WriterWrapper::new(writer);
let terminal = wezterm_term::Terminal::new(
size,
std::sync::Arc::new(config::TermConfig::new()),
"WezTerm",
config::wezterm_version(),
writer,
Box::new(writer.clone()),
);
let pane: Rc<dyn Pane> = Rc::new(LocalPane::new(
@ -680,6 +682,7 @@ impl Domain for RemoteSshDomain {
terminal,
child,
pty,
Box::new(writer),
self.id,
"RemoteSshDomain".to_string(),
));
@ -990,7 +993,7 @@ impl portable_pty::MasterPty for WrappedSshPty {
}
}
fn try_clone_writer(&self) -> anyhow::Result<Box<(dyn Write + Send + 'static)>> {
fn take_writer(&self) -> anyhow::Result<Box<(dyn Write + Send + 'static)>> {
anyhow::bail!("writer must be created during bootstrap");
}

View File

@ -1,4 +1,4 @@
use crate::domain::DomainId;
use crate::domain::{DomainId, WriterWrapper};
use crate::localpane::LocalPane;
use crate::pane::alloc_pane_id;
use crate::tab::{Tab, TabId};
@ -128,7 +128,7 @@ impl TmuxDomainState {
cmd_queue: self.cmd_queue.clone(),
master_pane: ref_pane,
};
let writer = pane_pty.try_clone_writer()?;
let writer = WriterWrapper::new(pane_pty.take_writer()?);
let mux = Mux::get().expect("should be called at main thread");
let size = TerminalSize {
rows: pane.pane_height as usize,
@ -147,7 +147,7 @@ impl TmuxDomainState {
std::sync::Arc::new(config::TermConfig::new()),
"WezTerm",
config::wezterm_version(),
Box::new(writer),
Box::new(writer.clone()),
);
let local_pane: Rc<dyn Pane> = Rc::new(LocalPane::new(
@ -155,6 +155,7 @@ impl TmuxDomainState {
terminal,
Box::new(child),
Box::new(pane_pty),
Box::new(writer),
self.domain_id,
"tmux pane".to_string(),
));

View File

@ -143,7 +143,7 @@ impl MasterPty for TmuxPty {
Ok(Box::new(self.reader.try_clone()?))
}
fn try_clone_writer(&self) -> Result<Box<dyn Write + Send>, anyhow::Error> {
fn take_writer(&self) -> Result<Box<dyn Write + Send>, anyhow::Error> {
Ok(Box::new(TmuxPtyWriter {
domain_id: self.domain_id,
master_pane: self.master_pane.clone(),

View File

@ -1,6 +1,6 @@
[package]
name = "portable-pty"
version = "0.7.0"
version = "0.8.0"
authors = ["Wez Furlong"]
edition = "2018"
repository = "https://github.com/wez/wezterm"
@ -19,12 +19,10 @@ shell-words = "1.1"
serde_derive = {version="1.0", optional=true}
serde = {version="1.0", optional=true}
serial = "0.4"
ssh2 = {optional=true, version="0.9"}
[features]
default = []
serde_support = ["serde", "serde_derive"]
ssh = ["ssh2"]
[target."cfg(windows)".dependencies]
bitflags = "1.3"

View File

@ -3,6 +3,7 @@
//! pipes involved and it is easy to get blocked/deadlocked if care and attention
//! is not paid to those pipes!
use portable_pty::{CommandBuilder, NativePtySystem, PtySize, PtySystem};
use std::sync::mpsc::channel;
fn main() {
let pty_system = NativePtySystem::default();
@ -18,35 +19,63 @@ fn main() {
let cmd = CommandBuilder::new("whoami");
let mut child = pair.slave.spawn_command(cmd).unwrap();
// Release any handles owned by the slave: we don't need it now
// that we've spawned the child.
drop(pair.slave);
{
// Obtain the writer.
// When the writer is dropped, EOF will be sent to
// the program that was spawned.
// It is important to take the writer even if you don't
// send anything to its stdin so that EOF can be
// generated, otherwise you risk deadlocking yourself.
let mut writer = pair.master.take_writer().unwrap();
// This example doesn't need to write anything, but if you
// want to send data to the child, you'd set `to_write` to
// that data and do it like this:
let to_write = "";
if !to_write.is_empty() {
// To avoid deadlock, wrt. reading and waiting, we send
// data to the stdin of the child in a different thread.
std::thread::spawn(move || {
writer.write_all(to_write.as_bytes()).unwrap();
});
}
}
// Read the output in another thread.
// This is important because it is easy to encounter a situation
// where read/write buffers fill and block either your process
// or the spawned process.
let (tx, rx) = channel();
let mut reader = pair.master.try_clone_reader().unwrap();
// We hold handles on the pty. Now that the child is complete
// there are no processes remaining that will write to it until
// we spawn more. We're not going to do that in this example,
// so we should close it down. If we didn't drop it explicitly
// here, then the attempt to read its output would block forever
// waiting for a future child that will never be spawned.
std::thread::spawn(move || {
// Consume the output from the child
let mut s = String::new();
reader.read_to_string(&mut s).unwrap();
tx.send(s).unwrap();
});
// Wait for the child to complete
println!("child status: {:?}", child.wait().unwrap());
// Take care to drop the master after our processes are
// done, as some platforms get unhappy if it is dropped
// sooner than that.
drop(pair.master);
// Consume the output from the child
let mut s = String::new();
reader.read_to_string(&mut s).unwrap();
// Now wait for the output to be read by our reader thread
let output = rx.recv().unwrap();
// We print with escapes escaped because the windows conpty
// implementation synthesizes title change escape sequences
// in the output stream and it can be confusing to see those
// printed out raw in another terminal.
print!("output: ");
for c in s.escape_debug() {
for c in output.escape_debug() {
print!("{}", c);
}
// Note that we're waiting until after we've read the output
// to call `wait` on the process.
// On macOS Catalina, waiting on the process seems to prevent
// its output from making it into the pty.
println!("child status: {:?}", child.wait().unwrap());
}

View File

@ -26,14 +26,32 @@ fn main() -> anyhow::Result<()> {
let slave = pair.slave;
let mut child = smol::unblock(move || slave.spawn_command(cmd)).await?;
{
// Obtain the writer.
// When the writer is dropped, EOF will be sent to
// the program that was spawned.
// It is important to take the writer even if you don't
// send anything to its stdin so that EOF can be
// generated, otherwise you risk deadlocking yourself.
let writer = pair.master.take_writer()?;
// Explicitly generate EOF
drop(writer);
}
println!(
"child status: {:?}",
smol::unblock(move || child
.wait()
.map_err(|e| anyhow!("waiting for child: {}", e)))
.await?
);
let reader = pair.master.try_clone_reader()?;
// We hold handles on the pty. Now that the child is complete
// there are no processes remaining that will write to it until
// we spawn more. We're not going to do that in this example,
// so we should close it down. If we didn't drop it explicitly
// here, then the attempt to read its output would block forever
// waiting for a future child that will never be spawned.
// Take care to drop the master after our processes are
// done, as some platforms get unhappy if it is dropped
// sooner than that.
drop(pair.master);
let mut lines = smol::io::BufReader::new(smol::Unblock::new(reader)).lines();
@ -50,17 +68,6 @@ fn main() -> anyhow::Result<()> {
println!();
}
// Note that we're waiting until after we've read the output
// to call `wait` on the process.
// On macOS Catalina, waiting on the process seems to prevent
// its output from making it into the pty.
println!(
"child status: {:?}",
smol::unblock(move || child
.wait()
.map_err(|e| anyhow!("waiting for child: {}", e)))
.await?
);
Ok(())
})
}

View File

@ -37,12 +37,6 @@
//! # Ok::<(), Error>(())
//! ```
//!
//! ## ssh2
//!
//! If the `ssh` feature is enabled, this crate exposes an
//! `ssh::SshSession` type that can wrap an established ssh
//! session with an implementation of `PtySystem`, allowing
//! you to use the same pty interface with remote ptys.
use anyhow::Error;
use downcast_rs::{impl_downcast, Downcast};
#[cfg(unix)]
@ -61,9 +55,6 @@ pub mod unix;
#[cfg(windows)]
pub mod win;
#[cfg(feature = "ssh")]
pub mod ssh;
pub mod serial;
/// Represents the size of the visible display area in the pty
@ -94,7 +85,7 @@ impl Default for PtySize {
}
/// Represents the master/control end of the pty
pub trait MasterPty: std::io::Write {
pub trait MasterPty {
/// Inform the kernel and thus the child process that the window resized.
/// It will update the winsize information maintained by the kernel,
/// and generate a signal for the child to notice and update its state.
@ -105,9 +96,10 @@ pub trait MasterPty: std::io::Write {
/// via this stream.
fn try_clone_reader(&self) -> Result<Box<dyn std::io::Read + Send>, Error>;
/// Obtain a writable handle; writing to it will send data to the
/// slave end. This is equivalent to the Write impl on MasterPty
/// itself, but allows splitting it off into a separate object.
fn try_clone_writer(&self) -> Result<Box<dyn std::io::Write + Send>, Error>;
/// slave end.
/// Dropping the writer will send EOF to the slave end.
/// It is invalid to take the writer more than once.
fn take_writer(&self) -> Result<Box<dyn std::io::Write + Send>, Error>;
/// If applicable to the type of the tty, return the local process id
/// of the process group or session leader

View File

@ -14,6 +14,7 @@ use filedescriptor::FileDescriptor;
use serial::{
BaudRate, CharSize, FlowControl, Parity, PortSettings, SerialPort, StopBits, SystemPort,
};
use std::cell::RefCell;
use std::ffi::{OsStr, OsString};
use std::io::{Read, Result as IoResult, Write};
use std::sync::{Arc, Mutex};
@ -91,7 +92,10 @@ impl PtySystem for SerialTty {
slave: Box::new(Slave {
port: Arc::clone(&port),
}),
master: Box::new(Master { port }),
master: Box::new(Master {
port,
took_writer: RefCell::new(false),
}),
})
}
}
@ -186,9 +190,14 @@ impl ChildKiller for SerialChildKiller {
struct Master {
port: Handle,
took_writer: RefCell<bool>,
}
impl Write for Master {
struct MasterWriter {
port: Handle,
}
impl Write for MasterWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.port.lock().unwrap().write(buf)
}
@ -217,9 +226,13 @@ impl MasterPty for Master {
Ok(Box::new(Reader { fd }))
}
fn try_clone_writer(&self) -> anyhow::Result<Box<dyn std::io::Write + Send>> {
fn take_writer(&self) -> anyhow::Result<Box<dyn std::io::Write + Send>> {
if *self.took_writer.borrow() {
anyhow::bail!("cannot take writer more than once");
}
*self.took_writer.borrow_mut() = true;
let port = Arc::clone(&self.port);
Ok(Box::new(Master { port }))
Ok(Box::new(MasterWriter { port }))
}
#[cfg(unix)]

View File

@ -1,352 +0,0 @@
//! This module implements a remote pty via ssh2.
//! While it offers a `PtySystem` implementation on the `SshSession`
//! struct, we don't include ssh in `PtySystemSelection` because
//! there is a non-trivial amount of setup that is required to
//! initiate a connection somewhere and to authenticate that session
//! before we can get to a point where `openpty` will be able to run.
use crate::{
Child, ChildKiller, CommandBuilder, ExitStatus, MasterPty, PtyPair, PtySize, PtySystem,
SlavePty,
};
use filedescriptor::{AsRawSocketDescriptor, POLLIN};
use ssh2::{Channel, Session};
use std::collections::HashMap;
use std::io::{Read, Result as IoResult, Write};
use std::sync::{Arc, Condvar, Mutex};
/// Represents a pty channel within a session.
struct SshPty {
channel: Channel,
/// The size that we last set; we need to remember it in order to
/// return it via `get_size`.
size: PtySize,
}
/// The internal state that tracks the ssh session.
/// It owns the Session and indirectly owns the Channel instances.
/// The ownership is important: both must be protected by the same
/// mutable borrow in order to respect the threading model of libssh2.
/// We do so by ensuring that a Mutex wraps SessionInner.
struct SessionInner {
session: Session,
ptys: HashMap<usize, SshPty>,
next_channel_id: usize,
term: String,
/// an instance of SshReader owns the wait for read and subsequent
/// wakeup broadcast
waiting_for_read: bool,
}
#[derive(Debug)]
struct SessionHolder {
locked_inner: Mutex<SessionInner>,
read_waiters: Condvar,
}
// An anemic impl of Debug to satisfy some indirect trait bounds
impl std::fmt::Debug for SessionInner {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
fmt.debug_struct("SessionInner")
.field("next_channel_id", &self.next_channel_id)
.finish()
}
}
/// The `SshSession` struct wraps an `ssh2::Session` instance.
/// The session is expected to have been pre-connected and pre-authenticated
/// by the calling the application.
/// Once established and wrapped into an `SshSession`, the `SshSession`
/// implements the `PtySystem` trait and exposes the `openpty` function
/// that can be used to return a remote pty via ssh.
pub struct SshSession {
inner: Arc<SessionHolder>,
}
impl SshSession {
/// Wrap an `ssh2::Session` in such a way that we can safely map it
/// into the `portable-pty` object model.
/// The `ssh2::Session` must be pre-connected (eg: `ssh2::Session::handshake`
/// must have been successfully completed) and pre-authenticated so that
/// internal calls made to `ssh2::Channel::exec` can be made.
/// The `term` parameter specifies the term name for the remote host in
/// the case that a pty needs to be allocated.
pub fn new(session: Session, term: &str) -> Self {
Self {
inner: Arc::new(SessionHolder {
locked_inner: Mutex::new(SessionInner {
session,
ptys: HashMap::new(),
next_channel_id: 1,
term: term.to_string(),
waiting_for_read: false,
}),
read_waiters: Condvar::new(),
}),
}
}
}
impl PtySystem for SshSession {
fn openpty(&self, size: PtySize) -> anyhow::Result<PtyPair> {
let mut inner = self.inner.locked_inner.lock().unwrap();
let mut channel = inner.session.channel_session()?;
channel.handle_extended_data(ssh2::ExtendedData::Merge)?;
channel.request_pty(
&inner.term,
None,
Some((
size.cols.into(),
size.rows.into(),
size.pixel_width.into(),
size.pixel_height.into(),
)),
)?;
let id = {
let id = inner.next_channel_id;
inner.next_channel_id += 1;
inner.ptys.insert(id, SshPty { channel, size });
id
};
let pty = PtyHandle {
id,
inner: Arc::clone(&self.inner),
};
Ok(PtyPair {
slave: Box::new(SshSlave { pty: pty.clone() }),
master: Box::new(SshMaster { pty }),
})
}
}
/// Represents a handle to a Channel
#[derive(Clone, Debug)]
struct PtyHandle {
id: usize,
inner: Arc<SessionHolder>,
}
impl PtyHandle {
/// Acquire the session mutex and then perform a lambda on the Channel
fn with_channel<R, F: FnMut(&mut Channel) -> R>(&self, mut f: F) -> R {
let mut inner = self.inner.locked_inner.lock().unwrap();
f(&mut inner.ptys.get_mut(&self.id).unwrap().channel)
}
/// Acquire the session mutex and then perform a lambda on the SshPty
fn with_pty<R, F: FnMut(&mut SshPty) -> R>(&self, mut f: F) -> R {
let mut inner = self.inner.locked_inner.lock().unwrap();
f(&mut inner.ptys.get_mut(&self.id).unwrap())
}
}
struct SshMaster {
pty: PtyHandle,
}
impl Write for SshMaster {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.pty.with_channel(|channel| channel.write(buf))
}
fn flush(&mut self) -> Result<(), std::io::Error> {
self.pty.with_channel(|channel| channel.flush())
}
}
impl MasterPty for SshMaster {
fn resize(&self, size: PtySize) -> anyhow::Result<()> {
self.pty.with_pty(|pty| {
pty.channel.request_pty_size(
size.cols.into(),
size.rows.into(),
Some(size.pixel_width.into()),
Some(size.pixel_height.into()),
)?;
pty.size = size;
Ok(())
})
}
fn get_size(&self) -> anyhow::Result<PtySize> {
Ok(self.pty.with_pty(|pty| pty.size))
}
fn try_clone_reader(&self) -> anyhow::Result<Box<dyn std::io::Read + Send>> {
Ok(Box::new(SshReader {
pty: self.pty.clone(),
}))
}
fn try_clone_writer(&self) -> anyhow::Result<Box<dyn std::io::Write + Send>> {
Ok(Box::new(SshMaster {
pty: self.pty.clone(),
}))
}
#[cfg(unix)]
fn process_group_leader(&self) -> Option<libc::pid_t> {
// N/A: there is no local process
None
}
}
struct SshSlave {
pty: PtyHandle,
}
impl SlavePty for SshSlave {
fn spawn_command(&self, cmd: CommandBuilder) -> anyhow::Result<Box<dyn Child + Send + Sync>> {
self.pty.with_channel(|channel| {
for (key, val) in cmd.iter_extra_env_as_str() {
if let Err(err) = channel.setenv(key, val) {
// Depending on the server configuration, a given
// setenv request may not succeed, but that doesn't
// prevent the connection from being set up.
log::error!("ssh: setenv {}={} failed: {}", key, val, err);
}
}
if cmd.is_default_prog() {
channel.shell()?;
} else {
let command = cmd.as_unix_command_line()?;
channel.exec(&command)?;
}
let child: Box<dyn Child + Send + Sync> = Box::new(SshChild {
pty: self.pty.clone(),
});
Ok(child)
})
}
}
#[derive(Debug)]
struct SshChild {
pty: PtyHandle,
}
impl Child for SshChild {
fn try_wait(&mut self) -> IoResult<Option<ExitStatus>> {
let mut lock = self.pty.inner.locked_inner.try_lock();
if let Ok(ref mut inner) = lock {
let ssh_pty = inner.ptys.get_mut(&self.pty.id).unwrap();
if ssh_pty.channel.eof() {
Ok(Some(ExitStatus::with_exit_code(
ssh_pty.channel.exit_status()? as u32,
)))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
fn wait(&mut self) -> IoResult<ExitStatus> {
self.pty.with_channel(|channel| {
channel.close()?;
channel.wait_close()?;
Ok(ExitStatus::with_exit_code(channel.exit_status()? as u32))
})
}
fn process_id(&self) -> Option<u32> {
None
}
#[cfg(windows)]
fn as_raw_handle(&self) -> Option<std::os::windows::io::RawHandle> {
None
}
}
impl ChildKiller for SshChild {
fn kill(&mut self) -> IoResult<()> {
self.pty.with_channel(|channel| channel.send_eof())?;
Ok(())
}
fn clone_killer(&self) -> Box<dyn ChildKiller + Send + Sync> {
Box::new(SshChildKiller {
pty: self.pty.clone(),
})
}
}
#[derive(Debug)]
struct SshChildKiller {
pty: PtyHandle,
}
impl ChildKiller for SshChildKiller {
fn kill(&mut self) -> IoResult<()> {
self.pty.with_channel(|channel| channel.send_eof())?;
Ok(())
}
fn clone_killer(&self) -> Box<dyn ChildKiller + Send + Sync> {
Box::new(SshChildKiller {
pty: self.pty.clone(),
})
}
}
struct SshReader {
pty: PtyHandle,
}
impl Read for SshReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
loop {
let mut inner = self.pty.inner.locked_inner.lock().unwrap();
inner.session.set_blocking(false);
let res = inner.ptys.get_mut(&self.pty.id).unwrap().channel.read(buf);
inner.session.set_blocking(true);
match res {
Ok(size) => return Ok(size),
Err(err) => match err.kind() {
std::io::ErrorKind::WouldBlock => {}
_ => return Err(err),
},
};
// No data available for this channel, so we'll wait.
// If we're the first SshReader to do this, we'll perform the
// OS level poll() call for ourselves, otherwise we'll block
// on the condvar
if inner.waiting_for_read {
self.pty.inner.read_waiters.wait(inner).ok();
} else {
let socket = inner.session.as_socket_descriptor();
// We own waiting for read
inner.waiting_for_read = true;
// Unlock and wait
drop(inner);
let mut pfd = [filedescriptor::pollfd {
fd: socket,
events: POLLIN,
revents: 0,
}];
filedescriptor::poll(&mut pfd, None).ok();
// re-acquire the lock to release our ownership of the poll
// and to wake up the others
let mut inner = self.pty.inner.locked_inner.lock().unwrap();
inner.waiting_for_read = false;
// Wake all readers and we'll all race to read our next
// iteration
self.pty.inner.read_waiters.notify_all();
drop(inner);
}
}
}
}

View File

@ -4,6 +4,7 @@ use crate::{Child, CommandBuilder, MasterPty, PtyPair, PtySize, PtySystem, Slave
use anyhow::{bail, Error};
use filedescriptor::FileDescriptor;
use libc::{self, winsize};
use std::cell::RefCell;
use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::process::CommandExt;
@ -41,6 +42,7 @@ fn openpty(size: PtySize) -> anyhow::Result<(UnixMasterPty, UnixSlavePty)> {
let master = UnixMasterPty {
fd: PtyFd(unsafe { FileDescriptor::from_raw_fd(master) }),
took_writer: RefCell::new(false),
};
let slave = UnixSlavePty {
fd: PtyFd(unsafe { FileDescriptor::from_raw_fd(slave) }),
@ -261,6 +263,7 @@ impl PtyFd {
/// The file descriptor will be closed when the Pty is dropped.
struct UnixMasterPty {
fd: PtyFd,
took_writer: RefCell<bool>,
}
/// Represents the slave end of a pty.
@ -311,9 +314,13 @@ impl MasterPty for UnixMasterPty {
Ok(Box::new(fd))
}
fn try_clone_writer(&self) -> Result<Box<dyn Write + Send>, Error> {
fn take_writer(&self) -> Result<Box<dyn Write + Send>, Error> {
if *self.took_writer.borrow() {
anyhow::bail!("cannot take writer more than once");
}
*self.took_writer.borrow_mut() = true;
let fd = PtyFd(self.fd.try_clone()?);
Ok(Box::new(UnixMasterPty { fd }))
Ok(Box::new(UnixMasterWriter { fd }))
}
fn process_group_leader(&self) -> Option<libc::pid_t> {
@ -324,7 +331,28 @@ impl MasterPty for UnixMasterPty {
}
}
impl Write for UnixMasterPty {
/// Represents the master end of a pty.
/// EOT will be sent, and then the file descriptor will be closed when
/// the Pty is dropped.
struct UnixMasterWriter {
fd: PtyFd,
}
impl Drop for UnixMasterWriter {
fn drop(&mut self) {
let mut t: libc::termios = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
if unsafe { libc::tcgetattr(self.fd.0.as_raw_fd(), &mut t) } == 0 {
// EOF is only interpreted after a newline, so if it is set,
// we send a newline followed by EOF.
let eot = t.c_cc[libc::VEOF];
if eot != 0 {
let _ = self.fd.0.write_all(&[b'\r', b'\n', eot]);
}
}
}
}
impl Write for UnixMasterWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.fd.write(buf)
}

View File

@ -3,7 +3,6 @@ use crate::win::psuedocon::PsuedoCon;
use crate::{Child, MasterPty, PtyPair, PtySize, PtySystem, SlavePty};
use anyhow::Error;
use filedescriptor::{FileDescriptor, Pipe};
use std::io;
use std::sync::{Arc, Mutex};
use winapi::um::wincon::COORD;
@ -28,7 +27,7 @@ impl PtySystem for ConPtySystem {
inner: Arc::new(Mutex::new(Inner {
con,
readable: stdout.read,
writable: stdin.write,
writable: Some(stdin.write),
size,
})),
};
@ -47,7 +46,7 @@ impl PtySystem for ConPtySystem {
struct Inner {
con: PsuedoCon,
readable: FileDescriptor,
writable: FileDescriptor,
writable: Option<FileDescriptor>,
size: PtySize,
}
@ -97,17 +96,15 @@ impl MasterPty for ConPtyMasterPty {
Ok(Box::new(self.inner.lock().unwrap().readable.try_clone()?))
}
fn try_clone_writer(&self) -> anyhow::Result<Box<dyn std::io::Write + Send>> {
Ok(Box::new(self.inner.lock().unwrap().writable.try_clone()?))
}
}
impl io::Write for ConPtyMasterPty {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.inner.lock().unwrap().writable.write(buf)
}
fn flush(&mut self) -> Result<(), io::Error> {
Ok(())
fn take_writer(&self) -> anyhow::Result<Box<dyn std::io::Write + Send>> {
Ok(Box::new(
self.inner
.lock()
.unwrap()
.writable
.take()
.ok_or_else(|| anyhow::anyhow!("writer already taken"))?,
))
}
}

View File

@ -50,7 +50,7 @@ mux = { path = "../mux" }
mux-lua = { path = "../lua-api-crates/mux" }
open = "3.0"
ordered-float = "3.0"
portable-pty = { path = "../pty", features = ["serde_support", "ssh"]}
portable-pty = { path = "../pty", features = ["serde_support"]}
promise = { path = "../promise" }
pulldown-cmark = "0.9"
rangeset = { path = "../rangeset" }

View File

@ -26,7 +26,7 @@ filedescriptor = { version="0.8", path = "../filedescriptor" }
filenamegen = "0.2"
libc = "0.2"
log = "0.4"
portable-pty = { version="0.7", path = "../pty" }
portable-pty = { version="0.8", path = "../pty" }
regex = "1"
smol = "1.2"
ssh2 = {version="0.9.3", features=["openssl-on-win32"], optional = true}

View File

@ -67,7 +67,7 @@ impl portable_pty::MasterPty for SshPty {
Ok(Box::new(reader))
}
fn try_clone_writer(&self) -> anyhow::Result<Box<(dyn Write + Send + 'static)>> {
fn take_writer(&self) -> anyhow::Result<Box<(dyn Write + Send + 'static)>> {
let writer = self.writer.try_clone()?;
Ok(Box::new(writer))
}

View File

@ -360,7 +360,7 @@ impl RecordCommand {
writeln!(cast_file, "{}", serde_json::to_string(&header)?)?;
let pty_system = native_pty_system();
let mut pair = pty_system.openpty(size)?;
let pair = pty_system.openpty(size)?;
let cmd = config.build_prog(
if self.prog.is_empty() {
@ -423,11 +423,12 @@ impl RecordCommand {
let mut child_status = None;
let first_output = Instant::now();
let mut buffer = vec![];
let mut writer = pair.master.take_writer()?;
for msg in rx {
match msg {
Message::Stdin(data) => {
pair.master.write_all(&data)?;
writer.write_all(&data)?;
}
Message::Stdout(mut data) => {
let elapsed = first_output.elapsed().as_secs_f32();