diff --git a/codex-rs/codex-api/src/common.rs b/codex-rs/codex-api/src/common.rs index b6126840d..31b4dcdb4 100644 --- a/codex-rs/codex-api/src/common.rs +++ b/codex-rs/codex-api/src/common.rs @@ -67,8 +67,6 @@ pub enum ResponseEvent { Completed { response_id: String, token_usage: Option, - /// Whether the client can append more items to a long-running websocket response. - can_append: bool, }, OutputTextDelta(String), ReasoningSummaryDelta { @@ -211,20 +209,12 @@ pub struct ResponseCreateWsRequest { pub client_metadata: Option>, } -#[derive(Debug, Serialize)] -pub struct ResponseAppendWsRequest { - pub input: Vec, - #[serde(skip_serializing_if = "Option::is_none")] - pub client_metadata: Option>, -} #[derive(Debug, Serialize)] #[serde(tag = "type")] #[allow(clippy::large_enum_variant)] pub enum ResponsesWsRequest { #[serde(rename = "response.create")] ResponseCreate(ResponseCreateWsRequest), - #[serde(rename = "response.append")] - ResponseAppend(ResponseAppendWsRequest), } pub fn create_text_param_for_request( diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index b9152ec57..138929602 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -19,7 +19,6 @@ pub use crate::common::MemorySummarizeInput; pub use crate::common::MemorySummarizeOutput; pub use crate::common::RawMemory; pub use crate::common::RawMemoryMetadata; -pub use crate::common::ResponseAppendWsRequest; pub use crate::common::ResponseCreateWsRequest; pub use crate::common::ResponseEvent; pub use crate::common::ResponseStream; diff --git a/codex-rs/codex-api/src/sse/responses.rs b/codex-rs/codex-api/src/sse/responses.rs index 3c039380c..d01e1d918 100644 --- a/codex-rs/codex-api/src/sse/responses.rs +++ b/codex-rs/codex-api/src/sse/responses.rs @@ -118,14 +118,6 @@ struct ResponseCompleted { usage: Option, } -#[derive(Debug, Deserialize)] -struct ResponseDone { - #[serde(default)] - id: Option, - #[serde(default)] - usage: Option, -} - #[derive(Debug, Deserialize)] struct ResponseCompletedUsage { input_tokens: i64, @@ -324,7 +316,6 @@ pub fn process_responses_event( return Ok(Some(ResponseEvent::Completed { response_id: resp.id, token_usage: resp.usage.map(Into::into), - can_append: false, })); } Err(err) => { @@ -335,31 +326,6 @@ pub fn process_responses_event( } } } - "response.done" => { - if let Some(resp_val) = event.response { - match serde_json::from_value::(resp_val) { - Ok(resp) => { - return Ok(Some(ResponseEvent::Completed { - response_id: resp.id.unwrap_or_default(), - token_usage: resp.usage.map(Into::into), - can_append: true, - })); - } - Err(err) => { - let error = format!("failed to parse ResponseCompleted: {err}"); - debug!("{error}"); - return Err(ResponsesEventError::Api(ApiError::Stream(error))); - } - } - } - - debug!("response.done missing response payload"); - return Ok(Some(ResponseEvent::Completed { - response_id: String::new(), - token_usage: None, - can_append: true, - })); - } "response.output_item.added" => { if let Some(item_val) = event.item { if let Ok(item) = serde_json::from_value::(item_val) { @@ -639,11 +605,9 @@ mod tests { Ok(ResponseEvent::Completed { response_id, token_usage, - can_append, }) => { assert_eq!(response_id, "resp1"); assert!(token_usage.is_none()); - assert!(!can_append); } other => panic!("unexpected third event: {other:?}"), } @@ -677,69 +641,6 @@ mod tests { } } - #[tokio::test] - async fn response_done_emits_incremental_completed() { - let done = json!({ - "type": "response.done", - "response": { - "usage": { - "input_tokens": 1, - "input_tokens_details": null, - "output_tokens": 2, - "output_tokens_details": null, - "total_tokens": 3 - } - } - }) - .to_string(); - - let sse1 = format!("event: response.done\ndata: {done}\n\n"); - - let events = collect_events(&[sse1.as_bytes()]).await; - - assert_eq!(events.len(), 1); - - match &events[0] { - Ok(ResponseEvent::Completed { - response_id, - token_usage, - can_append, - }) => { - assert_eq!(response_id, ""); - assert!(token_usage.is_some()); - assert!(*can_append); - } - other => panic!("unexpected event: {other:?}"), - } - } - - #[tokio::test] - async fn response_done_without_payload_emits_completed() { - let done = json!({ - "type": "response.done" - }) - .to_string(); - - let sse1 = format!("event: response.done\ndata: {done}\n\n"); - - let events = collect_events(&[sse1.as_bytes()]).await; - - assert_eq!(events.len(), 1); - - match &events[0] { - Ok(ResponseEvent::Completed { - response_id, - token_usage, - can_append, - }) => { - assert_eq!(response_id, ""); - assert!(token_usage.is_none()); - assert!(*can_append); - } - other => panic!("unexpected event: {other:?}"), - } - } - #[tokio::test] async fn emits_completed_without_stream_end() { let completed = json!({ @@ -770,11 +671,9 @@ mod tests { Ok(ResponseEvent::Completed { response_id, token_usage, - can_append, }) => { assert_eq!(response_id, "resp1"); assert!(token_usage.is_none()); - assert!(!can_append); } other => panic!("unexpected event: {other:?}"), } @@ -996,8 +895,7 @@ mod tests { &events[1], ResponseEvent::Completed { response_id, - token_usage: None, - can_append: false + token_usage: None } if response_id == "resp-1" ); } @@ -1033,8 +931,7 @@ mod tests { &events[2], ResponseEvent::Completed { response_id, - token_usage: None, - can_append: false + token_usage: None } if response_id == "resp-1" ); } diff --git a/codex-rs/codex-api/tests/sse_end_to_end.rs b/codex-rs/codex-api/tests/sse_end_to_end.rs index ca840d1b9..f625fdcd9 100644 --- a/codex-rs/codex-api/tests/sse_end_to_end.rs +++ b/codex-rs/codex-api/tests/sse_end_to_end.rs @@ -159,11 +159,9 @@ async fn responses_stream_parses_items_and_completed_end_to_end() -> Result<()> ResponseEvent::Completed { response_id, token_usage, - can_append, } => { assert_eq!(response_id, "resp1"); assert!(token_usage.is_none()); - assert!(!can_append); } other => panic!("unexpected third event: {other:?}"), } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 534b1ec5e..9e5974b48 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -20,9 +20,8 @@ //! //! ## Retry-Budget Tradeoff //! -//! V2 request prewarm is treated as the first websocket connection attempt for a turn. If it -//! fails, normal stream retry/fallback logic handles recovery on the same turn. V1 prewarm -//! remains connection-only. +//! WebSocket prewarm is treated as the first websocket connection attempt for a turn. If it +//! fails, normal stream retry/fallback logic handles recovery on the same turn. use std::collections::HashMap; use std::sync::Arc; @@ -43,7 +42,6 @@ use codex_api::MemorySummarizeOutput as ApiMemorySummarizeOutput; use codex_api::RawMemory as ApiRawMemory; use codex_api::RequestTelemetry; use codex_api::ReqwestTransport; -use codex_api::ResponseAppendWsRequest; use codex_api::ResponseCreateWsRequest; use codex_api::ResponsesApiRequest; use codex_api::ResponsesClient as ApiResponsesClient; @@ -101,32 +99,19 @@ use crate::model_provider_info::WireApi; use crate::tools::spec::create_tools_json_for_responses_api; pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta"; -pub const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04"; pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state"; pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata"; pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str = "x-responsesapi-include-timing-metrics"; const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06"; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ResponsesWebsocketVersion { - V1, - V2, -} - -pub fn ws_version_from_features(config: &Config) -> Option { - match ( - config +pub fn ws_version_from_features(config: &Config) -> bool { + config + .features + .enabled(crate::features::Feature::ResponsesWebsockets) + || config .features - .enabled(crate::features::Feature::ResponsesWebsockets), - config - .features - .enabled(crate::features::Feature::ResponsesWebsocketsV2), - ) { - (_, true) => Some(ResponsesWebsocketVersion::V2), - (true, false) => Some(ResponsesWebsocketVersion::V1), - (false, false) => None, - } + .enabled(crate::features::Feature::ResponsesWebsocketsV2) } /// Session-scoped state shared by all [`ModelClient`] clones. @@ -140,7 +125,7 @@ struct ModelClientState { provider: ModelProviderInfo, session_source: SessionSource, model_verbosity: Option, - responses_websocket_version: Option, + responses_websockets_enabled_by_feature: bool, enable_request_compression: bool, include_timing_metrics: bool, beta_features_header: Option, @@ -180,8 +165,8 @@ pub struct ModelClient { /// The session establishes a Responses WebSocket connection lazily and reuses it across multiple /// requests within the turn. It also caches per-turn state: /// -/// - The last full request, so subsequent calls can use `response.append` only when the current -/// request is an incremental extension of the previous one. +/// - The last full request, so subsequent calls can reuse incremental websocket request payloads +/// only when the current request is an incremental extension of the previous one. /// - The `x-codex-turn-state` sticky-routing token, which must be replayed for all requests within /// the same turn. /// @@ -208,7 +193,6 @@ pub struct ModelClientSession { struct LastResponse { response_id: String, items_added: Vec, - can_append: bool, } #[derive(Debug, Default)] @@ -235,7 +219,7 @@ impl ModelClient { provider: ModelProviderInfo, session_source: SessionSource, model_verbosity: Option, - responses_websocket_version: Option, + responses_websockets_enabled_by_feature: bool, enable_request_compression: bool, include_timing_metrics: bool, beta_features_header: Option, @@ -247,7 +231,7 @@ impl ModelClient { provider, session_source, model_verbosity, - responses_websocket_version, + responses_websockets_enabled_by_feature, enable_request_compression, include_timing_metrics, beta_features_header, @@ -391,25 +375,21 @@ impl ModelClient { request_telemetry } - /// Returns the active Responses-over-WebSocket version for this session. + /// Returns whether the Responses-over-WebSocket transport is active for this session. /// /// This combines provider capability and feature gating; both must be true for websocket paths /// to be eligible. /// /// If websockets are only enabled via model preference (no explicit feature flag), prefer the /// current v2 behavior. - pub fn active_ws_version(&self, model_info: &ModelInfo) -> Option { + pub fn responses_websocket_enabled(&self, model_info: &ModelInfo) -> bool { if !self.state.provider.supports_websockets || self.state.disable_websockets.load(Ordering::Relaxed) { - return None; + return false; } - match self.state.responses_websocket_version { - Some(version) => Some(version), - None if model_info.prefer_websockets => Some(ResponsesWebsocketVersion::V2), - None => None, - } + self.state.responses_websockets_enabled_by_feature || model_info.prefer_websockets } /// Returns auth + provider configuration resolved from the current session auth state. @@ -442,12 +422,10 @@ impl ModelClient { otel_manager: &OtelManager, api_provider: codex_api::Provider, api_auth: CoreAuthProvider, - ws_version: ResponsesWebsocketVersion, turn_state: Option>>, turn_metadata_header: Option<&str>, ) -> std::result::Result { - let headers = - self.build_websocket_headers(ws_version, turn_state.as_ref(), turn_metadata_header); + let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header); let websocket_telemetry = ModelClientSession::build_websocket_telemetry(otel_manager); ApiWebSocketResponsesClient::new(api_provider, api_auth) .connect( @@ -465,7 +443,6 @@ impl ModelClient { /// replayed on reconnect within the same turn. fn build_websocket_headers( &self, - ws_version: ResponsesWebsocketVersion, turn_state: Option<&Arc>>, turn_metadata_header: Option<&str>, ) -> ApiHeaderMap { @@ -478,13 +455,9 @@ impl ModelClient { headers.extend(build_conversation_headers(Some( self.state.conversation_id.to_string(), ))); - let responses_websockets_beta_header = match ws_version { - ResponsesWebsocketVersion::V2 => RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE, - ResponsesWebsocketVersion::V1 => OPENAI_BETA_RESPONSES_WEBSOCKETS, - }; headers.insert( OPENAI_BETA_HEADER, - HeaderValue::from_static(responses_websockets_beta_header), + HeaderValue::from_static(RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE), ); if self.state.include_timing_metrics { headers.insert( @@ -613,8 +586,9 @@ impl ModelClientSession { last_response: Option<&LastResponse>, allow_empty_delta: bool, ) -> Option> { - // Checks whether the current request is an incremental append to the previous request. - // We only append when non-input request fields are unchanged and `input` is a strict + // Checks whether the current request is an incremental extension of the previous request. + // We only reuse an incremental input delta when non-input request fields are unchanged and + // `input` is a strict // extension of the previous known input. Server-returned output items are treated as part // of the baseline so we do not resend them. let previous_request = self.websocket_session.last_request.as_ref()?; @@ -659,42 +633,26 @@ impl ModelClientSession { &mut self, payload: ResponseCreateWsRequest, request: &ResponsesApiRequest, - ws_version: ResponsesWebsocketVersion, ) -> ResponsesWsRequest { let Some(last_response) = self.get_last_response() else { return ResponsesWsRequest::ResponseCreate(payload); }; - let allow_empty_delta = matches!(ws_version, ResponsesWebsocketVersion::V2); - let Some(append_items) = - self.get_incremental_items(request, Some(&last_response), allow_empty_delta) + let Some(incremental_items) = + self.get_incremental_items(request, Some(&last_response), true) else { return ResponsesWsRequest::ResponseCreate(payload); }; - match ws_version { - ResponsesWebsocketVersion::V2 => { - if last_response.response_id.is_empty() { - trace!("incremental request failed, no previous response id"); - return ResponsesWsRequest::ResponseCreate(payload); - } - - ResponsesWsRequest::ResponseCreate(ResponseCreateWsRequest { - previous_response_id: Some(last_response.response_id), - input: append_items, - ..payload - }) - } - ResponsesWebsocketVersion::V1 => { - if !last_response.can_append { - trace!("incremental request failed, can't append"); - return ResponsesWsRequest::ResponseCreate(payload); - } - ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest { - input: append_items, - client_metadata: payload.client_metadata, - }) - } + if last_response.response_id.is_empty() { + trace!("incremental request failed, no previous response id"); + return ResponsesWsRequest::ResponseCreate(payload); } + + ResponsesWsRequest::ResponseCreate(ResponseCreateWsRequest { + previous_response_id: Some(last_response.response_id), + input: incremental_items, + ..payload + }) } /// Opportunistically preconnects a websocket for this turn-scoped client session. @@ -705,9 +663,9 @@ impl ModelClientSession { otel_manager: &OtelManager, model_info: &ModelInfo, ) -> std::result::Result<(), ApiError> { - let Some(ws_version) = self.client.active_ws_version(model_info) else { + if !self.client.responses_websocket_enabled(model_info) { return Ok(()); - }; + } if self.websocket_session.connection.is_some() { return Ok(()); } @@ -724,7 +682,6 @@ impl ModelClientSession { otel_manager, client_setup.api_provider, client_setup.api_auth, - ws_version, Some(Arc::clone(&self.turn_state)), None, ) @@ -738,7 +695,6 @@ impl ModelClientSession { otel_manager: &OtelManager, api_provider: codex_api::Provider, api_auth: CoreAuthProvider, - ws_version: ResponsesWebsocketVersion, turn_metadata_header: Option<&str>, options: &ApiResponsesOptions, ) -> std::result::Result<&ApiWebSocketConnection, ApiError> { @@ -760,7 +716,6 @@ impl ModelClientSession { otel_manager, api_provider, api_auth, - ws_version, Some(turn_state), turn_metadata_header, ) @@ -862,7 +817,6 @@ impl ModelClientSession { &mut self, prompt: &Prompt, model_info: &ModelInfo, - ws_version: ResponsesWebsocketVersion, otel_manager: &OtelManager, effort: Option, summary: ReasoningSummaryConfig, @@ -901,7 +855,6 @@ impl ModelClientSession { otel_manager, client_setup.api_provider, client_setup.api_auth, - ws_version, turn_metadata_header, &options, ) @@ -922,7 +875,7 @@ impl ModelClientSession { Err(err) => return Err(map_api_error(err)), } - let ws_request = self.prepare_websocket_request(ws_payload, &request, ws_version); + let ws_request = self.prepare_websocket_request(ws_payload, &request); self.websocket_session.last_request = Some(request); let stream_result = self .websocket_session @@ -971,17 +924,10 @@ impl ModelClientSession { service_tier: Option, turn_metadata_header: Option<&str>, ) -> Result<()> { - let Some(ws_version) = self.client.active_ws_version(model_info) else { - return Ok(()); - }; - if self.websocket_session.last_request.is_some() { + if !self.client.responses_websocket_enabled(model_info) { return Ok(()); } - - if matches!(ws_version, ResponsesWebsocketVersion::V1) { - self.preconnect_websocket(otel_manager, model_info) - .await - .map_err(map_api_error)?; + if self.websocket_session.last_request.is_some() { return Ok(()); } @@ -989,7 +935,6 @@ impl ModelClientSession { .stream_responses_websocket( prompt, model_info, - ws_version, otel_manager, effort, summary, @@ -1038,12 +983,11 @@ impl ModelClientSession { let wire_api = self.client.state.provider.wire_api; match wire_api { WireApi::Responses => { - if let Some(ws_version) = self.client.active_ws_version(model_info) { + if self.client.responses_websocket_enabled(model_info) { match self .stream_responses_websocket( prompt, model_info, - ws_version, otel_manager, effort, summary, @@ -1085,7 +1029,7 @@ impl ModelClientSession { otel_manager: &OtelManager, model_info: &ModelInfo, ) -> bool { - let websocket_enabled = self.client.active_ws_version(model_info).is_some(); + let websocket_enabled = self.client.responses_websocket_enabled(model_info); let activated = self.activate_http_fallback(websocket_enabled); if activated { warn!("falling back to HTTP"); @@ -1183,7 +1127,6 @@ where Ok(ResponseEvent::Completed { response_id, token_usage, - can_append, }) => { if let Some(usage) = &token_usage { otel_manager.sse_event_completed( @@ -1198,14 +1141,12 @@ where let _ = sender.send(LastResponse { response_id: response_id.clone(), items_added: std::mem::take(&mut items_added), - can_append, }); } if tx_event .send(Ok(ResponseEvent::Completed { response_id, token_usage, - can_append, })) .await .is_err() @@ -1335,7 +1276,7 @@ mod tests { provider, session_source, None, - None, + false, false, false, None, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7b540aa4d..3a28a3e83 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -5572,12 +5572,10 @@ async fn run_sampling_request( // transient reconnect messages. In debug builds, keep full visibility for diagnosis. let report_error = retries > 1 || cfg!(debug_assertions) - || sess + || !sess .services .model_client - .active_ws_version(&turn_context.model_info) - .is_none(); - + .responses_websocket_enabled(&turn_context.model_info); if report_error { // Surface retry information to any UI/front‑end so the // user understands what is happening instead of staring @@ -6418,7 +6416,6 @@ async fn try_run_sampling_request( ResponseEvent::Completed { response_id: _, token_usage, - can_append: _, } => { flush_assistant_text_segments_all( &sess, @@ -9194,7 +9191,7 @@ mod tests { handlers::run_user_shell_command(&session, "sub-id".to_string(), "echo shell".to_string()) .await; - let deadline = StdDuration::from_secs(5); + let deadline = StdDuration::from_secs(15); let start = std::time::Instant::now(); loop { let remaining = deadline.saturating_sub(start.elapsed()); diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index baabf8a42..6c7dbb105 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -109,7 +109,8 @@ async fn run_compact_task_inner( let max_retries = turn_context.provider.stream_max_retries(); let mut retries = 0; let mut client_session = sess.services.model_client.new_session(); - // Reuse one client session so turn-scoped state (sticky routing, websocket append tracking) + // Reuse one client session so turn-scoped state (sticky routing, websocket incremental + // request tracking) // survives retries within this compact turn. loop { diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index ef988f1c5..828bbe214 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -147,7 +147,6 @@ pub(crate) use codex_shell_command::powershell; pub use client::ModelClient; pub use client::ModelClientSession; -pub use client::ResponsesWebsocketVersion; pub use client::X_CODEX_TURN_METADATA_HEADER; pub use client::ws_version_from_features; pub use client_common::Prompt; diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index c1a40e90f..4f2e89636 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -582,25 +582,6 @@ pub fn ev_completed(id: &str) -> Value { }) } -pub fn ev_done() -> Value { - serde_json::json!({ - "type": "response.done", - "response": { - "usage": {"input_tokens":0,"input_tokens_details":null,"output_tokens":0,"output_tokens_details":null,"total_tokens":0} - } - }) -} - -pub fn ev_done_with_id(id: &str) -> Value { - serde_json::json!({ - "type": "response.done", - "response": { - "id": id, - "usage": {"input_tokens":0,"input_tokens_details":null,"output_tokens":0,"output_tokens_details":null,"total_tokens":0} - } - }) -} - /// Convenience: SSE event for a created response with a specific id. pub fn ev_response_created(id: &str) -> Value { serde_json::json!({ diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index a46c45f23..be41a25a0 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -6,7 +6,6 @@ use codex_core::ModelClient; use codex_core::ModelProviderInfo; use codex_core::Prompt; use codex_core::ResponseEvent; -use codex_core::ResponsesWebsocketVersion; use codex_core::WireApi; use codex_otel::OtelManager; use codex_otel::TelemetryAuthMode; @@ -92,7 +91,7 @@ async fn responses_stream_includes_subagent_header_on_review() { provider.clone(), session_source, config.model_verbosity, - None::, + false, false, false, None, @@ -205,7 +204,7 @@ async fn responses_stream_includes_subagent_header_on_other() { provider.clone(), session_source, config.model_verbosity, - None::, + false, false, false, None, @@ -317,7 +316,7 @@ async fn responses_respects_model_info_overrides_from_config() { provider.clone(), session_source, config.model_verbosity, - None::, + false, false, false, None, diff --git a/codex-rs/core/tests/suite/agent_websocket.rs b/codex-rs/core/tests/suite/agent_websocket.rs index fdd9f316d..0db3cf61a 100644 --- a/codex-rs/core/tests/suite/agent_websocket.rs +++ b/codex-rs/core/tests/suite/agent_websocket.rs @@ -4,8 +4,6 @@ use codex_protocol::config_types::ServiceTier; use core_test_support::responses::WebSocketConnectionConfig; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; -use core_test_support::responses::ev_done; -use core_test_support::responses::ev_done_with_id; use core_test_support::responses::ev_response_created; use core_test_support::responses::ev_shell_command_call; use core_test_support::responses::start_websocket_server; @@ -27,7 +25,7 @@ async fn websocket_test_codex_shell_chain() -> Result<()> { vec![ ev_response_created("resp-1"), ev_shell_command_call(call_id, "echo websocket"), - ev_done(), + ev_completed("resp-1"), ], vec![ ev_response_created("resp-2"), @@ -59,36 +57,30 @@ async fn websocket_test_codex_shell_chain() -> Result<()> { .body_json(); assert_eq!(first_turn["type"].as_str(), Some("response.create")); - assert_eq!(second_turn["type"].as_str(), Some("response.append")); + assert_eq!(second_turn["type"].as_str(), Some("response.create")); - let append_items = second_turn + let input_items = second_turn .get("input") .and_then(Value::as_array) - .expect("response.append input array"); - assert!(!append_items.is_empty()); - - let output_item = append_items - .iter() - .find(|item| item.get("type").and_then(Value::as_str) == Some("function_call_output")) - .expect("function_call_output in append"); - assert_eq!( - output_item.get("call_id").and_then(Value::as_str), - Some(call_id) - ); + .expect("second response.create input array"); + assert!(!input_items.is_empty()); server.shutdown().await; Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn websocket_first_turn_uses_preconnect_and_create() -> Result<()> { +async fn websocket_first_turn_uses_startup_prewarm_and_create() -> Result<()> { skip_if_no_network!(Ok(())); - let server = start_websocket_server(vec![vec![vec![ - ev_response_created("resp-1"), - ev_assistant_message("msg-1", "hello"), - ev_completed("resp-1"), - ]]]) + let server = start_websocket_server(vec![vec![ + vec![ev_response_created("warm-1"), ev_completed("warm-1")], + vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "hello"), + ev_completed("resp-1"), + ], + ]]) .await; let mut builder = test_codex(); @@ -101,11 +93,14 @@ async fn websocket_first_turn_uses_preconnect_and_create() -> Result<()> { assert_eq!(server.handshakes().len(), 1); let connection = server.single_connection(); - assert_eq!(connection.len(), 1); - let turn = connection + assert_eq!(connection.len(), 2); + let warmup = connection .first() - .expect("missing turn request") + .expect("missing warmup request") .body_json(); + let turn = connection.get(1).expect("missing turn request").body_json(); + assert_eq!(warmup["type"].as_str(), Some("response.create")); + assert_eq!(warmup["generate"].as_bool(), Some(false)); assert!( turn["tools"] .as_array() @@ -119,15 +114,18 @@ async fn websocket_first_turn_uses_preconnect_and_create() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn websocket_first_turn_handles_handshake_delay_with_preconnect() -> Result<()> { +async fn websocket_first_turn_handles_handshake_delay_with_startup_prewarm() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig { - requests: vec![vec![ - ev_response_created("resp-1"), - ev_assistant_message("msg-1", "hello"), - ev_completed("resp-1"), - ]], + requests: vec![ + vec![ev_response_created("warm-1"), ev_completed("warm-1")], + vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "hello"), + ev_completed("resp-1"), + ], + ], response_headers: Vec::new(), // Delay handshake so turn processing must tolerate websocket startup latency. accept_delay: Some(Duration::from_millis(150)), @@ -144,11 +142,14 @@ async fn websocket_first_turn_handles_handshake_delay_with_preconnect() -> Resul assert_eq!(server.handshakes().len(), 1); let connection = server.single_connection(); - assert_eq!(connection.len(), 1); - let turn = connection + assert_eq!(connection.len(), 2); + let warmup = connection .first() - .expect("missing turn request") + .expect("missing warmup request") .body_json(); + let turn = connection.get(1).expect("missing turn request").body_json(); + assert_eq!(warmup["type"].as_str(), Some("response.create")); + assert_eq!(warmup["generate"].as_bool(), Some(false)); assert!( turn["tools"] .as_array() @@ -167,7 +168,7 @@ async fn websocket_v2_test_codex_shell_chain() -> Result<()> { let call_id = "shell-command-call"; let server = start_websocket_server(vec![vec![ - vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")], + vec![ev_response_created("warm-1"), ev_completed("warm-1")], vec![ ev_response_created("resp-1"), ev_shell_command_call(call_id, "echo websocket"), @@ -251,7 +252,7 @@ async fn websocket_v2_first_turn_uses_updated_fast_tier_after_startup_prewarm() skip_if_no_network!(Ok(())); let server = start_websocket_server(vec![vec![ - vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")], + vec![ev_response_created("warm-1"), ev_completed("warm-1")], vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "fast"), @@ -300,7 +301,7 @@ async fn websocket_v2_first_turn_drops_fast_tier_after_startup_prewarm() -> Resu skip_if_no_network!(Ok(())); let server = start_websocket_server(vec![vec![ - vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")], + vec![ev_response_created("warm-1"), ev_completed("warm-1")], vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "standard"), @@ -349,7 +350,7 @@ async fn websocket_v2_next_turn_uses_updated_service_tier() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_websocket_server(vec![vec![ - vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")], + vec![ev_response_created("warm-1"), ev_completed("warm-1")], vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "fast"), diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 8521c91eb..d4a8ce9f3 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -4,7 +4,6 @@ use codex_core::ModelProviderInfo; use codex_core::NewThread; use codex_core::Prompt; use codex_core::ResponseEvent; -use codex_core::ResponsesWebsocketVersion; use codex_core::ThreadManager; use codex_core::WireApi; use codex_core::auth::AuthCredentialsStoreMode; @@ -1643,7 +1642,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { provider.clone(), SessionSource::Exec, config.model_verbosity, - None::, + false, false, false, None, diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 2c2dc7c7c..b2098c039 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -31,8 +31,6 @@ use core_test_support::responses::WebSocketConnectionConfig; use core_test_support::responses::WebSocketTestServer; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; -use core_test_support::responses::ev_done; -use core_test_support::responses::ev_done_with_id; use core_test_support::responses::ev_response_created; use core_test_support::responses::start_websocket_server; use core_test_support::responses::start_websocket_server_with_headers; @@ -50,7 +48,6 @@ use tracing_test::traced_test; const MODEL: &str = "gpt-5.2-codex"; const OPENAI_BETA_HEADER: &str = "OpenAI-Beta"; -const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04"; const WS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06"; struct WebsocketTestHarness { @@ -89,7 +86,7 @@ async fn responses_websocket_streams_request() { let handshake = server.single_handshake(); assert_eq!( handshake.header(OPENAI_BETA_HEADER), - Some(OPENAI_BETA_RESPONSES_WEBSOCKETS.to_string()) + Some(WS_V2_BETA_HEADER_VALUE.to_string()) ); server.shutdown().await; @@ -125,7 +122,7 @@ async fn responses_websocket_request_prewarm_reuses_connection() { skip_if_no_network!(); let server = start_websocket_server(vec![vec![ - vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")], + vec![ev_response_created("warm-1"), ev_completed("warm-1")], vec![ev_response_created("resp-1"), ev_completed("resp-1")], ]]) .await; @@ -244,7 +241,7 @@ async fn responses_websocket_request_prewarm_is_reused_even_with_header_changes( skip_if_no_network!(); let server = start_websocket_server(vec![vec![ - vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")], + vec![ev_response_created("warm-1"), ev_completed("warm-1")], vec![ev_response_created("resp-1"), ev_completed("resp-1")], ]]) .await; @@ -344,13 +341,6 @@ async fn responses_websocket_prewarm_uses_v2_when_model_prefers_websockets_and_f .map(str::trim) .any(|value| value == WS_V2_BETA_HEADER_VALUE) ); - assert!( - !openai_beta_header - .split(',') - .map(str::trim) - .any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS) - ); - stream_until_complete(&mut client_session, &harness, &prompt).await; assert_eq!(server.handshakes().len(), 1); let connection = server.single_connection(); @@ -404,13 +394,6 @@ async fn responses_websocket_preconnect_runs_when_only_v2_feature_enabled() { .map(str::trim) .any(|value| value == WS_V2_BETA_HEADER_VALUE) ); - assert!( - !openai_beta_header - .split(',') - .map(str::trim) - .any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS) - ); - server.shutdown().await; } @@ -422,7 +405,7 @@ async fn responses_websocket_v2_requests_use_v2_when_model_prefers_websockets() vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_done_with_id("resp-1"), + ev_completed("resp-1"), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]]) @@ -460,13 +443,6 @@ async fn responses_websocket_v2_requests_use_v2_when_model_prefers_websockets() .map(str::trim) .any(|value| value == WS_V2_BETA_HEADER_VALUE) ); - assert!( - !openai_beta_header - .split(',') - .map(str::trim) - .any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS) - ); - server.shutdown().await; } @@ -478,7 +454,7 @@ async fn responses_websocket_v2_incremental_requests_are_reused_across_turns() { vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_done_with_id("resp-1"), + ev_completed("resp-1"), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]]) @@ -522,7 +498,7 @@ async fn responses_websocket_v2_wins_when_both_features_enabled() { vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_done_with_id("resp-1"), + ev_completed("resp-1"), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]]) @@ -560,13 +536,6 @@ async fn responses_websocket_v2_wins_when_both_features_enabled() { .map(str::trim) .any(|value| value == WS_V2_BETA_HEADER_VALUE) ); - assert!( - !openai_beta_header - .split(',') - .map(str::trim) - .any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS) - ); - server.shutdown().await; } @@ -835,7 +804,14 @@ async fn responses_websocket_usage_limit_error_emits_rate_limit_event() { } }); - let server = start_websocket_server(vec![vec![vec![usage_limit_error]]]).await; + let server = start_websocket_server(vec![vec![ + vec![ + ev_response_created("resp-prewarm"), + ev_completed("resp-prewarm"), + ], + vec![usage_limit_error], + ]]) + .await; let mut builder = test_codex().with_config(|config| { config.model_provider.request_max_retries = Some(0); config.model_provider.stream_max_retries = Some(0); @@ -913,7 +889,14 @@ async fn responses_websocket_invalid_request_error_with_status_is_forwarded() { } }); - let server = start_websocket_server(vec![vec![vec![invalid_request_error]]]).await; + let server = start_websocket_server(vec![vec![ + vec![ + ev_response_created("resp-prewarm"), + ev_completed("resp-prewarm"), + ], + vec![invalid_request_error], + ]]) + .await; let mut builder = test_codex().with_config(|config| { config.model_provider.request_max_retries = Some(0); config.model_provider.stream_max_retries = Some(0); @@ -990,14 +973,14 @@ async fn responses_websocket_connection_limit_error_reconnects_and_completes() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn responses_websocket_appends_on_prefix() { +async fn responses_websocket_uses_incremental_create_on_prefix() { skip_if_no_network!(); let server = start_websocket_server(vec![vec![ vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_done(), + ev_completed("resp-1"), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]]) @@ -1024,24 +1007,25 @@ async fn responses_websocket_appends_on_prefix() { assert_eq!(first["model"].as_str(), Some(MODEL)); assert_eq!(first["stream"], serde_json::Value::Bool(true)); assert_eq!(first["input"].as_array().map(Vec::len), Some(1)); - let expected_append = serde_json::json!({ - "type": "response.append", - "input": serde_json::to_value(&prompt_two.input[2..]).expect("serialize append items"), - }); - assert_eq!(second, expected_append); + assert_eq!(second["type"].as_str(), Some("response.create")); + assert_eq!(second["previous_response_id"].as_str(), Some("resp-1")); + assert_eq!( + second["input"], + serde_json::to_value(&prompt_two.input[2..]).expect("serialize incremental items") + ); server.shutdown().await; } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn responses_websocket_forwards_turn_metadata_on_create_and_append() { +async fn responses_websocket_forwards_turn_metadata_on_initial_and_incremental_create() { skip_if_no_network!(); let server = start_websocket_server(vec![vec![ vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_done(), + ev_completed("resp-1"), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]]) @@ -1085,7 +1069,8 @@ async fn responses_websocket_forwards_turn_metadata_on_create_and_append() { first["client_metadata"]["x-codex-turn-metadata"].as_str(), Some(first_turn_metadata) ); - assert_eq!(second["type"].as_str(), Some("response.append")); + assert_eq!(second["type"].as_str(), Some("response.create")); + assert_eq!(second["previous_response_id"].as_str(), Some("resp-1")); assert_eq!( second["client_metadata"]["x-codex-turn-metadata"].as_str(), Some(enriched_turn_metadata) @@ -1107,7 +1092,7 @@ async fn responses_websocket_forwards_turn_metadata_on_create_and_append() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn responses_websocket_creates_on_prefix_when_previous_completion_cannot_append() { +async fn responses_websocket_uses_previous_response_id_when_prefix_after_completed() { skip_if_no_network!(); let server = start_websocket_server(vec![vec![ @@ -1137,9 +1122,10 @@ async fn responses_websocket_creates_on_prefix_when_previous_completion_cannot_a let second = connection.get(1).expect("missing request").body_json(); assert_eq!(second["type"].as_str(), Some("response.create")); + assert_eq!(second["previous_response_id"].as_str(), Some("resp-1")); assert_eq!( second["input"], - serde_json::to_value(&prompt_two.input).expect("serialize full input") + serde_json::to_value(&prompt_two.input[2..]).expect("serialize incremental input") ); server.shutdown().await; @@ -1222,7 +1208,7 @@ async fn responses_websocket_v2_creates_with_previous_response_id_on_prefix() { vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_done_with_id("resp-1"), + ev_completed("resp-1"), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]]) @@ -1409,13 +1395,6 @@ async fn responses_websocket_v2_sets_openai_beta_header() { .map(str::trim) .any(|value| value == WS_V2_BETA_HEADER_VALUE) ); - assert!( - !openai_beta_header - .split(',') - .map(str::trim) - .any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS) - ); - server.shutdown().await; } diff --git a/codex-rs/core/tests/suite/turn_state.rs b/codex-rs/core/tests/suite/turn_state.rs index 62bd97439..c068cfafb 100644 --- a/codex-rs/core/tests/suite/turn_state.rs +++ b/codex-rs/core/tests/suite/turn_state.rs @@ -4,7 +4,6 @@ use anyhow::Result; use core_test_support::responses::WebSocketConnectionConfig; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; -use core_test_support::responses::ev_done; use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::ev_response_created; use core_test_support::responses::ev_shell_command_call; @@ -100,7 +99,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result< ev_response_created("resp-1"), ev_reasoning_item("rsn-1", &["thinking"], &[]), ev_shell_command_call(call_id, "echo websocket"), - ev_done(), + ev_completed("resp-1"), ]], response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())], accept_delay: None, diff --git a/codex-rs/core/tests/suite/websocket_fallback.rs b/codex-rs/core/tests/suite/websocket_fallback.rs index 5be4bc910..fadd427c1 100644 --- a/codex-rs/core/tests/suite/websocket_fallback.rs +++ b/codex-rs/core/tests/suite/websocket_fallback.rs @@ -66,9 +66,9 @@ async fn websocket_fallback_switches_to_http_on_upgrade_required_connect() -> Re .filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses")) .count(); - // Startup prewarm now only preconnects for v1 (one websocket GET with no request body). - // The first turn then attempts websocket once, sees 426, and falls back to HTTP. - assert_eq!(websocket_attempts, 2); + // The startup prewarm request sees 426 and immediately switches the session to HTTP fallback, + // so the first turn goes straight to HTTP with no additional websocket connect attempt. + assert_eq!(websocket_attempts, 1); assert_eq!(http_attempts, 1); assert_eq!(response_mock.requests().len(), 1);