Add WebSocket event stream endpoint

This commit is contained in:
Marcin Kulik 2024-07-04 15:57:19 +02:00
parent 6c71814812
commit f50dbd68aa
5 changed files with 193 additions and 30 deletions

13
Cargo.lock generated
View File

@ -123,6 +123,7 @@ dependencies = [
"pin-project-lite",
"rustversion",
"serde",
"serde_urlencoded",
"sha1",
"sync_wrapper 1.0.1",
"tokio",
@ -860,6 +861,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"

View File

@ -15,7 +15,7 @@ anyhow = "1.0.81"
clap = { version = "4.5.4", features = ["derive"] }
serde = "1.0.203"
tokio = { version = "1.38.0", features = ["full"] }
axum = { version = "0.7.5", default-features = false, features = ["http1", "ws"] }
axum = { version = "0.7.5", default-features = false, features = ["http1", "ws", "query"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
futures-util = "0.3.30"
rust-embed = "8.4.0"

View File

@ -33,7 +33,7 @@
<script>
const loc = window.location;
const src = loc.protocol.replace("http", "ws") + '//' + loc.host + '/ws/live';
const src = loc.protocol.replace("http", "ws") + '//' + loc.host + '/ws/alis';
const opts = {
logger: console,

View File

@ -1,7 +1,7 @@
use crate::session;
use anyhow::Result;
use axum::{
extract::{connect_info::ConnectInfo, ws, State},
extract::{connect_info::ConnectInfo, ws, Query, State},
http::{header, StatusCode, Uri},
response::IntoResponse,
routing::get,
@ -9,6 +9,7 @@ use axum::{
};
use futures_util::{sink, stream, StreamExt};
use rust_embed::RustEmbed;
use serde::Deserialize;
use serde_json::json;
use std::borrow::Cow;
use std::future::{self, Future, IntoFuture};
@ -32,7 +33,8 @@ pub async fn start(
eprintln!("live preview available at http://{addr}");
let app: Router<()> = Router::new()
.route("/ws/live", get(ws_handler))
.route("/ws/alis", get(alis_handler))
.route("/ws/events", get(event_stream_handler))
.with_state(clients_tx)
.fallback(static_handler);
@ -43,19 +45,21 @@ pub async fn start(
.into_future())
}
async fn ws_handler(
/// ALiS protocol handler
///
/// This endpoint implements ALiS (asciinema live stream) protocol (https://docs.asciinema.org/manual/alis/).
/// It allows pointing asciinema player directly to ht to get a real-time terminal preview.
async fn alis_handler(
ws: ws::WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
ConnectInfo(_addr): ConnectInfo<SocketAddr>,
State(clients_tx): State<mpsc::Sender<session::Client>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| async move {
eprintln!("websocket client {addr} connected");
let _ = handle_socket(socket, clients_tx).await;
eprintln!("websocket client {addr} disconnected");
let _ = handle_alis_socket(socket, clients_tx).await;
})
}
async fn handle_socket(
async fn handle_alis_socket(
socket: ws::WebSocket,
clients_tx: mpsc::Sender<session::Client>,
) -> Result<()> {
@ -64,7 +68,7 @@ async fn handle_socket(
let result = session::stream(&clients_tx)
.await?
.map(ws_result)
.filter_map(alis_message)
.chain(stream::once(future::ready(Ok(close_message()))))
.forward(sink)
.await;
@ -75,25 +79,151 @@ async fn handle_socket(
Ok(())
}
fn ws_result(
async fn alis_message(
event: Result<session::Event, BroadcastStreamRecvError>,
) -> Result<ws::Message, axum::Error> {
) -> Option<Result<ws::Message, axum::Error>> {
use session::Event::*;
event.map_err(axum::Error::new).map(|event| match event {
Init(time, cols, rows, init) => json_message(json!({
match event {
Ok(Init(time, cols, rows, seq, _text)) => Some(Ok(json_message(json!({
"time": time,
"cols": cols,
"rows": rows,
"time": time,
"init": init
})),
"init": seq,
})))),
Stdout(time, data) => json_message(json!([time, "o", data])),
Ok(Output(time, data)) => Some(Ok(json_message(json!([time, "o", data])))),
Resize(time, cols, rows) => json_message(json!([time, "r", format!("{cols}x{rows}")])),
Ok(Resize(time, cols, rows)) => Some(Ok(json_message(json!([
time,
"r",
format!("{cols}x{rows}")
])))),
Ok(Snapshot(_, _, _, _)) => None,
Err(e) => Some(Err(axum::Error::new(e))),
}
}
#[derive(Debug, Deserialize)]
struct EventsParams {
sub: Option<String>,
}
#[derive(Default, Copy, Clone)]
struct EventSubscription {
init: bool,
snapshot: bool,
resize: bool,
output: bool,
}
impl From<String> for EventSubscription {
fn from(value: String) -> Self {
let mut sub = EventSubscription::default();
for s in value.split(',') {
match s {
"init" => sub.init = true,
"output" => sub.output = true,
"resize" => sub.resize = true,
"snapshot" => sub.snapshot = true,
_ => (),
}
}
sub
}
}
/// Event stream handler
///
/// This endpoint allows the client to subscribe to selected events and have them delivered as they occur.
/// Query param `sub` should be set to a comma-separated list desired of events.
/// See above for a list of supported events.
async fn event_stream_handler(
ws: ws::WebSocketUpgrade,
Query(params): Query<EventsParams>,
ConnectInfo(_addr): ConnectInfo<SocketAddr>,
State(clients_tx): State<mpsc::Sender<session::Client>>,
) -> impl IntoResponse {
let sub = params.sub.unwrap_or_default().into();
ws.on_upgrade(move |socket| async move {
let _ = handle_event_stream_socket(socket, clients_tx, sub).await;
})
}
async fn handle_event_stream_socket(
socket: ws::WebSocket,
clients_tx: mpsc::Sender<session::Client>,
sub: EventSubscription,
) -> Result<()> {
let (sink, stream) = socket.split();
let drainer = tokio::spawn(stream.map(Ok).forward(sink::drain()));
let result = session::stream(&clients_tx)
.await?
.filter_map(move |e| event_stream_message(e, sub))
.chain(stream::once(future::ready(Ok(close_message()))))
.forward(sink)
.await;
drainer.abort();
result?;
Ok(())
}
async fn event_stream_message(
event: Result<session::Event, BroadcastStreamRecvError>,
sub: EventSubscription,
) -> Option<Result<ws::Message, axum::Error>> {
use session::Event::*;
match event {
Ok(Init(_time, cols, rows, seq, text)) if sub.init => Some(Ok(json_message(json!({
"type": "init",
"data": json!({
"cols": cols,
"rows": rows,
"seq": seq,
"text": text,
})
})))),
Ok(Output(_time, data)) if sub.output => Some(Ok(json_message(json!({
"type": "output",
"data": json!({
"seq": data
})
})))),
Ok(Resize(_time, cols, rows)) if sub.resize => Some(Ok(json_message(json!({
"type": "resize",
"data": json!({
"cols": cols,
"rows": rows,
})
})))),
Ok(Snapshot(cols, rows, seq, text)) if sub.snapshot => Some(Ok(json_message(json!({
"type": "snapshot",
"data": json!({
"cols": cols,
"rows": rows,
"seq": seq,
"text": text,
})
})))),
Ok(_) => None,
Err(e) => Some(Err(axum::Error::new(e))),
}
}
fn json_message(value: serde_json::Value) -> ws::Message {
ws::Message::Text(value.to_string())
}

View File

@ -15,9 +15,10 @@ pub struct Session {
#[derive(Clone)]
pub enum Event {
Init(f64, usize, usize, String),
Stdout(f64, String),
Init(f64, usize, usize, String, String),
Output(f64, String),
Resize(f64, usize, usize),
Snapshot(usize, usize, String, String),
}
pub struct Client(oneshot::Sender<Subscription>);
@ -44,7 +45,7 @@ impl Session {
pub fn output(&mut self, data: String) {
self.vt.feed_str(&data);
let time = self.start_time.elapsed().as_secs_f64();
let _ = self.broadcast_tx.send(Event::Stdout(time, data));
let _ = self.broadcast_tx.send(Event::Output(time, data));
self.stream_time = time;
self.last_event_time = Instant::now();
}
@ -58,12 +59,14 @@ impl Session {
}
pub fn get_text(&self) -> String {
self.vt
.lines()
.iter()
.map(|l| l.text())
.collect::<Vec<_>>()
.join("\n")
let (cols, rows) = self.vt.size();
let text = self.text_view();
let _ = self
.broadcast_tx
.send(Event::Snapshot(cols, rows, self.vt.dump(), text.clone()));
text
}
pub fn cursor_key_app_mode(&self) -> bool {
@ -72,7 +75,15 @@ impl Session {
pub fn subscribe(&self) -> Subscription {
let (cols, rows) = self.vt.size();
let init = Event::Init(self.elapsed_time(), cols, rows, self.vt.dump());
let init = Event::Init(
self.elapsed_time(),
cols,
rows,
self.vt.dump(),
self.text_view(),
);
let broadcast_rx = self.broadcast_tx.subscribe();
Subscription { init, broadcast_rx }
@ -81,6 +92,15 @@ impl Session {
fn elapsed_time(&self) -> f64 {
self.stream_time + self.last_event_time.elapsed().as_secs_f64()
}
fn text_view(&self) -> String {
self.vt
.view()
.iter()
.map(|l| l.text())
.collect::<Vec<_>>()
.join("\n")
}
}
fn build_vt(cols: usize, rows: usize) -> avt::Vt {