aboutsummaryrefslogtreecommitdiff
path: root/src/ipc/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipc/server.rs')
-rw-r--r--src/ipc/server.rs416
1 files changed, 371 insertions, 45 deletions
diff --git a/src/ipc/server.rs b/src/ipc/server.rs
index 5d2c8524..af2c4fa2 100644
--- a/src/ipc/server.rs
+++ b/src/ipc/server.rs
@@ -1,15 +1,20 @@
+use std::cell::RefCell;
+use std::collections::HashSet;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
+use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::{env, io, process};
use anyhow::Context;
+use async_channel::{Receiver, Sender, TrySendError};
+use calloop::futures::Scheduler;
use calloop::io::Async;
use directories::BaseDirs;
use futures_util::io::{AsyncReadExt, BufReader};
-use futures_util::{AsyncBufReadExt, AsyncWriteExt};
-use niri_ipc::{KeyboardLayouts, OutputConfigChanged, Reply, Request, Response};
-use smithay::desktop::Window;
+use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _};
+use niri_ipc::state::{EventStreamState, EventStreamStatePart as _};
+use niri_ipc::{Event, KeyboardLayouts, OutputConfigChanged, Reply, Request, Response, Workspace};
use smithay::input::keyboard::XkbContextHandler;
use smithay::reexports::calloop::generic::Generic;
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
@@ -18,17 +23,38 @@ use smithay::wayland::compositor::with_states;
use smithay::wayland::shell::xdg::XdgToplevelSurfaceData;
use crate::backend::IpcOutputMap;
+use crate::layout::workspace::WorkspaceId;
use crate::niri::State;
use crate::utils::version;
+use crate::window::Mapped;
+
+// If an event stream client fails to read events fast enough that we accumulate more than this
+// number in our buffer, we drop that event stream client.
+const EVENT_STREAM_BUFFER_SIZE: usize = 64;
pub struct IpcServer {
pub socket_path: PathBuf,
+ event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
+ event_stream_state: Rc<RefCell<EventStreamState>>,
}
struct ClientCtx {
event_loop: LoopHandle<'static, State>,
+ scheduler: Scheduler<()>,
ipc_outputs: Arc<Mutex<IpcOutputMap>>,
- ipc_focused_window: Arc<Mutex<Option<Window>>>,
+ event_streams: Rc<RefCell<Vec<EventStreamSender>>>,
+ event_stream_state: Rc<RefCell<EventStreamState>>,
+}
+
+struct EventStreamClient {
+ events: Receiver<Event>,
+ disconnect: Receiver<()>,
+ write: Box<dyn AsyncWrite + Unpin>,
+}
+
+struct EventStreamSender {
+ events: Sender<Event>,
+ disconnect: Sender<()>,
}
impl IpcServer {
@@ -60,7 +86,43 @@ impl IpcServer {
})
.unwrap();
- Ok(Self { socket_path })
+ Ok(Self {
+ socket_path,
+ event_streams: Rc::new(RefCell::new(Vec::new())),
+ event_stream_state: Rc::new(RefCell::new(EventStreamState::default())),
+ })
+ }
+
+ fn send_event(&self, event: Event) {
+ let mut streams = self.event_streams.borrow_mut();
+ let mut to_remove = Vec::new();
+ for (idx, stream) in streams.iter_mut().enumerate() {
+ match stream.events.try_send(event.clone()) {
+ Ok(()) => (),
+ Err(TrySendError::Closed(_)) => to_remove.push(idx),
+ Err(TrySendError::Full(_)) => {
+ warn!(
+ "disconnecting IPC event stream client \
+ because it is reading events too slowly"
+ );
+ to_remove.push(idx);
+ }
+ }
+ }
+
+ for idx in to_remove.into_iter().rev() {
+ let stream = streams.swap_remove(idx);
+ let _ = stream.disconnect.send_blocking(());
+ }
+ }
+
+ pub fn keyboard_layout_switched(&self, new_idx: u8) {
+ let mut state = self.event_stream_state.borrow_mut();
+ let state = &mut state.keyboard_layouts;
+
+ let event = Event::KeyboardLayoutSwitched { idx: new_idx };
+ state.apply(event.clone());
+ self.send_event(event);
}
}
@@ -90,10 +152,14 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
}
};
+ let ipc_server = state.niri.ipc_server.as_ref().unwrap();
+
let ctx = ClientCtx {
event_loop: state.niri.event_loop.clone(),
+ scheduler: state.niri.scheduler.clone(),
ipc_outputs: state.backend.ipc_outputs(),
- ipc_focused_window: state.niri.ipc_focused_window.clone(),
+ event_streams: ipc_server.event_streams.clone(),
+ event_stream_state: ipc_server.event_stream_state.clone(),
};
let future = async move {
@@ -106,7 +172,7 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
}
}
-async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> {
+async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
let (read, mut write) = stream.split();
let mut buf = String::new();
@@ -120,6 +186,7 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
.context("error parsing request")
.map_err(|err| err.to_string());
let requested_error = matches!(request, Ok(Request::ReturnError));
+ let requested_event_stream = matches!(request, Ok(Request::EventStream));
let reply = match request {
Ok(request) => process(&ctx, request).await,
@@ -136,6 +203,46 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow:
buf.push(b'\n');
write.write_all(&buf).await.context("error writing reply")?;
+ if requested_event_stream {
+ let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
+ let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
+
+ // Spawn a task for the client.
+ let client = EventStreamClient {
+ events: events_rx,
+ disconnect: disconnect_rx,
+ write: Box::new(write) as _,
+ };
+ let future = async move {
+ if let Err(err) = handle_event_stream_client(client).await {
+ warn!("error handling IPC event stream client: {err:?}");
+ }
+ };
+ if let Err(err) = ctx.scheduler.schedule(future) {
+ warn!("error scheduling IPC event stream future: {err:?}");
+ }
+
+ // Send the initial state.
+ {
+ let state = ctx.event_stream_state.borrow();
+ for event in state.replicate() {
+ events_tx
+ .try_send(event)
+ .expect("initial event burst had more events than buffer size");
+ }
+ }
+
+ // Add it to the list.
+ {
+ let mut streams = ctx.event_streams.borrow_mut();
+ let sender = EventStreamSender {
+ events: events_tx,
+ disconnect: disconnect_tx,
+ };
+ streams.push(sender);
+ }
+ }
+
Ok(())
}
@@ -149,23 +256,9 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
Response::Outputs(outputs.collect())
}
Request::FocusedWindow => {
- let window = ctx.ipc_focused_window.lock().unwrap().clone();
- let window = window.map(|window| {
- let wl_surface = window.toplevel().expect("no X11 support").wl_surface();
- with_states(wl_surface, |states| {
- let role = states
- .data_map
- .get::<XdgToplevelSurfaceData>()
- .unwrap()
- .lock()
- .unwrap();
-
- niri_ipc::Window {
- title: role.title.clone(),
- app_id: role.app_id.clone(),
- }
- })
- });
+ let state = ctx.event_stream_state.borrow();
+ let windows = &state.windows.windows;
+ let window = windows.values().find(|win| win.is_focused).cloned();
Response::FocusedWindow(window)
}
Request::Action(action) => {
@@ -202,13 +295,8 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
Response::OutputConfigChanged(response)
}
Request::Workspaces => {
- let (tx, rx) = async_channel::bounded(1);
- ctx.event_loop.insert_idle(move |state| {
- let workspaces = state.niri.layout.ipc_workspaces();
- let _ = tx.send_blocking(workspaces);
- });
- let result = rx.recv().await;
- let workspaces = result.map_err(|_| String::from("error getting workspace info"))?;
+ let state = ctx.event_stream_state.borrow();
+ let workspaces = state.workspaces.workspaces.values().cloned().collect();
Response::Workspaces(workspaces)
}
Request::FocusedOutput => {
@@ -238,23 +326,261 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply {
Response::FocusedOutput(output)
}
Request::KeyboardLayouts => {
- let (tx, rx) = async_channel::bounded(1);
- ctx.event_loop.insert_idle(move |state| {
- let keyboard = state.niri.seat.get_keyboard().unwrap();
- let layout = keyboard.with_xkb_state(state, |context| {
- let layouts = context.keymap().layouts();
- KeyboardLayouts {
- names: layouts.map(str::to_owned).collect(),
- current_idx: context.active_layout().0 as u8,
- }
- });
- let _ = tx.send_blocking(layout);
- });
- let result = rx.recv().await;
- let layout = result.map_err(|_| String::from("error getting layout info"))?;
+ let state = ctx.event_stream_state.borrow();
+ let layout = state.keyboard_layouts.keyboard_layouts.clone();
+ let layout = layout.expect("keyboard layouts should be set at startup");
Response::KeyboardLayouts(layout)
}
+ Request::EventStream => Response::Handled,
};
Ok(response)
}
+
+async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> {
+ let EventStreamClient {
+ events,
+ disconnect,
+ mut write,
+ } = client;
+
+ while let Ok(event) = events.recv().await {
+ let mut buf = serde_json::to_vec(&event).context("error formatting event")?;
+ buf.push(b'\n');
+
+ let res = select_biased! {
+ _ = disconnect.recv().fuse() => return Ok(()),
+ res = write.write_all(&buf).fuse() => res,
+ };
+
+ match res {
+ Ok(()) => (),
+ // Normal client disconnection.
+ Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
+ res @ Err(_) => res.context("error writing event")?,
+ }
+ }
+
+ Ok(())
+}
+
+fn make_ipc_window(mapped: &Mapped, workspace_id: Option<WorkspaceId>) -> niri_ipc::Window {
+ let wl_surface = mapped.toplevel().wl_surface();
+ with_states(wl_surface, |states| {
+ let role = states
+ .data_map
+ .get::<XdgToplevelSurfaceData>()
+ .unwrap()
+ .lock()
+ .unwrap();
+
+ niri_ipc::Window {
+ id: u64::from(mapped.id().get()),
+ title: role.title.clone(),
+ app_id: role.app_id.clone(),
+ workspace_id: workspace_id.map(|id| u64::from(id.0)),
+ is_focused: mapped.is_focused(),
+ }
+ })
+}
+
+impl State {
+ pub fn ipc_keyboard_layouts_changed(&mut self) {
+ let keyboard = self.niri.seat.get_keyboard().unwrap();
+ let keyboard_layouts = keyboard.with_xkb_state(self, |context| {
+ let layouts = context.keymap().layouts();
+ KeyboardLayouts {
+ names: layouts.map(str::to_owned).collect(),
+ current_idx: context.active_layout().0 as u8,
+ }
+ });
+
+ let Some(server) = &self.niri.ipc_server else {
+ return;
+ };
+
+ let mut state = server.event_stream_state.borrow_mut();
+ let state = &mut state.keyboard_layouts;
+
+ let event = Event::KeyboardLayoutsChanged { keyboard_layouts };
+ state.apply(event.clone());
+ server.send_event(event);
+ }
+
+ pub fn ipc_refresh_layout(&mut self) {
+ self.ipc_refresh_workspaces();
+ self.ipc_refresh_windows();
+ }
+
+ fn ipc_refresh_workspaces(&mut self) {
+ let Some(server) = &self.niri.ipc_server else {
+ return;
+ };
+
+ let _span = tracy_client::span!("State::ipc_refresh_workspaces");
+
+ let mut state = server.event_stream_state.borrow_mut();
+ let state = &mut state.workspaces;
+
+ let mut events = Vec::new();
+ let layout = &self.niri.layout;
+ let focused_ws_id = layout.active_workspace().map(|ws| u64::from(ws.id().0));
+
+ // Check for workspace changes.
+ let mut seen = HashSet::new();
+ let mut need_workspaces_changed = false;
+ for (mon, ws_idx, ws) in layout.workspaces() {
+ let id = u64::from(ws.id().0);
+ seen.insert(id);
+
+ let Some(ipc_ws) = state.workspaces.get(&id) else {
+ // A new workspace was added.
+ need_workspaces_changed = true;
+ break;
+ };
+
+ // Check for any changes that we can't signal as individual events.
+ let output_name = mon.map(|mon| mon.output_name());
+ if ipc_ws.idx != u8::try_from(ws_idx + 1).unwrap_or(u8::MAX)
+ || ipc_ws.name != ws.name
+ || ipc_ws.output.as_ref() != output_name
+ {
+ need_workspaces_changed = true;
+ break;
+ }
+
+ let active_window_id = ws.active_window().map(|win| u64::from(win.id().get()));
+ if ipc_ws.active_window_id != active_window_id {
+ events.push(Event::WorkspaceActiveWindowChanged {
+ workspace_id: id,
+ active_window_id,
+ });
+ }
+
+ // Check if this workspace became focused.
+ let is_focused = Some(id) == focused_ws_id;
+ if is_focused && !ipc_ws.is_focused {
+ events.push(Event::WorkspaceActivated { id, focused: true });
+ continue;
+ }
+
+ // Check if this workspace became active.
+ let is_active = mon.map_or(false, |mon| mon.active_workspace_idx == ws_idx);
+ if is_active && !ipc_ws.is_active {
+ events.push(Event::WorkspaceActivated { id, focused: false });
+ }
+ }
+
+ // Check if any workspaces were removed.
+ if !need_workspaces_changed && state.workspaces.keys().any(|id| !seen.contains(id)) {
+ need_workspaces_changed = true;
+ }
+
+ if need_workspaces_changed {
+ events.clear();
+
+ let workspaces = layout
+ .workspaces()
+ .map(|(mon, ws_idx, ws)| {
+ let id = u64::from(ws.id().0);
+ Workspace {
+ id,
+ idx: u8::try_from(ws_idx + 1).unwrap_or(u8::MAX),
+ name: ws.name.clone(),
+ output: mon.map(|mon| mon.output_name().clone()),
+ is_active: mon.map_or(false, |mon| mon.active_workspace_idx == ws_idx),
+ is_focused: Some(id) == focused_ws_id,
+ active_window_id: ws.active_window().map(|win| u64::from(win.id().get())),
+ }
+ })
+ .collect();
+
+ events.push(Event::WorkspacesChanged { workspaces });
+ }
+
+ for event in events {
+ state.apply(event.clone());
+ server.send_event(event);
+ }
+ }
+
+ fn ipc_refresh_windows(&mut self) {
+ let Some(server) = &self.niri.ipc_server else {
+ return;
+ };
+
+ let _span = tracy_client::span!("State::ipc_refresh_windows");
+
+ let mut state = server.event_stream_state.borrow_mut();
+ let state = &mut state.windows;
+
+ let mut events = Vec::new();
+ let layout = &self.niri.layout;
+
+ // Check for window changes.
+ let mut seen = HashSet::new();
+ let mut focused_id = None;
+ layout.with_windows(|mapped, _, ws_id| {
+ let id = u64::from(mapped.id().get());
+ seen.insert(id);
+
+ if mapped.is_focused() {
+ focused_id = Some(id);
+ }
+
+ let Some(ipc_win) = state.windows.get(&id) else {
+ let window = make_ipc_window(mapped, Some(ws_id));
+ events.push(Event::WindowOpenedOrChanged { window });
+ return;
+ };
+
+ let workspace_id = Some(u64::from(ws_id.0));
+ let mut changed = ipc_win.workspace_id != workspace_id;
+
+ let wl_surface = mapped.toplevel().wl_surface();
+ changed |= with_states(wl_surface, |states| {
+ let role = states
+ .data_map
+ .get::<XdgToplevelSurfaceData>()
+ .unwrap()
+ .lock()
+ .unwrap();
+
+ ipc_win.title != role.title || ipc_win.app_id != role.app_id
+ });
+
+ if changed {
+ let window = make_ipc_window(mapped, Some(ws_id));
+ events.push(Event::WindowOpenedOrChanged { window });
+ return;
+ }
+
+ if mapped.is_focused() && !ipc_win.is_focused {
+ events.push(Event::WindowFocusChanged { id: Some(id) });
+ }
+ });
+
+ // Check for closed windows.
+ let mut ipc_focused_id = None;
+ for (id, ipc_win) in &state.windows {
+ if !seen.contains(id) {
+ events.push(Event::WindowClosed { id: *id });
+ }
+
+ if ipc_win.is_focused {
+ ipc_focused_id = Some(id);
+ }
+ }
+
+ // Extra check for focus becoming None, since the checks above only work for focus becoming
+ // a different window.
+ if focused_id.is_none() && ipc_focused_id.is_some() {
+ events.push(Event::WindowFocusChanged { id: None });
+ }
+
+ for event in events {
+ state.apply(event.clone());
+ server.send_event(event);
+ }
+ }
+}