From 40c85da102054caeb86b7905cd27c69e392c8f92 Mon Sep 17 00:00:00 2001 From: Ivan Molodetskikh Date: Wed, 17 Jan 2024 10:38:32 +0400 Subject: Add an IPC socket and a niri msg outputs subcommand --- src/ipc/client.rs | 106 +++++++++++++++++++++++++++++++++++++++++++++ src/ipc/mod.rs | 2 + src/ipc/server.rs | 127 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 src/ipc/client.rs create mode 100644 src/ipc/mod.rs create mode 100644 src/ipc/server.rs (limited to 'src/ipc') diff --git a/src/ipc/client.rs b/src/ipc/client.rs new file mode 100644 index 00000000..c09ef84e --- /dev/null +++ b/src/ipc/client.rs @@ -0,0 +1,106 @@ +use std::env; +use std::io::{Read, Write}; +use std::net::Shutdown; +use std::os::unix::net::UnixStream; + +use anyhow::{bail, Context}; +use niri_ipc::{Mode, Output, Request, Response}; + +use crate::Msg; + +pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> { + let socket_path = env::var_os(niri_ipc::SOCKET_PATH_ENV).with_context(|| { + format!( + "{} is not set, are you running this within niri?", + niri_ipc::SOCKET_PATH_ENV + ) + })?; + + let mut stream = + UnixStream::connect(socket_path).context("error connecting to {socket_path}")?; + + let request = match msg { + Msg::Outputs => Request::Outputs, + }; + let mut buf = serde_json::to_vec(&request).unwrap(); + stream + .write_all(&buf) + .context("error writing IPC request")?; + stream + .shutdown(Shutdown::Write) + .context("error closing IPC stream for writing")?; + + buf.clear(); + stream + .read_to_end(&mut buf) + .context("error reading IPC response")?; + + let response = serde_json::from_slice(&buf).context("error parsing IPC response")?; + match msg { + Msg::Outputs => { + #[allow(irrefutable_let_patterns)] + let Response::Outputs(outputs) = response + else { + bail!("unexpected response: expected Outputs, got {response:?}"); + }; + + if json { + let output = + serde_json::to_string(&outputs).context("error formatting response")?; + println!("{output}"); + return Ok(()); + } + + let mut outputs = outputs.into_iter().collect::>(); + outputs.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + + for (connector, output) in outputs.into_iter() { + let Output { + name, + make, + model, + physical_size, + modes, + current_mode, + } = output; + + println!(r#"Output "{connector}" ({make} - {model} - {name})"#); + + if let Some(current) = current_mode { + let mode = *modes + .get(current) + .context("invalid response: current mode does not exist")?; + let Mode { + width, + height, + refresh_rate, + } = mode; + let refresh = refresh_rate as f64 / 1000.; + println!(" Current mode: {width}x{height} @ {refresh:.3} Hz"); + } else { + println!(" Disabled"); + } + + if let Some((width, height)) = physical_size { + println!(" Physical size: {width}x{height} mm"); + } else { + println!(" Physical size: unknown"); + } + + println!(" Available modes:"); + for mode in modes { + let Mode { + width, + height, + refresh_rate, + } = mode; + let refresh = refresh_rate as f64 / 1000.; + println!(" {width}x{height}@{refresh:.3}"); + } + println!(); + } + } + } + + Ok(()) +} diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs new file mode 100644 index 00000000..c07f47e0 --- /dev/null +++ b/src/ipc/mod.rs @@ -0,0 +1,2 @@ +pub mod client; +pub mod server; diff --git a/src/ipc/server.rs b/src/ipc/server.rs new file mode 100644 index 00000000..d493e861 --- /dev/null +++ b/src/ipc/server.rs @@ -0,0 +1,127 @@ +use std::cell::RefCell; +use std::collections::HashMap; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::PathBuf; +use std::rc::Rc; +use std::{env, io, process}; + +use anyhow::Context; +use calloop::io::Async; +use directories::BaseDirs; +use futures_util::io::{AsyncReadExt, BufReader}; +use futures_util::{AsyncBufReadExt, AsyncWriteExt}; +use niri_ipc::{Request, Response}; +use smithay::reexports::calloop::generic::Generic; +use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction}; +use smithay::reexports::rustix::fs::unlink; + +use crate::niri::State; + +pub struct IpcServer { + pub socket_path: PathBuf, +} + +struct ClientCtx { + ipc_outputs: Rc>>, +} + +impl IpcServer { + pub fn start( + event_loop: &LoopHandle<'static, State>, + wayland_socket_name: &str, + ) -> anyhow::Result { + let _span = tracy_client::span!("Ipc::start"); + + let socket_name = format!("niri.{wayland_socket_name}.{}.sock", process::id()); + let mut socket_path = socket_dir(); + socket_path.push(socket_name); + + let listener = UnixListener::bind(&socket_path).context("error binding socket")?; + listener + .set_nonblocking(true) + .context("error setting socket to non-blocking")?; + + let source = Generic::new(listener, Interest::READ, Mode::Level); + event_loop + .insert_source(source, |_, socket, state| { + match socket.accept() { + Ok((stream, _)) => on_new_ipc_client(state, stream), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => (), + Err(e) => return Err(e), + } + + Ok(PostAction::Continue) + }) + .unwrap(); + + Ok(Self { socket_path }) + } +} + +impl Drop for IpcServer { + fn drop(&mut self) { + let _ = unlink(&self.socket_path); + } +} + +fn socket_dir() -> PathBuf { + BaseDirs::new() + .as_ref() + .and_then(|x| x.runtime_dir()) + .map(|x| x.to_owned()) + .unwrap_or_else(env::temp_dir) +} + +fn on_new_ipc_client(state: &mut State, stream: UnixStream) { + let _span = tracy_client::span!("on_new_ipc_client"); + trace!("new IPC client connected"); + + let stream = match state.niri.event_loop.adapt_io(stream) { + Ok(stream) => stream, + Err(err) => { + warn!("error making IPC stream async: {err:?}"); + return; + } + }; + + let ctx = ClientCtx { + ipc_outputs: state.backend.ipc_outputs(), + }; + + let future = async move { + if let Err(err) = handle_client(ctx, stream).await { + warn!("error handling IPC client: {err:?}"); + } + }; + if let Err(err) = state.niri.scheduler.schedule(future) { + warn!("error scheduling IPC stream future: {err:?}"); + } +} + +async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> { + let (read, mut write) = stream.split(); + let mut buf = String::new(); + + // Read a single line to allow extensibility in the future to keep reading. + BufReader::new(read) + .read_line(&mut buf) + .await + .context("error reading request")?; + + let request: Request = serde_json::from_str(&buf).context("error parsing request")?; + + let response = match request { + Request::Outputs => { + let ipc_outputs = ctx.ipc_outputs.borrow().clone(); + Response::Outputs(ipc_outputs) + } + }; + + let buf = serde_json::to_vec(&response).context("error formatting response")?; + write + .write_all(&buf) + .await + .context("error writing response")?; + + Ok(()) +} -- cgit