mirror of
https://github.com/wez/wezterm.git
synced 2024-11-23 15:04:36 +03:00
pty: remove experimental awaitable bits
This commit is contained in:
parent
65cc7bb286
commit
c54c64f395
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -2200,20 +2200,17 @@ name = "portable-pty"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bitflags 1.2.1",
|
||||
"filedescriptor",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"mio",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serial",
|
||||
"shared_library",
|
||||
"shell-words",
|
||||
"ssh2",
|
||||
"tokio",
|
||||
"uds_windows",
|
||||
"winapi 0.3.8",
|
||||
]
|
||||
|
@ -9,7 +9,6 @@ license = "MIT"
|
||||
documentation = "https://docs.rs/portable-pty"
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
anyhow = "1.0"
|
||||
filedescriptor = { version="0.7", path = "../filedescriptor" }
|
||||
log = "0.4"
|
||||
@ -19,16 +18,12 @@ serde_derive = {version="1.0", optional=true}
|
||||
serde = {version="1.0", optional=true}
|
||||
serial = "0.4"
|
||||
ssh2 = {optional=true, version="0.8"}
|
||||
tokio = { version = "0.2", features = ["io-driver", "io-util"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
serde_support = ["serde", "serde_derive"]
|
||||
ssh = ["ssh2"]
|
||||
|
||||
[target."cfg(unix)".dependencies]
|
||||
mio = { version = "0.6" }
|
||||
|
||||
[target."cfg(windows)".dependencies]
|
||||
bitflags = "1.0"
|
||||
lazy_static = "1.3"
|
||||
@ -42,6 +37,3 @@ winapi = { version = "0.3", features = [
|
||||
"namedpipeapi",
|
||||
"synchapi",
|
||||
]}
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "0.2", features = ["rt-threaded", "macros"] }
|
||||
|
@ -1,49 +0,0 @@
|
||||
use portable_pty::awaitable::native_pty_system;
|
||||
use portable_pty::{CommandBuilder, PtySize};
|
||||
use tokio::prelude::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let pty_system = native_pty_system();
|
||||
|
||||
let pair = pty_system
|
||||
.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let cmd = CommandBuilder::new("whoami");
|
||||
let child = pair.slave.spawn_command(cmd).await?;
|
||||
// Release any handles owned by the slave: we don't need it now
|
||||
// that we've spawned the child, and if we don't drop it, we'll
|
||||
// block a one of the awaits we perform below.
|
||||
drop(pair.slave);
|
||||
|
||||
let reader = pair.master.try_clone_reader()?;
|
||||
println!("child status: {:?}", child.await?);
|
||||
|
||||
// We hold handles on the pty. Now that the child is complete
|
||||
// there are no processes remaining that will write to it until
|
||||
// we spawn more. We're not going to do that in this example,
|
||||
// so we should close it down. If we didn't drop it explicitly
|
||||
// here, then the attempt to read its output would block forever
|
||||
// waiting for a future child that will never be spawned.
|
||||
drop(pair.master);
|
||||
|
||||
let mut lines = tokio::io::BufReader::new(reader).lines();
|
||||
while let Some(line) = lines.next_line().await? {
|
||||
// We print with escapes escaped because the windows conpty
|
||||
// implementation synthesizes title change escape sequences
|
||||
// in the output stream and it can be confusing to see those
|
||||
// printed out raw in another terminal.
|
||||
print!("output: len={} ", line.len());
|
||||
for c in line.escape_debug() {
|
||||
print!("{}", c);
|
||||
}
|
||||
println!();
|
||||
}
|
||||
Ok(())
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Represents the master/control end of the pty
|
||||
#[async_trait(?Send)]
|
||||
pub trait MasterPty: tokio::io::AsyncWrite {
|
||||
/// Inform the kernel and thus the child process that the window resized.
|
||||
/// It will update the winsize information maintained by the kernel,
|
||||
/// and generate a signal for the child to notice and update its state.
|
||||
async fn resize(&self, size: PtySize) -> anyhow::Result<()>;
|
||||
/// Retrieves the size of the pty as known by the kernel
|
||||
async fn get_size(&self) -> anyhow::Result<PtySize>;
|
||||
/// Obtain a readable handle; output from the slave(s) is readable
|
||||
/// via this stream.
|
||||
fn try_clone_reader(&self) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send>>>;
|
||||
}
|
||||
|
||||
/// Represents a child process spawned into the pty.
|
||||
/// This handle can be used to wait for or terminate that child process.
|
||||
/// awaiting the Child yields the ExitStatus when the child completes.
|
||||
pub trait Child:
|
||||
std::fmt::Debug + std::future::Future<Output = anyhow::Result<ExitStatus>>
|
||||
{
|
||||
/// Request termination of the child process
|
||||
fn kill(&mut self) -> IoResult<()>;
|
||||
}
|
||||
|
||||
/// Represents the slave side of a pty.
|
||||
/// Can be used to spawn processes into the pty.
|
||||
#[async_trait(?Send)]
|
||||
pub trait SlavePty {
|
||||
/// Spawns the command specified by the provided CommandBuilder
|
||||
async fn spawn_command(&self, cmd: CommandBuilder) -> anyhow::Result<Pin<Box<dyn Child>>>;
|
||||
}
|
||||
|
||||
pub struct PtyPair {
|
||||
// slave is listed first so that it is dropped first.
|
||||
// The drop order is stable and specified by rust rfc 1857
|
||||
pub slave: Pin<Box<dyn SlavePty>>,
|
||||
pub master: Pin<Box<dyn MasterPty>>,
|
||||
}
|
||||
|
||||
/// The `PtySystem` trait allows an application to work with multiple
|
||||
/// possible Pty implementations at runtime.
|
||||
#[async_trait(?Send)]
|
||||
pub trait PtySystem {
|
||||
/// Create a new Pty instance with the window size set to the specified
|
||||
/// dimensions. Returns a (master, slave) Pty pair. The master side
|
||||
/// is used to drive the slave side.
|
||||
async fn openpty(&self, size: PtySize) -> anyhow::Result<PtyPair>;
|
||||
}
|
||||
|
||||
pub fn native_pty_system() -> Box<dyn PtySystem> {
|
||||
Box::new(NativePtySystem::default())
|
||||
}
|
@ -59,7 +59,6 @@ pub mod win;
|
||||
#[cfg(feature = "ssh")]
|
||||
pub mod ssh;
|
||||
|
||||
pub mod awaitable;
|
||||
pub mod serial;
|
||||
|
||||
/// Represents the size of the visible display area in the pty
|
||||
|
232
pty/src/unix.rs
232
pty/src/unix.rs
@ -1,21 +1,15 @@
|
||||
//! Working with pseudo-terminals
|
||||
|
||||
use crate::{Child, CommandBuilder, ExitStatus, MasterPty, PtyPair, PtySize, PtySystem, SlavePty};
|
||||
use anyhow::{bail, Context as _, Error};
|
||||
use async_trait::async_trait;
|
||||
use crate::{Child, CommandBuilder, MasterPty, PtyPair, PtySize, PtySystem, SlavePty};
|
||||
use anyhow::{bail, Error};
|
||||
use filedescriptor::FileDescriptor;
|
||||
use libc::{self, winsize};
|
||||
use mio::unix::EventedFd;
|
||||
use mio::{self, Evented, PollOpt, Ready, Token};
|
||||
use std::io;
|
||||
use std::io::{Read, Write};
|
||||
use std::mem;
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
|
||||
use std::os::unix::process::CommandExt;
|
||||
use std::pin::Pin;
|
||||
use std::ptr;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::PollEvented;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct UnixPtySystem {}
|
||||
@ -74,25 +68,6 @@ impl PtySystem for UnixPtySystem {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl crate::awaitable::PtySystem for UnixPtySystem {
|
||||
async fn openpty(&self, size: PtySize) -> anyhow::Result<crate::awaitable::PtyPair> {
|
||||
let (mut master, mut slave) = openpty(size)?;
|
||||
|
||||
master.fd.set_non_blocking(true)?;
|
||||
slave.fd.set_non_blocking(true)?;
|
||||
|
||||
Ok(crate::awaitable::PtyPair {
|
||||
master: Box::pin(AwaitableMasterPty {
|
||||
io: PollEvented::new(master.fd)?,
|
||||
}),
|
||||
slave: Box::pin(AwaitableSlavePty {
|
||||
io: PollEvented::new(slave.fd)?,
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct PtyFd(pub FileDescriptor);
|
||||
impl std::ops::Deref for PtyFd {
|
||||
type Target = FileDescriptor;
|
||||
@ -218,32 +193,6 @@ impl PtyFd {
|
||||
}
|
||||
}
|
||||
|
||||
impl Evented for PtyFd {
|
||||
fn register(
|
||||
&self,
|
||||
poll: &mio::Poll,
|
||||
token: Token,
|
||||
interest: Ready,
|
||||
opts: PollOpt,
|
||||
) -> std::io::Result<()> {
|
||||
EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
|
||||
}
|
||||
|
||||
fn reregister(
|
||||
&self,
|
||||
poll: &mio::Poll,
|
||||
token: Token,
|
||||
interest: Ready,
|
||||
opts: PollOpt,
|
||||
) -> std::io::Result<()> {
|
||||
EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
|
||||
}
|
||||
|
||||
fn deregister(&self, poll: &mio::Poll) -> std::io::Result<()> {
|
||||
EventedFd(&self.0.as_raw_fd()).deregister(poll)
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the master end of a pty.
|
||||
/// The file descriptor will be closed when the Pty is dropped.
|
||||
struct UnixMasterPty {
|
||||
@ -304,180 +253,3 @@ impl Write for UnixMasterPty {
|
||||
self.fd.flush()
|
||||
}
|
||||
}
|
||||
|
||||
struct AwaitableSlavePty {
|
||||
io: PollEvented<PtyFd>,
|
||||
}
|
||||
|
||||
struct AwaitableMasterPty {
|
||||
io: PollEvented<PtyFd>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AwaitableChild {
|
||||
pid: libc::pid_t,
|
||||
waiting: Option<std::sync::mpsc::Receiver<anyhow::Result<ExitStatus>>>,
|
||||
}
|
||||
|
||||
impl crate::awaitable::Child for AwaitableChild {
|
||||
fn kill(&mut self) -> std::io::Result<()> {
|
||||
let res = unsafe { libc::kill(self.pid, libc::SIGKILL) };
|
||||
if res != 0 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::future::Future for AwaitableChild {
|
||||
type Output = anyhow::Result<ExitStatus>;
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<anyhow::Result<ExitStatus>> {
|
||||
if self.waiting.is_none() {
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let pid = self.pid;
|
||||
let waker = cx.waker().clone();
|
||||
std::thread::spawn(move || loop {
|
||||
let mut status = 0;
|
||||
let reaped = unsafe { libc::waitpid(pid, &mut status, 0) };
|
||||
let err = std::io::Error::last_os_error();
|
||||
if reaped == pid {
|
||||
let exit_code = if unsafe { libc::WIFEXITED(status) } {
|
||||
unsafe { libc::WEXITSTATUS(status) as u32 }
|
||||
} else {
|
||||
1
|
||||
};
|
||||
tx.send(Ok(ExitStatus::with_exit_code(exit_code))).ok();
|
||||
waker.wake();
|
||||
return;
|
||||
}
|
||||
|
||||
if err.kind() == std::io::ErrorKind::Interrupted {
|
||||
continue;
|
||||
}
|
||||
|
||||
tx.send(Err(err).context("waitpid result")).ok();
|
||||
waker.wake();
|
||||
return;
|
||||
});
|
||||
self.waiting = Some(rx);
|
||||
}
|
||||
|
||||
match self.waiting.as_mut().unwrap().try_recv() {
|
||||
Ok(status) => Poll::Ready(status),
|
||||
Err(std::sync::mpsc::TryRecvError::Empty) => Poll::Pending,
|
||||
Err(err) => Poll::Ready(Err(err).context("receiving process wait status")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl crate::awaitable::SlavePty for AwaitableSlavePty {
|
||||
async fn spawn_command(
|
||||
&self,
|
||||
builder: CommandBuilder,
|
||||
) -> anyhow::Result<Pin<Box<dyn crate::awaitable::Child>>> {
|
||||
let child = self.io.get_ref().spawn_command(builder)?;
|
||||
let pid = child.id() as libc::pid_t;
|
||||
let child: Pin<Box<dyn crate::awaitable::Child>> =
|
||||
Box::pin(AwaitableChild { pid, waiting: None });
|
||||
Ok(child)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl crate::awaitable::MasterPty for AwaitableMasterPty {
|
||||
async fn resize(&self, size: PtySize) -> Result<(), Error> {
|
||||
self.io.get_ref().resize(size)
|
||||
}
|
||||
|
||||
async fn get_size(&self) -> Result<PtySize, Error> {
|
||||
self.io.get_ref().get_size()
|
||||
}
|
||||
|
||||
fn try_clone_reader(&self) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send>>> {
|
||||
let mut fd = self.io.get_ref().try_clone()?;
|
||||
fd.set_non_blocking(true)?;
|
||||
Ok(Box::pin(AwaitableMasterPty {
|
||||
io: PollEvented::new(PtyFd(fd))?,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl AwaitableMasterPty {
|
||||
fn poll_write_impl(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
if Poll::Pending == self.io.poll_write_ready(cx)? {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
match self.io.get_mut().0.write(buf) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
self.io.clear_write_ready(cx)?;
|
||||
Poll::Pending
|
||||
}
|
||||
x => Poll::Ready(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncWrite for AwaitableMasterPty {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
self.poll_write_impl(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context,
|
||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncRead for AwaitableMasterPty {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
poll_read_impl(&mut self.io, cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_read_impl(
|
||||
io: &mut PollEvented<PtyFd>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
if Poll::Pending == io.poll_read_ready(cx, Ready::readable())? {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
match io.get_mut().read(buf) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
io.clear_read_ready(cx, Ready::readable())?;
|
||||
Poll::Pending
|
||||
}
|
||||
x => Poll::Ready(x),
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncRead for AwaitableSlavePty {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
poll_read_impl(&mut self.io, cx, buf)
|
||||
}
|
||||
}
|
||||
|
@ -1,414 +0,0 @@
|
||||
use crate::cmdbuilder::CommandBuilder;
|
||||
use crate::win::conpty::ConPtySystem;
|
||||
use crate::win::psuedocon::PsuedoCon;
|
||||
use crate::win::readbuf::ReadBuffer;
|
||||
use crate::PtySize;
|
||||
use anyhow::anyhow;
|
||||
use async_trait::async_trait;
|
||||
use filedescriptor::{FileDescriptor, Pipe};
|
||||
use std::io::{self, Error as IoError, Read, Write};
|
||||
use std::os::windows::io::AsRawHandle;
|
||||
use std::os::windows::raw::HANDLE;
|
||||
use std::pin::Pin;
|
||||
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TryRecvError};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use winapi::um::namedpipeapi::PeekNamedPipe;
|
||||
use winapi::um::wincon::COORD;
|
||||
|
||||
struct AwaitableConPtySlavePty {
|
||||
inner: Arc<Mutex<AwaitableInner>>,
|
||||
}
|
||||
|
||||
struct AwaitableConPtyMasterPty {
|
||||
inner: Arc<Mutex<AwaitableInner>>,
|
||||
}
|
||||
|
||||
enum WriteRequest {
|
||||
Data(Vec<u8>),
|
||||
Resize(PtySize, Waker, Sender<anyhow::Result<()>>),
|
||||
}
|
||||
|
||||
struct AwaitableInner {
|
||||
con: Arc<PsuedoCon>,
|
||||
size: PtySize,
|
||||
write_tx: Sender<WriteRequest>,
|
||||
reader: Arc<Mutex<AwaitableReader>>,
|
||||
}
|
||||
|
||||
/// PTYs on Windows are restricted to synchronous operation, so we cannot
|
||||
/// simply use IOCP to manage our asynchronous reads or writes.
|
||||
/// The AwaitableReader implements a little facade that allows us to
|
||||
/// schedule a blocking read of a desirable size in a worker thread.
|
||||
/// We can then use non-blocking calls on a channel to poll for completion.
|
||||
struct AwaitableReader {
|
||||
wait_for_read: Sender<(Waker, usize)>,
|
||||
read_buffer: ReadBuffer,
|
||||
read_results_rx: Receiver<std::io::Result<Vec<u8>>>,
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl crate::awaitable::SlavePty for AwaitableConPtySlavePty {
|
||||
async fn spawn_command(
|
||||
&self,
|
||||
cmd: CommandBuilder,
|
||||
) -> anyhow::Result<Pin<Box<dyn crate::awaitable::Child>>> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let child = inner.con.spawn_command(cmd)?;
|
||||
Ok(Box::pin(child))
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncRead for AwaitableConPtyMasterPty {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.inner
|
||||
.lock()
|
||||
.unwrap()
|
||||
.reader
|
||||
.lock()
|
||||
.unwrap()
|
||||
.poll_read_impl(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncRead for AwaitableReaderArc {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.0.lock().unwrap().poll_read_impl(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncWrite for AwaitableConPtyMasterPty {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
self.inner.lock().unwrap().poll_write_impl(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context,
|
||||
) -> std::task::Poll<Result<(), std::io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl crate::awaitable::MasterPty for AwaitableConPtyMasterPty {
|
||||
async fn resize(&self, size: PtySize) -> anyhow::Result<()> {
|
||||
enum ResizeState {
|
||||
NotRequested,
|
||||
Waiting(Receiver<anyhow::Result<()>>),
|
||||
Done,
|
||||
}
|
||||
|
||||
struct ResizeFuture {
|
||||
state: ResizeState,
|
||||
inner: Arc<Mutex<AwaitableInner>>,
|
||||
size: PtySize,
|
||||
}
|
||||
|
||||
impl std::future::Future for ResizeFuture {
|
||||
type Output = anyhow::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match std::mem::replace(&mut self.state, ResizeState::Done) {
|
||||
ResizeState::NotRequested => {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let (tx, rx) = channel();
|
||||
if let Err(err) = inner.write_tx.send(WriteRequest::Resize(
|
||||
self.size.clone(),
|
||||
cx.waker().clone(),
|
||||
tx,
|
||||
)) {
|
||||
return Poll::Ready(Err(anyhow!(
|
||||
"sending write request to pty failed: {}",
|
||||
err
|
||||
)));
|
||||
}
|
||||
|
||||
drop(inner);
|
||||
self.state = ResizeState::Waiting(rx);
|
||||
Poll::Pending
|
||||
}
|
||||
ResizeState::Waiting(rx) => match rx.try_recv() {
|
||||
Ok(res) => {
|
||||
// We just successfully changed the size, so
|
||||
// record the new size
|
||||
self.inner.lock().unwrap().size = self.size;
|
||||
Poll::Ready(res)
|
||||
}
|
||||
Err(TryRecvError::Empty) => {
|
||||
self.state = ResizeState::Waiting(rx);
|
||||
Poll::Pending
|
||||
}
|
||||
Err(err) => Poll::Ready(Err(anyhow!(
|
||||
"receiving write results from pty failed: {}",
|
||||
err
|
||||
))),
|
||||
},
|
||||
ResizeState::Done => Poll::Ready(Err(anyhow!("polling a completed future?"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let future = ResizeFuture {
|
||||
state: ResizeState::NotRequested,
|
||||
inner: Arc::clone(&self.inner),
|
||||
size,
|
||||
};
|
||||
future.await
|
||||
}
|
||||
|
||||
async fn get_size(&self) -> anyhow::Result<PtySize> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
Ok(inner.size.clone())
|
||||
}
|
||||
|
||||
fn try_clone_reader(&self) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send>>> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
Ok(Box::pin(AwaitableReaderArc(inner.reader.clone())))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
impl crate::awaitable::PtySystem for ConPtySystem {
|
||||
async fn openpty(&self, size: PtySize) -> anyhow::Result<crate::awaitable::PtyPair> {
|
||||
let stdin = Pipe::new()?;
|
||||
let stdout = Pipe::new()?;
|
||||
|
||||
let con = PsuedoCon::new(
|
||||
COORD {
|
||||
X: size.cols as i16,
|
||||
Y: size.rows as i16,
|
||||
},
|
||||
stdin.read,
|
||||
stdout.write,
|
||||
)?;
|
||||
|
||||
let master = AwaitableConPtyMasterPty {
|
||||
inner: Arc::new(Mutex::new(AwaitableInner::new(
|
||||
con,
|
||||
stdout.read,
|
||||
stdin.write,
|
||||
size,
|
||||
))),
|
||||
};
|
||||
|
||||
let slave = AwaitableConPtySlavePty {
|
||||
inner: master.inner.clone(),
|
||||
};
|
||||
|
||||
Ok(crate::awaitable::PtyPair {
|
||||
master: Box::pin(master),
|
||||
slave: Box::pin(slave),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn peek_pipe_len(pipe: HANDLE) -> std::io::Result<usize> {
|
||||
let mut bytes_avail = 0;
|
||||
|
||||
let res = unsafe {
|
||||
PeekNamedPipe(
|
||||
pipe,
|
||||
std::ptr::null_mut(),
|
||||
0,
|
||||
std::ptr::null_mut(),
|
||||
&mut bytes_avail,
|
||||
std::ptr::null_mut(),
|
||||
)
|
||||
};
|
||||
if res == 0 {
|
||||
return Err(IoError::last_os_error());
|
||||
}
|
||||
|
||||
Ok(bytes_avail as usize)
|
||||
}
|
||||
|
||||
impl AwaitableInner {
|
||||
fn new(
|
||||
con: PsuedoCon,
|
||||
readable: FileDescriptor,
|
||||
writable: FileDescriptor,
|
||||
size: PtySize,
|
||||
) -> Self {
|
||||
let con = Arc::new(con);
|
||||
let (write_tx, write_rx) = channel();
|
||||
let write_thread_con = Arc::clone(&con);
|
||||
std::thread::spawn(move || Self::writer_thread(write_rx, write_thread_con, writable));
|
||||
|
||||
let (wait_for_read, wait_rx) = channel();
|
||||
|
||||
let (read_results_tx, read_results_rx) = sync_channel(1);
|
||||
std::thread::spawn(move || {
|
||||
AwaitableReader::reader_thread(wait_rx, readable, read_results_tx)
|
||||
});
|
||||
|
||||
Self {
|
||||
con,
|
||||
size,
|
||||
write_tx,
|
||||
reader: Arc::new(Mutex::new(AwaitableReader {
|
||||
wait_for_read,
|
||||
read_results_rx,
|
||||
read_buffer: ReadBuffer::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
fn writer_thread(
|
||||
to_write: Receiver<WriteRequest>,
|
||||
con: Arc<PsuedoCon>,
|
||||
mut writable: FileDescriptor,
|
||||
) {
|
||||
while let Ok(item) = to_write.recv() {
|
||||
match item {
|
||||
WriteRequest::Data(data) => {
|
||||
if let Err(_err) = writable.write_all(&data) {
|
||||
// FIXME: set errored flag?
|
||||
// Right now we defer error detection to
|
||||
// the read side
|
||||
}
|
||||
}
|
||||
WriteRequest::Resize(size, waker, results) => {
|
||||
let res = con.resize(COORD {
|
||||
X: size.cols as i16,
|
||||
Y: size.rows as i16,
|
||||
});
|
||||
let res = results.send(res);
|
||||
waker.wake();
|
||||
if res.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_write_impl(
|
||||
&mut self,
|
||||
_cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
// The poll_write API has EAGAIN semantics which are too
|
||||
// awkward to emulate here. We'll simply buffer up the
|
||||
// write and claim that it succeeded.
|
||||
let len = buf.len();
|
||||
if let Err(err) = self.write_tx.send(WriteRequest::Data(buf.to_vec())) {
|
||||
Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
format!("unable to queue write ({}); treating as EOF", err),
|
||||
)))
|
||||
} else {
|
||||
Poll::Ready(Ok(len))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AwaitableReader {
|
||||
fn poll_read_impl(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
|
||||
// Try to satisfy from the buffer
|
||||
let len = self.read_buffer.consume(buf);
|
||||
if len > 0 {
|
||||
return Poll::Ready(Ok(len));
|
||||
}
|
||||
|
||||
match self.read_results_rx.try_recv() {
|
||||
// A successful read; store to the buffer and then return
|
||||
// an appropriate portion of it to our caller.
|
||||
Ok(Ok(recv_buf)) => {
|
||||
self.read_buffer.append(&recv_buf);
|
||||
|
||||
let len = self.read_buffer.consume(buf);
|
||||
if len > 0 {
|
||||
return Poll::Ready(Ok(len));
|
||||
}
|
||||
|
||||
// Probably impossible, but if we get here, we'll fall
|
||||
// through below and request some more data to be read
|
||||
}
|
||||
|
||||
// Map BrokenPipe errors to EOF for easier POSIX compatibility
|
||||
Ok(Err(err)) if err.kind() == std::io::ErrorKind::BrokenPipe => {
|
||||
return Poll::Ready(Ok(0))
|
||||
}
|
||||
|
||||
// Other errors are returned to the caller
|
||||
Ok(Err(err)) => return Poll::Ready(Err(err)),
|
||||
|
||||
Err(TryRecvError::Empty) => {
|
||||
// There are no reasults ready to read yet.
|
||||
// We'll queue up a read below.
|
||||
}
|
||||
|
||||
// There's a problem with the channel, most likely we're partially
|
||||
// destroyed: relay this as an EOF error (distinct from EOF) so
|
||||
// that it bubbles up as an actual error condition
|
||||
Err(err) => {
|
||||
return Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
format!("unable to receive read results ({}); treating as EOF", err),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
// Ask the reader thread to do some IO for us
|
||||
match self.wait_for_read.send((cx.waker().clone(), buf.len())) {
|
||||
Ok(_) => Poll::Pending,
|
||||
Err(err) => Poll::Ready(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
format!("unable to queue read ({}); treating as EOF", err),
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn reader_thread(
|
||||
wait_requests: Receiver<(Waker, usize)>,
|
||||
mut readable: FileDescriptor,
|
||||
read_results: SyncSender<std::io::Result<Vec<u8>>>,
|
||||
) {
|
||||
while let Ok((waker, size)) = wait_requests.recv() {
|
||||
// If there is data already present in the pipe, we know that
|
||||
// our read will immediately succeed for that size, so prefer
|
||||
// to size our local vector to match.
|
||||
// If there is no data available to read, take the size from
|
||||
// the caller.
|
||||
let res = match peek_pipe_len(readable.as_raw_handle()) {
|
||||
Ok(avail) => {
|
||||
let size = if avail == 0 { size } else { size.min(avail) };
|
||||
|
||||
let mut buf = vec![0u8; size];
|
||||
|
||||
readable.read(&mut buf).map(|size| {
|
||||
buf.resize(size, 0);
|
||||
buf
|
||||
})
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
};
|
||||
|
||||
let broken = read_results.send(res).is_err();
|
||||
waker.wake();
|
||||
if broken {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AwaitableReaderArc(pub Arc<Mutex<AwaitableReader>>);
|
@ -10,7 +10,6 @@ use winapi::um::processthreadsapi::*;
|
||||
use winapi::um::synchapi::WaitForSingleObject;
|
||||
use winapi::um::winbase::INFINITE;
|
||||
|
||||
mod awaitable;
|
||||
pub mod conpty;
|
||||
mod procthreadattr;
|
||||
mod psuedocon;
|
||||
@ -77,12 +76,6 @@ impl Child for WinChild {
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::awaitable::Child for WinChild {
|
||||
fn kill(&mut self) -> IoResult<()> {
|
||||
self.do_kill()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::future::Future for WinChild {
|
||||
type Output = anyhow::Result<ExitStatus>;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user