Stabilize websocket response.failed error delivery (#14017)

## What changed
- Drop failed websocket connections immediately after a terminal stream
error instead of awaiting a graceful close handshake before forwarding
the error to the caller.
- Keep the success path and the closed-connection guard behavior
unchanged.

## Why this fixes the flake
- The failing integration test waits for the second websocket stream to
surface the model error before issuing a follow-up request.
- On slower runners, the old error path awaited
`ws_stream.close().await` before sending the error downstream. If that
close handshake stalled, the test kept waiting for an error that had
already happened server-side and nextest timed it out.
- Dropping the failed websocket immediately makes the terminal error
observable right away and marks the session closed so the next request
reconnects cleanly instead of depending on a best-effort close
handshake.

## Code or test?
- This is a production logic fix in `codex-api`. The existing websocket
integration test already exercises the regression path.
This commit is contained in:
Ahmed Ibrahim 2026-03-10 17:59:41 -07:00 committed by Michael Bolin
parent 285b3a5143
commit c8446d7cf3
5 changed files with 102 additions and 32 deletions

View file

@ -53,9 +53,6 @@ enum WsCommand {
message: Message,
tx_result: oneshot::Sender<Result<(), WsError>>,
},
Close {
tx_result: oneshot::Sender<Result<(), WsError>>,
},
}
impl WsStream {
@ -80,11 +77,6 @@ impl WsStream {
break;
}
}
WsCommand::Close { tx_result } => {
let result = inner.close(None).await;
let _ = tx_result.send(result);
break;
}
}
}
message = inner.next() => {
@ -144,11 +136,6 @@ impl WsStream {
.await
}
async fn close(&self) -> Result<(), WsError> {
self.request(|tx_result| WsCommand::Close { tx_result })
.await
}
async fn next(&mut self) -> Option<Result<Message, WsError>> {
self.rx_message.recv().await
}
@ -242,26 +229,32 @@ impl ResponsesWebsocketConnection {
.await;
}
let mut guard = stream.lock().await;
let Some(ws_stream) = guard.as_mut() else {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.await;
return;
let result = {
let Some(ws_stream) = guard.as_mut() else {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.await;
return;
};
run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
};
if let Err(err) = run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
{
let _ = ws_stream.close().await;
*guard = None;
if let Err(err) = result {
// A terminal stream error should reach the caller immediately. Waiting for a
// graceful close handshake here can stall indefinitely and mask the error.
let failed_stream = guard.take();
drop(guard);
drop(failed_stream);
let _ = tx_event.send(Err(err)).await;
}
});

View file

@ -416,6 +416,11 @@ pub struct WebSocketConnectionConfig {
/// Tests use this to force websocket setup into an in-flight state so first-turn warmup paths
/// can be exercised deterministically.
pub accept_delay: Option<Duration>,
/// Whether the server should send a websocket close frame after all scripted responses.
///
/// Tests can disable this to simulate a peer that surfaces a terminal event but never
/// completes the close handshake.
pub close_after_requests: bool,
}
pub struct WebSocketTestServer {
@ -1168,6 +1173,7 @@ pub async fn start_websocket_server(connections: Vec<Vec<Vec<Value>>>) -> WebSoc
requests,
response_headers: Vec::new(),
accept_delay: None,
close_after_requests: true,
})
.collect();
start_websocket_server_with_headers(connections).await
@ -1261,6 +1267,7 @@ pub async fn start_websocket_server_with_headers(
log.push(Vec::new());
log.len() - 1
};
let close_after_requests = connection.close_after_requests;
for request_events in connection.requests {
let Some(Ok(message)) = ws_stream.next().await else {
break;
@ -1324,7 +1331,12 @@ pub async fn start_websocket_server_with_headers(
}
}
let _ = ws_stream.close(None).await;
if close_after_requests {
let _ = ws_stream.close(None).await;
} else {
let _ = shutdown_rx.await;
return;
}
if connections.lock().unwrap().is_empty() {
return;

View file

@ -129,6 +129,7 @@ async fn websocket_first_turn_handles_handshake_delay_with_startup_prewarm() ->
response_headers: Vec::new(),
// Delay handshake so turn processing must tolerate websocket startup latency.
accept_delay: Some(Duration::from_millis(150)),
close_after_requests: true,
}])
.await;

View file

@ -653,6 +653,7 @@ async fn responses_websocket_emits_reasoning_included_event() {
requests: vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
response_headers: vec![("X-Reasoning-Included".to_string(), "true".to_string())],
accept_delay: None,
close_after_requests: true,
}])
.await;
@ -725,6 +726,7 @@ async fn responses_websocket_emits_rate_limit_events() {
("X-Reasoning-Included".to_string(), "true".to_string()),
],
accept_delay: None,
close_after_requests: true,
}])
.await;
@ -1369,6 +1371,65 @@ async fn responses_websocket_v2_after_error_uses_full_create_without_previous_re
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_v2_surfaces_terminal_error_without_close_handshake() {
skip_if_no_network!();
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
requests: vec![
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
vec![json!({
"type": "response.failed",
"response": {
"error": {
"code": "invalid_prompt",
"message": "synthetic websocket failure"
}
}
})],
],
response_headers: Vec::new(),
accept_delay: None,
close_after_requests: false,
}])
.await;
let harness = websocket_harness_with_v2(&server, true).await;
let mut session = harness.client.new_session();
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![message_item("hello"), message_item("second")]);
stream_until_complete(&mut session, &harness, &prompt_one).await;
let mut second_stream = session
.stream(
&prompt_two,
&harness.model_info,
&harness.session_telemetry,
harness.effort,
harness.summary,
None,
None,
)
.await
.expect("websocket stream failed");
let saw_error = tokio::time::timeout(Duration::from_secs(2), async {
while let Some(event) = second_stream.next().await {
if event.is_err() {
return true;
}
}
false
})
.await
.expect("timed out waiting for terminal websocket error");
assert!(saw_error, "expected second websocket stream to error");
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_v2_sets_openai_beta_header() {
skip_if_no_network!();

View file

@ -103,6 +103,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
]],
response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())],
accept_delay: None,
close_after_requests: true,
},
WebSocketConnectionConfig {
requests: vec![vec![
@ -112,6 +113,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
]],
response_headers: Vec::new(),
accept_delay: None,
close_after_requests: true,
},
WebSocketConnectionConfig {
requests: vec![vec![
@ -121,6 +123,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
]],
response_headers: Vec::new(),
accept_delay: None,
close_after_requests: true,
},
])
.await;