This commit is contained in:
Ivan Molodetskikh 2024-07-12 17:43:39 +09:00 committed by GitHub
commit ac0c587391
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 290 additions and 23 deletions

View File

@ -36,6 +36,11 @@ pub enum Request {
Workspaces,
/// Request information about the focused output.
FocusedOutput,
/// Start continuously receiving events from the compositor.
///
/// The compositor should reply with `Reply::Ok(Response::Handled)`, then continuously send
/// [`Event`]s, one per line.
EventStream,
/// Respond with an error (for testing error handling).
ReturnError,
}
@ -538,6 +543,10 @@ pub enum OutputConfigChanged {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))]
pub struct Workspace {
/// Unique id of this workspace.
///
/// This id remains constant regardless of the workspace moving around and across monitors.
pub id: u64,
/// Index of the workspace on its monitor.
///
/// This is the same index you can use for requests like `niri msg action focus-workspace`.
@ -552,6 +561,51 @@ pub struct Workspace {
pub is_active: bool,
}
/// A compositor event.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Event {
/// A new workspace was created.
WorkspaceCreated {
/// The new workspace.
workspace: Workspace,
},
/// A workspace was removed.
WorkspaceRemoved {
/// Id of the removed workspace.
id: u64,
},
/// A workspace was switched on an output.
///
/// This doesn't mean the workspace became focused, just that it's now the active workspace on
/// its output.
WorkspaceSwitched {
/// Output where the workspace was switched.
output: String,
/// Id of the newly active workspace.
id: u64,
},
/// A workspace moved on an output or to a different output.
WorkspaceMoved {
/// Id of the moved workspace.
id: u64,
/// New output of the workspace.
output: String,
/// New position of the workspace on the output.
idx: u8,
},
/// Window focus changed.
WindowFocused {
// FIXME: replace with id, and WindowCreated/Removed.
/// The newly focused window, or `None` if no window is now focused.
window: Option<Window>,
},
/// The keyboard layout changed.
KeyboardLayoutChanged {
/// Name of the newly active layout.
name: String,
},
}
impl FromStr for WorkspaceReferenceArg {
type Err = &'static str;

View File

@ -1,12 +1,12 @@
//! Helper for blocking communication over the niri socket.
use std::env;
use std::io::{self, Read, Write};
use std::io::{self, BufRead, BufReader, Write};
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::path::Path;
use crate::{Reply, Request};
use crate::{Event, Reply, Request};
/// Name of the environment variable containing the niri IPC socket path.
pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET";
@ -47,17 +47,31 @@ impl Socket {
/// * `Ok(Ok(response))`: successful [`Response`](crate::Response) from niri
/// * `Ok(Err(message))`: error message from niri
/// * `Err(error)`: error communicating with niri
pub fn send(self, request: Request) -> io::Result<Reply> {
///
/// This method also returns a blocking function that you can call to keep reading [`Event`]s
/// after requesting an [`EventStream`][Request::EventStream]. This function is not useful
/// otherwise.
pub fn send(self, request: Request) -> io::Result<(Reply, impl FnMut() -> io::Result<Event>)> {
let Self { mut stream } = self;
let mut buf = serde_json::to_vec(&request).unwrap();
stream.write_all(&buf)?;
let mut buf = serde_json::to_string(&request).unwrap();
stream.write_all(buf.as_bytes())?;
stream.shutdown(Shutdown::Write)?;
buf.clear();
stream.read_to_end(&mut buf)?;
let mut reader = BufReader::new(stream);
let reply = serde_json::from_slice(&buf)?;
Ok(reply)
buf.clear();
reader.read_line(&mut buf)?;
let reply = serde_json::from_str(&buf)?;
let events = move || {
buf.clear();
reader.read_line(&mut buf)?;
let event = serde_json::from_str(&buf)?;
Ok(event)
};
Ok((reply, events))
}
}

View File

@ -86,6 +86,8 @@ pub enum Msg {
#[command(subcommand)]
action: OutputAction,
},
/// Start continuously receiving events from the compositor.
EventStream,
/// Print the version of the running niri instance.
Version,
/// Request an error from the running niri instance.

View File

@ -1,6 +1,6 @@
use anyhow::{anyhow, bail, Context};
use niri_ipc::{
LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket, Transform,
Event, LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket, Transform,
};
use serde_json::json;
@ -19,12 +19,13 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
action: action.clone(),
},
Msg::Workspaces => Request::Workspaces,
Msg::EventStream => Request::EventStream,
Msg::RequestError => Request::ReturnError,
};
let socket = Socket::connect().context("error connecting to the niri socket")?;
let reply = socket
let (reply, mut read_event) = socket
.send(request)
.context("error communicating with niri")?;
@ -35,6 +36,7 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
Socket::connect()
.and_then(|socket| socket.send(Request::Version))
.ok()
.map(|(reply, _read_event)| reply)
}
_ => None,
};
@ -238,6 +240,37 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
println!("{is_active}{idx}{name}");
}
}
Msg::EventStream => {
let Response::Handled = response else {
bail!("unexpected response: expected Handled, got {response:?}");
};
println!("Started reading events.");
loop {
let event = read_event().context("error reading event from niri")?;
match event {
Event::WorkspaceCreated { workspace } => {
println!("Workspace created: {workspace:?}");
}
Event::WorkspaceRemoved { id } => {
println!("Workspace removed: {id}");
}
Event::WorkspaceSwitched { output, id } => {
println!("Workspace switched on output \"{output}\": {id}");
}
Event::WorkspaceMoved { id, output, idx } => {
println!("Workspace moved: {id} to output \"{output}\", index {idx}");
}
Event::WindowFocused { window } => {
println!("Window focused: {window:?}");
}
Event::KeyboardLayoutChanged { name } => {
println!("Keyboard layout changed: \"{name}\"");
}
}
}
}
}
Ok(())

