Remove Responses V1 websocket implementation (#13364)
V2 is the way to go!
This commit is contained in:
parent
8da7e4bdae
commit
69df12efb3
15 changed files with 133 additions and 353 deletions
|
|
@ -67,8 +67,6 @@ pub enum ResponseEvent {
|
|||
Completed {
|
||||
response_id: String,
|
||||
token_usage: Option<TokenUsage>,
|
||||
/// Whether the client can append more items to a long-running websocket response.
|
||||
can_append: bool,
|
||||
},
|
||||
OutputTextDelta(String),
|
||||
ReasoningSummaryDelta {
|
||||
|
|
@ -211,20 +209,12 @@ pub struct ResponseCreateWsRequest {
|
|||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ResponseAppendWsRequest {
|
||||
pub input: Vec<ResponseItem>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
}
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub enum ResponsesWsRequest {
|
||||
#[serde(rename = "response.create")]
|
||||
ResponseCreate(ResponseCreateWsRequest),
|
||||
#[serde(rename = "response.append")]
|
||||
ResponseAppend(ResponseAppendWsRequest),
|
||||
}
|
||||
|
||||
pub fn create_text_param_for_request(
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ pub use crate::common::MemorySummarizeInput;
|
|||
pub use crate::common::MemorySummarizeOutput;
|
||||
pub use crate::common::RawMemory;
|
||||
pub use crate::common::RawMemoryMetadata;
|
||||
pub use crate::common::ResponseAppendWsRequest;
|
||||
pub use crate::common::ResponseCreateWsRequest;
|
||||
pub use crate::common::ResponseEvent;
|
||||
pub use crate::common::ResponseStream;
|
||||
|
|
|
|||
|
|
@ -118,14 +118,6 @@ struct ResponseCompleted {
|
|||
usage: Option<ResponseCompletedUsage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResponseDone {
|
||||
#[serde(default)]
|
||||
id: Option<String>,
|
||||
#[serde(default)]
|
||||
usage: Option<ResponseCompletedUsage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ResponseCompletedUsage {
|
||||
input_tokens: i64,
|
||||
|
|
@ -324,7 +316,6 @@ pub fn process_responses_event(
|
|||
return Ok(Some(ResponseEvent::Completed {
|
||||
response_id: resp.id,
|
||||
token_usage: resp.usage.map(Into::into),
|
||||
can_append: false,
|
||||
}));
|
||||
}
|
||||
Err(err) => {
|
||||
|
|
@ -335,31 +326,6 @@ pub fn process_responses_event(
|
|||
}
|
||||
}
|
||||
}
|
||||
"response.done" => {
|
||||
if let Some(resp_val) = event.response {
|
||||
match serde_json::from_value::<ResponseDone>(resp_val) {
|
||||
Ok(resp) => {
|
||||
return Ok(Some(ResponseEvent::Completed {
|
||||
response_id: resp.id.unwrap_or_default(),
|
||||
token_usage: resp.usage.map(Into::into),
|
||||
can_append: true,
|
||||
}));
|
||||
}
|
||||
Err(err) => {
|
||||
let error = format!("failed to parse ResponseCompleted: {err}");
|
||||
debug!("{error}");
|
||||
return Err(ResponsesEventError::Api(ApiError::Stream(error)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("response.done missing response payload");
|
||||
return Ok(Some(ResponseEvent::Completed {
|
||||
response_id: String::new(),
|
||||
token_usage: None,
|
||||
can_append: true,
|
||||
}));
|
||||
}
|
||||
"response.output_item.added" => {
|
||||
if let Some(item_val) = event.item {
|
||||
if let Ok(item) = serde_json::from_value::<ResponseItem>(item_val) {
|
||||
|
|
@ -639,11 +605,9 @@ mod tests {
|
|||
Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
can_append,
|
||||
}) => {
|
||||
assert_eq!(response_id, "resp1");
|
||||
assert!(token_usage.is_none());
|
||||
assert!(!can_append);
|
||||
}
|
||||
other => panic!("unexpected third event: {other:?}"),
|
||||
}
|
||||
|
|
@ -677,69 +641,6 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn response_done_emits_incremental_completed() {
|
||||
let done = json!({
|
||||
"type": "response.done",
|
||||
"response": {
|
||||
"usage": {
|
||||
"input_tokens": 1,
|
||||
"input_tokens_details": null,
|
||||
"output_tokens": 2,
|
||||
"output_tokens_details": null,
|
||||
"total_tokens": 3
|
||||
}
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let sse1 = format!("event: response.done\ndata: {done}\n\n");
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes()]).await;
|
||||
|
||||
assert_eq!(events.len(), 1);
|
||||
|
||||
match &events[0] {
|
||||
Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
can_append,
|
||||
}) => {
|
||||
assert_eq!(response_id, "");
|
||||
assert!(token_usage.is_some());
|
||||
assert!(*can_append);
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn response_done_without_payload_emits_completed() {
|
||||
let done = json!({
|
||||
"type": "response.done"
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let sse1 = format!("event: response.done\ndata: {done}\n\n");
|
||||
|
||||
let events = collect_events(&[sse1.as_bytes()]).await;
|
||||
|
||||
assert_eq!(events.len(), 1);
|
||||
|
||||
match &events[0] {
|
||||
Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
can_append,
|
||||
}) => {
|
||||
assert_eq!(response_id, "");
|
||||
assert!(token_usage.is_none());
|
||||
assert!(*can_append);
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn emits_completed_without_stream_end() {
|
||||
let completed = json!({
|
||||
|
|
@ -770,11 +671,9 @@ mod tests {
|
|||
Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
can_append,
|
||||
}) => {
|
||||
assert_eq!(response_id, "resp1");
|
||||
assert!(token_usage.is_none());
|
||||
assert!(!can_append);
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
|
|
@ -996,8 +895,7 @@ mod tests {
|
|||
&events[1],
|
||||
ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage: None,
|
||||
can_append: false
|
||||
token_usage: None
|
||||
} if response_id == "resp-1"
|
||||
);
|
||||
}
|
||||
|
|
@ -1033,8 +931,7 @@ mod tests {
|
|||
&events[2],
|
||||
ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage: None,
|
||||
can_append: false
|
||||
token_usage: None
|
||||
} if response_id == "resp-1"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,11 +159,9 @@ async fn responses_stream_parses_items_and_completed_end_to_end() -> Result<()>
|
|||
ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
can_append,
|
||||
} => {
|
||||
assert_eq!(response_id, "resp1");
|
||||
assert!(token_usage.is_none());
|
||||
assert!(!can_append);
|
||||
}
|
||||
other => panic!("unexpected third event: {other:?}"),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,9 +20,8 @@
|
|||
//!
|
||||
//! ## Retry-Budget Tradeoff
|
||||
//!
|
||||
//! V2 request prewarm is treated as the first websocket connection attempt for a turn. If it
|
||||
//! fails, normal stream retry/fallback logic handles recovery on the same turn. V1 prewarm
|
||||
//! remains connection-only.
|
||||
//! WebSocket prewarm is treated as the first websocket connection attempt for a turn. If it
|
||||
//! fails, normal stream retry/fallback logic handles recovery on the same turn.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
|
@ -43,7 +42,6 @@ use codex_api::MemorySummarizeOutput as ApiMemorySummarizeOutput;
|
|||
use codex_api::RawMemory as ApiRawMemory;
|
||||
use codex_api::RequestTelemetry;
|
||||
use codex_api::ReqwestTransport;
|
||||
use codex_api::ResponseAppendWsRequest;
|
||||
use codex_api::ResponseCreateWsRequest;
|
||||
use codex_api::ResponsesApiRequest;
|
||||
use codex_api::ResponsesClient as ApiResponsesClient;
|
||||
|
|
@ -101,32 +99,19 @@ use crate::model_provider_info::WireApi;
|
|||
use crate::tools::spec::create_tools_json_for_responses_api;
|
||||
|
||||
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
pub const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04";
|
||||
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
|
||||
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
"x-responsesapi-include-timing-metrics";
|
||||
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ResponsesWebsocketVersion {
|
||||
V1,
|
||||
V2,
|
||||
}
|
||||
|
||||
pub fn ws_version_from_features(config: &Config) -> Option<ResponsesWebsocketVersion> {
|
||||
match (
|
||||
config
|
||||
pub fn ws_version_from_features(config: &Config) -> bool {
|
||||
config
|
||||
.features
|
||||
.enabled(crate::features::Feature::ResponsesWebsockets)
|
||||
|| config
|
||||
.features
|
||||
.enabled(crate::features::Feature::ResponsesWebsockets),
|
||||
config
|
||||
.features
|
||||
.enabled(crate::features::Feature::ResponsesWebsocketsV2),
|
||||
) {
|
||||
(_, true) => Some(ResponsesWebsocketVersion::V2),
|
||||
(true, false) => Some(ResponsesWebsocketVersion::V1),
|
||||
(false, false) => None,
|
||||
}
|
||||
.enabled(crate::features::Feature::ResponsesWebsocketsV2)
|
||||
}
|
||||
|
||||
/// Session-scoped state shared by all [`ModelClient`] clones.
|
||||
|
|
@ -140,7 +125,7 @@ struct ModelClientState {
|
|||
provider: ModelProviderInfo,
|
||||
session_source: SessionSource,
|
||||
model_verbosity: Option<VerbosityConfig>,
|
||||
responses_websocket_version: Option<ResponsesWebsocketVersion>,
|
||||
responses_websockets_enabled_by_feature: bool,
|
||||
enable_request_compression: bool,
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
|
|
@ -180,8 +165,8 @@ pub struct ModelClient {
|
|||
/// The session establishes a Responses WebSocket connection lazily and reuses it across multiple
|
||||
/// requests within the turn. It also caches per-turn state:
|
||||
///
|
||||
/// - The last full request, so subsequent calls can use `response.append` only when the current
|
||||
/// request is an incremental extension of the previous one.
|
||||
/// - The last full request, so subsequent calls can reuse incremental websocket request payloads
|
||||
/// only when the current request is an incremental extension of the previous one.
|
||||
/// - The `x-codex-turn-state` sticky-routing token, which must be replayed for all requests within
|
||||
/// the same turn.
|
||||
///
|
||||
|
|
@ -208,7 +193,6 @@ pub struct ModelClientSession {
|
|||
struct LastResponse {
|
||||
response_id: String,
|
||||
items_added: Vec<ResponseItem>,
|
||||
can_append: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
|
@ -235,7 +219,7 @@ impl ModelClient {
|
|||
provider: ModelProviderInfo,
|
||||
session_source: SessionSource,
|
||||
model_verbosity: Option<VerbosityConfig>,
|
||||
responses_websocket_version: Option<ResponsesWebsocketVersion>,
|
||||
responses_websockets_enabled_by_feature: bool,
|
||||
enable_request_compression: bool,
|
||||
include_timing_metrics: bool,
|
||||
beta_features_header: Option<String>,
|
||||
|
|
@ -247,7 +231,7 @@ impl ModelClient {
|
|||
provider,
|
||||
session_source,
|
||||
model_verbosity,
|
||||
responses_websocket_version,
|
||||
responses_websockets_enabled_by_feature,
|
||||
enable_request_compression,
|
||||
include_timing_metrics,
|
||||
beta_features_header,
|
||||
|
|
@ -391,25 +375,21 @@ impl ModelClient {
|
|||
request_telemetry
|
||||
}
|
||||
|
||||
/// Returns the active Responses-over-WebSocket version for this session.
|
||||
/// Returns whether the Responses-over-WebSocket transport is active for this session.
|
||||
///
|
||||
/// This combines provider capability and feature gating; both must be true for websocket paths
|
||||
/// to be eligible.
|
||||
///
|
||||
/// If websockets are only enabled via model preference (no explicit feature flag), prefer the
|
||||
/// current v2 behavior.
|
||||
pub fn active_ws_version(&self, model_info: &ModelInfo) -> Option<ResponsesWebsocketVersion> {
|
||||
pub fn responses_websocket_enabled(&self, model_info: &ModelInfo) -> bool {
|
||||
if !self.state.provider.supports_websockets
|
||||
|| self.state.disable_websockets.load(Ordering::Relaxed)
|
||||
{
|
||||
return None;
|
||||
return false;
|
||||
}
|
||||
|
||||
match self.state.responses_websocket_version {
|
||||
Some(version) => Some(version),
|
||||
None if model_info.prefer_websockets => Some(ResponsesWebsocketVersion::V2),
|
||||
None => None,
|
||||
}
|
||||
self.state.responses_websockets_enabled_by_feature || model_info.prefer_websockets
|
||||
}
|
||||
|
||||
/// Returns auth + provider configuration resolved from the current session auth state.
|
||||
|
|
@ -442,12 +422,10 @@ impl ModelClient {
|
|||
otel_manager: &OtelManager,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
ws_version: ResponsesWebsocketVersion,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
|
||||
let headers =
|
||||
self.build_websocket_headers(ws_version, turn_state.as_ref(), turn_metadata_header);
|
||||
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
|
||||
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(otel_manager);
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(
|
||||
|
|
@ -465,7 +443,6 @@ impl ModelClient {
|
|||
/// replayed on reconnect within the same turn.
|
||||
fn build_websocket_headers(
|
||||
&self,
|
||||
ws_version: ResponsesWebsocketVersion,
|
||||
turn_state: Option<&Arc<OnceLock<String>>>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> ApiHeaderMap {
|
||||
|
|
@ -478,13 +455,9 @@ impl ModelClient {
|
|||
headers.extend(build_conversation_headers(Some(
|
||||
self.state.conversation_id.to_string(),
|
||||
)));
|
||||
let responses_websockets_beta_header = match ws_version {
|
||||
ResponsesWebsocketVersion::V2 => RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE,
|
||||
ResponsesWebsocketVersion::V1 => OPENAI_BETA_RESPONSES_WEBSOCKETS,
|
||||
};
|
||||
headers.insert(
|
||||
OPENAI_BETA_HEADER,
|
||||
HeaderValue::from_static(responses_websockets_beta_header),
|
||||
HeaderValue::from_static(RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE),
|
||||
);
|
||||
if self.state.include_timing_metrics {
|
||||
headers.insert(
|
||||
|
|
@ -613,8 +586,9 @@ impl ModelClientSession {
|
|||
last_response: Option<&LastResponse>,
|
||||
allow_empty_delta: bool,
|
||||
) -> Option<Vec<ResponseItem>> {
|
||||
// Checks whether the current request is an incremental append to the previous request.
|
||||
// We only append when non-input request fields are unchanged and `input` is a strict
|
||||
// Checks whether the current request is an incremental extension of the previous request.
|
||||
// We only reuse an incremental input delta when non-input request fields are unchanged and
|
||||
// `input` is a strict
|
||||
// extension of the previous known input. Server-returned output items are treated as part
|
||||
// of the baseline so we do not resend them.
|
||||
let previous_request = self.websocket_session.last_request.as_ref()?;
|
||||
|
|
@ -659,42 +633,26 @@ impl ModelClientSession {
|
|||
&mut self,
|
||||
payload: ResponseCreateWsRequest,
|
||||
request: &ResponsesApiRequest,
|
||||
ws_version: ResponsesWebsocketVersion,
|
||||
) -> ResponsesWsRequest {
|
||||
let Some(last_response) = self.get_last_response() else {
|
||||
return ResponsesWsRequest::ResponseCreate(payload);
|
||||
};
|
||||
let allow_empty_delta = matches!(ws_version, ResponsesWebsocketVersion::V2);
|
||||
let Some(append_items) =
|
||||
self.get_incremental_items(request, Some(&last_response), allow_empty_delta)
|
||||
let Some(incremental_items) =
|
||||
self.get_incremental_items(request, Some(&last_response), true)
|
||||
else {
|
||||
return ResponsesWsRequest::ResponseCreate(payload);
|
||||
};
|
||||
|
||||
match ws_version {
|
||||
ResponsesWebsocketVersion::V2 => {
|
||||
if last_response.response_id.is_empty() {
|
||||
trace!("incremental request failed, no previous response id");
|
||||
return ResponsesWsRequest::ResponseCreate(payload);
|
||||
}
|
||||
|
||||
ResponsesWsRequest::ResponseCreate(ResponseCreateWsRequest {
|
||||
previous_response_id: Some(last_response.response_id),
|
||||
input: append_items,
|
||||
..payload
|
||||
})
|
||||
}
|
||||
ResponsesWebsocketVersion::V1 => {
|
||||
if !last_response.can_append {
|
||||
trace!("incremental request failed, can't append");
|
||||
return ResponsesWsRequest::ResponseCreate(payload);
|
||||
}
|
||||
ResponsesWsRequest::ResponseAppend(ResponseAppendWsRequest {
|
||||
input: append_items,
|
||||
client_metadata: payload.client_metadata,
|
||||
})
|
||||
}
|
||||
if last_response.response_id.is_empty() {
|
||||
trace!("incremental request failed, no previous response id");
|
||||
return ResponsesWsRequest::ResponseCreate(payload);
|
||||
}
|
||||
|
||||
ResponsesWsRequest::ResponseCreate(ResponseCreateWsRequest {
|
||||
previous_response_id: Some(last_response.response_id),
|
||||
input: incremental_items,
|
||||
..payload
|
||||
})
|
||||
}
|
||||
|
||||
/// Opportunistically preconnects a websocket for this turn-scoped client session.
|
||||
|
|
@ -705,9 +663,9 @@ impl ModelClientSession {
|
|||
otel_manager: &OtelManager,
|
||||
model_info: &ModelInfo,
|
||||
) -> std::result::Result<(), ApiError> {
|
||||
let Some(ws_version) = self.client.active_ws_version(model_info) else {
|
||||
if !self.client.responses_websocket_enabled(model_info) {
|
||||
return Ok(());
|
||||
};
|
||||
}
|
||||
if self.websocket_session.connection.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
|
@ -724,7 +682,6 @@ impl ModelClientSession {
|
|||
otel_manager,
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
ws_version,
|
||||
Some(Arc::clone(&self.turn_state)),
|
||||
None,
|
||||
)
|
||||
|
|
@ -738,7 +695,6 @@ impl ModelClientSession {
|
|||
otel_manager: &OtelManager,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
ws_version: ResponsesWebsocketVersion,
|
||||
turn_metadata_header: Option<&str>,
|
||||
options: &ApiResponsesOptions,
|
||||
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
|
||||
|
|
@ -760,7 +716,6 @@ impl ModelClientSession {
|
|||
otel_manager,
|
||||
api_provider,
|
||||
api_auth,
|
||||
ws_version,
|
||||
Some(turn_state),
|
||||
turn_metadata_header,
|
||||
)
|
||||
|
|
@ -862,7 +817,6 @@ impl ModelClientSession {
|
|||
&mut self,
|
||||
prompt: &Prompt,
|
||||
model_info: &ModelInfo,
|
||||
ws_version: ResponsesWebsocketVersion,
|
||||
otel_manager: &OtelManager,
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
|
|
@ -901,7 +855,6 @@ impl ModelClientSession {
|
|||
otel_manager,
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
ws_version,
|
||||
turn_metadata_header,
|
||||
&options,
|
||||
)
|
||||
|
|
@ -922,7 +875,7 @@ impl ModelClientSession {
|
|||
Err(err) => return Err(map_api_error(err)),
|
||||
}
|
||||
|
||||
let ws_request = self.prepare_websocket_request(ws_payload, &request, ws_version);
|
||||
let ws_request = self.prepare_websocket_request(ws_payload, &request);
|
||||
self.websocket_session.last_request = Some(request);
|
||||
let stream_result = self
|
||||
.websocket_session
|
||||
|
|
@ -971,17 +924,10 @@ impl ModelClientSession {
|
|||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) -> Result<()> {
|
||||
let Some(ws_version) = self.client.active_ws_version(model_info) else {
|
||||
return Ok(());
|
||||
};
|
||||
if self.websocket_session.last_request.is_some() {
|
||||
if !self.client.responses_websocket_enabled(model_info) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if matches!(ws_version, ResponsesWebsocketVersion::V1) {
|
||||
self.preconnect_websocket(otel_manager, model_info)
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
if self.websocket_session.last_request.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
|
@ -989,7 +935,6 @@ impl ModelClientSession {
|
|||
.stream_responses_websocket(
|
||||
prompt,
|
||||
model_info,
|
||||
ws_version,
|
||||
otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
|
|
@ -1038,12 +983,11 @@ impl ModelClientSession {
|
|||
let wire_api = self.client.state.provider.wire_api;
|
||||
match wire_api {
|
||||
WireApi::Responses => {
|
||||
if let Some(ws_version) = self.client.active_ws_version(model_info) {
|
||||
if self.client.responses_websocket_enabled(model_info) {
|
||||
match self
|
||||
.stream_responses_websocket(
|
||||
prompt,
|
||||
model_info,
|
||||
ws_version,
|
||||
otel_manager,
|
||||
effort,
|
||||
summary,
|
||||
|
|
@ -1085,7 +1029,7 @@ impl ModelClientSession {
|
|||
otel_manager: &OtelManager,
|
||||
model_info: &ModelInfo,
|
||||
) -> bool {
|
||||
let websocket_enabled = self.client.active_ws_version(model_info).is_some();
|
||||
let websocket_enabled = self.client.responses_websocket_enabled(model_info);
|
||||
let activated = self.activate_http_fallback(websocket_enabled);
|
||||
if activated {
|
||||
warn!("falling back to HTTP");
|
||||
|
|
@ -1183,7 +1127,6 @@ where
|
|||
Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
can_append,
|
||||
}) => {
|
||||
if let Some(usage) = &token_usage {
|
||||
otel_manager.sse_event_completed(
|
||||
|
|
@ -1198,14 +1141,12 @@ where
|
|||
let _ = sender.send(LastResponse {
|
||||
response_id: response_id.clone(),
|
||||
items_added: std::mem::take(&mut items_added),
|
||||
can_append,
|
||||
});
|
||||
}
|
||||
if tx_event
|
||||
.send(Ok(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
can_append,
|
||||
}))
|
||||
.await
|
||||
.is_err()
|
||||
|
|
@ -1335,7 +1276,7 @@ mod tests {
|
|||
provider,
|
||||
session_source,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
|
|
|
|||
|
|
@ -5572,12 +5572,10 @@ async fn run_sampling_request(
|
|||
// transient reconnect messages. In debug builds, keep full visibility for diagnosis.
|
||||
let report_error = retries > 1
|
||||
|| cfg!(debug_assertions)
|
||||
|| sess
|
||||
|| !sess
|
||||
.services
|
||||
.model_client
|
||||
.active_ws_version(&turn_context.model_info)
|
||||
.is_none();
|
||||
|
||||
.responses_websocket_enabled(&turn_context.model_info);
|
||||
if report_error {
|
||||
// Surface retry information to any UI/front‑end so the
|
||||
// user understands what is happening instead of staring
|
||||
|
|
@ -6418,7 +6416,6 @@ async fn try_run_sampling_request(
|
|||
ResponseEvent::Completed {
|
||||
response_id: _,
|
||||
token_usage,
|
||||
can_append: _,
|
||||
} => {
|
||||
flush_assistant_text_segments_all(
|
||||
&sess,
|
||||
|
|
@ -9194,7 +9191,7 @@ mod tests {
|
|||
handlers::run_user_shell_command(&session, "sub-id".to_string(), "echo shell".to_string())
|
||||
.await;
|
||||
|
||||
let deadline = StdDuration::from_secs(5);
|
||||
let deadline = StdDuration::from_secs(15);
|
||||
let start = std::time::Instant::now();
|
||||
loop {
|
||||
let remaining = deadline.saturating_sub(start.elapsed());
|
||||
|
|
|
|||
|
|
@ -109,7 +109,8 @@ async fn run_compact_task_inner(
|
|||
let max_retries = turn_context.provider.stream_max_retries();
|
||||
let mut retries = 0;
|
||||
let mut client_session = sess.services.model_client.new_session();
|
||||
// Reuse one client session so turn-scoped state (sticky routing, websocket append tracking)
|
||||
// Reuse one client session so turn-scoped state (sticky routing, websocket incremental
|
||||
// request tracking)
|
||||
// survives retries within this compact turn.
|
||||
|
||||
loop {
|
||||
|
|
|
|||
|
|
@ -147,7 +147,6 @@ pub(crate) use codex_shell_command::powershell;
|
|||
|
||||
pub use client::ModelClient;
|
||||
pub use client::ModelClientSession;
|
||||
pub use client::ResponsesWebsocketVersion;
|
||||
pub use client::X_CODEX_TURN_METADATA_HEADER;
|
||||
pub use client::ws_version_from_features;
|
||||
pub use client_common::Prompt;
|
||||
|
|
|
|||
|
|
@ -582,25 +582,6 @@ pub fn ev_completed(id: &str) -> Value {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn ev_done() -> Value {
|
||||
serde_json::json!({
|
||||
"type": "response.done",
|
||||
"response": {
|
||||
"usage": {"input_tokens":0,"input_tokens_details":null,"output_tokens":0,"output_tokens_details":null,"total_tokens":0}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ev_done_with_id(id: &str) -> Value {
|
||||
serde_json::json!({
|
||||
"type": "response.done",
|
||||
"response": {
|
||||
"id": id,
|
||||
"usage": {"input_tokens":0,"input_tokens_details":null,"output_tokens":0,"output_tokens_details":null,"total_tokens":0}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Convenience: SSE event for a created response with a specific id.
|
||||
pub fn ev_response_created(id: &str) -> Value {
|
||||
serde_json::json!({
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ use codex_core::ModelClient;
|
|||
use codex_core::ModelProviderInfo;
|
||||
use codex_core::Prompt;
|
||||
use codex_core::ResponseEvent;
|
||||
use codex_core::ResponsesWebsocketVersion;
|
||||
use codex_core::WireApi;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
|
|
@ -92,7 +91,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
|||
provider.clone(),
|
||||
session_source,
|
||||
config.model_verbosity,
|
||||
None::<ResponsesWebsocketVersion>,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
|
|
@ -205,7 +204,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
|||
provider.clone(),
|
||||
session_source,
|
||||
config.model_verbosity,
|
||||
None::<ResponsesWebsocketVersion>,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
|
|
@ -317,7 +316,7 @@ async fn responses_respects_model_info_overrides_from_config() {
|
|||
provider.clone(),
|
||||
session_source,
|
||||
config.model_verbosity,
|
||||
None::<ResponsesWebsocketVersion>,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@ use codex_protocol::config_types::ServiceTier;
|
|||
use core_test_support::responses::WebSocketConnectionConfig;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_done;
|
||||
use core_test_support::responses::ev_done_with_id;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::ev_shell_command_call;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
|
|
@ -27,7 +25,7 @@ async fn websocket_test_codex_shell_chain() -> Result<()> {
|
|||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_shell_command_call(call_id, "echo websocket"),
|
||||
ev_done(),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
vec![
|
||||
ev_response_created("resp-2"),
|
||||
|
|
@ -59,36 +57,30 @@ async fn websocket_test_codex_shell_chain() -> Result<()> {
|
|||
.body_json();
|
||||
|
||||
assert_eq!(first_turn["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(second_turn["type"].as_str(), Some("response.append"));
|
||||
assert_eq!(second_turn["type"].as_str(), Some("response.create"));
|
||||
|
||||
let append_items = second_turn
|
||||
let input_items = second_turn
|
||||
.get("input")
|
||||
.and_then(Value::as_array)
|
||||
.expect("response.append input array");
|
||||
assert!(!append_items.is_empty());
|
||||
|
||||
let output_item = append_items
|
||||
.iter()
|
||||
.find(|item| item.get("type").and_then(Value::as_str) == Some("function_call_output"))
|
||||
.expect("function_call_output in append");
|
||||
assert_eq!(
|
||||
output_item.get("call_id").and_then(Value::as_str),
|
||||
Some(call_id)
|
||||
);
|
||||
.expect("second response.create input array");
|
||||
assert!(!input_items.is_empty());
|
||||
|
||||
server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_first_turn_uses_preconnect_and_create() -> Result<()> {
|
||||
async fn websocket_first_turn_uses_startup_prewarm_and_create() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "hello"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "hello"),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
|
|
@ -101,11 +93,14 @@ async fn websocket_first_turn_uses_preconnect_and_create() -> Result<()> {
|
|||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
let connection = server.single_connection();
|
||||
assert_eq!(connection.len(), 1);
|
||||
let turn = connection
|
||||
assert_eq!(connection.len(), 2);
|
||||
let warmup = connection
|
||||
.first()
|
||||
.expect("missing turn request")
|
||||
.expect("missing warmup request")
|
||||
.body_json();
|
||||
let turn = connection.get(1).expect("missing turn request").body_json();
|
||||
assert_eq!(warmup["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(warmup["generate"].as_bool(), Some(false));
|
||||
assert!(
|
||||
turn["tools"]
|
||||
.as_array()
|
||||
|
|
@ -119,15 +114,18 @@ async fn websocket_first_turn_uses_preconnect_and_create() -> Result<()> {
|
|||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn websocket_first_turn_handles_handshake_delay_with_preconnect() -> Result<()> {
|
||||
async fn websocket_first_turn_handles_handshake_delay_with_startup_prewarm() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server_with_headers(vec![WebSocketConnectionConfig {
|
||||
requests: vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "hello"),
|
||||
ev_completed("resp-1"),
|
||||
]],
|
||||
requests: vec![
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "hello"),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
],
|
||||
response_headers: Vec::new(),
|
||||
// Delay handshake so turn processing must tolerate websocket startup latency.
|
||||
accept_delay: Some(Duration::from_millis(150)),
|
||||
|
|
@ -144,11 +142,14 @@ async fn websocket_first_turn_handles_handshake_delay_with_preconnect() -> Resul
|
|||
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
let connection = server.single_connection();
|
||||
assert_eq!(connection.len(), 1);
|
||||
let turn = connection
|
||||
assert_eq!(connection.len(), 2);
|
||||
let warmup = connection
|
||||
.first()
|
||||
.expect("missing turn request")
|
||||
.expect("missing warmup request")
|
||||
.body_json();
|
||||
let turn = connection.get(1).expect("missing turn request").body_json();
|
||||
assert_eq!(warmup["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(warmup["generate"].as_bool(), Some(false));
|
||||
assert!(
|
||||
turn["tools"]
|
||||
.as_array()
|
||||
|
|
@ -167,7 +168,7 @@ async fn websocket_v2_test_codex_shell_chain() -> Result<()> {
|
|||
|
||||
let call_id = "shell-command-call";
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")],
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_shell_command_call(call_id, "echo websocket"),
|
||||
|
|
@ -251,7 +252,7 @@ async fn websocket_v2_first_turn_uses_updated_fast_tier_after_startup_prewarm()
|
|||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")],
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "fast"),
|
||||
|
|
@ -300,7 +301,7 @@ async fn websocket_v2_first_turn_drops_fast_tier_after_startup_prewarm() -> Resu
|
|||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")],
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "standard"),
|
||||
|
|
@ -349,7 +350,7 @@ async fn websocket_v2_next_turn_uses_updated_service_tier() -> Result<()> {
|
|||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")],
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "fast"),
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ use codex_core::ModelProviderInfo;
|
|||
use codex_core::NewThread;
|
||||
use codex_core::Prompt;
|
||||
use codex_core::ResponseEvent;
|
||||
use codex_core::ResponsesWebsocketVersion;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::auth::AuthCredentialsStoreMode;
|
||||
|
|
@ -1643,7 +1642,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
|||
provider.clone(),
|
||||
SessionSource::Exec,
|
||||
config.model_verbosity,
|
||||
None::<ResponsesWebsocketVersion>,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
None,
|
||||
|
|
|
|||
|
|
@ -31,8 +31,6 @@ use core_test_support::responses::WebSocketConnectionConfig;
|
|||
use core_test_support::responses::WebSocketTestServer;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_done;
|
||||
use core_test_support::responses::ev_done_with_id;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::responses::start_websocket_server_with_headers;
|
||||
|
|
@ -50,7 +48,6 @@ use tracing_test::traced_test;
|
|||
|
||||
const MODEL: &str = "gpt-5.2-codex";
|
||||
const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
const OPENAI_BETA_RESPONSES_WEBSOCKETS: &str = "responses_websockets=2026-02-04";
|
||||
const WS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
|
||||
struct WebsocketTestHarness {
|
||||
|
|
@ -89,7 +86,7 @@ async fn responses_websocket_streams_request() {
|
|||
let handshake = server.single_handshake();
|
||||
assert_eq!(
|
||||
handshake.header(OPENAI_BETA_HEADER),
|
||||
Some(OPENAI_BETA_RESPONSES_WEBSOCKETS.to_string())
|
||||
Some(WS_V2_BETA_HEADER_VALUE.to_string())
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
|
|
@ -125,7 +122,7 @@ async fn responses_websocket_request_prewarm_reuses_connection() {
|
|||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")],
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
|
||||
]])
|
||||
.await;
|
||||
|
|
@ -244,7 +241,7 @@ async fn responses_websocket_request_prewarm_is_reused_even_with_header_changes(
|
|||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![ev_response_created("warm-1"), ev_done_with_id("warm-1")],
|
||||
vec![ev_response_created("warm-1"), ev_completed("warm-1")],
|
||||
vec![ev_response_created("resp-1"), ev_completed("resp-1")],
|
||||
]])
|
||||
.await;
|
||||
|
|
@ -344,13 +341,6 @@ async fn responses_websocket_prewarm_uses_v2_when_model_prefers_websockets_and_f
|
|||
.map(str::trim)
|
||||
.any(|value| value == WS_V2_BETA_HEADER_VALUE)
|
||||
);
|
||||
assert!(
|
||||
!openai_beta_header
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS)
|
||||
);
|
||||
|
||||
stream_until_complete(&mut client_session, &harness, &prompt).await;
|
||||
assert_eq!(server.handshakes().len(), 1);
|
||||
let connection = server.single_connection();
|
||||
|
|
@ -404,13 +394,6 @@ async fn responses_websocket_preconnect_runs_when_only_v2_feature_enabled() {
|
|||
.map(str::trim)
|
||||
.any(|value| value == WS_V2_BETA_HEADER_VALUE)
|
||||
);
|
||||
assert!(
|
||||
!openai_beta_header
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS)
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
|
|
@ -422,7 +405,7 @@ async fn responses_websocket_v2_requests_use_v2_when_model_prefers_websockets()
|
|||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "assistant output"),
|
||||
ev_done_with_id("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
|
|
@ -460,13 +443,6 @@ async fn responses_websocket_v2_requests_use_v2_when_model_prefers_websockets()
|
|||
.map(str::trim)
|
||||
.any(|value| value == WS_V2_BETA_HEADER_VALUE)
|
||||
);
|
||||
assert!(
|
||||
!openai_beta_header
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS)
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
|
|
@ -478,7 +454,7 @@ async fn responses_websocket_v2_incremental_requests_are_reused_across_turns() {
|
|||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "assistant output"),
|
||||
ev_done_with_id("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
|
|
@ -522,7 +498,7 @@ async fn responses_websocket_v2_wins_when_both_features_enabled() {
|
|||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "assistant output"),
|
||||
ev_done_with_id("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
|
|
@ -560,13 +536,6 @@ async fn responses_websocket_v2_wins_when_both_features_enabled() {
|
|||
.map(str::trim)
|
||||
.any(|value| value == WS_V2_BETA_HEADER_VALUE)
|
||||
);
|
||||
assert!(
|
||||
!openai_beta_header
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS)
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
|
|
@ -835,7 +804,14 @@ async fn responses_websocket_usage_limit_error_emits_rate_limit_event() {
|
|||
}
|
||||
});
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![usage_limit_error]]]).await;
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
ev_response_created("resp-prewarm"),
|
||||
ev_completed("resp-prewarm"),
|
||||
],
|
||||
vec![usage_limit_error],
|
||||
]])
|
||||
.await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
|
|
@ -913,7 +889,14 @@ async fn responses_websocket_invalid_request_error_with_status_is_forwarded() {
|
|||
}
|
||||
});
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![invalid_request_error]]]).await;
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
ev_response_created("resp-prewarm"),
|
||||
ev_completed("resp-prewarm"),
|
||||
],
|
||||
vec![invalid_request_error],
|
||||
]])
|
||||
.await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.model_provider.request_max_retries = Some(0);
|
||||
config.model_provider.stream_max_retries = Some(0);
|
||||
|
|
@ -990,14 +973,14 @@ async fn responses_websocket_connection_limit_error_reconnects_and_completes() {
|
|||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_appends_on_prefix() {
|
||||
async fn responses_websocket_uses_incremental_create_on_prefix() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "assistant output"),
|
||||
ev_done(),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
|
|
@ -1024,24 +1007,25 @@ async fn responses_websocket_appends_on_prefix() {
|
|||
assert_eq!(first["model"].as_str(), Some(MODEL));
|
||||
assert_eq!(first["stream"], serde_json::Value::Bool(true));
|
||||
assert_eq!(first["input"].as_array().map(Vec::len), Some(1));
|
||||
let expected_append = serde_json::json!({
|
||||
"type": "response.append",
|
||||
"input": serde_json::to_value(&prompt_two.input[2..]).expect("serialize append items"),
|
||||
});
|
||||
assert_eq!(second, expected_append);
|
||||
assert_eq!(second["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(second["previous_response_id"].as_str(), Some("resp-1"));
|
||||
assert_eq!(
|
||||
second["input"],
|
||||
serde_json::to_value(&prompt_two.input[2..]).expect("serialize incremental items")
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_forwards_turn_metadata_on_create_and_append() {
|
||||
async fn responses_websocket_forwards_turn_metadata_on_initial_and_incremental_create() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "assistant output"),
|
||||
ev_done(),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
|
|
@ -1085,7 +1069,8 @@ async fn responses_websocket_forwards_turn_metadata_on_create_and_append() {
|
|||
first["client_metadata"]["x-codex-turn-metadata"].as_str(),
|
||||
Some(first_turn_metadata)
|
||||
);
|
||||
assert_eq!(second["type"].as_str(), Some("response.append"));
|
||||
assert_eq!(second["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(second["previous_response_id"].as_str(), Some("resp-1"));
|
||||
assert_eq!(
|
||||
second["client_metadata"]["x-codex-turn-metadata"].as_str(),
|
||||
Some(enriched_turn_metadata)
|
||||
|
|
@ -1107,7 +1092,7 @@ async fn responses_websocket_forwards_turn_metadata_on_create_and_append() {
|
|||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_creates_on_prefix_when_previous_completion_cannot_append() {
|
||||
async fn responses_websocket_uses_previous_response_id_when_prefix_after_completed() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![
|
||||
|
|
@ -1137,9 +1122,10 @@ async fn responses_websocket_creates_on_prefix_when_previous_completion_cannot_a
|
|||
let second = connection.get(1).expect("missing request").body_json();
|
||||
|
||||
assert_eq!(second["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(second["previous_response_id"].as_str(), Some("resp-1"));
|
||||
assert_eq!(
|
||||
second["input"],
|
||||
serde_json::to_value(&prompt_two.input).expect("serialize full input")
|
||||
serde_json::to_value(&prompt_two.input[2..]).expect("serialize incremental input")
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
|
|
@ -1222,7 +1208,7 @@ async fn responses_websocket_v2_creates_with_previous_response_id_on_prefix() {
|
|||
vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_assistant_message("msg-1", "assistant output"),
|
||||
ev_done_with_id("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
],
|
||||
vec![ev_response_created("resp-2"), ev_completed("resp-2")],
|
||||
]])
|
||||
|
|
@ -1409,13 +1395,6 @@ async fn responses_websocket_v2_sets_openai_beta_header() {
|
|||
.map(str::trim)
|
||||
.any(|value| value == WS_V2_BETA_HEADER_VALUE)
|
||||
);
|
||||
assert!(
|
||||
!openai_beta_header
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.any(|value| value == OPENAI_BETA_RESPONSES_WEBSOCKETS)
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ use anyhow::Result;
|
|||
use core_test_support::responses::WebSocketConnectionConfig;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_done;
|
||||
use core_test_support::responses::ev_reasoning_item;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::ev_shell_command_call;
|
||||
|
|
@ -100,7 +99,7 @@ async fn websocket_turn_state_persists_within_turn_and_resets_after() -> Result<
|
|||
ev_response_created("resp-1"),
|
||||
ev_reasoning_item("rsn-1", &["thinking"], &[]),
|
||||
ev_shell_command_call(call_id, "echo websocket"),
|
||||
ev_done(),
|
||||
ev_completed("resp-1"),
|
||||
]],
|
||||
response_headers: vec![(TURN_STATE_HEADER.to_string(), "ts-1".to_string())],
|
||||
accept_delay: None,
|
||||
|
|
|
|||
|
|
@ -66,9 +66,9 @@ async fn websocket_fallback_switches_to_http_on_upgrade_required_connect() -> Re
|
|||
.filter(|req| req.method == Method::POST && req.url.path().ends_with("/responses"))
|
||||
.count();
|
||||
|
||||
// Startup prewarm now only preconnects for v1 (one websocket GET with no request body).
|
||||
// The first turn then attempts websocket once, sees 426, and falls back to HTTP.
|
||||
assert_eq!(websocket_attempts, 2);
|
||||
// The startup prewarm request sees 426 and immediately switches the session to HTTP fallback,
|
||||
// so the first turn goes straight to HTTP with no additional websocket connect attempt.
|
||||
assert_eq!(websocket_attempts, 1);
|
||||
assert_eq!(http_attempts, 1);
|
||||
assert_eq!(response_mock.requests().len(), 1);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue