From 02f67bace8f7dc3e264b27b4c884b0c187cb166f Mon Sep 17 00:00:00 2001 From: Celia Chen Date: Wed, 14 Jan 2026 10:05:00 -0800 Subject: [PATCH] fix: Emit response.completed immediately for Responses SSE (#9170) we see windows test failures like this: https://github.com/openai/codex/actions/runs/20930055601/job/60138344260. The issue is that SSE connections sometimes remain open after the completion event esp. for windows. We should emit the completion event and return immediately. this is consistent with the protocol: > The Model streams responses back in an SSE, which are collected until "completed" message and the SSE terminates from https://github.com/openai/codex/blob/dev/cc/fix-windows-test/codex-rs/docs/protocol_v1.md#L37. this helps us achieve parity with responses websocket logic here: https://github.com/openai/codex/blob/dev/cc/fix-windows-test/codex-rs/codex-api/src/endpoint/responses_websocket.rs#L220-L227. --- codex-rs/codex-api/src/sse/responses.rs | 64 +++++++++++++++++++------ 1 file changed, 49 insertions(+), 15 deletions(-) diff --git a/codex-rs/codex-api/src/sse/responses.rs b/codex-rs/codex-api/src/sse/responses.rs index 7f0981c5c..cc4c03819 100644 --- a/codex-rs/codex-api/src/sse/responses.rs +++ b/codex-rs/codex-api/src/sse/responses.rs @@ -290,7 +290,6 @@ pub async fn process_sse( telemetry: Option>, ) { let mut stream = stream.eventsource(); - let mut response_completed: Option = None; let mut response_error: Option = None; loop { @@ -307,17 +306,10 @@ pub async fn process_sse( return; } Ok(None) => { - match response_completed.take() { - Some(event) => { - let _ = tx_event.send(Ok(event)).await; - } - None => { - let error = response_error.unwrap_or(ApiError::Stream( - "stream closed before response.completed".into(), - )); - let _ = tx_event.send(Err(error)).await; - } - } + let error = response_error.unwrap_or(ApiError::Stream( + "stream closed before response.completed".into(), + )); + let _ = tx_event.send(Err(error)).await; return; } Err(_) => { @@ -341,9 +333,11 @@ pub async fn process_sse( match process_responses_event(event) { Ok(Some(event)) => { - if matches!(event, ResponseEvent::Completed { .. }) { - response_completed = Some(event); - } else if tx_event.send(Ok(event)).await.is_err() { + let is_completed = matches!(event, ResponseEvent::Completed { .. }); + if tx_event.send(Ok(event)).await.is_err() { + return; + } + if is_completed { return; } } @@ -405,7 +399,9 @@ fn rate_limit_regex() -> &'static regex_lite::Regex { mod tests { use super::*; use assert_matches::assert_matches; + use bytes::Bytes; use codex_protocol::models::ResponseItem; + use futures::stream; use pretty_assertions::assert_eq; use serde_json::json; use tokio::sync::mpsc; @@ -607,6 +603,44 @@ mod tests { } } + #[tokio::test] + async fn emits_completed_without_stream_end() { + let completed = json!({ + "type": "response.completed", + "response": { "id": "resp1" } + }) + .to_string(); + + let sse1 = format!("event: response.completed\ndata: {completed}\n\n"); + let stream = stream::iter(vec![Ok(Bytes::from(sse1))]).chain(stream::pending()); + let stream: ByteStream = Box::pin(stream); + + let (tx, mut rx) = mpsc::channel::>(8); + tokio::spawn(process_sse(stream, tx, idle_timeout(), None)); + + let events = tokio::time::timeout(Duration::from_millis(1000), async { + let mut events = Vec::new(); + while let Some(ev) = rx.recv().await { + events.push(ev); + } + events + }) + .await + .expect("timed out collecting events"); + + assert_eq!(events.len(), 1); + match &events[0] { + Ok(ResponseEvent::Completed { + response_id, + token_usage, + }) => { + assert_eq!(response_id, "resp1"); + assert!(token_usage.is_none()); + } + other => panic!("unexpected event: {other:?}"), + } + } + #[tokio::test] async fn error_when_error_event() { let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_689bcf18d7f08194bf3440ba62fe05d803fee0cdac429894","object":"response","created_at":1755041560,"status":"failed","background":false,"error":{"code":"rate_limit_exceeded","message":"Rate limit reached for gpt-5.1 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."}, "usage":null,"user":null,"metadata":{}}}"#;