From 9d7013eab084013572069b5d784cb78e78e35691 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 25 Feb 2026 10:31:37 -0800 Subject: [PATCH] Handle websocket timeout (#12791) Sometimes websockets will timeout with 400 error, ensure we retry it. --- .../src/endpoint/responses_websocket.rs | 75 ++++++++++++++----- .../core/tests/suite/client_websockets.rs | 38 ++++++++++ 2 files changed, 94 insertions(+), 19 deletions(-) diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index bdd32fbd5..925f7d52d 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -164,6 +164,8 @@ const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state"; const X_MODELS_ETAG_HEADER: &str = "x-models-etag"; const X_REASONING_INCLUDED_HEADER: &str = "x-reasoning-included"; const OPENAI_MODEL_HEADER: &str = "openai-model"; +const WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE: &str = "websocket_connection_limit_reached"; +const WEBSOCKET_CONNECTION_LIMIT_REACHED_MESSAGE: &str = "Responses websocket connection limit reached (60 minutes). Create a new websocket connection to continue."; pub struct ResponsesWebsocketConnection { stream: Arc>>, @@ -417,6 +419,12 @@ fn map_ws_error(err: WsError, url: &Url) -> ApiError { } } +#[derive(Debug, Deserialize)] +struct WrappedWebsocketError { + code: Option, + message: Option, +} + #[derive(Debug, Deserialize)] struct WrappedWebsocketErrorEvent { #[serde(rename = "type")] @@ -424,7 +432,7 @@ struct WrappedWebsocketErrorEvent { #[serde(alias = "status_code")] status: Option, #[serde(default)] - error: Option, + error: Option, #[serde(default)] headers: Option>, } @@ -437,7 +445,10 @@ fn parse_wrapped_websocket_error_event(payload: &str) -> Option Option { +fn map_wrapped_websocket_error_event( + event: WrappedWebsocketErrorEvent, + original_payload: String, +) -> Option { let WrappedWebsocketErrorEvent { status, error, @@ -445,28 +456,29 @@ fn map_wrapped_websocket_error_event(event: WrappedWebsocketErrorEvent) -> Optio .. } = event; + if let Some(error) = error.as_ref() + && let Some(code) = error.code.as_deref() + && code == WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE + { + return Some(ApiError::Retryable { + message: error + .message + .clone() + .unwrap_or_else(|| WEBSOCKET_CONNECTION_LIMIT_REACHED_MESSAGE.to_string()), + delay: None, + }); + } + let status = StatusCode::from_u16(status?).ok()?; if status.is_success() { return None; } - let body = error.map(|error| { - serde_json::to_string_pretty(&serde_json::json!({ - "error": error - })) - .unwrap_or_else(|_| { - serde_json::json!({ - "error": error - }) - .to_string() - }) - }); - Some(ApiError::Transport(TransportError::Http { status, url: None, headers: headers.map(json_headers_to_http_headers), - body, + body: Some(original_payload), })) } @@ -551,7 +563,8 @@ async fn run_websocket_response_stream( Message::Text(text) => { trace!("websocket event: {text}"); if let Some(wrapped_error) = parse_wrapped_websocket_error_event(&text) - && let Some(error) = map_wrapped_websocket_error_event(wrapped_error) + && let Some(error) = + map_wrapped_websocket_error_event(wrapped_error, text.to_string()) { return Err(error); } @@ -639,7 +652,7 @@ mod tests { let wrapped_error = parse_wrapped_websocket_error_event(&payload) .expect("expected websocket error payload to be parsed"); - let api_error = map_wrapped_websocket_error_event(wrapped_error) + let api_error = map_wrapped_websocket_error_event(wrapped_error, payload) .expect("expected websocket error payload to map to ApiError"); let ApiError::Transport(TransportError::Http { @@ -699,7 +712,7 @@ mod tests { let wrapped_error = parse_wrapped_websocket_error_event(&payload) .expect("expected websocket error payload to be parsed"); - let api_error = map_wrapped_websocket_error_event(wrapped_error) + let api_error = map_wrapped_websocket_error_event(wrapped_error, payload) .expect("expected websocket error payload to map to ApiError"); let ApiError::Transport(TransportError::Http { status, body, .. }) = api_error else { panic!("expected ApiError::Transport(Http)"); @@ -710,6 +723,30 @@ mod tests { assert!(body.contains("Model does not support image inputs")); } + #[test] + fn parse_wrapped_websocket_error_event_with_connection_limit_maps_retryable() { + let payload = json!({ + "type": "error", + "status": 400, + "error": { + "type": "invalid_request_error", + "code": "websocket_connection_limit_reached", + "message": "Responses websocket connection limit reached (60 minutes). Create a new websocket connection to continue." + } + }) + .to_string(); + + let wrapped_error = parse_wrapped_websocket_error_event(&payload) + .expect("expected websocket error payload to be parsed"); + let api_error = map_wrapped_websocket_error_event(wrapped_error, payload) + .expect("expected websocket error payload to map to ApiError"); + let ApiError::Retryable { message, delay } = api_error else { + panic!("expected ApiError::Retryable"); + }; + assert_eq!(message, WEBSOCKET_CONNECTION_LIMIT_REACHED_MESSAGE); + assert_eq!(delay, None); + } + #[test] fn parse_wrapped_websocket_error_event_without_status_is_not_mapped() { let payload = json!({ @@ -727,7 +764,7 @@ mod tests { let wrapped_error = parse_wrapped_websocket_error_event(&payload) .expect("expected websocket error payload to be parsed"); - let api_error = map_wrapped_websocket_error_event(wrapped_error); + let api_error = map_wrapped_websocket_error_event(wrapped_error, payload); assert!(api_error.is_none()); } diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 3dec0c440..369d69ed2 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -923,6 +923,44 @@ async fn responses_websocket_invalid_request_error_with_status_is_forwarded() { server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_connection_limit_error_reconnects_and_completes() { + skip_if_no_network!(); + + let websocket_connection_limit_error = json!({ + "type": "error", + "status": 400, + "error": { + "type": "invalid_request_error", + "code": "websocket_connection_limit_reached", + "message": "Responses websocket connection limit reached (60 minutes). Create a new websocket connection to continue." + } + }); + + let server = start_websocket_server(vec![ + vec![vec![websocket_connection_limit_error]], + vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]], + ]) + .await; + let mut builder = test_codex().with_config(|config| { + config.model_provider.request_max_retries = Some(0); + config.model_provider.stream_max_retries = Some(1); + }); + let test = builder + .build_with_websocket_server(&server) + .await + .expect("build websocket codex"); + + test.submit_turn("hello") + .await + .expect("submission should reconnect after websocket connection limit error"); + + let total_websocket_requests: usize = server.connections().iter().map(Vec::len).sum(); + assert_eq!(total_websocket_requests, 2); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_appends_on_prefix() { skip_if_no_network!();