aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJon Heinritz <jon.heinritz@protonmail.com>2025-03-01 10:28:09 +0100
committerIvan Molodetskikh <yalterz@gmail.com>2025-05-11 21:51:26 -0700
commitf917932b3e49f745865fdea41a1ccfbd400fc177 (patch)
tree857d43e6dfb6af0c4b796cc8f1e8e1579fecda44 /src
parent89b7423ee56da9b7c08b012c9491407e34b1c9f6 (diff)
downloadniri-f917932b3e49f745865fdea41a1ccfbd400fc177.tar.gz
niri-f917932b3e49f745865fdea41a1ccfbd400fc177.tar.bz2
niri-f917932b3e49f745865fdea41a1ccfbd400fc177.zip
ipc: support long living sockets
Diffstat (limited to 'src')
-rw-r--r--src/ipc/server.rs126
1 files changed, 68 insertions, 58 deletions
diff --git a/src/ipc/server.rs b/src/ipc/server.rs
index ef9d9c87..cbcf3f67 100644
--- a/src/ipc/server.rs
+++ b/src/ipc/server.rs
@@ -185,76 +185,86 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {
async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> {
let (read, mut write) = stream.split();
- let mut buf = String::new();
-
- // Read a single line to allow extensibility in the future to keep reading.
- BufReader::new(read)
- .read_line(&mut buf)
- .await
- .context("error reading request")?;
-
- let request = serde_json::from_str(&buf)
- .context("error parsing request")
- .map_err(|err| err.to_string());
- let requested_error = matches!(request, Ok(Request::ReturnError));
- let requested_event_stream = matches!(request, Ok(Request::EventStream));
-
- let reply = match request {
- Ok(request) => process(&ctx, request).await,
- Err(err) => Err(err),
- };
+ let mut read = BufReader::new(read);
- if let Err(err) = &reply {
- if !requested_error {
- warn!("error processing IPC request: {err:?}");
+ loop {
+ // Don't keep buf around to avoid clients wasting RAM by filling it with bogus data.
+ let mut buf = Vec::new();
+ let res = read.read_until(b'\n', &mut buf).await;
+ match res {
+ Ok(0) => return Ok(()),
+ Ok(_) => (),
+ // Normal client disconnection.
+ Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()),
+ Err(err) => {
+ return Err(err).context("error reading request");
+ }
}
- }
- let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?;
- buf.push(b'\n');
- write.write_all(&buf).await.context("error writing reply")?;
+ let request = serde_json::from_slice(&buf)
+ .context("error parsing request")
+ .map_err(|err| err.to_string());
+ let requested_error = matches!(request, Ok(Request::ReturnError));
+ let requested_event_stream = matches!(request, Ok(Request::EventStream));
- if requested_event_stream {
- let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
- let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
-
- // Spawn a task for the client.
- let client = EventStreamClient {
- events: events_rx,
- disconnect: disconnect_rx,
- write: Box::new(write) as _,
+ let reply = match request {
+ Ok(request) => process(&ctx, request).await,
+ Err(err) => Err(err),
};
- let future = async move {
- if let Err(err) = handle_event_stream_client(client).await {
- warn!("error handling IPC event stream client: {err:?}");
- }
- };
- if let Err(err) = ctx.scheduler.schedule(future) {
- warn!("error scheduling IPC event stream future: {err:?}");
- }
- // Send the initial state.
- {
- let state = ctx.event_stream_state.borrow();
- for event in state.replicate() {
- events_tx
- .try_send(event)
- .expect("initial event burst had more events than buffer size");
+ if let Err(err) = &reply {
+ if !requested_error {
+ warn!("error processing IPC request: {err:?}");
}
}
- // Add it to the list.
- {
- let mut streams = ctx.event_streams.borrow_mut();
- let sender = EventStreamSender {
- events: events_tx,
- disconnect: disconnect_tx,
+ buf.clear();
+ serde_json::to_writer(&mut buf, &reply).context("error formatting reply")?;
+ buf.push(b'\n');
+ write.write_all(&buf).await.context("error writing reply")?;
+
+ if requested_event_stream {
+ let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE);
+ let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
+
+ // Spawn a task for the client.
+ let client = EventStreamClient {
+ events: events_rx,
+ disconnect: disconnect_rx,
+ write: Box::new(write) as _,
};
- streams.push(sender);
+ let future = async move {
+ if let Err(err) = handle_event_stream_client(client).await {
+ warn!("error handling IPC event stream client: {err:?}");
+ }
+ };
+ if let Err(err) = ctx.scheduler.schedule(future) {
+ warn!("error scheduling IPC event stream future: {err:?}");
+ }
+
+ // Send the initial state.
+ {
+ let state = ctx.event_stream_state.borrow();
+ for event in state.replicate() {
+ events_tx
+ .try_send(event)
+ .expect("initial event burst had more events than buffer size");
+ }
+ }
+
+ // Add it to the list.
+ {
+ let mut streams = ctx.event_streams.borrow_mut();
+ let sender = EventStreamSender {
+ events: events_tx,
+ disconnect: disconnect_tx,
+ };
+ streams.push(sender);
+ }
+
+ return Ok(());
}
}
-
- Ok(())
}
async fn process(ctx: &ClientCtx, request: Request) -> Reply {