Compare commits

...

7 Commits

Author SHA1 Message Date
Marcin Kulik
b5b44f6347 Fix the event name in README 2024-07-04 19:53:51 +02:00
Marcin Kulik
2785f42f32 Words 2024-07-04 18:15:53 +02:00
Marcin Kulik
caab207d4d Document WebSocket API 2024-07-04 18:13:56 +02:00
Marcin Kulik
f4e3727e34 Document live preview endpoint 2024-07-04 17:59:52 +02:00
Marcin Kulik
b0ba80e4a5 Make stdio API respond to getView via event subscription 2024-07-04 17:38:53 +02:00
Marcin Kulik
980e6873f1
Merge pull request #13 from andyk/events-endpoint
Add WebSocket event stream endpoint
2024-07-04 17:06:29 +02:00
Marcin Kulik
f50dbd68aa Add WebSocket event stream endpoint 2024-07-04 15:57:19 +02:00
10 changed files with 312 additions and 63 deletions

13
Cargo.lock generated
View File

@ -123,6 +123,7 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"serde_urlencoded",
"sha1",
"sync_wrapper 1.0.1",
"tokio",
@ -860,6 +861,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"

View File

@ -15,7 +15,7 @@ anyhow = "1.0.81"
clap = { version = "4.5.4", features = ["derive"] }
serde = "1.0.203"
tokio = { version = "1.38.0", features = ["full"] }
axum = { version = "0.7.5", default-features = false, features = ["http1", "ws"] }
axum = { version = "0.7.5", default-features = false, features = ["http1", "ws", "query"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
futures-util = "0.3.30"
rust-embed = "8.4.0"

View File

@ -1,6 +1,6 @@
# ht - headless terminal
`ht` (short for *headless terminal*) is a command line program that wraps an arbitrary other binary (e.g. `bash`, `vim`, etc.) with a VT100 style terminal interface--i.e. a pseudoterminal client (PTY) plus terminal server--and allows easy programmatic access to the input and output of that terminal (via JSON over stdin/stdout). `ht` is built in rust and works on MacOS and Linux.
`ht` (short for *headless terminal*) is a command line program that wraps an arbitrary other binary (e.g. `bash`, `vim`, etc.) with a VT100 style terminal interface--i.e. a pseudoterminal client (PTY) plus terminal server--and allows easy programmatic access to the input and output of that terminal (via JSON over STDIN/STDOUT). `ht` is built in rust and works on MacOS and Linux.
<img src="https://andykonwinski.com/assets/img/headless-terminal.png" alt="screenshot of raw terminal output vs ht output" align="right" style="width:450px">
@ -67,19 +67,39 @@ size can also be dynamically changed - see [resize command](#resize) below.
Run `ht -h` or `ht --help` to see all available options.
## Live terminal preview
ht comes with a built-in HTTP server which provides a handy live terminal preview page.
To enable it, start ht with `-l` / `--listen` option. This will print the URL of
the live preview.
By default it listens on `127.0.0.1` and a system assigned, dynamic port. If you
need it to bind to another interface, or a specific port, pass the address to
the `-l` option, e.g. `-l 0.0.0.0:9999`.
## API
Communication with ht is performed via stdin, stdout and stderr.
ht provides 2 types of API: STDIO and WebSocket.
ht uses simple JSON-based protocol for sending commands to its stdin. Each
The STDIO API allows control and introspection of the terminal using STDIN,
STDOUT and STDERR.
WebSocket API provides several endpoints for getting terminal updates in
real-time. Websocket API is _not_ enabled by default, and requires starting the
built-in HTTP server with `-l` / `--listen` option.
### STDIO API
ht uses simple JSON-based protocol for sending commands to its STDIN. Each
command must be sent on a separate line and be a JSON object having `"type"`
field set to one of the supported commands (below).
ht sends responses (where applicable) to its stdout, as JSON-encoded objects.
ht sends responses (where applicable) to its STDOUT, as JSON-encoded objects.
Diagnostic messages (notices, errors) are printed to stderr.
Diagnostic messages (notices, errors) are printed to STDERR.
### sendKeys
#### sendKeys
`sendKeys` command allows sending keys to a process running in the virtual
terminal as if the keys were pressed on a keyboard.
@ -94,7 +114,7 @@ Each element of the `keys` array can be either a key name or an arbitrary text.
If a key is not matched by any supported key name then the text is sent to the
process as is, i.e. like when using the `input` command.
This command doesn't produce any output on stdout.
This command doesn't produce any output on STDOUT.
The key and modifier specifications were inspired by
[tmux](https://github.com/tmux/tmux/wiki/Modifier-Keys).
@ -135,7 +155,7 @@ etc. For text characters, instead of specifying e.g. `S-a` just use upper case
Alt modifier can be used with any Unicode character and most special key names.
### input
#### input
`input` command allows sending arbitrary raw input to a process running in the
virtual terminal.
@ -156,9 +176,9 @@ payload:
{ "type": "input", "payload": "\u0003" }
```
This command doesn't produce any output on stdout.
This command doesn't produce any output on STDOUT.
### getView
#### getView
`getView` command allows obtaining a textual view of a terminal window.
@ -166,14 +186,14 @@ This command doesn't produce any output on stdout.
{ "type": "getView" }
```
This command responds with the current view on stdout. The view is a multi-line
This command responds with the current view on STDOUT. The view is a multi-line
string, where each line represents a terminal row.
```json
{ "view": "[user@host dir]$ \n \n..." }
```
### resize
#### resize
`resize` command allows resizing the virtual terminal window dynamically by
specifying new width (`cols`) and height (`rows`).
@ -182,15 +202,50 @@ specifying new width (`cols`) and height (`rows`).
{ "type": "resize", "cols": 80, "rows": 24 }
```
This command doesn't produce any output on stdout.
This command doesn't produce any output on STDOUT.
### WebSocket API
The WebSocket API currently provides 2 endpoints:
#### `/ws/events`
This endpoint allows the client to subscribe to events that happen in ht.
Query param `sub` should be set to a comma-separated list of desired events.
E.g. `/ws/events?sub=init,snapshot`.
Events are delivered as JSON encoded strings, using WebSocket text message type.
Every event contains 2 fields:
- `type` - type of event,
- `data` - associated data, specific to each event type.
Supported events:
- `init` - similar to `snapshot` (see below) but sent only once, as the first event after establishing connection
- `output` - terminal output
- `resize` - terminal resize
- `snapshot` - view snapshot taken (e.g. with `getView`)
TODO: describe the associated data for the above event types.
#### `/ws/alis`
This endpoint implements JSON flavor of [asciinema live stream
protocol](https://github.com/asciinema/asciinema-player/blob/develop/src/driver/websocket.js),
therefore allows pointing asciinema player directly to ht to get a real-time
terminal preview. This endpoint is used by the live terminal preview page
mentioned above.
## Testing on command line
ht is aimed at programmatic use given its JSON-based API, however one can play
with it by just launching it in a normal desktop terminal emulator and typing in
JSON-encoded commands from keyboard and observing the output on stdout.
JSON-encoded commands from keyboard and observing the output on STDOUT.
[rlwrap](https://github.com/hanslub42/rlwrap) can be used to wrap stdin in a
[rlwrap](https://github.com/hanslub42/rlwrap) can be used to wrap STDIN in a
readline based editable prompt, which also provides history (up/down arrows).
To use `rlwrap` with `ht`:

View File

@ -33,7 +33,7 @@
<script>
const loc = window.location;
const src = loc.protocol.replace("http", "ws") + '//' + loc.host + '/ws/live';
const src = loc.protocol.replace("http", "ws") + '//' + loc.host + '/ws/alis';
const opts = {
logger: console,

View File

@ -1,9 +1,11 @@
use crate::command::{self, Command, InputSeq};
use crate::session::{self, Event};
use anyhow::Result;
use serde::{de::DeserializeOwned, Deserialize};
use std::io;
use std::thread;
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
#[derive(Debug, Deserialize)]
struct InputArgs {
@ -21,14 +23,41 @@ struct ResizeArgs {
rows: usize,
}
pub async fn start(command_tx: mpsc::Sender<Command>) -> Result<()> {
pub async fn start(
command_tx: mpsc::Sender<Command>,
clients_tx: mpsc::Sender<session::Client>,
) -> Result<()> {
let (input_tx, mut input_rx) = mpsc::unbounded_channel();
thread::spawn(|| read_stdin(input_tx));
let mut events = session::stream(&clients_tx).await?;
while let Some(line) = input_rx.recv().await {
match parse_line(&line) {
Ok(command) => command_tx.send(command).await?,
Err(e) => eprintln!("command parse error: {e}"),
loop {
tokio::select! {
line = input_rx.recv() => {
match line {
Some(line) => {
match parse_line(&line) {
Ok(command) => command_tx.send(command).await?,
Err(e) => eprintln!("command parse error: {e}"),
}
}
None => break
}
}
event = events.next() => {
match event {
Some(Ok(Event::Snapshot(_cols, _rows, _seq, text))) => {
let msg = serde_json::json!({ "view": text });
println!("{}", serde_json::to_string(&msg).unwrap());
}
Some(_) => (),
None => break
}
}
}
}
@ -67,7 +96,7 @@ fn build_command(value: serde_json::Value) -> Result<Command, String> {
Ok(Command::Resize(args.cols, args.rows))
}
Some("getView") => Ok(Command::GetView),
Some("getView") => Ok(Command::Snapshot),
other => Err(format!("invalid command type: {other:?}")),
}
@ -434,7 +463,7 @@ mod test {
#[test]
fn parse_get_view() {
let command = parse_line(r#"{ "type": "getView" }"#).unwrap();
assert!(matches!(command, Command::GetView));
assert!(matches!(command, Command::Snapshot));
}
#[test]

View File

@ -16,8 +16,8 @@ pub struct Cli {
pub command: Vec<String>,
/// Enable HTTP server
#[arg(short, long, default_missing_value = "127.0.0.1:0", num_args = 0..=1)]
pub listen_addr: Option<SocketAddr>,
#[arg(short, long, value_name = "LISTEN_ADDR", default_missing_value = "127.0.0.1:0", num_args = 0..=1)]
pub listen: Option<SocketAddr>,
}
impl Cli {

View File

@ -1,7 +1,7 @@
#[derive(Debug)]
pub enum Command {
Input(Vec<InputSeq>),
GetView,
Snapshot,
Resize(usize, usize),
}

View File

@ -22,8 +22,8 @@ async fn main() -> Result<()> {
let (command_tx, command_rx) = mpsc::channel(1024);
let (clients_tx, clients_rx) = mpsc::channel(1);
start_http_server(cli.listen_addr, clients_tx).await?;
let api = start_api(command_tx);
start_http_server(cli.listen, clients_tx.clone()).await?;
let api = start_api(command_tx, clients_tx);
let pty = start_pty(cli.command, &cli.size, input_rx, output_tx)?;
let session = build_session(&cli.size);
run_event_loop(output_rx, input_tx, command_rx, clients_rx, session, api).await?;
@ -34,8 +34,11 @@ fn build_session(size: &cli::Size) -> Session {
Session::new(size.cols(), size.rows())
}
fn start_api(command_tx: mpsc::Sender<Command>) -> JoinHandle<Result<()>> {
tokio::spawn(api::start(command_tx))
fn start_api(
command_tx: mpsc::Sender<Command>,
clients_tx: mpsc::Sender<session::Client>,
) -> JoinHandle<Result<()>> {
tokio::spawn(api::start(command_tx, clients_tx))
}
fn start_pty(
@ -96,9 +99,8 @@ async fn run_event_loop(
input_tx.send(data).await?;
}
Some(Command::GetView) => {
let resp = serde_json::json!({ "view": session.get_text() });
println!("{}", serde_json::to_string(&resp).unwrap());
Some(Command::Snapshot) => {
session.snapshot();
}
Some(Command::Resize(cols, rows)) => {

View File

@ -1,7 +1,7 @@
use crate::session;
use anyhow::Result;
use axum::{
extract::{connect_info::ConnectInfo, ws, State},
extract::{connect_info::ConnectInfo, ws, Query, State},
http::{header, StatusCode, Uri},
response::IntoResponse,
routing::get,
@ -9,6 +9,7 @@ use axum::{
};
use futures_util::{sink, stream, StreamExt};
use rust_embed::RustEmbed;
use serde::Deserialize;
use serde_json::json;
use std::borrow::Cow;
use std::future::{self, Future, IntoFuture};
@ -32,7 +33,8 @@ pub async fn start(
eprintln!("live preview available at http://{addr}");
let app: Router<()> = Router::new()
.route("/ws/live", get(ws_handler))
.route("/ws/alis", get(alis_handler))
.route("/ws/events", get(event_stream_handler))
.with_state(clients_tx)
.fallback(static_handler);
@ -43,19 +45,21 @@ pub async fn start(
.into_future())
}
async fn ws_handler(
/// ALiS protocol handler
///
/// This endpoint implements ALiS (asciinema live stream) protocol (https://docs.asciinema.org/manual/alis/).
/// It allows pointing asciinema player directly to ht to get a real-time terminal preview.
async fn alis_handler(
ws: ws::WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
ConnectInfo(_addr): ConnectInfo<SocketAddr>,
State(clients_tx): State<mpsc::Sender<session::Client>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| async move {
eprintln!("websocket client {addr} connected");
let _ = handle_socket(socket, clients_tx).await;
eprintln!("websocket client {addr} disconnected");
let _ = handle_alis_socket(socket, clients_tx).await;
})
}
async fn handle_socket(
async fn handle_alis_socket(
socket: ws::WebSocket,
clients_tx: mpsc::Sender<session::Client>,
) -> Result<()> {
@ -64,7 +68,7 @@ async fn handle_socket(
let result = session::stream(&clients_tx)
.await?
.map(ws_result)
.filter_map(alis_message)
.chain(stream::once(future::ready(Ok(close_message()))))
.forward(sink)
.await;
@ -75,25 +79,151 @@ async fn handle_socket(
Ok(())
}
fn ws_result(
async fn alis_message(
event: Result<session::Event, BroadcastStreamRecvError>,
) -> Result<ws::Message, axum::Error> {
) -> Option<Result<ws::Message, axum::Error>> {
use session::Event::*;
event.map_err(axum::Error::new).map(|event| match event {
Init(time, cols, rows, init) => json_message(json!({
match event {
Ok(Init(time, cols, rows, seq, _text)) => Some(Ok(json_message(json!({
"time": time,
"cols": cols,
"rows": rows,
"time": time,
"init": init
})),
"init": seq,
})))),
Stdout(time, data) => json_message(json!([time, "o", data])),
Ok(Output(time, data)) => Some(Ok(json_message(json!([time, "o", data])))),
Resize(time, cols, rows) => json_message(json!([time, "r", format!("{cols}x{rows}")])),
Ok(Resize(time, cols, rows)) => Some(Ok(json_message(json!([
time,
"r",
format!("{cols}x{rows}")
])))),
Ok(Snapshot(_, _, _, _)) => None,
Err(e) => Some(Err(axum::Error::new(e))),
}
}
#[derive(Debug, Deserialize)]
struct EventsParams {
sub: Option<String>,
}
#[derive(Default, Copy, Clone)]
struct EventSubscription {
init: bool,
snapshot: bool,
resize: bool,
output: bool,
}
impl From<String> for EventSubscription {
fn from(value: String) -> Self {
let mut sub = EventSubscription::default();
for s in value.split(',') {
match s {
"init" => sub.init = true,
"output" => sub.output = true,
"resize" => sub.resize = true,
"snapshot" => sub.snapshot = true,
_ => (),
}
}
sub
}
}
/// Event stream handler
///
/// This endpoint allows the client to subscribe to selected events and have them delivered as they occur.
/// Query param `sub` should be set to a comma-separated list desired of events.
/// See above for a list of supported events.
async fn event_stream_handler(
ws: ws::WebSocketUpgrade,
Query(params): Query<EventsParams>,
ConnectInfo(_addr): ConnectInfo<SocketAddr>,
State(clients_tx): State<mpsc::Sender<session::Client>>,
) -> impl IntoResponse {
let sub = params.sub.unwrap_or_default().into();
ws.on_upgrade(move |socket| async move {
let _ = handle_event_stream_socket(socket, clients_tx, sub).await;
})
}
async fn handle_event_stream_socket(
socket: ws::WebSocket,
clients_tx: mpsc::Sender<session::Client>,
sub: EventSubscription,
) -> Result<()> {
let (sink, stream) = socket.split();
let drainer = tokio::spawn(stream.map(Ok).forward(sink::drain()));
let result = session::stream(&clients_tx)
.await?
.filter_map(move |e| event_stream_message(e, sub))
.chain(stream::once(future::ready(Ok(close_message()))))
.forward(sink)
.await;
drainer.abort();
result?;
Ok(())
}
async fn event_stream_message(
event: Result<session::Event, BroadcastStreamRecvError>,
sub: EventSubscription,
) -> Option<Result<ws::Message, axum::Error>> {
use session::Event::*;
match event {
Ok(Init(_time, cols, rows, seq, text)) if sub.init => Some(Ok(json_message(json!({
"type": "init",
"data": json!({
"cols": cols,
"rows": rows,
"seq": seq,
"text": text,
})
})))),
Ok(Output(_time, data)) if sub.output => Some(Ok(json_message(json!({
"type": "output",
"data": json!({
"seq": data
})
})))),
Ok(Resize(_time, cols, rows)) if sub.resize => Some(Ok(json_message(json!({
"type": "resize",
"data": json!({
"cols": cols,
"rows": rows,
})
})))),
Ok(Snapshot(cols, rows, seq, text)) if sub.snapshot => Some(Ok(json_message(json!({
"type": "snapshot",
"data": json!({
"cols": cols,
"rows": rows,
"seq": seq,
"text": text,
})
})))),
Ok(_) => None,
Err(e) => Some(Err(axum::Error::new(e))),
}
}
fn json_message(value: serde_json::Value) -> ws::Message {
ws::Message::Text(value.to_string())
}

View File

@ -15,9 +15,10 @@ pub struct Session {
#[derive(Clone)]
pub enum Event {
Init(f64, usize, usize, String),
Stdout(f64, String),
Init(f64, usize, usize, String, String),
Output(f64, String),
Resize(f64, usize, usize),
Snapshot(usize, usize, String, String),
}
pub struct Client(oneshot::Sender<Subscription>);
@ -44,7 +45,7 @@ impl Session {
pub fn output(&mut self, data: String) {
self.vt.feed_str(&data);
let time = self.start_time.elapsed().as_secs_f64();
let _ = self.broadcast_tx.send(Event::Stdout(time, data));
let _ = self.broadcast_tx.send(Event::Output(time, data));
self.stream_time = time;
self.last_event_time = Instant::now();
}
@ -57,13 +58,15 @@ impl Session {
self.last_event_time = Instant::now();
}
pub fn get_text(&self) -> String {
self.vt
.lines()
.iter()
.map(|l| l.text())
.collect::<Vec<_>>()
.join("\n")
pub fn snapshot(&self) {
let (cols, rows) = self.vt.size();
let _ = self.broadcast_tx.send(Event::Snapshot(
cols,
rows,
self.vt.dump(),
self.text_view(),
));
}
pub fn cursor_key_app_mode(&self) -> bool {
@ -72,7 +75,15 @@ impl Session {
pub fn subscribe(&self) -> Subscription {
let (cols, rows) = self.vt.size();
let init = Event::Init(self.elapsed_time(), cols, rows, self.vt.dump());
let init = Event::Init(
self.elapsed_time(),
cols,
rows,
self.vt.dump(),
self.text_view(),
);
let broadcast_rx = self.broadcast_tx.subscribe();
Subscription { init, broadcast_rx }
@ -81,6 +92,15 @@ impl Session {
fn elapsed_time(&self) -> f64 {
self.stream_time + self.last_event_time.elapsed().as_secs_f64()
}
fn text_view(&self) -> String {
self.vt
.view()
.iter()
.map(|l| l.text())
.collect::<Vec<_>>()
.join("\n")
}
}
fn build_vt(cols: usize, rows: usize) -> avt::Vt {