View File

@ -1,14 +1,18 @@
use std::cell::RefCell;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::{env, io, process};
use anyhow::Context;
use async_channel::{Receiver, Sender, TrySendError};
use calloop::futures::Scheduler;
use calloop::io::Async;
use directories::BaseDirs;
use futures_util::io::{AsyncReadExt, BufReader};
use futures_util::{AsyncBufReadExt, AsyncWriteExt};
use niri_ipc::{OutputConfigChanged, Reply, Request, Response};
use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _};
use niri_ipc::{Event, OutputConfigChanged, Reply, Request, Response};
use smithay::desktop::Window;
use smithay::reexports::calloop::generic::Generic;
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
@ -20,14 +24,33 @@ use crate::backend::IpcOutputMap;
use crate::niri::State;
use crate::utils::version;
// If an event stream client fails to read events fast enough that we accumulate more than this
// number in our buffer, we drop that event stream client.
const EVENT_STREAM_BUFFER_SIZE: usize = 64;
pub struct IpcServer {
pub socket_path: PathBuf,
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
focused_window: Arc<Mutex<Option<Window>>>,
}
struct ClientCtx {
event_loop: LoopHandle<'static, State>,
scheduler: Scheduler<()>,
ipc_outputs: Arc<Mutex<IpcOutputMap>>,
ipc_focused_window: Arc<Mutex<Option<Window>>>,
focused_window: Arc<Mutex<Option<Window>>>,
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
}
struct EventStreamClient {
events: Receiver<Event>,
disconnect: Receiver<()>,
write: Box<dyn AsyncWrite + Unpin>,
}
struct EventStreamSender {
events: Sender<Event>,
disconnect: Sender<()>,
}
impl IpcServer {
@ -59,7 +82,62 @@ impl IpcServer {
})
.unwrap();
Ok(Self { socket_path })
Ok(Self {
socket_path,
event_streams: Rc::new(RefCell::new(Vec::new())),
focused_window: Arc::new(Mutex::new(None)),
})
}
fn send_event(&self, event: Event) {
let mut streams = self.event_streams.borrow_mut();
let mut to_remove = Vec::new();
for (idx, stream) in streams.iter_mut().enumerate() {
match stream.events.try_send(event.clone()) {
Ok(()) => (),
Err(TrySendError::Closed(_)) => to_remove.push(idx),
Err(TrySendError::Full(_)) => {
warn!(
"disconnecting IPC event stream client \
because it is reading events too slowly"
);
to_remove.push(idx);
}
}
}
for idx in to_remove.into_iter().rev() {
let stream = streams.swap_remove(idx);
let _ = stream.disconnect.send_blocking(());
}
}
pub fn focused_window_changed(&self, focused_window: Option<Window>) {
let mut guard = self.focused_window.lock().unwrap();
if *guard == focused_window {
return;
}
guard.clone_from(&focused_window);
drop(guard);
let window = focused_window.map(|window| {
let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
with_states(wl_surface, |states| {
let role = states
.data_map
.get::<XdgToplevelSurfaceData>()
.unwrap()
.lock()
.unwrap();
niri_ipc::Window {
title: role.title.clone(),
app_id: role.app_id.clone(),
}
})
});
self.send_event(Event::WindowFocused { window })
}
}
@ -89,10 +167,14 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
}
};
let ipc_server = state.niri.ipc_server.as_ref().unwrap();
let ctx = ClientCtx {
event_loop: state.niri.event_loop.clone(),
scheduler: state.niri.scheduler.clone(),
ipc_outputs: state.backend.ipc_outputs(),
ipc_focused_window: state.niri.ipc_focused_window.clone(),
focused_window: ipc_server.focused_window.clone(),
event_streams: ipc_server.event_streams.clone(),
};
let future = async move {
@ -105,7 +187,7 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
}
}
async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> {
async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
let (read, mut write) = stream.split();
let mut buf = String::new();
@ -119,6 +201,7 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
.context("error parsing request")
.map_err(|err| err.to_string());
let requested_error = matches!(request, Ok(Request::ReturnError));
let requested_event_stream = matches!(request, Ok(Request::EventStream));
let reply = match request {
Ok(request) => process(&ctx, request).await,
@ -131,9 +214,60 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
}
}
let buf = serde_json::to_vec(&reply).context("error formatting reply")?;
let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?;
buf.push(b'\n');
write.write_all(&buf).await.context("error writing reply")?;
if requested_event_stream {
let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
// Spawn a task for the client.
let client = EventStreamClient {
events: events_rx,
disconnect: disconnect_rx,
write: Box::new(write) as _,
};
let future = async move {
if let Err(err) = handle_event_stream_client(client).await {
warn!("error handling IPC event stream client: {err:?}");
}
};
if let Err(err) = ctx.scheduler.schedule(future) {
warn!("error scheduling IPC event stream future: {err:?}");
}
// Send the initial state.
let window = ctx.focused_window.lock().unwrap().clone();
let window = window.map(|window| {
let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
with_states(wl_surface, |states| {
let role = states
.data_map
.get::<XdgToplevelSurfaceData>()
.unwrap()
.lock()
.unwrap();
niri_ipc::Window {
title: role.title.clone(),
app_id: role.app_id.clone(),
}
})
});
events_tx.try_send(Event::WindowFocused { window }).unwrap();
// Add it to the list.
{
let mut streams = ctx.event_streams.borrow_mut();
let sender = EventStreamSender {
events: events_tx,
disconnect: disconnect_tx,
};
streams.push(sender);
}
}
Ok(())
}
@ -147,7 +281,7 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
Response::Outputs(outputs.collect())
}
Request::FocusedWindow => {
let window = ctx.ipc_focused_window.lock().unwrap().clone();
let window = ctx.focused_window.lock().unwrap().clone();
let window = window.map(|window| {
let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
with_states(wl_surface, |states| {
@ -235,7 +369,35 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
let output = result.map_err(|_| String::from("error getting active output info"))?;
Response::FocusedOutput(output)
}
Request::EventStream => Response::Handled,
};
Ok(response)
}
async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> {
let EventStreamClient {
events,
disconnect,
mut write,
} = client;
while let Ok(event) = events.recv().await {
let mut buf = serde_json::to_vec(&event).context("error formatting event")?;
buf.push(b'\n');
let res = select_biased! {
_ = disconnect.recv().fuse() => return Ok(()),
res = write.write_all(&buf).fuse() => res,
};
match res {
Ok(()) => (),
// Normal client disconnection.
Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
res @ Err(_) => res.context("error writing event")?,
}
}
Ok(())
}

View File

@ -2483,6 +2483,7 @@ impl<W: LayoutElement> Layout<W> {
for monitor in monitors {
for (idx, workspace) in monitor.workspaces.iter().enumerate() {
workspaces.push(niri_ipc::Workspace {
id: u64::from(workspace.id().0),
idx: u8::try_from(idx + 1).unwrap_or(u8::MAX),
name: workspace.name.clone(),
output: Some(monitor.output.name()),
@ -2497,6 +2498,7 @@ impl<W: LayoutElement> Layout<W> {
.iter()
.enumerate()
.map(|(idx, ws)| niri_ipc::Workspace {
id: u64::from(ws.id().0),
idx: u8::try_from(idx + 1).unwrap_or(u8::MAX),
name: ws.name.clone(),
output: None,

View File

@ -122,7 +122,7 @@ pub struct OutputId(String);
static WORKSPACE_ID_COUNTER: IdCounter = IdCounter::new();
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WorkspaceId(u32);
pub struct WorkspaceId(pub u32);
impl WorkspaceId {
fn next() -> WorkspaceId {

View File

@ -279,7 +279,6 @@ pub struct Niri {
pub ipc_server: Option<IpcServer>,
pub ipc_outputs_changed: bool,
pub ipc_focused_window: Arc<Mutex<Option<Window>>>,
// Casts are dropped before PipeWire to prevent a double-free (yay).
pub casts: Vec<Cast>,
@ -834,7 +833,9 @@ impl State {
}
}
*self.niri.ipc_focused_window.lock().unwrap() = newly_focused_window;
if let Some(server) = &self.niri.ipc_server {
server.focused_window_changed(newly_focused_window);
}
if let Some(grab) = self.niri.popup_grab.as_mut() {
if Some(&grab.root) != focus.surface() {
@ -1774,7 +1775,6 @@ impl Niri {
ipc_server,
ipc_outputs_changed: false,
ipc_focused_window: Arc::new(Mutex::new(None)),
pipewire,
casts: vec![],