mirror of
https://github.com/andyk/ht.git
synced 2024-10-04 01:08:04 +03:00
Merge pull request #13 from andyk/events-endpoint
Add WebSocket event stream endpoint
This commit is contained in:
commit
980e6873f1
13
Cargo.lock
generated
13
Cargo.lock
generated
@ -123,6 +123,7 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rustversion",
|
"rustversion",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_urlencoded",
|
||||||
"sha1",
|
"sha1",
|
||||||
"sync_wrapper 1.0.1",
|
"sync_wrapper 1.0.1",
|
||||||
"tokio",
|
"tokio",
|
||||||
@ -860,6 +861,18 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_urlencoded"
|
||||||
|
version = "0.7.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
|
||||||
|
dependencies = [
|
||||||
|
"form_urlencoded",
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha1"
|
name = "sha1"
|
||||||
version = "0.10.6"
|
version = "0.10.6"
|
||||||
|
@ -15,7 +15,7 @@ anyhow = "1.0.81"
|
|||||||
clap = { version = "4.5.4", features = ["derive"] }
|
clap = { version = "4.5.4", features = ["derive"] }
|
||||||
serde = "1.0.203"
|
serde = "1.0.203"
|
||||||
tokio = { version = "1.38.0", features = ["full"] }
|
tokio = { version = "1.38.0", features = ["full"] }
|
||||||
axum = { version = "0.7.5", default-features = false, features = ["http1", "ws"] }
|
axum = { version = "0.7.5", default-features = false, features = ["http1", "ws", "query"] }
|
||||||
tokio-stream = { version = "0.1.15", features = ["sync"] }
|
tokio-stream = { version = "0.1.15", features = ["sync"] }
|
||||||
futures-util = "0.3.30"
|
futures-util = "0.3.30"
|
||||||
rust-embed = "8.4.0"
|
rust-embed = "8.4.0"
|
||||||
|
@ -33,7 +33,7 @@
|
|||||||
|
|
||||||
<script>
|
<script>
|
||||||
const loc = window.location;
|
const loc = window.location;
|
||||||
const src = loc.protocol.replace("http", "ws") + '//' + loc.host + '/ws/live';
|
const src = loc.protocol.replace("http", "ws") + '//' + loc.host + '/ws/alis';
|
||||||
|
|
||||||
const opts = {
|
const opts = {
|
||||||
logger: console,
|
logger: console,
|
||||||
|
166
src/server.rs
166
src/server.rs
@ -1,7 +1,7 @@
|
|||||||
use crate::session;
|
use crate::session;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{connect_info::ConnectInfo, ws, State},
|
extract::{connect_info::ConnectInfo, ws, Query, State},
|
||||||
http::{header, StatusCode, Uri},
|
http::{header, StatusCode, Uri},
|
||||||
response::IntoResponse,
|
response::IntoResponse,
|
||||||
routing::get,
|
routing::get,
|
||||||
@ -9,6 +9,7 @@ use axum::{
|
|||||||
};
|
};
|
||||||
use futures_util::{sink, stream, StreamExt};
|
use futures_util::{sink, stream, StreamExt};
|
||||||
use rust_embed::RustEmbed;
|
use rust_embed::RustEmbed;
|
||||||
|
use serde::Deserialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::future::{self, Future, IntoFuture};
|
use std::future::{self, Future, IntoFuture};
|
||||||
@ -32,7 +33,8 @@ pub async fn start(
|
|||||||
eprintln!("live preview available at http://{addr}");
|
eprintln!("live preview available at http://{addr}");
|
||||||
|
|
||||||
let app: Router<()> = Router::new()
|
let app: Router<()> = Router::new()
|
||||||
.route("/ws/live", get(ws_handler))
|
.route("/ws/alis", get(alis_handler))
|
||||||
|
.route("/ws/events", get(event_stream_handler))
|
||||||
.with_state(clients_tx)
|
.with_state(clients_tx)
|
||||||
.fallback(static_handler);
|
.fallback(static_handler);
|
||||||
|
|
||||||
@ -43,19 +45,21 @@ pub async fn start(
|
|||||||
.into_future())
|
.into_future())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ws_handler(
|
/// ALiS protocol handler
|
||||||
|
///
|
||||||
|
/// This endpoint implements ALiS (asciinema live stream) protocol (https://docs.asciinema.org/manual/alis/).
|
||||||
|
/// It allows pointing asciinema player directly to ht to get a real-time terminal preview.
|
||||||
|
async fn alis_handler(
|
||||||
ws: ws::WebSocketUpgrade,
|
ws: ws::WebSocketUpgrade,
|
||||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
ConnectInfo(_addr): ConnectInfo<SocketAddr>,
|
||||||
State(clients_tx): State<mpsc::Sender<session::Client>>,
|
State(clients_tx): State<mpsc::Sender<session::Client>>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
ws.on_upgrade(move |socket| async move {
|
ws.on_upgrade(move |socket| async move {
|
||||||
eprintln!("websocket client {addr} connected");
|
let _ = handle_alis_socket(socket, clients_tx).await;
|
||||||
let _ = handle_socket(socket, clients_tx).await;
|
|
||||||
eprintln!("websocket client {addr} disconnected");
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_socket(
|
async fn handle_alis_socket(
|
||||||
socket: ws::WebSocket,
|
socket: ws::WebSocket,
|
||||||
clients_tx: mpsc::Sender<session::Client>,
|
clients_tx: mpsc::Sender<session::Client>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
@ -64,7 +68,7 @@ async fn handle_socket(
|
|||||||
|
|
||||||
let result = session::stream(&clients_tx)
|
let result = session::stream(&clients_tx)
|
||||||
.await?
|
.await?
|
||||||
.map(ws_result)
|
.filter_map(alis_message)
|
||||||
.chain(stream::once(future::ready(Ok(close_message()))))
|
.chain(stream::once(future::ready(Ok(close_message()))))
|
||||||
.forward(sink)
|
.forward(sink)
|
||||||
.await;
|
.await;
|
||||||
@ -75,25 +79,151 @@ async fn handle_socket(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ws_result(
|
async fn alis_message(
|
||||||
event: Result<session::Event, BroadcastStreamRecvError>,
|
event: Result<session::Event, BroadcastStreamRecvError>,
|
||||||
) -> Result<ws::Message, axum::Error> {
|
) -> Option<Result<ws::Message, axum::Error>> {
|
||||||
use session::Event::*;
|
use session::Event::*;
|
||||||
|
|
||||||
event.map_err(axum::Error::new).map(|event| match event {
|
match event {
|
||||||
Init(time, cols, rows, init) => json_message(json!({
|
Ok(Init(time, cols, rows, seq, _text)) => Some(Ok(json_message(json!({
|
||||||
|
"time": time,
|
||||||
"cols": cols,
|
"cols": cols,
|
||||||
"rows": rows,
|
"rows": rows,
|
||||||
"time": time,
|
"init": seq,
|
||||||
"init": init
|
})))),
|
||||||
})),
|
|
||||||
|
|
||||||
Stdout(time, data) => json_message(json!([time, "o", data])),
|
Ok(Output(time, data)) => Some(Ok(json_message(json!([time, "o", data])))),
|
||||||
|
|
||||||
Resize(time, cols, rows) => json_message(json!([time, "r", format!("{cols}x{rows}")])),
|
Ok(Resize(time, cols, rows)) => Some(Ok(json_message(json!([
|
||||||
|
time,
|
||||||
|
"r",
|
||||||
|
format!("{cols}x{rows}")
|
||||||
|
])))),
|
||||||
|
|
||||||
|
Ok(Snapshot(_, _, _, _)) => None,
|
||||||
|
|
||||||
|
Err(e) => Some(Err(axum::Error::new(e))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct EventsParams {
|
||||||
|
sub: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Copy, Clone)]
|
||||||
|
struct EventSubscription {
|
||||||
|
init: bool,
|
||||||
|
snapshot: bool,
|
||||||
|
resize: bool,
|
||||||
|
output: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<String> for EventSubscription {
|
||||||
|
fn from(value: String) -> Self {
|
||||||
|
let mut sub = EventSubscription::default();
|
||||||
|
|
||||||
|
for s in value.split(',') {
|
||||||
|
match s {
|
||||||
|
"init" => sub.init = true,
|
||||||
|
"output" => sub.output = true,
|
||||||
|
"resize" => sub.resize = true,
|
||||||
|
"snapshot" => sub.snapshot = true,
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sub
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Event stream handler
|
||||||
|
///
|
||||||
|
/// This endpoint allows the client to subscribe to selected events and have them delivered as they occur.
|
||||||
|
/// Query param `sub` should be set to a comma-separated list desired of events.
|
||||||
|
/// See above for a list of supported events.
|
||||||
|
async fn event_stream_handler(
|
||||||
|
ws: ws::WebSocketUpgrade,
|
||||||
|
Query(params): Query<EventsParams>,
|
||||||
|
ConnectInfo(_addr): ConnectInfo<SocketAddr>,
|
||||||
|
State(clients_tx): State<mpsc::Sender<session::Client>>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let sub = params.sub.unwrap_or_default().into();
|
||||||
|
|
||||||
|
ws.on_upgrade(move |socket| async move {
|
||||||
|
let _ = handle_event_stream_socket(socket, clients_tx, sub).await;
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_event_stream_socket(
|
||||||
|
socket: ws::WebSocket,
|
||||||
|
clients_tx: mpsc::Sender<session::Client>,
|
||||||
|
sub: EventSubscription,
|
||||||
|
) -> Result<()> {
|
||||||
|
let (sink, stream) = socket.split();
|
||||||
|
let drainer = tokio::spawn(stream.map(Ok).forward(sink::drain()));
|
||||||
|
|
||||||
|
let result = session::stream(&clients_tx)
|
||||||
|
.await?
|
||||||
|
.filter_map(move |e| event_stream_message(e, sub))
|
||||||
|
.chain(stream::once(future::ready(Ok(close_message()))))
|
||||||
|
.forward(sink)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
drainer.abort();
|
||||||
|
result?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn event_stream_message(
|
||||||
|
event: Result<session::Event, BroadcastStreamRecvError>,
|
||||||
|
sub: EventSubscription,
|
||||||
|
) -> Option<Result<ws::Message, axum::Error>> {
|
||||||
|
use session::Event::*;
|
||||||
|
|
||||||
|
match event {
|
||||||
|
Ok(Init(_time, cols, rows, seq, text)) if sub.init => Some(Ok(json_message(json!({
|
||||||
|
"type": "init",
|
||||||
|
"data": json!({
|
||||||
|
"cols": cols,
|
||||||
|
"rows": rows,
|
||||||
|
"seq": seq,
|
||||||
|
"text": text,
|
||||||
|
})
|
||||||
|
})))),
|
||||||
|
|
||||||
|
Ok(Output(_time, data)) if sub.output => Some(Ok(json_message(json!({
|
||||||
|
"type": "output",
|
||||||
|
"data": json!({
|
||||||
|
"seq": data
|
||||||
|
})
|
||||||
|
})))),
|
||||||
|
|
||||||
|
Ok(Resize(_time, cols, rows)) if sub.resize => Some(Ok(json_message(json!({
|
||||||
|
"type": "resize",
|
||||||
|
"data": json!({
|
||||||
|
"cols": cols,
|
||||||
|
"rows": rows,
|
||||||
|
})
|
||||||
|
})))),
|
||||||
|
|
||||||
|
Ok(Snapshot(cols, rows, seq, text)) if sub.snapshot => Some(Ok(json_message(json!({
|
||||||
|
"type": "snapshot",
|
||||||
|
"data": json!({
|
||||||
|
"cols": cols,
|
||||||
|
"rows": rows,
|
||||||
|
"seq": seq,
|
||||||
|
"text": text,
|
||||||
|
})
|
||||||
|
})))),
|
||||||
|
|
||||||
|
Ok(_) => None,
|
||||||
|
|
||||||
|
Err(e) => Some(Err(axum::Error::new(e))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn json_message(value: serde_json::Value) -> ws::Message {
|
fn json_message(value: serde_json::Value) -> ws::Message {
|
||||||
ws::Message::Text(value.to_string())
|
ws::Message::Text(value.to_string())
|
||||||
}
|
}
|
||||||
|
@ -15,9 +15,10 @@ pub struct Session {
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum Event {
|
pub enum Event {
|
||||||
Init(f64, usize, usize, String),
|
Init(f64, usize, usize, String, String),
|
||||||
Stdout(f64, String),
|
Output(f64, String),
|
||||||
Resize(f64, usize, usize),
|
Resize(f64, usize, usize),
|
||||||
|
Snapshot(usize, usize, String, String),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Client(oneshot::Sender<Subscription>);
|
pub struct Client(oneshot::Sender<Subscription>);
|
||||||
@ -44,7 +45,7 @@ impl Session {
|
|||||||
pub fn output(&mut self, data: String) {
|
pub fn output(&mut self, data: String) {
|
||||||
self.vt.feed_str(&data);
|
self.vt.feed_str(&data);
|
||||||
let time = self.start_time.elapsed().as_secs_f64();
|
let time = self.start_time.elapsed().as_secs_f64();
|
||||||
let _ = self.broadcast_tx.send(Event::Stdout(time, data));
|
let _ = self.broadcast_tx.send(Event::Output(time, data));
|
||||||
self.stream_time = time;
|
self.stream_time = time;
|
||||||
self.last_event_time = Instant::now();
|
self.last_event_time = Instant::now();
|
||||||
}
|
}
|
||||||
@ -58,12 +59,14 @@ impl Session {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_text(&self) -> String {
|
pub fn get_text(&self) -> String {
|
||||||
self.vt
|
let (cols, rows) = self.vt.size();
|
||||||
.lines()
|
let text = self.text_view();
|
||||||
.iter()
|
|
||||||
.map(|l| l.text())
|
let _ = self
|
||||||
.collect::<Vec<_>>()
|
.broadcast_tx
|
||||||
.join("\n")
|
.send(Event::Snapshot(cols, rows, self.vt.dump(), text.clone()));
|
||||||
|
|
||||||
|
text
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cursor_key_app_mode(&self) -> bool {
|
pub fn cursor_key_app_mode(&self) -> bool {
|
||||||
@ -72,7 +75,15 @@ impl Session {
|
|||||||
|
|
||||||
pub fn subscribe(&self) -> Subscription {
|
pub fn subscribe(&self) -> Subscription {
|
||||||
let (cols, rows) = self.vt.size();
|
let (cols, rows) = self.vt.size();
|
||||||
let init = Event::Init(self.elapsed_time(), cols, rows, self.vt.dump());
|
|
||||||
|
let init = Event::Init(
|
||||||
|
self.elapsed_time(),
|
||||||
|
cols,
|
||||||
|
rows,
|
||||||
|
self.vt.dump(),
|
||||||
|
self.text_view(),
|
||||||
|
);
|
||||||
|
|
||||||
let broadcast_rx = self.broadcast_tx.subscribe();
|
let broadcast_rx = self.broadcast_tx.subscribe();
|
||||||
|
|
||||||
Subscription { init, broadcast_rx }
|
Subscription { init, broadcast_rx }
|
||||||
@ -81,6 +92,15 @@ impl Session {
|
|||||||
fn elapsed_time(&self) -> f64 {
|
fn elapsed_time(&self) -> f64 {
|
||||||
self.stream_time + self.last_event_time.elapsed().as_secs_f64()
|
self.stream_time + self.last_event_time.elapsed().as_secs_f64()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn text_view(&self) -> String {
|
||||||
|
self.vt
|
||||||
|
.view()
|
||||||
|
.iter()
|
||||||
|
.map(|l| l.text())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_vt(cols: usize, rows: usize) -> avt::Vt {
|
fn build_vt(cols: usize, rows: usize) -> avt::Vt {
|
||||||
|
Loading…
Reference in New Issue
Block a user