Handle websocket timeout (#12791)
Sometimes websockets will timeout with 400 error, ensure we retry it.
This commit is contained in:
parent
7b39e76a66
commit
9d7013eab0
2 changed files with 94 additions and 19 deletions
|
|
@ -164,6 +164,8 @@ const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
|||
const X_MODELS_ETAG_HEADER: &str = "x-models-etag";
|
||||
const X_REASONING_INCLUDED_HEADER: &str = "x-reasoning-included";
|
||||
const OPENAI_MODEL_HEADER: &str = "openai-model";
|
||||
const WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE: &str = "websocket_connection_limit_reached";
|
||||
const WEBSOCKET_CONNECTION_LIMIT_REACHED_MESSAGE: &str = "Responses websocket connection limit reached (60 minutes). Create a new websocket connection to continue.";
|
||||
|
||||
pub struct ResponsesWebsocketConnection {
|
||||
stream: Arc<Mutex<Option<WsStream>>>,
|
||||
|
|
@ -417,6 +419,12 @@ fn map_ws_error(err: WsError, url: &Url) -> ApiError {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct WrappedWebsocketError {
|
||||
code: Option<String>,
|
||||
message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct WrappedWebsocketErrorEvent {
|
||||
#[serde(rename = "type")]
|
||||
|
|
@ -424,7 +432,7 @@ struct WrappedWebsocketErrorEvent {
|
|||
#[serde(alias = "status_code")]
|
||||
status: Option<u16>,
|
||||
#[serde(default)]
|
||||
error: Option<Value>,
|
||||
error: Option<WrappedWebsocketError>,
|
||||
#[serde(default)]
|
||||
headers: Option<JsonMap<String, Value>>,
|
||||
}
|
||||
|
|
@ -437,7 +445,10 @@ fn parse_wrapped_websocket_error_event(payload: &str) -> Option<WrappedWebsocket
|
|||
Some(event)
|
||||
}
|
||||
|
||||
fn map_wrapped_websocket_error_event(event: WrappedWebsocketErrorEvent) -> Option<ApiError> {
|
||||
fn map_wrapped_websocket_error_event(
|
||||
event: WrappedWebsocketErrorEvent,
|
||||
original_payload: String,
|
||||
) -> Option<ApiError> {
|
||||
let WrappedWebsocketErrorEvent {
|
||||
status,
|
||||
error,
|
||||
|
|
@ -445,28 +456,29 @@ fn map_wrapped_websocket_error_event(event: WrappedWebsocketErrorEvent) -> Optio
|
|||
..
|
||||
} = event;
|
||||
|
||||
if let Some(error) = error.as_ref()
|
||||
&& let Some(code) = error.code.as_deref()
|
||||
&& code == WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE
|
||||
{
|
||||
return Some(ApiError::Retryable {
|
||||
message: error
|
||||
.message
|
||||
.clone()
|
||||
.unwrap_or_else(|| WEBSOCKET_CONNECTION_LIMIT_REACHED_MESSAGE.to_string()),
|
||||
delay: None,
|
||||
});
|
||||
}
|
||||
|
||||
let status = StatusCode::from_u16(status?).ok()?;
|
||||
if status.is_success() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let body = error.map(|error| {
|
||||
serde_json::to_string_pretty(&serde_json::json!({
|
||||
"error": error
|
||||
}))
|
||||
.unwrap_or_else(|_| {
|
||||
serde_json::json!({
|
||||
"error": error
|
||||
})
|
||||
.to_string()
|
||||
})
|
||||
});
|
||||
|
||||
Some(ApiError::Transport(TransportError::Http {
|
||||
status,
|
||||
url: None,
|
||||
headers: headers.map(json_headers_to_http_headers),
|
||||
body,
|
||||
body: Some(original_payload),
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
@ -551,7 +563,8 @@ async fn run_websocket_response_stream(
|
|||
Message::Text(text) => {
|
||||
trace!("websocket event: {text}");
|
||||
if let Some(wrapped_error) = parse_wrapped_websocket_error_event(&text)
|
||||
&& let Some(error) = map_wrapped_websocket_error_event(wrapped_error)
|
||||
&& let Some(error) =
|
||||
map_wrapped_websocket_error_event(wrapped_error, text.to_string())
|
||||
{
|
||||
return Err(error);
|
||||
}
|
||||
|
|
@ -639,7 +652,7 @@ mod tests {
|
|||
|
||||
let wrapped_error = parse_wrapped_websocket_error_event(&payload)
|
||||
.expect("expected websocket error payload to be parsed");
|
||||
let api_error = map_wrapped_websocket_error_event(wrapped_error)
|
||||
let api_error = map_wrapped_websocket_error_event(wrapped_error, payload)
|
||||
.expect("expected websocket error payload to map to ApiError");
|
||||
|
||||
let ApiError::Transport(TransportError::Http {
|
||||
|
|
@ -699,7 +712,7 @@ mod tests {
|
|||
|
||||
let wrapped_error = parse_wrapped_websocket_error_event(&payload)
|
||||
.expect("expected websocket error payload to be parsed");
|
||||
let api_error = map_wrapped_websocket_error_event(wrapped_error)
|
||||
let api_error = map_wrapped_websocket_error_event(wrapped_error, payload)
|
||||
.expect("expected websocket error payload to map to ApiError");
|
||||
let ApiError::Transport(TransportError::Http { status, body, .. }) = api_error else {
|
||||
panic!("expected ApiError::Transport(Http)");
|
||||
|
|
@ -710,6 +723,30 @@ mod tests {
|
|||
assert!(body.contains("Model does not support image inputs"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_wrapped_websocket_error_event_with_connection_limit_maps_retryable() {
|
||||
let payload = json!({
|
||||
"type": "error",
|
||||
"status": 400,
|
||||
"error": {
|
||||
"type": "invalid_request_error",
|
||||
"code": "websocket_connection_limit_reached",
|
||||
"message": "Responses websocket connection limit reached (60 minutes). Create a new websocket connection to continue."
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let wrapped_error = parse_wrapped_websocket_error_event(&payload)
|
||||
.expect("expected websocket error payload to be parsed");
|
||||
let api_error = map_wrapped_websocket_error_event(wrapped_error, payload)
|
||||
.expect("expected websocket error payload to map to ApiError");
|
||||
let ApiError::Retryable { message, delay } = api_error else {
|
||||
panic!("expected ApiError::Retryable");
|
||||
};
|
||||
assert_eq!(message, WEBSOCKET_CONNECTION_LIMIT_REACHED_MESSAGE);
|
||||
assert_eq!(delay, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_wrapped_websocket_error_event_without_status_is_not_mapped() {
|
||||
let payload = json!({
|
||||
|
|
@ -727,7 +764,7 @@ mod tests {
|
|||
|
||||
let wrapped_error = parse_wrapped_websocket_error_event(&payload)
|
||||
.expect("expected websocket error payload to be parsed");
|
||||
let api_error = map_wrapped_websocket_error_event(wrapped_error);
|
||||
let api_error = map_wrapped_websocket_error_event(wrapped_error, payload);
|
||||
assert!(api_error.is_none());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -923,6 +923,44 @@ async fn responses_websocket_invalid_request_error_with_status_is_forwarded() {
|
|||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_connection_limit_error_reconnects_and_completes() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let websocket_connection_limit_error = json!({
|
||||
"type": "error",
|
||||
"status": 400,
|
||||
"error": {
|
||||
"type": "invalid_request_error",
|
||||
"code": "websocket_connection_limit_reached",
|
||||
"message": "Responses websocket connection limit reached (60 minutes). Create a new websocket connection to continue."
|
||||
}
|
||||
});
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
vec![vec![websocket_connection_limit_error]],
|
||||
vec![vec![ev_response_created("resp-1"), ev_completed("resp-1")]],
|
||||
])
|
||||
.await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.model_provider.stream_max_retries = Some(1);
|
||||
});
|
||||
let test = builder
|
||||
.build_with_websocket_server(&server)
|
||||
.await
|
||||
.expect("build websocket codex");
|
||||
|
||||
test.submit_turn("hello")
|
||||
.await
|
||||
.expect("submission should reconnect after websocket connection limit error");
|
||||
|
||||
let total_websocket_requests: usize = server.connections().iter().map(Vec::len).sum();
|
||||
assert_eq!(total_websocket_requests, 2);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_appends_on_prefix() {
|
||||
skip_if_no_network!();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue