use std::cell::RefCell; use std::collections::HashSet; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::PathBuf; use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::{env, io, process}; use anyhow::Context; use async_channel::{Receiver, Sender, TrySendError}; use calloop::futures::Scheduler; use calloop::io::Async; use directories::BaseDirs; use futures_util::io::{AsyncReadExt, BufReader}; use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _}; use niri_config::OutputName; use niri_ipc::state::{EventStreamState, EventStreamStatePart as _}; use niri_ipc::{Event, KeyboardLayouts, OutputConfigChanged, Reply, Request, Response, Workspace}; use smithay::reexports::calloop::generic::Generic; use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction}; use smithay::reexports::rustix::fs::unlink; use smithay::wayland::compositor::with_states; use smithay::wayland::shell::xdg::XdgToplevelSurfaceData; use crate::backend::IpcOutputMap; use crate::layout::workspace::WorkspaceId; use crate::niri::State; use crate::utils::version; use crate::window::Mapped; // If an event stream client fails to read events fast enough that we accumulate more than this // number in our buffer, we drop that event stream client. const EVENT_STREAM_BUFFER_SIZE: usize = 64; pub struct IpcServer { pub socket_path: PathBuf, event_streams: Rc>>, event_stream_state: Rc>, } struct ClientCtx { event_loop: LoopHandle<'static, State>, scheduler: Scheduler<()>, ipc_outputs: Arc>, event_streams: Rc>>, event_stream_state: Rc>, } struct EventStreamClient { events: Receiver, disconnect: Receiver<()>, write: Box, } struct EventStreamSender { events: Sender, disconnect: Sender<()>, } impl IpcServer { pub fn start( event_loop: &LoopHandle<'static, State>, wayland_socket_name: &str, ) -> anyhow::Result { let _span = tracy_client::span!("Ipc::start"); let socket_name = format!("niri.{wayland_socket_name}.{}.sock", process::id()); let mut socket_path = socket_dir(); socket_path.push(socket_name); let listener = UnixListener::bind(&socket_path).context("error binding socket")?; listener .set_nonblocking(true) .context("error setting socket to non-blocking")?; let source = Generic::new(listener, Interest::READ, Mode::Level); event_loop .insert_source(source, |_, socket, state| { match socket.accept() { Ok((stream, _)) => on_new_ipc_client(state, stream), Err(e) if e.kind() == io::ErrorKind::WouldBlock => (), Err(e) => return Err(e), } Ok(PostAction::Continue) }) .unwrap(); Ok(Self { socket_path, event_streams: Rc::new(RefCell::new(Vec::new())), event_stream_state: Rc::new(RefCell::new(EventStreamState::default())), }) } fn send_event(&self, event: Event) { let mut streams = self.event_streams.borrow_mut(); let mut to_remove = Vec::new(); for (idx, stream) in streams.iter_mut().enumerate() { match stream.events.try_send(event.clone()) { Ok(()) => (), Err(TrySendError::Closed(_)) => to_remove.push(idx), Err(TrySendError::Full(_)) => { warn!( "disconnecting IPC event stream client \ because it is reading events too slowly" ); to_remove.push(idx); } } } for idx in to_remove.into_iter().rev() { let stream = streams.swap_remove(idx); let _ = stream.disconnect.send_blocking(()); } } } impl Drop for IpcServer { fn drop(&mut self) { let _ = unlink(&self.socket_path); } } fn socket_dir() -> PathBuf { BaseDirs::new() .as_ref() .and_then(|x| x.runtime_dir()) .map(|x| x.to_owned()) .unwrap_or_else(env::temp_dir) } fn on_new_ipc_client(state: &mut State, stream: UnixStream) { let _span = tracy_client::span!("on_new_ipc_client"); trace!("new IPC client connected"); let stream = match state.niri.event_loop.adapt_io(stream) { Ok(stream) => stream, Err(err) => { warn!("error making IPC stream async: {err:?}"); return; } }; let ipc_server = state.niri.ipc_server.as_ref().unwrap(); let ctx = ClientCtx { event_loop: state.niri.event_loop.clone(), scheduler: state.niri.scheduler.clone(), ipc_outputs: state.backend.ipc_outputs(), event_streams: ipc_server.event_streams.clone(), event_stream_state: ipc_server.event_stream_state.clone(), }; let future = async move { if let Err(err) = handle_client(ctx, stream).await { warn!("error handling IPC client: {err:?}"); } }; if let Err(err) = state.niri.scheduler.schedule(future) { warn!("error scheduling IPC stream future: {err:?}"); } } async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> { let (read, mut write) = stream.split(); let mut buf = String::new(); // Read a single line to allow extensibility in the future to keep reading. BufReader::new(read) .read_line(&mut buf) .await .context("error reading request")?; let request = serde_json::from_str(&buf) .context("error parsing request") .map_err(|err| err.to_string()); let requested_error = matches!(request, Ok(Request::ReturnError)); let requested_event_stream = matches!(request, Ok(Request::EventStream)); let reply = match request { Ok(request) => process(&ctx, request).await, Err(err) => Err(err), }; if let Err(err) = &reply { if !requested_error { warn!("error processing IPC request: {err:?}"); } } let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?; buf.push(b'\n'); write.write_all(&buf).await.context("error writing reply")?; if requested_event_stream { let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); // Spawn a task for the client. let client = EventStreamClient { events: events_rx, disconnect: disconnect_rx, write: Box::new(write) as _, }; let future = async move { if let Err(err) = handle_event_stream_client(client).await { warn!("error handling IPC event stream client: {err:?}"); } }; if let Err(err) = ctx.scheduler.schedule(future) { warn!("error scheduling IPC event stream future: {err:?}"); } // Send the initial state. { let state = ctx.event_stream_state.borrow(); for event in state.replicate() { events_tx .try_send(event) .expect("initial event burst had more events than buffer size"); } } // Add it to the list. { let mut streams = ctx.event_streams.borrow_mut(); let sender = EventStreamSender { events: events_tx, disconnect: disconnect_tx, }; streams.push(sender); } } Ok(()) } async fn process(ctx: &ClientCtx, request: Request) -> Reply { let response = match request { Request::ReturnError => return Err(String::from("example compositor error")), Request::Version => Response::Version(version()), Request::Outputs => { let ipc_outputs = ctx.ipc_outputs.lock().unwrap().clone(); let outputs = ipc_outputs.values().cloned().map(|o| (o.name.clone(), o)); Response::Outputs(outputs.collect()) } Request::Workspaces => { let state = ctx.event_stream_state.borrow(); let workspaces = state.workspaces.workspaces.values().cloned().collect(); Response::Workspaces(workspaces) } Request::Windows => { let state = ctx.event_stream_state.borrow(); let windows = state.windows.windows.values().cloned().collect(); Response::Windows(windows) } Request::KeyboardLayouts => { let state = ctx.event_stream_state.borrow(); let layout = state.keyboard_layouts.keyboard_layouts.clone(); let layout = layout.expect("keyboard layouts should be set at startup"); Response::KeyboardLayouts(layout) } Request::FocusedWindow => { let state = ctx.event_stream_state.borrow(); let windows = &state.windows.windows; let window = windows.values().find(|win| win.is_focused).cloned(); Response::FocusedWindow(window) } Request::Action(action) => { let (tx, rx) = async_channel::bounded(1); let action = niri_config::Action::from(action); ctx.event_loop.insert_idle(move |state| { state.do_action(action, false); let _ = tx.send_blocking(()); }); // Wait until the action has been processed before returning. This is important for a // few actions, for instance for DoScreenTransition this wait ensures that the screen // contents were sampled into the texture. let _ = rx.recv().await; Response::Handled } Request::Output { output, action } => { let ipc_outputs = ctx.ipc_outputs.lock().unwrap(); let found = ipc_outputs .values() .any(|o| OutputName::from_ipc_output(o).matches(&output)); let response = if found { OutputConfigChanged::Applied } else { OutputConfigChanged::OutputWasMissing }; drop(ipc_outputs); ctx.event_loop.insert_idle(move |state| { state.apply_transient_output_config(&output, action); }); Response::OutputConfigChanged(response) } Request::FocusedOutput => { let (tx, rx) = async_channel::bounded(1); ctx.event_loop.insert_idle(move |state| { let active_output = state .niri .layout .active_output() .map(|output| output.name()); let output = active_output.and_then(|active_output| { state .backend .ipc_outputs() .lock() .unwrap() .values() .find(|o| o.name == active_output) .cloned() }); let _ = tx.send_blocking(output); }); let result = rx.recv().await; let output = result.map_err(|_| String::from("error getting active output info"))?; Response::FocusedOutput(output) } Request::EventStream => Response::Handled, }; Ok(response) } async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> { let EventStreamClient { events, disconnect, mut write, } = client; while let Ok(event) = events.recv().await { let mut buf = serde_json::to_vec(&event).context("error formatting event")?; buf.push(b'\n'); let res = select_biased! { _ = disconnect.recv().fuse() => return Ok(()), res = write.write_all(&buf).fuse() => res, }; match res { Ok(()) => (), // Normal client disconnection. Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()), res @ Err(_) => res.context("error writing event")?, } } Ok(()) } fn make_ipc_window(mapped: &Mapped, workspace_id: Option) -> niri_ipc::Window { let wl_surface = mapped.toplevel().wl_surface(); with_states(wl_surface, |states| { let role = states .data_map .get::() .unwrap() .lock() .unwrap(); niri_ipc::Window { id: mapped.id().get(), title: role.title.clone(), app_id: role.app_id.clone(), workspace_id: workspace_id.map(|id| id.get()), is_focused: mapped.is_focused(), } }) } impl State { pub fn ipc_keyboard_layouts_changed(&mut self) { let keyboard = self.niri.seat.get_keyboard().unwrap(); let keyboard_layouts = keyboard.with_xkb_state(self, |context| { let xkb = context.xkb().lock().unwrap(); let layouts = xkb.layouts(); KeyboardLayouts { names: layouts .map(|layout| xkb.layout_name(layout).to_owned()) .collect(), current_idx: xkb.active_layout().0 as u8, } }); let Some(server) = &self.niri.ipc_server else { return; }; let mut state = server.event_stream_state.borrow_mut(); let state = &mut state.keyboard_layouts; let event = Event::KeyboardLayoutsChanged { keyboard_layouts }; state.apply(event.clone()); server.send_event(event); } pub fn ipc_refresh_keyboard_layout_index(&mut self) { let keyboard = self.niri.seat.get_keyboard().unwrap(); let idx = keyboard.with_xkb_state(self, |context| { let xkb = context.xkb().lock().unwrap(); xkb.active_layout().0 as u8 }); let Some(server) = &self.niri.ipc_server else { return; }; let mut state = server.event_stream_state.borrow_mut(); let state = &mut state.keyboard_layouts; if state.keyboard_layouts.as_ref().unwrap().current_idx == idx { return; } let event = Event::KeyboardLayoutSwitched { idx }; state.apply(event.clone()); server.send_event(event); } pub fn ipc_refresh_layout(&mut self) { self.ipc_refresh_workspaces(); self.ipc_refresh_windows(); } fn ipc_refresh_workspaces(&mut self) { let Some(server) = &self.niri.ipc_server else { return; }; let _span = tracy_client::span!("State::ipc_refresh_workspaces"); let mut state = server.event_stream_state.borrow_mut(); let state = &mut state.workspaces; let mut events = Vec::new(); let layout = &self.niri.layout; let focused_ws_id = layout.active_workspace().map(|ws| ws.id().get()); // Check for workspace changes. let mut seen = HashSet::new(); let mut need_workspaces_changed = false; for (mon, ws_idx, ws) in layout.workspaces() { let id = ws.id().get(); seen.insert(id); let Some(ipc_ws) = state.workspaces.get(&id) else { // A new workspace was added. need_workspaces_changed = true; break; }; // Check for any changes that we can't signal as individual events. let output_name = mon.map(|mon| mon.output_name()); if ipc_ws.idx != u8::try_from(ws_idx + 1).unwrap_or(u8::MAX) || ipc_ws.name.as_ref() != ws.name() || ipc_ws.output.as_ref() != output_name { need_workspaces_changed = true; break; } let active_window_id = ws.active_window().map(|win| win.id().get()); if ipc_ws.active_window_id != active_window_id { events.push(Event::WorkspaceActiveWindowChanged { workspace_id: id, active_window_id, }); } // Check if this workspace became focused. let is_focused = Some(id) == focused_ws_id; if is_focused && !ipc_ws.is_focused { events.push(Event::WorkspaceActivated { id, focused: true }); continue; } // Check if this workspace became active. let is_active = mon.map_or(false, |mon| mon.active_workspace_idx() == ws_idx); if is_active && !ipc_ws.is_active { events.push(Event::WorkspaceActivated { id, focused: false }); } } // Check if any workspaces were removed. if !need_workspaces_changed && state.workspaces.keys().any(|id| !seen.contains(id)) { need_workspaces_changed = true; } if need_workspaces_changed { events.clear(); let workspaces = layout .workspaces() .map(|(mon, ws_idx, ws)| { let id = ws.id().get(); Workspace { id, idx: u8::try_from(ws_idx + 1).unwrap_or(u8::MAX), name: ws.name().cloned(), output: mon.map(|mon| mon.output_name().clone()), is_active: mon.map_or(false, |mon| mon.active_workspace_idx() == ws_idx), is_focused: Some(id) == focused_ws_id, active_window_id: ws.active_window().map(|win| win.id().get()), } }) .collect(); events.push(Event::WorkspacesChanged { workspaces }); } for event in events { state.apply(event.clone()); server.send_event(event); } } fn ipc_refresh_windows(&mut self) { let Some(server) = &self.niri.ipc_server else { return; }; let _span = tracy_client::span!("State::ipc_refresh_windows"); let mut state = server.event_stream_state.borrow_mut(); let state = &mut state.windows; let mut events = Vec::new(); let layout = &self.niri.layout; // Check for window changes. let mut seen = HashSet::new(); let mut focused_id = None; layout.with_windows(|mapped, _, ws_id| { let id = mapped.id().get(); seen.insert(id); if mapped.is_focused() { focused_id = Some(id); } let Some(ipc_win) = state.windows.get(&id) else { let window = make_ipc_window(mapped, Some(ws_id)); events.push(Event::WindowOpenedOrChanged { window }); return; }; let workspace_id = Some(ws_id.get()); let mut changed = ipc_win.workspace_id != workspace_id; let wl_surface = mapped.toplevel().wl_surface(); changed |= with_states(wl_surface, |states| { let role = states .data_map .get::() .unwrap() .lock() .unwrap(); ipc_win.title != role.title || ipc_win.app_id != role.app_id }); if changed { let window = make_ipc_window(mapped, Some(ws_id)); events.push(Event::WindowOpenedOrChanged { window }); return; } if mapped.is_focused() && !ipc_win.is_focused { events.push(Event::WindowFocusChanged { id: Some(id) }); } }); // Check for closed windows. let mut ipc_focused_id = None; for (id, ipc_win) in &state.windows { if !seen.contains(id) { events.push(Event::WindowClosed { id: *id }); } if ipc_win.is_focused { ipc_focused_id = Some(id); } } // Extra check for focus becoming None, since the checks above only work for focus becoming // a different window. if focused_id.is_none() && ipc_focused_id.is_some() { events.push(Event::WindowFocusChanged { id: None }); } for event in events { state.apply(event.clone()); server.send_event(event); } } }