diff options
Diffstat (limited to 'src/ipc/server.rs')
| -rw-r--r-- | src/ipc/server.rs | 416 |
1 files changed, 371 insertions, 45 deletions
diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 5d2c8524..af2c4fa2 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -1,15 +1,20 @@ +use std::cell::RefCell; +use std::collections::HashSet; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::PathBuf; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::{env, io, process}; use anyhow::Context; +use async_channel::{Receiver, Sender, TrySendError}; +use calloop::futures::Scheduler; use calloop::io::Async; use directories::BaseDirs; use futures_util::io::{AsyncReadExt, BufReader}; -use futures_util::{AsyncBufReadExt, AsyncWriteExt}; -use niri_ipc::{KeyboardLayouts, OutputConfigChanged, Reply, Request, Response}; -use smithay::desktop::Window; +use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _}; +use niri_ipc::state::{EventStreamState, EventStreamStatePart as _}; +use niri_ipc::{Event, KeyboardLayouts, OutputConfigChanged, Reply, Request, Response, Workspace}; use smithay::input::keyboard::XkbContextHandler; use smithay::reexports::calloop::generic::Generic; use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction}; @@ -18,17 +23,38 @@ use smithay::wayland::compositor::with_states; use smithay::wayland::shell::xdg::XdgToplevelSurfaceData; use crate::backend::IpcOutputMap; +use crate::layout::workspace::WorkspaceId; use crate::niri::State; use crate::utils::version; +use crate::window::Mapped; + +// If an event stream client fails to read events fast enough that we accumulate more than this +// number in our buffer, we drop that event stream client. +const EVENT_STREAM_BUFFER_SIZE: usize = 64; pub struct IpcServer { pub socket_path: PathBuf, + event_streams: Rc<RefCell<Vec<EventStreamSender>>>, + event_stream_state: Rc<RefCell<EventStreamState>>, } struct ClientCtx { event_loop: LoopHandle<'static, State>, + scheduler: Scheduler<()>, ipc_outputs: Arc<Mutex<IpcOutputMap>>, - ipc_focused_window: Arc<Mutex<Option<Window>>>, + event_streams: Rc<RefCell<Vec<EventStreamSender>>>, + event_stream_state: Rc<RefCell<EventStreamState>>, +} + +struct EventStreamClient { + events: Receiver<Event>, + disconnect: Receiver<()>, + write: Box<dyn AsyncWrite + Unpin>, +} + +struct EventStreamSender { + events: Sender<Event>, + disconnect: Sender<()>, } impl IpcServer { @@ -60,7 +86,43 @@ impl IpcServer { }) .unwrap(); - Ok(Self { socket_path }) + Ok(Self { + socket_path, + event_streams: Rc::new(RefCell::new(Vec::new())), + event_stream_state: Rc::new(RefCell::new(EventStreamState::default())), + }) + } + + fn send_event(&self, event: Event) { + let mut streams = self.event_streams.borrow_mut(); + let mut to_remove = Vec::new(); + for (idx, stream) in streams.iter_mut().enumerate() { + match stream.events.try_send(event.clone()) { + Ok(()) => (), + Err(TrySendError::Closed(_)) => to_remove.push(idx), + Err(TrySendError::Full(_)) => { + warn!( + "disconnecting IPC event stream client \ + because it is reading events too slowly" + ); + to_remove.push(idx); + } + } + } + + for idx in to_remove.into_iter().rev() { + let stream = streams.swap_remove(idx); + let _ = stream.disconnect.send_blocking(()); + } + } + + pub fn keyboard_layout_switched(&self, new_idx: u8) { + let mut state = self.event_stream_state.borrow_mut(); + let state = &mut state.keyboard_layouts; + + let event = Event::KeyboardLayoutSwitched { idx: new_idx }; + state.apply(event.clone()); + self.send_event(event); } } @@ -90,10 +152,14 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { } }; + let ipc_server = state.niri.ipc_server.as_ref().unwrap(); + let ctx = ClientCtx { event_loop: state.niri.event_loop.clone(), + scheduler: state.niri.scheduler.clone(), ipc_outputs: state.backend.ipc_outputs(), - ipc_focused_window: state.niri.ipc_focused_window.clone(), + event_streams: ipc_server.event_streams.clone(), + event_stream_state: ipc_server.event_stream_state.clone(), }; let future = async move { @@ -106,7 +172,7 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { } } -async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> { +async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> { let (read, mut write) = stream.split(); let mut buf = String::new(); @@ -120,6 +186,7 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow: .context("error parsing request") .map_err(|err| err.to_string()); let requested_error = matches!(request, Ok(Request::ReturnError)); + let requested_event_stream = matches!(request, Ok(Request::EventStream)); let reply = match request { Ok(request) => process(&ctx, request).await, @@ -136,6 +203,46 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow: buf.push(b'\n'); write.write_all(&buf).await.context("error writing reply")?; + if requested_event_stream { + let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); + let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); + + // Spawn a task for the client. + let client = EventStreamClient { + events: events_rx, + disconnect: disconnect_rx, + write: Box::new(write) as _, + }; + let future = async move { + if let Err(err) = handle_event_stream_client(client).await { + warn!("error handling IPC event stream client: {err:?}"); + } + }; + if let Err(err) = ctx.scheduler.schedule(future) { + warn!("error scheduling IPC event stream future: {err:?}"); + } + + // Send the initial state. + { + let state = ctx.event_stream_state.borrow(); + for event in state.replicate() { + events_tx + .try_send(event) + .expect("initial event burst had more events than buffer size"); + } + } + + // Add it to the list. + { + let mut streams = ctx.event_streams.borrow_mut(); + let sender = EventStreamSender { + events: events_tx, + disconnect: disconnect_tx, + }; + streams.push(sender); + } + } + Ok(()) } @@ -149,23 +256,9 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { Response::Outputs(outputs.collect()) } Request::FocusedWindow => { - let window = ctx.ipc_focused_window.lock().unwrap().clone(); - let window = window.map(|window| { - let wl_surface = window.toplevel().expect("no X11 support").wl_surface(); - with_states(wl_surface, |states| { - let role = states - .data_map - .get::<XdgToplevelSurfaceData>() - .unwrap() - .lock() - .unwrap(); - - niri_ipc::Window { - title: role.title.clone(), - app_id: role.app_id.clone(), - } - }) - }); + let state = ctx.event_stream_state.borrow(); + let windows = &state.windows.windows; + let window = windows.values().find(|win| win.is_focused).cloned(); Response::FocusedWindow(window) } Request::Action(action) => { @@ -202,13 +295,8 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { Response::OutputConfigChanged(response) } Request::Workspaces => { - let (tx, rx) = async_channel::bounded(1); - ctx.event_loop.insert_idle(move |state| { - let workspaces = state.niri.layout.ipc_workspaces(); - let _ = tx.send_blocking(workspaces); - }); - let result = rx.recv().await; - let workspaces = result.map_err(|_| String::from("error getting workspace info"))?; + let state = ctx.event_stream_state.borrow(); + let workspaces = state.workspaces.workspaces.values().cloned().collect(); Response::Workspaces(workspaces) } Request::FocusedOutput => { @@ -238,23 +326,261 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { Response::FocusedOutput(output) } Request::KeyboardLayouts => { - let (tx, rx) = async_channel::bounded(1); - ctx.event_loop.insert_idle(move |state| { - let keyboard = state.niri.seat.get_keyboard().unwrap(); - let layout = keyboard.with_xkb_state(state, |context| { - let layouts = context.keymap().layouts(); - KeyboardLayouts { - names: layouts.map(str::to_owned).collect(), - current_idx: context.active_layout().0 as u8, - } - }); - let _ = tx.send_blocking(layout); - }); - let result = rx.recv().await; - let layout = result.map_err(|_| String::from("error getting layout info"))?; + let state = ctx.event_stream_state.borrow(); + let layout = state.keyboard_layouts.keyboard_layouts.clone(); + let layout = layout.expect("keyboard layouts should be set at startup"); Response::KeyboardLayouts(layout) } + Request::EventStream => Response::Handled, }; Ok(response) } + +async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> { + let EventStreamClient { + events, + disconnect, + mut write, + } = client; + + while let Ok(event) = events.recv().await { + let mut buf = serde_json::to_vec(&event).context("error formatting event")?; + buf.push(b'\n'); + + let res = select_biased! { + _ = disconnect.recv().fuse() => return Ok(()), + res = write.write_all(&buf).fuse() => res, + }; + + match res { + Ok(()) => (), + // Normal client disconnection. + Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + res @ Err(_) => res.context("error writing event")?, + } + } + + Ok(()) +} + +fn make_ipc_window(mapped: &Mapped, workspace_id: Option<WorkspaceId>) -> niri_ipc::Window { + let wl_surface = mapped.toplevel().wl_surface(); + with_states(wl_surface, |states| { + let role = states + .data_map + .get::<XdgToplevelSurfaceData>() + .unwrap() + .lock() + .unwrap(); + + niri_ipc::Window { + id: u64::from(mapped.id().get()), + title: role.title.clone(), + app_id: role.app_id.clone(), + workspace_id: workspace_id.map(|id| u64::from(id.0)), + is_focused: mapped.is_focused(), + } + }) +} + +impl State { + pub fn ipc_keyboard_layouts_changed(&mut self) { + let keyboard = self.niri.seat.get_keyboard().unwrap(); + let keyboard_layouts = keyboard.with_xkb_state(self, |context| { + let layouts = context.keymap().layouts(); + KeyboardLayouts { + names: layouts.map(str::to_owned).collect(), + current_idx: context.active_layout().0 as u8, + } + }); + + let Some(server) = &self.niri.ipc_server else { + return; + }; + + let mut state = server.event_stream_state.borrow_mut(); + let state = &mut state.keyboard_layouts; + + let event = Event::KeyboardLayoutsChanged { keyboard_layouts }; + state.apply(event.clone()); + server.send_event(event); + } + + pub fn ipc_refresh_layout(&mut self) { + self.ipc_refresh_workspaces(); + self.ipc_refresh_windows(); + } + + fn ipc_refresh_workspaces(&mut self) { + let Some(server) = &self.niri.ipc_server else { + return; + }; + + let _span = tracy_client::span!("State::ipc_refresh_workspaces"); + + let mut state = server.event_stream_state.borrow_mut(); + let state = &mut state.workspaces; + + let mut events = Vec::new(); + let layout = &self.niri.layout; + let focused_ws_id = layout.active_workspace().map(|ws| u64::from(ws.id().0)); + + // Check for workspace changes. + let mut seen = HashSet::new(); + let mut need_workspaces_changed = false; + for (mon, ws_idx, ws) in layout.workspaces() { + let id = u64::from(ws.id().0); + seen.insert(id); + + let Some(ipc_ws) = state.workspaces.get(&id) else { + // A new workspace was added. + need_workspaces_changed = true; + break; + }; + + // Check for any changes that we can't signal as individual events. + let output_name = mon.map(|mon| mon.output_name()); + if ipc_ws.idx != u8::try_from(ws_idx + 1).unwrap_or(u8::MAX) + || ipc_ws.name != ws.name + || ipc_ws.output.as_ref() != output_name + { + need_workspaces_changed = true; + break; + } + + let active_window_id = ws.active_window().map(|win| u64::from(win.id().get())); + if ipc_ws.active_window_id != active_window_id { + events.push(Event::WorkspaceActiveWindowChanged { + workspace_id: id, + active_window_id, + }); + } + + // Check if this workspace became focused. + let is_focused = Some(id) == focused_ws_id; + if is_focused && !ipc_ws.is_focused { + events.push(Event::WorkspaceActivated { id, focused: true }); + continue; + } + + // Check if this workspace became active. + let is_active = mon.map_or(false, |mon| mon.active_workspace_idx == ws_idx); + if is_active && !ipc_ws.is_active { + events.push(Event::WorkspaceActivated { id, focused: false }); + } + } + + // Check if any workspaces were removed. + if !need_workspaces_changed && state.workspaces.keys().any(|id| !seen.contains(id)) { + need_workspaces_changed = true; + } + + if need_workspaces_changed { + events.clear(); + + let workspaces = layout + .workspaces() + .map(|(mon, ws_idx, ws)| { + let id = u64::from(ws.id().0); + Workspace { + id, + idx: u8::try_from(ws_idx + 1).unwrap_or(u8::MAX), + name: ws.name.clone(), + output: mon.map(|mon| mon.output_name().clone()), + is_active: mon.map_or(false, |mon| mon.active_workspace_idx == ws_idx), + is_focused: Some(id) == focused_ws_id, + active_window_id: ws.active_window().map(|win| u64::from(win.id().get())), + } + }) + .collect(); + + events.push(Event::WorkspacesChanged { workspaces }); + } + + for event in events { + state.apply(event.clone()); + server.send_event(event); + } + } + + fn ipc_refresh_windows(&mut self) { + let Some(server) = &self.niri.ipc_server else { + return; + }; + + let _span = tracy_client::span!("State::ipc_refresh_windows"); + + let mut state = server.event_stream_state.borrow_mut(); + let state = &mut state.windows; + + let mut events = Vec::new(); + let layout = &self.niri.layout; + + // Check for window changes. + let mut seen = HashSet::new(); + let mut focused_id = None; + layout.with_windows(|mapped, _, ws_id| { + let id = u64::from(mapped.id().get()); + seen.insert(id); + + if mapped.is_focused() { + focused_id = Some(id); + } + + let Some(ipc_win) = state.windows.get(&id) else { + let window = make_ipc_window(mapped, Some(ws_id)); + events.push(Event::WindowOpenedOrChanged { window }); + return; + }; + + let workspace_id = Some(u64::from(ws_id.0)); + let mut changed = ipc_win.workspace_id != workspace_id; + + let wl_surface = mapped.toplevel().wl_surface(); + changed |= with_states(wl_surface, |states| { + let role = states + .data_map + .get::<XdgToplevelSurfaceData>() + .unwrap() + .lock() + .unwrap(); + + ipc_win.title != role.title || ipc_win.app_id != role.app_id + }); + + if changed { + let window = make_ipc_window(mapped, Some(ws_id)); + events.push(Event::WindowOpenedOrChanged { window }); + return; + } + + if mapped.is_focused() && !ipc_win.is_focused { + events.push(Event::WindowFocusChanged { id: Some(id) }); + } + }); + + // Check for closed windows. + let mut ipc_focused_id = None; + for (id, ipc_win) in &state.windows { + if !seen.contains(id) { + events.push(Event::WindowClosed { id: *id }); + } + + if ipc_win.is_focused { + ipc_focused_id = Some(id); + } + } + + // Extra check for focus becoming None, since the checks above only work for focus becoming + // a different window. + if focused_id.is_none() && ipc_focused_id.is_some() { + events.push(Event::WindowFocusChanged { id: None }); + } + + for event in events { + state.apply(event.clone()); + server.send_event(event); + } + } +} |
