diff options
| author | Jon Heinritz <jon.heinritz@protonmail.com> | 2025-03-01 10:28:09 +0100 |
|---|---|---|
| committer | Ivan Molodetskikh <yalterz@gmail.com> | 2025-05-11 21:51:26 -0700 |
| commit | f917932b3e49f745865fdea41a1ccfbd400fc177 (patch) | |
| tree | 857d43e6dfb6af0c4b796cc8f1e8e1579fecda44 | |
| parent | 89b7423ee56da9b7c08b012c9491407e34b1c9f6 (diff) | |
| download | niri-f917932b3e49f745865fdea41a1ccfbd400fc177.tar.gz niri-f917932b3e49f745865fdea41a1ccfbd400fc177.tar.bz2 niri-f917932b3e49f745865fdea41a1ccfbd400fc177.zip | |
ipc: support long living sockets
| -rw-r--r-- | src/ipc/server.rs | 126 |
1 files changed, 68 insertions, 58 deletions
diff --git a/src/ipc/server.rs b/src/ipc/server.rs index ef9d9c87..cbcf3f67 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -185,76 +185,86 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> { let (read, mut write) = stream.split(); - let mut buf = String::new(); - - // Read a single line to allow extensibility in the future to keep reading. - BufReader::new(read) - .read_line(&mut buf) - .await - .context("error reading request")?; - - let request = serde_json::from_str(&buf) - .context("error parsing request") - .map_err(|err| err.to_string()); - let requested_error = matches!(request, Ok(Request::ReturnError)); - let requested_event_stream = matches!(request, Ok(Request::EventStream)); - - let reply = match request { - Ok(request) => process(&ctx, request).await, - Err(err) => Err(err), - }; + let mut read = BufReader::new(read); - if let Err(err) = &reply { - if !requested_error { - warn!("error processing IPC request: {err:?}"); + loop { + // Don't keep buf around to avoid clients wasting RAM by filling it with bogus data. + let mut buf = Vec::new(); + let res = read.read_until(b'\n', &mut buf).await; + match res { + Ok(0) => return Ok(()), + Ok(_) => (), + // Normal client disconnection. + Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + Err(err) => { + return Err(err).context("error reading request"); + } } - } - let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?; - buf.push(b'\n'); - write.write_all(&buf).await.context("error writing reply")?; + let request = serde_json::from_slice(&buf) + .context("error parsing request") + .map_err(|err| err.to_string()); + let requested_error = matches!(request, Ok(Request::ReturnError)); + let requested_event_stream = matches!(request, Ok(Request::EventStream)); - if requested_event_stream { - let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); - let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); - - // Spawn a task for the client. - let client = EventStreamClient { - events: events_rx, - disconnect: disconnect_rx, - write: Box::new(write) as _, + let reply = match request { + Ok(request) => process(&ctx, request).await, + Err(err) => Err(err), }; - let future = async move { - if let Err(err) = handle_event_stream_client(client).await { - warn!("error handling IPC event stream client: {err:?}"); - } - }; - if let Err(err) = ctx.scheduler.schedule(future) { - warn!("error scheduling IPC event stream future: {err:?}"); - } - // Send the initial state. - { - let state = ctx.event_stream_state.borrow(); - for event in state.replicate() { - events_tx - .try_send(event) - .expect("initial event burst had more events than buffer size"); + if let Err(err) = &reply { + if !requested_error { + warn!("error processing IPC request: {err:?}"); } } - // Add it to the list. - { - let mut streams = ctx.event_streams.borrow_mut(); - let sender = EventStreamSender { - events: events_tx, - disconnect: disconnect_tx, + buf.clear(); + serde_json::to_writer(&mut buf, &reply).context("error formatting reply")?; + buf.push(b'\n'); + write.write_all(&buf).await.context("error writing reply")?; + + if requested_event_stream { + let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); + let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); + + // Spawn a task for the client. + let client = EventStreamClient { + events: events_rx, + disconnect: disconnect_rx, + write: Box::new(write) as _, }; - streams.push(sender); + let future = async move { + if let Err(err) = handle_event_stream_client(client).await { + warn!("error handling IPC event stream client: {err:?}"); + } + }; + if let Err(err) = ctx.scheduler.schedule(future) { + warn!("error scheduling IPC event stream future: {err:?}"); + } + + // Send the initial state. + { + let state = ctx.event_stream_state.borrow(); + for event in state.replicate() { + events_tx + .try_send(event) + .expect("initial event burst had more events than buffer size"); + } + } + + // Add it to the list. + { + let mut streams = ctx.event_streams.borrow_mut(); + let sender = EventStreamSender { + events: events_tx, + disconnect: disconnect_tx, + }; + streams.push(sender); + } + + return Ok(()); } } - - Ok(()) } async fn process(ctx: &ClientCtx, request: Request) -> Reply { |
