fix: Emit response.completed immediately for Responses SSE (#9170)

we see windows test failures like this:
https://github.com/openai/codex/actions/runs/20930055601/job/60138344260.

The issue is that SSE connections sometimes remain open after the
completion event esp. for windows. We should emit the completion event
and return immediately. this is consistent with the protocol:

> The Model streams responses back in an SSE, which are collected until
"completed" message and the SSE terminates

from
https://github.com/openai/codex/blob/dev/cc/fix-windows-test/codex-rs/docs/protocol_v1.md#L37.

this helps us achieve parity with responses websocket logic here:
https://github.com/openai/codex/blob/dev/cc/fix-windows-test/codex-rs/codex-api/src/endpoint/responses_websocket.rs#L220-L227.
This commit is contained in:
Celia Chen 2026-01-14 10:05:00 -08:00 committed by GitHub
parent 3d322fa9d8
commit 02f67bace8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -290,7 +290,6 @@ pub async fn process_sse(
telemetry: Option<Arc<dyn SseTelemetry>>,
) {
let mut stream = stream.eventsource();
let mut response_completed: Option<ResponseEvent> = None;
let mut response_error: Option<ApiError> = None;
loop {
@ -307,17 +306,10 @@ pub async fn process_sse(
return;
}
Ok(None) => {
match response_completed.take() {
Some(event) => {
let _ = tx_event.send(Ok(event)).await;
}
None => {
let error = response_error.unwrap_or(ApiError::Stream(
"stream closed before response.completed".into(),
));
let _ = tx_event.send(Err(error)).await;
}
}
let error = response_error.unwrap_or(ApiError::Stream(
"stream closed before response.completed".into(),
));
let _ = tx_event.send(Err(error)).await;
return;
}
Err(_) => {
@ -341,9 +333,11 @@ pub async fn process_sse(
match process_responses_event(event) {
Ok(Some(event)) => {
if matches!(event, ResponseEvent::Completed { .. }) {
response_completed = Some(event);
} else if tx_event.send(Ok(event)).await.is_err() {
let is_completed = matches!(event, ResponseEvent::Completed { .. });
if tx_event.send(Ok(event)).await.is_err() {
return;
}
if is_completed {
return;
}
}
@ -405,7 +399,9 @@ fn rate_limit_regex() -> &'static regex_lite::Regex {
mod tests {
use super::*;
use assert_matches::assert_matches;
use bytes::Bytes;
use codex_protocol::models::ResponseItem;
use futures::stream;
use pretty_assertions::assert_eq;
use serde_json::json;
use tokio::sync::mpsc;
@ -607,6 +603,44 @@ mod tests {
}
}
#[tokio::test]
async fn emits_completed_without_stream_end() {
let completed = json!({
"type": "response.completed",
"response": { "id": "resp1" }
})
.to_string();
let sse1 = format!("event: response.completed\ndata: {completed}\n\n");
let stream = stream::iter(vec![Ok(Bytes::from(sse1))]).chain(stream::pending());
let stream: ByteStream = Box::pin(stream);
let (tx, mut rx) = mpsc::channel::<Result<ResponseEvent, ApiError>>(8);
tokio::spawn(process_sse(stream, tx, idle_timeout(), None));
let events = tokio::time::timeout(Duration::from_millis(1000), async {
let mut events = Vec::new();
while let Some(ev) = rx.recv().await {
events.push(ev);
}
events
})
.await
.expect("timed out collecting events");
assert_eq!(events.len(), 1);
match &events[0] {
Ok(ResponseEvent::Completed {
response_id,
token_usage,
}) => {
assert_eq!(response_id, "resp1");
assert!(token_usage.is_none());
}
other => panic!("unexpected event: {other:?}"),
}
}
#[tokio::test]
async fn error_when_error_event() {
let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_689bcf18d7f08194bf3440ba62fe05d803fee0cdac429894","object":"response","created_at":1755041560,"status":"failed","background":false,"error":{"code":"rate_limit_exceeded","message":"Rate limit reached for gpt-5.1 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."}, "usage":null,"user":null,"metadata":{}}}"#;