diff --git a/codex-rs/codex-api/src/sse/responses.rs b/codex-rs/codex-api/src/sse/responses.rs index 7f0981c5c..cc4c03819 100644 --- a/codex-rs/codex-api/src/sse/responses.rs +++ b/codex-rs/codex-api/src/sse/responses.rs @@ -290,7 +290,6 @@ pub async fn process_sse( telemetry: Option>, ) { let mut stream = stream.eventsource(); - let mut response_completed: Option = None; let mut response_error: Option = None; loop { @@ -307,17 +306,10 @@ pub async fn process_sse( return; } Ok(None) => { - match response_completed.take() { - Some(event) => { - let _ = tx_event.send(Ok(event)).await; - } - None => { - let error = response_error.unwrap_or(ApiError::Stream( - "stream closed before response.completed".into(), - )); - let _ = tx_event.send(Err(error)).await; - } - } + let error = response_error.unwrap_or(ApiError::Stream( + "stream closed before response.completed".into(), + )); + let _ = tx_event.send(Err(error)).await; return; } Err(_) => { @@ -341,9 +333,11 @@ pub async fn process_sse( match process_responses_event(event) { Ok(Some(event)) => { - if matches!(event, ResponseEvent::Completed { .. }) { - response_completed = Some(event); - } else if tx_event.send(Ok(event)).await.is_err() { + let is_completed = matches!(event, ResponseEvent::Completed { .. }); + if tx_event.send(Ok(event)).await.is_err() { + return; + } + if is_completed { return; } } @@ -405,7 +399,9 @@ fn rate_limit_regex() -> &'static regex_lite::Regex { mod tests { use super::*; use assert_matches::assert_matches; + use bytes::Bytes; use codex_protocol::models::ResponseItem; + use futures::stream; use pretty_assertions::assert_eq; use serde_json::json; use tokio::sync::mpsc; @@ -607,6 +603,44 @@ mod tests { } } + #[tokio::test] + async fn emits_completed_without_stream_end() { + let completed = json!({ + "type": "response.completed", + "response": { "id": "resp1" } + }) + .to_string(); + + let sse1 = format!("event: response.completed\ndata: {completed}\n\n"); + let stream = stream::iter(vec![Ok(Bytes::from(sse1))]).chain(stream::pending()); + let stream: ByteStream = Box::pin(stream); + + let (tx, mut rx) = mpsc::channel::>(8); + tokio::spawn(process_sse(stream, tx, idle_timeout(), None)); + + let events = tokio::time::timeout(Duration::from_millis(1000), async { + let mut events = Vec::new(); + while let Some(ev) = rx.recv().await { + events.push(ev); + } + events + }) + .await + .expect("timed out collecting events"); + + assert_eq!(events.len(), 1); + match &events[0] { + Ok(ResponseEvent::Completed { + response_id, + token_usage, + }) => { + assert_eq!(response_id, "resp1"); + assert!(token_usage.is_none()); + } + other => panic!("unexpected event: {other:?}"), + } + } + #[tokio::test] async fn error_when_error_event() { let raw_error = r#"{"type":"response.failed","sequence_number":3,"response":{"id":"resp_689bcf18d7f08194bf3440ba62fe05d803fee0cdac429894","object":"response","created_at":1755041560,"status":"failed","background":false,"error":{"code":"rate_limit_exceeded","message":"Rate limit reached for gpt-5.1 in organization org-AAA on tokens per min (TPM): Limit 30000, Used 22999, Requested 12528. Please try again in 11.054s. Visit https://platform.openai.com/account/rate-limits to learn more."}, "usage":null,"user":null,"metadata":{}}}"#;