From 212b262faacc74fd5231aba99a07ba0ee3a64d1f Mon Sep 17 00:00:00 2001 From: Ivan Molodetskikh Date: Thu, 20 Jun 2024 09:22:02 +0300 Subject: [PATCH 1/2] ipc: Read only a single line on the client Allow extensibility. --- niri-ipc/src/socket.rs | 14 ++++++++------ src/ipc/server.rs | 3 ++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/niri-ipc/src/socket.rs b/niri-ipc/src/socket.rs index 67b9625..3964f00 100644 --- a/niri-ipc/src/socket.rs +++ b/niri-ipc/src/socket.rs @@ -1,7 +1,7 @@ //! Helper for blocking communication over the niri socket. use std::env; -use std::io::{self, Read, Write}; +use std::io::{self, BufRead, BufReader, Write}; use std::net::Shutdown; use std::os::unix::net::UnixStream; use std::path::Path; @@ -50,14 +50,16 @@ impl Socket { pub fn send(self, request: Request) -> io::Result { let Self { mut stream } = self; - let mut buf = serde_json::to_vec(&request).unwrap(); - stream.write_all(&buf)?; + let mut buf = serde_json::to_string(&request).unwrap(); + stream.write_all(buf.as_bytes())?; stream.shutdown(Shutdown::Write)?; - buf.clear(); - stream.read_to_end(&mut buf)?; + let mut reader = BufReader::new(stream); - let reply = serde_json::from_slice(&buf)?; + buf.clear(); + reader.read_line(&mut buf)?; + + let reply = serde_json::from_str(&buf)?; Ok(reply) } } diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 7987ca5..9a682bd 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -131,7 +131,8 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow: } } - let buf = serde_json::to_vec(&reply).context("error formatting reply")?; + let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?; + buf.push(b'\n'); write.write_all(&buf).await.context("error writing reply")?; Ok(()) From f0e5bbd103a6f10a4b4872d83bcbc8968ad43117 Mon Sep 17 00:00:00 2001 From: Ivan Molodetskikh Date: Thu, 20 Jun 2024 12:04:10 +0300 Subject: [PATCH 2/2] [WIP] Draft event stream IPC --- niri-ipc/src/lib.rs | 54 +++++++++++++ niri-ipc/src/socket.rs | 18 ++++- src/cli.rs | 2 + src/ipc/client.rs | 37 ++++++++- src/ipc/server.rs | 175 ++++++++++++++++++++++++++++++++++++++-- src/layout/mod.rs | 2 + src/layout/workspace.rs | 2 +- src/niri.rs | 6 +- 8 files changed, 280 insertions(+), 16 deletions(-) diff --git a/niri-ipc/src/lib.rs b/niri-ipc/src/lib.rs index 72e050e..978007d 100644 --- a/niri-ipc/src/lib.rs +++ b/niri-ipc/src/lib.rs @@ -35,6 +35,11 @@ pub enum Request { Workspaces, /// Request information about the focused output. FocusedOutput, + /// Start continuously receiving events from the compositor. + /// + /// The compositor should reply with `Reply::Ok(Response::Handled)`, then continuously send + /// [`Event`]s, one per line. + EventStream, /// Respond with an error (for testing error handling). ReturnError, } @@ -507,6 +512,10 @@ pub enum OutputConfigChanged { /// A workspace. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct Workspace { + /// Unique id of this workspace. + /// + /// This id remains constant regardless of the workspace moving around and across monitors. + pub id: u64, /// Index of the workspace on its monitor. /// /// This is the same index you can use for requests like `niri msg action focus-workspace`. @@ -521,6 +530,51 @@ pub struct Workspace { pub is_active: bool, } +/// A compositor event. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Event { + /// A new workspace was created. + WorkspaceCreated { + /// The new workspace. + workspace: Workspace, + }, + /// A workspace was removed. + WorkspaceRemoved { + /// Id of the removed workspace. + id: u64, + }, + /// A workspace was switched on an output. + /// + /// This doesn't mean the workspace became focused, just that it's now the active workspace on + /// its output. + WorkspaceSwitched { + /// Output where the workspace was switched. + output: String, + /// Id of the newly active workspace. + id: u64, + }, + /// A workspace moved on an output or to a different output. + WorkspaceMoved { + /// Id of the moved workspace. + id: u64, + /// New output of the workspace. + output: String, + /// New position of the workspace on the output. + idx: u8, + }, + /// Window focus changed. + WindowFocused { + // FIXME: replace with id, and WindowCreated/Removed. + /// The newly focused window, or `None` if no window is now focused. + window: Option, + }, + /// The keyboard layout changed. + KeyboardLayoutChanged { + /// Name of the newly active layout. + name: String, + }, +} + impl FromStr for WorkspaceReferenceArg { type Err = &'static str; diff --git a/niri-ipc/src/socket.rs b/niri-ipc/src/socket.rs index 3964f00..d629f1a 100644 --- a/niri-ipc/src/socket.rs +++ b/niri-ipc/src/socket.rs @@ -6,7 +6,7 @@ use std::net::Shutdown; use std::os::unix::net::UnixStream; use std::path::Path; -use crate::{Reply, Request}; +use crate::{Event, Reply, Request}; /// Name of the environment variable containing the niri IPC socket path. pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET"; @@ -47,7 +47,11 @@ impl Socket { /// * `Ok(Ok(response))`: successful [`Response`](crate::Response) from niri /// * `Ok(Err(message))`: error message from niri /// * `Err(error)`: error communicating with niri - pub fn send(self, request: Request) -> io::Result { + /// + /// This method also returns a blocking function that you can call to keep reading [`Event`]s + /// after requesting an [`EventStream`][Request::EventStream]. This function is not useful + /// otherwise. + pub fn send(self, request: Request) -> io::Result<(Reply, impl FnMut() -> io::Result)> { let Self { mut stream } = self; let mut buf = serde_json::to_string(&request).unwrap(); @@ -60,6 +64,14 @@ impl Socket { reader.read_line(&mut buf)?; let reply = serde_json::from_str(&buf)?; - Ok(reply) + + let events = move || { + buf.clear(); + reader.read_line(&mut buf)?; + let event = serde_json::from_str(&buf)?; + Ok(event) + }; + + Ok((reply, events)) } } diff --git a/src/cli.rs b/src/cli.rs index 52489ad..1240067 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -80,6 +80,8 @@ pub enum Msg { #[command(subcommand)] action: OutputAction, }, + /// Start continuously receiving events from the compositor. + EventStream, /// Print the version of the running niri instance. Version, /// Request an error from the running niri instance. diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 3192925..1f4e159 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, bail, Context}; use niri_ipc::{ - LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket, Transform, + Event, LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket, Transform, }; use serde_json::json; @@ -19,12 +19,13 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> { action: action.clone(), }, Msg::Workspaces => Request::Workspaces, + Msg::EventStream => Request::EventStream, Msg::RequestError => Request::ReturnError, }; let socket = Socket::connect().context("error connecting to the niri socket")?; - let reply = socket + let (reply, mut read_event) = socket .send(request) .context("error communicating with niri")?; @@ -35,6 +36,7 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> { Socket::connect() .and_then(|socket| socket.send(Request::Version)) .ok() + .map(|(reply, _read_event)| reply) } _ => None, }; @@ -238,6 +240,37 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> { println!("{is_active}{idx}{name}"); } } + Msg::EventStream => { + let Response::Handled = response else { + bail!("unexpected response: expected Handled, got {response:?}"); + }; + + println!("Started reading events."); + + loop { + let event = read_event().context("error reading event from niri")?; + match event { + Event::WorkspaceCreated { workspace } => { + println!("Workspace created: {workspace:?}"); + } + Event::WorkspaceRemoved { id } => { + println!("Workspace removed: {id}"); + } + Event::WorkspaceSwitched { output, id } => { + println!("Workspace switched on output \"{output}\": {id}"); + } + Event::WorkspaceMoved { id, output, idx } => { + println!("Workspace moved: {id} to output \"{output}\", index {idx}"); + } + Event::WindowFocused { window } => { + println!("Window focused: {window:?}"); + } + Event::KeyboardLayoutChanged { name } => { + println!("Keyboard layout changed: \"{name}\""); + } + } + } + } } Ok(()) diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 9a682bd..6561934 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -1,14 +1,18 @@ +use std::cell::RefCell; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::PathBuf; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::{env, io, process}; use anyhow::Context; +use async_channel::{Receiver, Sender, TrySendError}; +use calloop::futures::Scheduler; use calloop::io::Async; use directories::BaseDirs; use futures_util::io::{AsyncReadExt, BufReader}; -use futures_util::{AsyncBufReadExt, AsyncWriteExt}; -use niri_ipc::{OutputConfigChanged, Reply, Request, Response}; +use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _}; +use niri_ipc::{Event, OutputConfigChanged, Reply, Request, Response}; use smithay::desktop::Window; use smithay::reexports::calloop::generic::Generic; use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction}; @@ -20,14 +24,33 @@ use crate::backend::IpcOutputMap; use crate::niri::State; use crate::utils::version; +// If an event stream client fails to read events fast enough that we accumulate more than this +// number in our buffer, we drop that event stream client. +const EVENT_STREAM_BUFFER_SIZE: usize = 64; + pub struct IpcServer { pub socket_path: PathBuf, + event_streams: Rc>>, + focused_window: Arc>>, } struct ClientCtx { event_loop: LoopHandle<'static, State>, + scheduler: Scheduler<()>, ipc_outputs: Arc>, - ipc_focused_window: Arc>>, + focused_window: Arc>>, + event_streams: Rc>>, +} + +struct EventStreamClient { + events: Receiver, + disconnect: Receiver<()>, + write: Box, +} + +struct EventStreamSender { + events: Sender, + disconnect: Sender<()>, } impl IpcServer { @@ -59,7 +82,62 @@ impl IpcServer { }) .unwrap(); - Ok(Self { socket_path }) + Ok(Self { + socket_path, + event_streams: Rc::new(RefCell::new(Vec::new())), + focused_window: Arc::new(Mutex::new(None)), + }) + } + + fn send_event(&self, event: Event) { + let mut streams = self.event_streams.borrow_mut(); + let mut to_remove = Vec::new(); + for (idx, stream) in streams.iter_mut().enumerate() { + match stream.events.try_send(event.clone()) { + Ok(()) => (), + Err(TrySendError::Closed(_)) => to_remove.push(idx), + Err(TrySendError::Full(_)) => { + warn!( + "disconnecting IPC event stream client \ + because it is reading events too slowly" + ); + to_remove.push(idx); + } + } + } + + for idx in to_remove.into_iter().rev() { + let stream = streams.swap_remove(idx); + let _ = stream.disconnect.send_blocking(()); + } + } + + pub fn focused_window_changed(&self, focused_window: Option) { + let mut guard = self.focused_window.lock().unwrap(); + if *guard == focused_window { + return; + } + + guard.clone_from(&focused_window); + drop(guard); + + let window = focused_window.map(|window| { + let wl_surface = window.toplevel().expect("no X11 support").wl_surface(); + with_states(wl_surface, |states| { + let role = states + .data_map + .get::() + .unwrap() + .lock() + .unwrap(); + + niri_ipc::Window { + title: role.title.clone(), + app_id: role.app_id.clone(), + } + }) + }); + self.send_event(Event::WindowFocused { window }) } } @@ -89,10 +167,14 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { } }; + let ipc_server = state.niri.ipc_server.as_ref().unwrap(); + let ctx = ClientCtx { event_loop: state.niri.event_loop.clone(), + scheduler: state.niri.scheduler.clone(), ipc_outputs: state.backend.ipc_outputs(), - ipc_focused_window: state.niri.ipc_focused_window.clone(), + focused_window: ipc_server.focused_window.clone(), + event_streams: ipc_server.event_streams.clone(), }; let future = async move { @@ -105,7 +187,7 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { } } -async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> { +async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> { let (read, mut write) = stream.split(); let mut buf = String::new(); @@ -119,6 +201,7 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow: .context("error parsing request") .map_err(|err| err.to_string()); let requested_error = matches!(request, Ok(Request::ReturnError)); + let requested_event_stream = matches!(request, Ok(Request::EventStream)); let reply = match request { Ok(request) => process(&ctx, request).await, @@ -135,6 +218,56 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow: buf.push(b'\n'); write.write_all(&buf).await.context("error writing reply")?; + if requested_event_stream { + let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); + let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); + + // Spawn a task for the client. + let client = EventStreamClient { + events: events_rx, + disconnect: disconnect_rx, + write: Box::new(write) as _, + }; + let future = async move { + if let Err(err) = handle_event_stream_client(client).await { + warn!("error handling IPC event stream client: {err:?}"); + } + }; + if let Err(err) = ctx.scheduler.schedule(future) { + warn!("error scheduling IPC event stream future: {err:?}"); + } + + // Send the initial state. + let window = ctx.focused_window.lock().unwrap().clone(); + let window = window.map(|window| { + let wl_surface = window.toplevel().expect("no X11 support").wl_surface(); + with_states(wl_surface, |states| { + let role = states + .data_map + .get::() + .unwrap() + .lock() + .unwrap(); + + niri_ipc::Window { + title: role.title.clone(), + app_id: role.app_id.clone(), + } + }) + }); + events_tx.try_send(Event::WindowFocused { window }).unwrap(); + + // Add it to the list. + { + let mut streams = ctx.event_streams.borrow_mut(); + let sender = EventStreamSender { + events: events_tx, + disconnect: disconnect_tx, + }; + streams.push(sender); + } + } + Ok(()) } @@ -147,7 +280,7 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { Response::Outputs(ipc_outputs) } Request::FocusedWindow => { - let window = ctx.ipc_focused_window.lock().unwrap().clone(); + let window = ctx.focused_window.lock().unwrap().clone(); let window = window.map(|window| { let wl_surface = window.toplevel().expect("no X11 support").wl_surface(); with_states(wl_surface, |states| { @@ -234,7 +367,35 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { let output = result.map_err(|_| String::from("error getting active output info"))?; Response::FocusedOutput(output) } + Request::EventStream => Response::Handled, }; Ok(response) } + +async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> { + let EventStreamClient { + events, + disconnect, + mut write, + } = client; + + while let Ok(event) = events.recv().await { + let mut buf = serde_json::to_vec(&event).context("error formatting event")?; + buf.push(b'\n'); + + let res = select_biased! { + _ = disconnect.recv().fuse() => return Ok(()), + res = write.write_all(&buf).fuse() => res, + }; + + match res { + Ok(()) => (), + // Normal client disconnection. + Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + res @ Err(_) => res.context("error writing event")?, + } + } + + Ok(()) +} diff --git a/src/layout/mod.rs b/src/layout/mod.rs index b74ef72..2a1d98c 100644 --- a/src/layout/mod.rs +++ b/src/layout/mod.rs @@ -2369,6 +2369,7 @@ impl Layout { for monitor in monitors { for (idx, workspace) in monitor.workspaces.iter().enumerate() { workspaces.push(niri_ipc::Workspace { + id: u64::from(workspace.id().0), idx: u8::try_from(idx + 1).unwrap_or(u8::MAX), name: workspace.name.clone(), output: Some(monitor.output.name()), @@ -2383,6 +2384,7 @@ impl Layout { .iter() .enumerate() .map(|(idx, ws)| niri_ipc::Workspace { + id: u64::from(ws.id().0), idx: u8::try_from(idx + 1).unwrap_or(u8::MAX), name: ws.name.clone(), output: None, diff --git a/src/layout/workspace.rs b/src/layout/workspace.rs index 5044ece..c3889f9 100644 --- a/src/layout/workspace.rs +++ b/src/layout/workspace.rs @@ -122,7 +122,7 @@ pub struct OutputId(String); static WORKSPACE_ID_COUNTER: IdCounter = IdCounter::new(); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct WorkspaceId(u32); +pub struct WorkspaceId(pub u32); impl WorkspaceId { fn next() -> WorkspaceId { diff --git a/src/niri.rs b/src/niri.rs index 0a3c3e5..3f00bcc 100644 --- a/src/niri.rs +++ b/src/niri.rs @@ -270,7 +270,6 @@ pub struct Niri { pub ipc_server: Option, pub ipc_outputs_changed: bool, - pub ipc_focused_window: Arc>>, // Casts are dropped before PipeWire to prevent a double-free (yay). pub casts: Vec, @@ -791,7 +790,9 @@ impl State { } } - *self.niri.ipc_focused_window.lock().unwrap() = newly_focused_window; + if let Some(server) = &self.niri.ipc_server { + server.focused_window_changed(newly_focused_window); + } if let Some(grab) = self.niri.popup_grab.as_mut() { if Some(&grab.root) != focus.surface() { @@ -1581,7 +1582,6 @@ impl Niri { ipc_server, ipc_outputs_changed: false, - ipc_focused_window: Arc::new(Mutex::new(None)), pipewire, casts: vec![],