From f917932b3e49f745865fdea41a1ccfbd400fc177 Mon Sep 17 00:00:00 2001 From: Jon Heinritz Date: Sat, 1 Mar 2025 10:28:09 +0100 Subject: ipc: support long living sockets --- src/ipc/server.rs | 126 +++++++++++++++++++++++++++++------------------------- 1 file changed, 68 insertions(+), 58 deletions(-) diff --git a/src/ipc/server.rs b/src/ipc/server.rs index ef9d9c87..cbcf3f67 100644 --- a/src/ipc/server.rs +++ b/src/ipc/server.rs @@ -185,76 +185,86 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) { async fn handle_client(ctx: ClientCtx, stream: Async<'static, UnixStream>) -> anyhow::Result<()> { let (read, mut write) = stream.split(); - let mut buf = String::new(); - - // Read a single line to allow extensibility in the future to keep reading. - BufReader::new(read) - .read_line(&mut buf) - .await - .context("error reading request")?; - - let request = serde_json::from_str(&buf) - .context("error parsing request") - .map_err(|err| err.to_string()); - let requested_error = matches!(request, Ok(Request::ReturnError)); - let requested_event_stream = matches!(request, Ok(Request::EventStream)); - - let reply = match request { - Ok(request) => process(&ctx, request).await, - Err(err) => Err(err), - }; + let mut read = BufReader::new(read); - if let Err(err) = &reply { - if !requested_error { - warn!("error processing IPC request: {err:?}"); + loop { + // Don't keep buf around to avoid clients wasting RAM by filling it with bogus data. + let mut buf = Vec::new(); + let res = read.read_until(b'\n', &mut buf).await; + match res { + Ok(0) => return Ok(()), + Ok(_) => (), + // Normal client disconnection. + Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Ok(()), + Err(err) => { + return Err(err).context("error reading request"); + } } - } - let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?; - buf.push(b'\n'); - write.write_all(&buf).await.context("error writing reply")?; + let request = serde_json::from_slice(&buf) + .context("error parsing request") + .map_err(|err| err.to_string()); + let requested_error = matches!(request, Ok(Request::ReturnError)); + let requested_event_stream = matches!(request, Ok(Request::EventStream)); - if requested_event_stream { - let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); - let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); - - // Spawn a task for the client. - let client = EventStreamClient { - events: events_rx, - disconnect: disconnect_rx, - write: Box::new(write) as _, + let reply = match request { + Ok(request) => process(&ctx, request).await, + Err(err) => Err(err), }; - let future = async move { - if let Err(err) = handle_event_stream_client(client).await { - warn!("error handling IPC event stream client: {err:?}"); - } - }; - if let Err(err) = ctx.scheduler.schedule(future) { - warn!("error scheduling IPC event stream future: {err:?}"); - } - // Send the initial state. - { - let state = ctx.event_stream_state.borrow(); - for event in state.replicate() { - events_tx - .try_send(event) - .expect("initial event burst had more events than buffer size"); + if let Err(err) = &reply { + if !requested_error { + warn!("error processing IPC request: {err:?}"); } } - // Add it to the list. - { - let mut streams = ctx.event_streams.borrow_mut(); - let sender = EventStreamSender { - events: events_tx, - disconnect: disconnect_tx, + buf.clear(); + serde_json::to_writer(&mut buf, &reply).context("error formatting reply")?; + buf.push(b'\n'); + write.write_all(&buf).await.context("error writing reply")?; + + if requested_event_stream { + let (events_tx, events_rx) = async_channel::bounded(EVENT_STREAM_BUFFER_SIZE); + let (disconnect_tx, disconnect_rx) = async_channel::bounded(1); + + // Spawn a task for the client. + let client = EventStreamClient { + events: events_rx, + disconnect: disconnect_rx, + write: Box::new(write) as _, }; - streams.push(sender); + let future = async move { + if let Err(err) = handle_event_stream_client(client).await { + warn!("error handling IPC event stream client: {err:?}"); + } + }; + if let Err(err) = ctx.scheduler.schedule(future) { + warn!("error scheduling IPC event stream future: {err:?}"); + } + + // Send the initial state. + { + let state = ctx.event_stream_state.borrow(); + for event in state.replicate() { + events_tx + .try_send(event) + .expect("initial event burst had more events than buffer size"); + } + } + + // Add it to the list. + { + let mut streams = ctx.event_streams.borrow_mut(); + let sender = EventStreamSender { + events: events_tx, + disconnect: disconnect_tx, + }; + streams.push(sender); + } + + return Ok(()); } } - - Ok(()) } async fn process(ctx: &ClientCtx, request: Request) -> Reply { -- cgit