From c8446d7cf3e749420a1963ecb17c574601652467 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 10 Mar 2026 17:59:41 -0700 Subject: [PATCH] Stabilize websocket response.failed error delivery (#14017) ## What changed - Drop failed websocket connections immediately after a terminal stream error instead of awaiting a graceful close handshake before forwarding the error to the caller. - Keep the success path and the closed-connection guard behavior unchanged. ## Why this fixes the flake - The failing integration test waits for the second websocket stream to surface the model error before issuing a follow-up request. - On slower runners, the old error path awaited `ws_stream.close().await` before sending the error downstream. If that close handshake stalled, the test kept waiting for an error that had already happened server-side and nextest timed it out. - Dropping the failed websocket immediately makes the terminal error observable right away and marks the session closed so the next request reconnects cleanly instead of depending on a best-effort close handshake. ## Code or test? - This is a production logic fix in `codex-api`. The existing websocket integration test already exercises the regression path. --- .../src/endpoint/responses_websocket.rs | 55 ++++++++--------- codex-rs/core/tests/common/responses.rs | 14 ++++- codex-rs/core/tests/suite/agent_websocket.rs | 1 + .../core/tests/suite/client_websockets.rs | 61 +++++++++++++++++++ codex-rs/core/tests/suite/turn_state.rs | 3 + 5 files changed, 102 insertions(+), 32 deletions(-) diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index 925f7d52d..30af9783a 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -53,9 +53,6 @@ enum WsCommand { message: Message, tx_result: oneshot::Sender>, }, - Close { - tx_result: oneshot::Sender>, - }, } impl WsStream { @@ -80,11 +77,6 @@ impl WsStream { break; } } - WsCommand::Close { tx_result } => { - let result = inner.close(None).await; - let _ = tx_result.send(result); - break; - } } } message = inner.next() => { @@ -144,11 +136,6 @@ impl WsStream { .await } - async fn close(&self) -> Result<(), WsError> { - self.request(|tx_result| WsCommand::Close { tx_result }) - .await - } - async fn next(&mut self) -> Option> { self.rx_message.recv().await } @@ -242,26 +229,32 @@ impl ResponsesWebsocketConnection { .await; } let mut guard = stream.lock().await; - let Some(ws_stream) = guard.as_mut() else { - let _ = tx_event - .send(Err(ApiError::Stream( - "websocket connection is closed".to_string(), - ))) - .await; - return; + let result = { + let Some(ws_stream) = guard.as_mut() else { + let _ = tx_event + .send(Err(ApiError::Stream( + "websocket connection is closed".to_string(), + ))) + .await; + return; + }; + + run_websocket_response_stream( + ws_stream, + tx_event.clone(), + request_body, + idle_timeout, + telemetry, + ) + .await }; - if let Err(err) = run_websocket_response_stream( - ws_stream, - tx_event.clone(), - request_body, - idle_timeout, - telemetry, - ) - .await - { - let _ = ws_stream.close().await; - *guard = None; + if let Err(err) = result { + // A terminal stream error should reach the caller immediately. Waiting for a + // graceful close handshake here can stall indefinitely and mask the error. + let failed_stream = guard.take(); + drop(guard); + drop(failed_stream); let _ = tx_event.send(Err(err)).await; } }); diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index d07b155f6..cf7c03f4d 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -416,6 +416,11 @@ pub struct WebSocketConnectionConfig { /// Tests use this to force websocket setup into an in-flight state so first-turn warmup paths /// can be exercised deterministically. pub accept_delay: Option, + /// Whether the server should send a websocket close frame after all scripted responses. + /// + /// Tests can disable this to simulate a peer that surfaces a terminal event but never + /// completes the close handshake. + pub close_after_requests: bool, } pub struct WebSocketTestServer { @@ -1168,6 +1173,7 @@ pub async fn start_websocket_server(connections: Vec>>) -> WebSoc requests, response_headers: Vec::new(), accept_delay: None, + close_after_requests: true, }) .collect(); start_websocket_server_with_headers(connections).await @@ -1261,6 +1267,7 @@ pub async fn start_websocket_server_with_headers( log.push(Vec::new()); log.len() - 1 }; + let close_after_requests = connection.close_after_requests; for request_events in connection.requests { let Some(Ok(message)) = ws_stream.next().await else { break; @@ -1324,7 +1331,12 @@ pub async fn start_websocket_server_with_headers( } } - let _ = ws_stream.close(None).await; + if close_after_requests { + let _ = ws_stream.close(None).await; + } else { + let _ = shutdown_rx.await; + return; + } if connections.lock().unwrap().is_empty() { return; diff --git a/codex-rs/core/tests/suite/agent_websocket.rs b/codex-rs/core/tests/suite/agent_websocket.rs index 5e81452a4..45752f182 100644 --- a/codex-rs/core/tests/suite/agent_websocket.rs +++ b/codex-rs/core/tests/suite/agent_websocket.rs @@ -129,6 +129,7 @@ async fn websocket_first_turn_handles_handshake_delay_with_startup_prewarm() -> response_headers: Vec::new(), // Delay handshake so turn processing must tolerate websocket startup latency. accept_delay: Some(Duration::from_millis(150)), + close_after_requests: true, }]) .await; diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index cda634448..0850f6e54 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -653,6 +653,7 @@ async fn responses_websocket_emits_reasoning_included_event() { requests: vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]], response_headers: vec![("X-Reasoning-Included".to_string(), "true".to_string())], accept_delay: None, + close_after_requests: true, }]) .await; @@ -725,6 +726,7 @@ async fn responses_websocket_emits_rate_limit_events() { ("X-Reasoning-Included".to_string(), "true".to_string()), ], accept_delay: None, + close_after_requests: true, }]) .await; @@ -1369,6 +1371,65 @@ async fn responses_websocket_v2_after_error_uses_full_create_without_previous_re server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake() { + skip_if_no_network!(); + + let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig { + requests: vec![ + vec![ev_response_created("resp-1"), ev_completed("resp-1")], + vec![json!({ + "type": "response.failed", + "response": { + "error": { + "code": "invalid_prompt", + "message": "synthetic websocket failure" + } + } + })], + ], + response_headers: Vec::new(), + accept_delay: None, + close_after_requests: false, + }]) + .await; + + let harness = websocket_harness_with_v2(&server, true).await; + let mut session = harness.client.new_session(); + let prompt_one = prompt_with_input(vec![message_item("hello")]); + let prompt_two = prompt_with_input(vec![message_item("hello"), message_item("second")]); + + stream_until_complete(&mut session, &harness, &prompt_one).await; + + let mut second_stream = session + .stream( + &prompt_two, + &harness.model_info, + &harness.session_telemetry, + harness.effort, + harness.summary, + None, + None, + ) + .await + .expect("websocket stream failed"); + + let saw_error = tokio::time::timeout(Duration::from_secs(2), async { + while let Some(event) = second_stream.next().await { + if event.is_err() { + return true; + } + } + false + }) + .await + .expect("timed out waiting for terminal websocket error"); + + assert!(saw_error, "expected second websocket stream to error"); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_v2_sets_openai_beta_header() { skip_if_no_network!(); diff --git a/codex-rs/core/tests/suite/turn_state.rs b/codex-rs/core/tests/suite/turn_state.rs index c068cfafb..7a930af60 100644 --- a/codex-rs/core/tests/suite/turn_state.rs +++ b/codex-rs/core/tests/suite/turn_state.rs @@ -103,6 +103,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result< ]], response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())], accept_delay: None, + close_after_requests: true, }, WebSocketConnectionConfig { requests: vec![vec![ @@ -112,6 +113,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result< ]], response_headers: Vec::new(), accept_delay: None, + close_after_requests: true, }, WebSocketConnectionConfig { requests: vec![vec![ @@ -121,6 +123,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result< ]], response_headers: Vec::new(), accept_delay: None, + close_after_requests: true, }, ]) .await;