mirror of
https://github.com/YaLTeR/niri.git
synced 2024-10-05 16:27:51 +03:00
[WIP] Draft event stream IPC
This commit is contained in:
parent
212b262faa
commit
f0e5bbd103
@ -35,6 +35,11 @@ pub enum Request {
|
||||
Workspaces,
|
||||
/// Request information about the focused output.
|
||||
FocusedOutput,
|
||||
/// Start continuously receiving events from the compositor.
|
||||
///
|
||||
/// The compositor should reply with `Reply::Ok(Response::Handled)`, then continuously send
|
||||
/// [`Event`]s, one per line.
|
||||
EventStream,
|
||||
/// Respond with an error (for testing error handling).
|
||||
ReturnError,
|
||||
}
|
||||
@ -507,6 +512,10 @@ pub enum OutputConfigChanged {
|
||||
/// A workspace.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Workspace {
|
||||
/// Unique id of this workspace.
|
||||
///
|
||||
/// This id remains constant regardless of the workspace moving around and across monitors.
|
||||
pub id: u64,
|
||||
/// Index of the workspace on its monitor.
|
||||
///
|
||||
/// This is the same index you can use for requests like `niri msg action focus-workspace`.
|
||||
@ -521,6 +530,51 @@ pub struct Workspace {
|
||||
pub is_active: bool,
|
||||
}
|
||||
|
||||
/// A compositor event.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum Event {
|
||||
/// A new workspace was created.
|
||||
WorkspaceCreated {
|
||||
/// The new workspace.
|
||||
workspace: Workspace,
|
||||
},
|
||||
/// A workspace was removed.
|
||||
WorkspaceRemoved {
|
||||
/// Id of the removed workspace.
|
||||
id: u64,
|
||||
},
|
||||
/// A workspace was switched on an output.
|
||||
///
|
||||
/// This doesn't mean the workspace became focused, just that it's now the active workspace on
|
||||
/// its output.
|
||||
WorkspaceSwitched {
|
||||
/// Output where the workspace was switched.
|
||||
output: String,
|
||||
/// Id of the newly active workspace.
|
||||
id: u64,
|
||||
},
|
||||
/// A workspace moved on an output or to a different output.
|
||||
WorkspaceMoved {
|
||||
/// Id of the moved workspace.
|
||||
id: u64,
|
||||
/// New output of the workspace.
|
||||
output: String,
|
||||
/// New position of the workspace on the output.
|
||||
idx: u8,
|
||||
},
|
||||
/// Window focus changed.
|
||||
WindowFocused {
|
||||
// FIXME: replace with id, and WindowCreated/Removed.
|
||||
/// The newly focused window, or `None` if no window is now focused.
|
||||
window: Option<Window>,
|
||||
},
|
||||
/// The keyboard layout changed.
|
||||
KeyboardLayoutChanged {
|
||||
/// Name of the newly active layout.
|
||||
name: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl FromStr for WorkspaceReferenceArg {
|
||||
type Err = &'static str;
|
||||
|
||||
|
@ -6,7 +6,7 @@ use std::net::Shutdown;
|
||||
use std::os::unix::net::UnixStream;
|
||||
use std::path::Path;
|
||||
|
||||
use crate::{Reply, Request};
|
||||
use crate::{Event, Reply, Request};
|
||||
|
||||
/// Name of the environment variable containing the niri IPC socket path.
|
||||
pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET";
|
||||
@ -47,7 +47,11 @@ impl Socket {
|
||||
/// * `Ok(Ok(response))`: successful [`Response`](crate::Response) from niri
|
||||
/// * `Ok(Err(message))`: error message from niri
|
||||
/// * `Err(error)`: error communicating with niri
|
||||
pub fn send(self, request: Request) -> io::Result<Reply> {
|
||||
///
|
||||
/// This method also returns a blocking function that you can call to keep reading [`Event`]s
|
||||
/// after requesting an [`EventStream`][Request::EventStream]. This function is not useful
|
||||
/// otherwise.
|
||||
pub fn send(self, request: Request) -> io::Result<(Reply, impl FnMut() -> io::Result<Event>)> {
|
||||
let Self { mut stream } = self;
|
||||
|
||||
let mut buf = serde_json::to_string(&request).unwrap();
|
||||
@ -60,6 +64,14 @@ impl Socket {
|
||||
reader.read_line(&mut buf)?;
|
||||
|
||||
let reply = serde_json::from_str(&buf)?;
|
||||
Ok(reply)
|
||||
|
||||
let events = move || {
|
||||
buf.clear();
|
||||
reader.read_line(&mut buf)?;
|
||||
let event = serde_json::from_str(&buf)?;
|
||||
Ok(event)
|
||||
};
|
||||
|
||||
Ok((reply, events))
|
||||
}
|
||||
}
|
||||
|
@ -80,6 +80,8 @@ pub enum Msg {
|
||||
#[command(subcommand)]
|
||||
action: OutputAction,
|
||||
},
|
||||
/// Start continuously receiving events from the compositor.
|
||||
EventStream,
|
||||
/// Print the version of the running niri instance.
|
||||
Version,
|
||||
/// Request an error from the running niri instance.
|
||||
|
@ -1,6 +1,6 @@
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use niri_ipc::{
|
||||
LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket, Transform,
|
||||
Event, LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket, Transform,
|
||||
};
|
||||
use serde_json::json;
|
||||
|
||||
@ -19,12 +19,13 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
|
||||
action: action.clone(),
|
||||
},
|
||||
Msg::Workspaces => Request::Workspaces,
|
||||
Msg::EventStream => Request::EventStream,
|
||||
Msg::RequestError => Request::ReturnError,
|
||||
};
|
||||
|
||||
let socket = Socket::connect().context("error connecting to the niri socket")?;
|
||||
|
||||
let reply = socket
|
||||
let (reply, mut read_event) = socket
|
||||
.send(request)
|
||||
.context("error communicating with niri")?;
|
||||
|
||||
@ -35,6 +36,7 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
|
||||
Socket::connect()
|
||||
.and_then(|socket| socket.send(Request::Version))
|
||||
.ok()
|
||||
.map(|(reply, _read_event)| reply)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
@ -238,6 +240,37 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
|
||||
println!("{is_active}{idx}{name}");
|
||||
}
|
||||
}
|
||||
Msg::EventStream => {
|
||||
let Response::Handled = response else {
|
||||
bail!("unexpected response: expected Handled, got {response:?}");
|
||||
};
|
||||
|
||||
println!("Started reading events.");
|
||||
|
||||
loop {
|
||||
let event = read_event().context("error reading event from niri")?;
|
||||
match event {
|
||||
Event::WorkspaceCreated { workspace } => {
|
||||
println!("Workspace created: {workspace:?}");
|
||||
}
|
||||
Event::WorkspaceRemoved { id } => {
|
||||
println!("Workspace removed: {id}");
|
||||
}
|
||||
Event::WorkspaceSwitched { output, id } => {
|
||||
println!("Workspace switched on output \"{output}\": {id}");
|
||||
}
|
||||
Event::WorkspaceMoved { id, output, idx } => {
|
||||
println!("Workspace moved: {id} to output \"{output}\", index {idx}");
|
||||
}
|
||||
Event::WindowFocused { window } => {
|
||||
println!("Window focused: {window:?}");
|
||||
}
|
||||
Event::KeyboardLayoutChanged { name } => {
|
||||
println!("Keyboard layout changed: \"{name}\"");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -1,14 +1,18 @@
|
||||
use std::cell::RefCell;
|
||||
use std::os::unix::net::{UnixListener, UnixStream};
|
||||
use std::path::PathBuf;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{env, io, process};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_channel::{Receiver, Sender, TrySendError};
|
||||
use calloop::futures::Scheduler;
|
||||
use calloop::io::Async;
|
||||
use directories::BaseDirs;
|
||||
use futures_util::io::{AsyncReadExt, BufReader};
|
||||
use futures_util::{AsyncBufReadExt, AsyncWriteExt};
|
||||
use niri_ipc::{OutputConfigChanged, Reply, Request, Response};
|
||||
use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _};
|
||||
use niri_ipc::{Event, OutputConfigChanged, Reply, Request, Response};
|
||||
use smithay::desktop::Window;
|
||||
use smithay::reexports::calloop::generic::Generic;
|
||||
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
|
||||
@ -20,14 +24,33 @@ use crate::backend::IpcOutputMap;
|
||||
use crate::niri::State;
|
||||
use crate::utils::version;
|
||||
|
||||
// If an event stream client fails to read events fast enough that we accumulate more than this
|
||||
// number in our buffer, we drop that event stream client.
|
||||
const EVENT_STREAM_BUFFER_SIZE: usize = 64;
|
||||
|
||||
pub struct IpcServer {
|
||||
pub socket_path: PathBuf,
|
||||
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
|
||||
focused_window: Arc<Mutex<Option<Window>>>,
|
||||
}
|
||||
|
||||
struct ClientCtx {
|
||||
event_loop: LoopHandle<'static, State>,
|
||||
scheduler: Scheduler<()>,
|
||||
ipc_outputs: Arc<Mutex<IpcOutputMap>>,
|
||||
ipc_focused_window: Arc<Mutex<Option<Window>>>,
|
||||
focused_window: Arc<Mutex<Option<Window>>>,
|
||||
event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
|
||||
}
|
||||
|
||||
struct EventStreamClient {
|
||||
events: Receiver<Event>,
|
||||
disconnect: Receiver<()>,
|
||||
write: Box<dyn AsyncWrite + Unpin>,
|
||||
}
|
||||
|
||||
struct EventStreamSender {
|
||||
events: Sender<Event>,
|
||||
disconnect: Sender<()>,
|
||||
}
|
||||
|
||||
impl IpcServer {
|
||||
@ -59,7 +82,62 @@ impl IpcServer {
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Ok(Self { socket_path })
|
||||
Ok(Self {
|
||||
socket_path,
|
||||
event_streams: Rc::new(RefCell::new(Vec::new())),
|
||||
focused_window: Arc::new(Mutex::new(None)),
|
||||
})
|
||||
}
|
||||
|
||||
fn send_event(&self, event: Event) {
|
||||
let mut streams = self.event_streams.borrow_mut();
|
||||
let mut to_remove = Vec::new();
|
||||
for (idx, stream) in streams.iter_mut().enumerate() {
|
||||
match stream.events.try_send(event.clone()) {
|
||||
Ok(()) => (),
|
||||
Err(TrySendError::Closed(_)) => to_remove.push(idx),
|
||||
Err(TrySendError::Full(_)) => {
|
||||
warn!(
|
||||
"disconnecting IPC event stream client \
|
||||
because it is reading events too slowly"
|
||||
);
|
||||
to_remove.push(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for idx in to_remove.into_iter().rev() {
|
||||
let stream = streams.swap_remove(idx);
|
||||
let _ = stream.disconnect.send_blocking(());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn focused_window_changed(&self, focused_window: Option<Window>) {
|
||||
let mut guard = self.focused_window.lock().unwrap();
|
||||
if *guard == focused_window {
|
||||
return;
|
||||
}
|
||||
|
||||
guard.clone_from(&focused_window);
|
||||
drop(guard);
|
||||
|
||||
let window = focused_window.map(|window| {
|
||||
let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
|
||||
with_states(wl_surface, |states| {
|
||||
let role = states
|
||||
.data_map
|
||||
.get::<XdgToplevelSurfaceData>()
|
||||
.unwrap()
|
||||
.lock()
|
||||
.unwrap();
|
||||
|
||||
niri_ipc::Window {
|
||||
title: role.title.clone(),
|
||||
app_id: role.app_id.clone(),
|
||||
}
|
||||
})
|
||||
});
|
||||
self.send_event(Event::WindowFocused { window })
|
||||
}
|
||||
}
|
||||
|
||||
@ -89,10 +167,14 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
|
||||
}
|
||||
};
|
||||
|
||||
let ipc_server = state.niri.ipc_server.as_ref().unwrap();
|
||||
|
||||
let ctx = ClientCtx {
|
||||
event_loop: state.niri.event_loop.clone(),
|
||||
scheduler: state.niri.scheduler.clone(),
|
||||
ipc_outputs: state.backend.ipc_outputs(),
|
||||
ipc_focused_window: state.niri.ipc_focused_window.clone(),
|
||||
focused_window: ipc_server.focused_window.clone(),
|
||||
event_streams: ipc_server.event_streams.clone(),
|
||||
};
|
||||
|
||||
let future = async move {
|
||||
@ -105,7 +187,7 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> {
|
||||
async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
|
||||
let (read, mut write) = stream.split();
|
||||
let mut buf = String::new();
|
||||
|
||||
@ -119,6 +201,7 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
|
||||
.context("error parsing request")
|
||||
.map_err(|err| err.to_string());
|
||||
let requested_error = matches!(request, Ok(Request::ReturnError));
|
||||
let requested_event_stream = matches!(request, Ok(Request::EventStream));
|
||||
|
||||
let reply = match request {
|
||||
Ok(request) => process(&ctx, request).await,
|
||||
@ -135,6 +218,56 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
|
||||
buf.push(b'\n');
|
||||
write.write_all(&buf).await.context("error writing reply")?;
|
||||
|
||||
if requested_event_stream {
|
||||
let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
|
||||
let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
|
||||
|
||||
// Spawn a task for the client.
|
||||
let client = EventStreamClient {
|
||||
events: events_rx,
|
||||
disconnect: disconnect_rx,
|
||||
write: Box::new(write) as _,
|
||||
};
|
||||
let future = async move {
|
||||
if let Err(err) = handle_event_stream_client(client).await {
|
||||
warn!("error handling IPC event stream client: {err:?}");
|
||||
}
|
||||
};
|
||||
if let Err(err) = ctx.scheduler.schedule(future) {
|
||||
warn!("error scheduling IPC event stream future: {err:?}");
|
||||
}
|
||||
|
||||
// Send the initial state.
|
||||
let window = ctx.focused_window.lock().unwrap().clone();
|
||||
let window = window.map(|window| {
|
||||
let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
|
||||
with_states(wl_surface, |states| {
|
||||
let role = states
|
||||
.data_map
|
||||
.get::<XdgToplevelSurfaceData>()
|
||||
.unwrap()
|
||||
.lock()
|
||||
.unwrap();
|
||||
|
||||
niri_ipc::Window {
|
||||
title: role.title.clone(),
|
||||
app_id: role.app_id.clone(),
|
||||
}
|
||||
})
|
||||
});
|
||||
events_tx.try_send(Event::WindowFocused { window }).unwrap();
|
||||
|
||||
// Add it to the list.
|
||||
{
|
||||
let mut streams = ctx.event_streams.borrow_mut();
|
||||
let sender = EventStreamSender {
|
||||
events: events_tx,
|
||||
disconnect: disconnect_tx,
|
||||
};
|
||||
streams.push(sender);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -147,7 +280,7 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
|
||||
Response::Outputs(ipc_outputs)
|
||||
}
|
||||
Request::FocusedWindow => {
|
||||
let window = ctx.ipc_focused_window.lock().unwrap().clone();
|
||||
let window = ctx.focused_window.lock().unwrap().clone();
|
||||
let window = window.map(|window| {
|
||||
let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
|
||||
with_states(wl_surface, |states| {
|
||||
@ -234,7 +367,35 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
|
||||
let output = result.map_err(|_| String::from("error getting active output info"))?;
|
||||
Response::FocusedOutput(output)
|
||||
}
|
||||
Request::EventStream => Response::Handled,
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> {
|
||||
let EventStreamClient {
|
||||
events,
|
||||
disconnect,
|
||||
mut write,
|
||||
} = client;
|
||||
|
||||
while let Ok(event) = events.recv().await {
|
||||
let mut buf = serde_json::to_vec(&event).context("error formatting event")?;
|
||||
buf.push(b'\n');
|
||||
|
||||
let res = select_biased! {
|
||||
_ = disconnect.recv().fuse() => return Ok(()),
|
||||
res = write.write_all(&buf).fuse() => res,
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(()) => (),
|
||||
// Normal client disconnection.
|
||||
Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
|
||||
res @ Err(_) => res.context("error writing event")?,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -2369,6 +2369,7 @@ impl<W: LayoutElement> Layout<W> {
|
||||
for monitor in monitors {
|
||||
for (idx, workspace) in monitor.workspaces.iter().enumerate() {
|
||||
workspaces.push(niri_ipc::Workspace {
|
||||
id: u64::from(workspace.id().0),
|
||||
idx: u8::try_from(idx + 1).unwrap_or(u8::MAX),
|
||||
name: workspace.name.clone(),
|
||||
output: Some(monitor.output.name()),
|
||||
@ -2383,6 +2384,7 @@ impl<W: LayoutElement> Layout<W> {
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, ws)| niri_ipc::Workspace {
|
||||
id: u64::from(ws.id().0),
|
||||
idx: u8::try_from(idx + 1).unwrap_or(u8::MAX),
|
||||
name: ws.name.clone(),
|
||||
output: None,
|
||||
|
@ -122,7 +122,7 @@ pub struct OutputId(String);
|
||||
static WORKSPACE_ID_COUNTER: IdCounter = IdCounter::new();
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct WorkspaceId(u32);
|
||||
pub struct WorkspaceId(pub u32);
|
||||
|
||||
impl WorkspaceId {
|
||||
fn next() -> WorkspaceId {
|
||||
|
@ -270,7 +270,6 @@ pub struct Niri {
|
||||
|
||||
pub ipc_server: Option<IpcServer>,
|
||||
pub ipc_outputs_changed: bool,
|
||||
pub ipc_focused_window: Arc<Mutex<Option<Window>>>,
|
||||
|
||||
// Casts are dropped before PipeWire to prevent a double-free (yay).
|
||||
pub casts: Vec<Cast>,
|
||||
@ -791,7 +790,9 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
*self.niri.ipc_focused_window.lock().unwrap() = newly_focused_window;
|
||||
if let Some(server) = &self.niri.ipc_server {
|
||||
server.focused_window_changed(newly_focused_window);
|
||||
}
|
||||
|
||||
if let Some(grab) = self.niri.popup_grab.as_mut() {
|
||||
if Some(&grab.root) != focus.surface() {
|
||||
@ -1581,7 +1582,6 @@ impl Niri {
|
||||
|
||||
ipc_server,
|
||||
ipc_outputs_changed: false,
|
||||
ipc_focused_window: Arc::new(Mutex::new(None)),
|
||||
|
||||
pipewire,
|
||||
casts: vec![],
|
||||
|
Loading…
Reference in New Issue
Block a user