diff options
| author | Ivan Molodetskikh <yalterz@gmail.com> | 2024-06-20 12:04:10 +0300 |
|---|---|---|
| committer | Ivan Molodetskikh <yalterz@gmail.com> | 2024-09-01 23:47:19 -0700 |
| commit | 30b213601a4f71d65a2227fa68ffb1ab2a69f671 (patch) | |
| tree | e68d9db212c6a4ac610ec0f80bb3e5db83950a67 | |
| parent | 8eb34b2e185aa0e0affea450226369cd3f9e6a78 (diff) | |
| download | niri-30b213601a4f71d65a2227fa68ffb1ab2a69f671.tar.gz niri-30b213601a4f71d65a2227fa68ffb1ab2a69f671.tar.bz2 niri-30b213601a4f71d65a2227fa68ffb1ab2a69f671.zip | |
Implement the event stream IPC
| -rw-r--r-- | niri-ipc/src/lib.rs | 98 | ||||
| -rw-r--r-- | niri-ipc/src/socket.rs | 18 | ||||
| -rw-r--r-- | niri-ipc/src/state.rs | 188 | ||||
| -rw-r--r-- | src/cli.rs | 2 | ||||
| -rw-r--r-- | src/input/mod.rs | 17 | ||||
| -rw-r--r-- | src/ipc/client.rs | 64 | ||||
| -rw-r--r-- | src/ipc/server.rs | 416 | ||||
| -rw-r--r-- | src/layout/mod.rs | 64 | ||||
| -rw-r--r-- | src/layout/workspace.rs | 11 | ||||
| -rw-r--r-- | src/niri.rs | 31 | ||||
| -rw-r--r-- | src/protocols/foreign_toplevel.rs | 2 | ||||
| -rw-r--r-- | wiki/IPC.md | 20 |
12 files changed, 827 insertions, 104 deletions
diff --git a/niri-ipc/src/lib.rs b/niri-ipc/src/lib.rs index ecf202f2..e6058ca1 100644 --- a/niri-ipc/src/lib.rs +++ b/niri-ipc/src/lib.rs @@ -9,6 +9,8 @@ use serde::{Deserialize, Serialize}; mod socket; pub use socket::{Socket, SOCKET_PATH_ENV}; +pub mod state; + /// Request from client to niri. #[derive(Debug, Serialize, Deserialize, Clone)] #[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))] @@ -38,6 +40,11 @@ pub enum Request { FocusedOutput, /// Request information about the keyboard layout. KeyboardLayouts, + /// Start continuously receiving events from the compositor. + /// + /// The compositor should reply with `Reply::Ok(Response::Handled)`, then continuously send + /// [`Event`]s, one per line. + EventStream, /// Respond with an error (for testing error handling). ReturnError, } @@ -536,10 +543,18 @@ pub enum Transform { #[derive(Serialize, Deserialize, Debug, Clone)] #[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))] pub struct Window { + /// Unique id of this window. + pub id: u64, /// Title, if set. pub title: Option<String>, /// Application ID, if set. pub app_id: Option<String>, + /// Id of the workspace this window is on, if any. + pub workspace_id: Option<u64>, + /// Whether this window is currently focused. + /// + /// There can be either one focused window or zero (e.g. when a layer-shell surface has focus). + pub is_focused: bool, } /// Output configuration change result. @@ -556,6 +571,10 @@ pub enum OutputConfigChanged { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))] pub struct Workspace { + /// Unique id of this workspace. + /// + /// This id remains constant regardless of the workspace moving around and across monitors. + pub id: u64, /// Index of the workspace on its monitor. /// /// This is the same index you can use for requests like `niri msg action focus-workspace`. @@ -567,7 +586,15 @@ pub struct Workspace { /// Can be `None` if no outputs are currently connected. pub output: Option<String>, /// Whether the workspace is currently active on its output. + /// + /// Every output has one active workspace, the one that is currently visible on that output. pub is_active: bool, + /// Whether the workspace is currently focused. + /// + /// There's only one focused workspace across all outputs. + pub is_focused: bool, + /// Id of the active window on this workspace, if any. + pub active_window_id: Option<u64>, } /// Configured keyboard layouts. @@ -580,6 +607,77 @@ pub struct KeyboardLayouts { pub current_idx: u8, } +/// A compositor event. +#[derive(Serialize, Deserialize, Debug, Clone)] +#[cfg_attr(feature = "json-schema", derive(schemars::JsonSchema))] +pub enum Event { + /// The workspace configuration has changed. + WorkspacesChanged { + /// The new workspace configuration. + /// + /// This configuration completely replaces the previous configuration. I.e. if any + /// workspaces are missing from here, then they were deleted. + workspaces: Vec<Workspace>, + }, + /// A workspace was activated on an output. + /// + /// This doesn't always mean the workspace became focused, just that it's now the active + /// workspace on its output. All other workspaces on the same output become inactive. + WorkspaceActivated { + /// Id of the newly active workspace. + id: u64, + /// Whether this workspace also became focused. + /// + /// If `true`, this is now the single focused workspace. All other workspaces are no longer + /// focused, but they may remain active on their respective outputs. + focused: bool, + }, + /// An active window changed on a workspace. + WorkspaceActiveWindowChanged { + /// Id of the workspace on which the active window changed. + workspace_id: u64, + /// Id of the new active window, if any. + active_window_id: Option<u64>, + }, + /// The window configuration has changed. + WindowsChanged { + /// The new window configuration. + /// + /// This configuration completely replaces the previous configuration. I.e. if any windows + /// are missing from here, then they were closed. + windows: Vec<Window>, + }, + /// A new toplevel window was opened, or an existing toplevel window changed. + WindowOpenedOrChanged { + /// The new or updated window. + /// + /// If the window is focused, all other windows are no longer focused. + window: Window, + }, + /// A toplevel window was closed. + WindowClosed { + /// Id of the removed window. + id: u64, + }, + /// Window focus changed. + /// + /// All other windows are no longer focused. + WindowFocusChanged { + /// Id of the newly focused window, or `None` if no window is now focused. + id: Option<u64>, + }, + /// The configured keyboard layouts have changed. + KeyboardLayoutsChanged { + /// The new keyboard layout configuration. + keyboard_layouts: KeyboardLayouts, + }, + /// The keyboard layout switched. + KeyboardLayoutSwitched { + /// Index of the newly active layout. + idx: u8, + }, +} + impl FromStr for WorkspaceReferenceArg { type Err = &'static str; diff --git a/niri-ipc/src/socket.rs b/niri-ipc/src/socket.rs index 3964f000..d629f1a4 100644 --- a/niri-ipc/src/socket.rs +++ b/niri-ipc/src/socket.rs @@ -6,7 +6,7 @@ use std::net::Shutdown; use std::os::unix::net::UnixStream; use std::path::Path; -use crate::{Reply, Request}; +use crate::{Event, Reply, Request}; /// Name of the environment variable containing the niri IPC socket path. pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET"; @@ -47,7 +47,11 @@ impl Socket { /// * `Ok(Ok(response))`: successful [`Response`](crate::Response) from niri /// * `Ok(Err(message))`: error message from niri /// * `Err(error)`: error communicating with niri - pub fn send(self, request: Request) -> io::Result<Reply> { + /// + /// This method also returns a blocking function that you can call to keep reading [`Event`]s + /// after requesting an [`EventStream`][Request::EventStream]. This function is not useful + /// otherwise. + pub fn send(self, request: Request) -> io::Result<(Reply, impl FnMut() -> io::Result<Event>)> { let Self { mut stream } = self; let mut buf = serde_json::to_string(&request).unwrap(); @@ -60,6 +64,14 @@ impl Socket { reader.read_line(&mut buf)?; let reply = serde_json::from_str(&buf)?; - Ok(reply) + + let events = move || { + buf.clear(); + reader.read_line(&mut buf)?; + let event = serde_json::from_str(&buf)?; + Ok(event) + }; + + Ok((reply, events)) } } diff --git a/niri-ipc/src/state.rs b/niri-ipc/src/state.rs new file mode 100644 index 00000000..8d2d1744 --- /dev/null +++ b/niri-ipc/src/state.rs @@ -0,0 +1,188 @@ +//! Helpers for keeping track of the event stream state. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +use crate::{Event, KeyboardLayouts, Window, Workspace}; + +/// Part of the state communicated via the event stream. +pub trait EventStreamStatePart { + /// Returns a sequence of events that replicates this state from default initialization. + fn replicate(&self) -> Vec<Event>; + + /// Applies the event to this state. + /// + /// Returns `None` after applying the event, and `Some(event)` if the event is ignored by this + /// part of the state. + fn apply(&mut self, event: Event) -> Option<Event>; +} + +/// The full state communicated over the event stream. +/// +/// Different parts of the state are not guaranteed to be consistent across every single event +/// sent by niri. For example, you may receive the first [`Event::WindowOpenedOrChanged`] for a +/// just-opened window *after* an [`Event::WorkspaceActiveWindowChanged`] for that window. Between +/// these two events, the workspace active window id refers to a window that does not yet exist in +/// the windows state part. +#[derive(Debug, Default)] +pub struct EventStreamState { + /// State of workspaces. + pub workspaces: WorkspacesState, + + /// State of workspaces. + pub windows: WindowsState, + + /// State of the keyboard layouts. + pub keyboard_layouts: KeyboardLayoutsState, +} + +/// The workspaces state communicated over the event stream. +#[derive(Debug, Default)] +pub struct WorkspacesState { + /// Map from a workspace id to the workspace. + pub workspaces: HashMap<u64, Workspace>, +} + +/// The windows state communicated over the event stream. +#[derive(Debug, Default)] +pub struct WindowsState { + /// Map from a window id to the window. + pub windows: HashMap<u64, Window>, +} + +/// The keyboard layout state communicated over the event stream. +#[derive(Debug, Default)] +pub struct KeyboardLayoutsState { + /// Configured keyboard layouts. + pub keyboard_layouts: Option<KeyboardLayouts>, +} + +impl EventStreamStatePart for EventStreamState { + fn replicate(&self) -> Vec<Event> { + let mut events = Vec::new(); + events.extend(self.workspaces.replicate()); + events.extend(self.windows.replicate()); + events.extend(self.keyboard_layouts.replicate()); + events + } + + fn apply(&mut self, event: Event) -> Option<Event> { + let event = self.workspaces.apply(event)?; + let event = self.windows.apply(event)?; + let event = self.keyboard_layouts.apply(event)?; + Some(event) + } +} + +impl EventStreamStatePart for WorkspacesState { + fn replicate(&self) -> Vec<Event> { + let workspaces = self.workspaces.values().cloned().collect(); + vec![Event::WorkspacesChanged { workspaces }] + } + + fn apply(&mut self, event: Event) -> Option<Event> { + match event { + Event::WorkspacesChanged { workspaces } => { + self.workspaces = workspaces.into_iter().map(|ws| (ws.id, ws)).collect(); + } + Event::WorkspaceActivated { id, focused } => { + let ws = self.workspaces.get(&id); + let ws = ws.expect("activated workspace was missing from the map"); + let output = ws.output.clone(); + + for ws in self.workspaces.values_mut() { + let got_activated = ws.id == id; + if ws.output == output { + ws.is_active = got_activated; + } + + if focused { + ws.is_focused = got_activated; + } + } + } + Event::WorkspaceActiveWindowChanged { + workspace_id, + active_window_id, + } => { + let ws = self.workspaces.get_mut(&workspace_id); + let ws = ws.expect("changed workspace was missing from the map"); + ws.active_window_id = active_window_id; + } + event => return Some(event), + } + None + } +} + +impl EventStreamStatePart for WindowsState { + fn replicate(&self) -> Vec<Event> { + let windows = self.windows.values().cloned().collect(); + vec![Event::WindowsChanged { windows }] + } + + fn apply(&mut self, event: Event) -> Option<Event> { + match event { + Event::WindowsChanged { windows } => { + self.windows = windows.into_iter().map(|win| (win.id, win)).collect(); + } + Event::WindowOpenedOrChanged { window } => { + let (id, is_focused) = match self.windows.entry(window.id) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + *entry = window; + (entry.id, entry.is_focused) + } + Entry::Vacant(entry) => { + let entry = entry.insert(window); + (entry.id, entry.is_focused) + } + }; + + if is_focused { + for win in self.windows.values_mut() { + if win.id != id { + win.is_focused = false; + } + } + } + } + Event::WindowClosed { id } => { + let win = self.windows.remove(&id); + win.expect("closed window was missing from the map"); + } + Event::WindowFocusChanged { id } => { + for win in self.windows.values_mut() { + win.is_focused = Some(win.id) == id; + } + } + event => return Some(event), + } + None + } +} + +impl EventStreamStatePart for KeyboardLayoutsState { + fn replicate(&self) -> Vec<Event> { + if let Some(keyboard_layouts) = self.keyboard_layouts.clone() { + vec![Event::KeyboardLayoutsChanged { keyboard_layouts }] + } else { + vec![] + } + } + + fn apply(&mut self, event: Event) -> Option<Event> { + match event { + Event::KeyboardLayoutsChanged { keyboard_layouts } => { + self.keyboard_layouts = Some(keyboard_layouts); + } + Event::KeyboardLayoutSwitched { idx } => { + let kb = self.keyboard_layouts.as_mut(); + let kb = kb.expect("keyboard layouts must be set before a layout can be switched"); + kb.current_idx = idx; + } + event => return Some(event), + } + None + } +} @@ -88,6 +88,8 @@ pub enum Msg { }, /// Get the configured keyboard layouts. KeyboardLayouts, + /// Start continuously receiving events from the compositor. + EventStream, /// Print the version of the running niri instance. Version, /// Request an error from the running niri instance. diff --git a/src/input/mod.rs b/src/input/mod.rs index 6e7f203c..38827415 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -16,7 +16,7 @@ use smithay::backend::input::{ TabletToolProximityEvent, TabletToolTipEvent, TabletToolTipState, TouchEvent, }; use smithay::backend::libinput::LibinputInputBackend; -use smithay::input::keyboard::{keysyms, FilterResult, Keysym, ModifiersState}; +use smithay::input::keyboard::{keysyms, FilterResult, Keysym, ModifiersState, XkbContextHandler}; use smithay::input::pointer::{ AxisFrame, ButtonEvent, CursorIcon, CursorImageStatus, Focus, GestureHoldBeginEvent, GestureHoldEndEvent, GesturePinchBeginEvent, GesturePinchEndEvent, GesturePinchUpdateEvent, @@ -539,13 +539,18 @@ impl State { } } Action::SwitchLayout(action) => { - self.niri.seat.get_keyboard().unwrap().with_xkb_state( - self, - |mut state| match action { + let keyboard = &self.niri.seat.get_keyboard().unwrap(); + let new_idx = keyboard.with_xkb_state(self, |mut state| { + match action { LayoutSwitchTarget::Next => state.cycle_next_layout(), LayoutSwitchTarget::Prev => state.cycle_prev_layout(), - }, - ); + }; + state.active_layout().0 + }); + + if let Some(server) = &self.niri.ipc_server { + server.keyboard_layout_switched(new_idx as u8); + } } Action::MoveColumnLeft => { self.niri.layout.move_left(); diff --git a/src/ipc/client.rs b/src/ipc/client.rs index e9312933..ea6121dc 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, bail, Context}; use niri_ipc::{ - KeyboardLayouts, LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, Socket, - Transform, + Event, KeyboardLayouts, LogicalOutput, Mode, Output, OutputConfigChanged, Request, Response, + Socket, Transform, }; use serde_json::json; @@ -21,12 +21,13 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> { }, Msg::Workspaces => Request::Workspaces, Msg::KeyboardLayouts => Request::KeyboardLayouts, + Msg::EventStream => Request::EventStream, Msg::RequestError => Request::ReturnError, }; let socket = Socket::connect().context("error connecting to the niri socket")?; - let reply = socket + let (reply, mut read_event) = socket .send(request) .context("error communicating with niri")?; @@ -37,6 +38,7 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> { Socket::connect() .and_then(|socket| socket.send(Request::Version)) .ok() + .map(|(reply, _read_event)| reply) } _ => None, }; @@ -261,6 +263,62 @@ pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> { println!("{is_active}{idx} {name}"); } } + Msg::EventStream => { + let Response::Handled = response else { + bail!("unexpected response: expected Handled, got {response:?}"); + }; + + if !json { + println!("Started reading events."); + } + + loop { + let event = read_event().context("error reading event from niri")?; + + if json { + let event = serde_json::to_string(&event).context("error formatting event")?; + println!("{event}"); + continue; + } + + match event { + Event::WorkspacesChanged { workspaces } => { + println!("Workspaces changed: {workspaces:?}"); + } + Event::WorkspaceActivated { id, focused } => { + let word = if focused { "focused" } else { "activated" }; + println!("Workspace {word}: {id}"); + } + Event::WorkspaceActiveWindowChanged { + workspace_id, + active_window_id, + } => { + println!( + "Workspace {workspace_id}: \ + active window changed to {active_window_id:?}" + ); + } + Event::WindowsChanged { windows } => { + println!("Windows changed: {windows:?}"); + } + Event::WindowOpenedOrChanged { window } => { + println!("Window opened or changed: {window:?}"); + } + Event::WindowClosed { id } => { + println!("Window closed: {id}"); + } + Event::WindowFocusChanged { id } => { + println!("Window focus changed: {id:?}"); + } + Event::KeyboardLayoutsChanged { keyboard_layouts } => { + println!("Keyboard layouts changed: {keyboard_layouts:?}"); + } + Event::KeyboardLayoutSwitched { idx } => { + println!("Keyboard layout switched: {idx}"); + } + } + } + } } Ok(()) diff --git a/src/ipc/server.rs b/src/ipc/server.rs index 5d2c8524..af2c4fa2 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -1,15 +1,20 @@ +use std::cell::RefCell; +use std::collections::HashSet; use std::os::unix::net::{UnixListener, UnixStream}; use std::path::PathBuf; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::{env, io, process}; use anyhow::Context; +use async_channel::{Receiver, Sender, TrySendError}; +use calloop::futures::Scheduler; use calloop::io::Async; use directories::BaseDirs; use futures_util::io::{AsyncReadExt, BufReader}; -use futures_util::{AsyncBufReadExt, AsyncWriteExt}; -use niri_ipc::{KeyboardLayouts, OutputConfigChanged, Reply, Request, Response}; -use smithay::desktop::Window; +use futures_util::{select_biased, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, FutureExt as _}; +use niri_ipc::state::{EventStreamState, EventStreamStatePart as _}; +use niri_ipc::{Event, KeyboardLayouts, OutputConfigChanged, Reply, Request, Response, Workspace}; use smithay::input::keyboard::XkbContextHandler; use smithay::reexports::calloop::generic::Generic; use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction}; @@ -18,17 +23,38 @@ use smithay::wayland::compositor::with_states; use smithay::wayland::shell::xdg::XdgToplevelSurfaceData; use crate::backend::IpcOutputMap; +use crate::layout::workspace::WorkspaceId; use crate::niri::State; use crate::utils::version; +use crate::window::Mapped; + +// If an event stream client fails to read events fast enough that we accumulate more than this +// number in our buffer, we drop that event stream client. +const EVENT_STREAM_BUFFER_SIZE: usize = 64; pub struct IpcServer { pub socket_path: PathBuf, + event_streams: Rc<RefCell<Vec<EventStreamSender>>>, + event_stream_state: Rc<RefCell<EventStreamState>>, } struct ClientCtx { event_loop: LoopHandle<'static, State>, + scheduler: Scheduler<()>, ipc_outputs: Arc<Mutex<IpcOutputMap>>, - ipc_focused_window: Arc<Mutex<Option<Window>>>, + event_streams: Rc<RefCell<Vec<EventStreamSender>>>, + event_stream_state: Rc<RefCell<EventStreamState>>, +} + +struct EventStreamClient { + events: Receiver<Event>, + disconnect: Receiver<()>, + write: Box<dyn AsyncWrite + Unpin>, +} + +struct EventStreamSender { + events: Sender<Event>, + disconnect: Sender<()>, } impl IpcServer { @@ -60,7 +86,43 @@ impl IpcServer { }) .unwrap(); - Ok(Self { socket_path }) + Ok(Self { + socket_path, + event_streams: Rc::new(RefCell::new(Vec::new())), + event_stream_state: Rc::new(RefCell::new(EventStreamState::default())), + }) + } + + fn send_event(&self, event: Event) { + let mut streams = self.event_streams.borrow_mut(); + let mut to_remove = Vec::new(); + for (idx, stream) in streams.iter_mut().enumerate() { + match stream.events.try_send(event.clone()) { + Ok(()) => (), + Err(TrySendError::Closed(_)) => to_remove.push(idx), + Err(TrySendError::Full(_)) => { + warn!( + "disconnecting IPC event stream client \ + because it is reading events too slowly" + ); + to_remove.push(idx); + } + } + } + + for idx in to_remove.into_iter().rev() { + let stream = streams.swap_remove(idx); + let _ = stream.disconnect.send_blocking(()); + } + } + + pub fn keyboard_layout_switched(&self, new_idx: u8) { + let mut state = self.event_stream_state.borrow_mut(); + let state = &mut state.keyboard_layouts; + + let event = Event::KeyboardLayoutSwitched { idx: new_idx }; + state.apply(event.clone()); + self.send_event(event); } } @@ -90,10 +152,14 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { } }; + let ipc_server = state.niri.ipc_server.as_ref().unwrap(); + let ctx = ClientCtx { event_loop: state.niri.event_loop.clone(), + scheduler: state.niri.scheduler.clone(), ipc_outputs: state.backend.ipc_outputs(), - ipc_focused_window: state.niri.ipc_focused_window.clone(), + event_streams: ipc_server.event_streams.clone(), + event_stream_state: ipc_server.event_stream_state.clone(), }; let future = async move { @@ -106,7 +172,7 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { } } -async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> { +async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> { let (read, mut write) = stream.split(); let mut buf = String::new(); @@ -120,6 +186,7 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow: .context("error parsing request") .map_err(|err| err.to_string()); let requested_error = matches!(request, Ok(Request::ReturnError)); + let requested_event_stream = matches!(request, Ok(Request::EventStream)); let reply = match request { Ok(request) => process(&ctx, request).await, @@ -136,6 +203,46 @@ async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow: buf.push(b'\n'); write.write_all(&buf).await.context("error writing reply")?; + if requested_event_stream { + let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); + let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); + + // Spawn a task for the client. + let client = EventStreamClient { + events: events_rx, + disconnect: disconnect_rx, + write: Box::new(write) as _, + }; + let future = async move { + if let Err(err) = handle_event_stream_client(client).await { + warn!("error handling IPC event stream client: {err:?}"); + } + }; + if let Err(err) = ctx.scheduler.schedule(future) { + warn!("error scheduling IPC event stream future: {err:?}"); + } + + // Send the initial state. + { + let state = ctx.event_stream_state.borrow(); + for event in state.replicate() { + events_tx + .try_send(event) + .expect("initial event burst had more events than buffer size"); + } + } + + // Add it to the list. + { + let mut streams = ctx.event_streams.borrow_mut(); + let sender = EventStreamSender { + events: events_tx, + disconnect: disconnect_tx, + }; + streams.push(sender); + } + } + Ok(()) } @@ -149,23 +256,9 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { Response::Outputs(outputs.collect()) } Request::FocusedWindow => { - let window = ctx.ipc_focused_window.lock().unwrap().clone(); - let window = window.map(|window| { - let wl_surface = window.toplevel().expect("no X11 support").wl_surface(); - with_states(wl_surface, |states| { - let role = states - .data_map - .get::<XdgToplevelSurfaceData>() - .unwrap() - .lock() - .unwrap(); - - niri_ipc::Window { - title: role.title.clone(), - app_id: role.app_id.clone(), - } - }) - }); + let state = ctx.event_stream_state.borrow(); + let windows = &state.windows.windows; + let window = windows.values().find(|win| win.is_focused).cloned(); Response::FocusedWindow(window) } Request::Action(action) => { @@ -202,13 +295,8 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { Response::OutputConfigChanged(response) } Request::Workspaces => { - let (tx, rx) = async_channel::bounded(1); - ctx.event_loop.insert_idle(move |state| { - let workspaces = state.niri.layout.ipc_workspaces(); - let _ = tx.send_blocking(workspaces); - }); - let result = rx.recv().await; - let workspaces = result.map_err(|_| String::from("error getting workspace info"))?; + let state = ctx.event_stream_state.borrow(); + let workspaces = state.workspaces.workspaces.values().cloned().collect(); Response::Workspaces(workspaces) } Request::FocusedOutput => { @@ -238,23 +326,261 @@ async fn process(ctx: &ClientCtx, request: Request) -> Reply { Response::FocusedOutput(output) } Request::KeyboardLayouts => { - let (tx, rx) = async_channel::bounded(1); - ctx.event_loop.insert_idle(move |state| { - let keyboard = state.niri.seat.get_keyboard().unwrap(); - let layout = keyboard.with_xkb_state(state, |context| { - let layouts = context.keymap().layouts(); - KeyboardLayouts { - names: layouts.map(str::to_owned).collect(), - current_idx: context.active_layout().0 as u8, - } - }); - let _ = tx.send_blocking(layout); - }); - let result = rx.recv().await; - let layout = result.map_err(|_| String::from("error getting layout info"))?; + let state = ctx.event_stream_state.borrow(); + let layout = state.keyboard_layouts.keyboard_layouts.clone(); + let layout = layout.expect("keyboard layouts should be set at startup"); Response::KeyboardLayouts(layout) } + Request::EventStream => Response::Handled, }; Ok(response) } + +async fn handle_event_stream_client(client: EventStreamClient) -> anyhow::Result<()> { + let EventStreamClient { + events, + disconnect, + mut write, + } = client; + + while let Ok(event) = events.recv().await { + let mut buf = serde_json::to_vec(&event).context("error formatting event")?; + buf.push(b'\n'); + + let res = select_biased! { + _ = disconnect.recv().fuse() => return Ok(()), + res = write.write_all(&buf).fuse() => res, + }; + + match res { + Ok(()) => (), + // Normal client disconnection. + Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + res @ Err(_) => res.context("error writing event")?, + } + } + + Ok(()) +} + +fn make_ipc_window(mapped: &Mapped, workspace_id: Option<WorkspaceId>) -> niri_ipc::Window { + let wl_surface = mapped.toplevel().wl_surface(); + with_states(wl_surface, |states| { + let role = states + .data_map + .get::<XdgToplevelSurfaceData>() + .unwrap() + .lock() + .unwrap(); + + niri_ipc::Window { + id: u64::from(mapped.id().get()), + title: role.title.clone(), + app_id: role.app_id.clone(), + workspace_id: workspace_id.map(|id| u64::from(id.0)), + is_focused: mapped.is_focused(), + } + }) +} + +impl State { + pub fn ipc_keyboard_layouts_changed(&mut self) { + let keyboard = self.niri.seat.get_keyboard().unwrap(); + let keyboard_layouts = keyboard.with_xkb_state(self, |context| { + let layouts = context.keymap().layouts(); + KeyboardLayouts { + names: layouts.map(str::to_owned).collect(), + current_idx: context.active_layout().0 as u8, + } + }); + + let Some(server) = &self.niri.ipc_server else { + return; + }; + + let mut state = server.event_stream_state.borrow_mut(); + let state = &mut state.keyboard_layouts; + + let event = Event::KeyboardLayoutsChanged { keyboard_layouts }; + state.apply(event.clone()); + server.send_event(event); + } + + pub fn ipc_refresh_layout(&mut self) { + self.ipc_refresh_workspaces(); + self.ipc_refresh_windows(); + } + + fn ipc_refresh_workspaces(&mut self) { + let Some(server) = &self.niri.ipc_server else { + return; + }; + + let _span = tracy_client::span!("State::ipc_refresh_workspaces"); + + let mut state = server.event_stream_state.borrow_mut(); + let state = &mut state.workspaces; + + let mut events = Vec::new(); + let layout = &self.niri.layout; + let focused_ws_id = layout.active_workspace().map(|ws| u64::from(ws.id().0)); + + // Check for workspace changes. + let mut seen = HashSet::new(); + let mut need_workspaces_changed = false; + for (mon, ws_idx, ws) in layout.workspaces() { + let id = u64::from(ws.id().0); + seen.insert(id); + + let Some(ipc_ws) = state.workspaces.get |
