From 0e8d359da900512129750cd6cce0a7799ddea28e Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 4 Feb 2026 16:58:48 -0800 Subject: [PATCH] Session-level model client (#10664) Make ModelClient a session-scoped object. Move state that is session level onto the client, and make state that is per-turn explicit on corresponding methods. Stop taking a huge Config object, instead only pass in values that are actually needed. --------- Co-authored-by: Josh McKinney --- codex-rs/core/src/client.rs | 473 +++++++++++------- codex-rs/core/src/codex.rs | 138 +++-- codex-rs/core/src/compact.rs | 35 +- codex-rs/core/src/compact_remote.rs | 11 +- codex-rs/core/src/lib.rs | 2 - codex-rs/core/src/memory_trace.rs | 13 +- codex-rs/core/src/state/service.rs | 5 +- codex-rs/core/src/transport_manager.rs | 22 - codex-rs/core/tests/responses_headers.rs | 100 ++-- codex-rs/core/tests/suite/client.rs | 32 +- .../core/tests/suite/client_websockets.rs | 101 ++-- .../core/tests/suite/stream_no_completed.rs | 2 +- 12 files changed, 601 insertions(+), 333 deletions(-) delete mode 100644 codex-rs/core/src/transport_manager.rs diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 0a0f702b1..570cdc830 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -1,5 +1,21 @@ +//! Session- and turn-scoped helpers for talking to model provider APIs. +//! +//! `ModelClient` is intended to live for the lifetime of a Codex session and holds the stable +//! configuration and state needed to talk to a provider (auth, provider selection, conversation id, +//! and feature-gated request behavior). +//! +//! Per-turn settings (model selection, reasoning controls, telemetry context, web search +//! eligibility, and turn metadata) are passed explicitly to streaming and unary methods so that the +//! turn lifetime is visible at the call site. +//! +//! A [`ModelClientSession`] is created per turn and is used to stream one or more Responses API +//! requests during that turn. It caches a Responses WebSocket connection (opened lazily) and +//! stores per-turn state such as the `x-codex-turn-state` token used for sticky routing. + use std::sync::Arc; use std::sync::OnceLock; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use crate::api_bridge::CoreAuthProvider; use crate::api_bridge::auth_provider_from_auth; @@ -33,7 +49,7 @@ use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; -use codex_protocol::config_types::WebSearchMode; +use codex_protocol::config_types::Verbosity as VerbosityConfig; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; @@ -58,17 +74,13 @@ use crate::auth::RefreshTokenError; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; use crate::client_common::ResponseStream; -use crate::config::Config; use crate::default_client::build_reqwest_client; use crate::error::CodexErr; use crate::error::Result; -use crate::features::FEATURES; -use crate::features::Feature; use crate::flags::CODEX_RS_SSE_FIXTURE; use crate::model_provider_info::ModelProviderInfo; use crate::model_provider_info::WireApi; use crate::tools::spec::create_tools_json_for_responses_api; -use crate::transport_manager::TransportManager; pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible"; pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state"; @@ -76,31 +88,60 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata"; pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str = "x-responsesapi-include-timing-metrics"; +/// Session-scoped state shared by all [`ModelClient`] clones. +/// +/// This is intentionally kept minimal so `ModelClient` does not need to hold a full `Config`. Most +/// configuration is per turn and is passed explicitly to streaming/unary methods. #[derive(Debug)] struct ModelClientState { - config: Arc, auth_manager: Option>, - model_info: ModelInfo, - otel_manager: OtelManager, - provider: ModelProviderInfo, conversation_id: ThreadId, - effort: Option, - summary: ReasoningSummaryConfig, + provider: ModelProviderInfo, session_source: SessionSource, - transport_manager: TransportManager, + model_verbosity: Option, + enable_responses_websockets: bool, + enable_request_compression: bool, + include_timing_metrics: bool, + beta_features_header: Option, + disable_websockets: AtomicBool, } +/// A session-scoped client for model-provider API calls. +/// +/// This holds configuration and state that should be shared across turns within a Codex session +/// (auth, provider selection, conversation id, feature-gated request behavior, and transport +/// fallback state). +/// +/// WebSocket fallback is session-scoped: once a turn activates the HTTP fallback, subsequent turns +/// will also use HTTP for the remainder of the session. +/// +/// Turn-scoped settings (model selection, reasoning controls, telemetry context, web search +/// eligibility, and turn metadata) are passed explicitly to the relevant methods to keep turn +/// lifetime visible at the call site. +/// +/// This type is cheap to clone. #[derive(Debug, Clone)] pub struct ModelClient { state: Arc, } +/// A turn-scoped streaming session created from a [`ModelClient`]. +/// +/// The session lazily establishes a Responses WebSocket connection (and reuses it across multiple +/// requests) and caches per-turn state: +/// +/// - The last request's input items, so subsequent calls can use `response.append` when the input +/// is an incremental extension of the previous request. +/// - The `x-codex-turn-state` sticky-routing token, which must be replayed for all requests within +/// the same turn. +/// +/// Create a fresh `ModelClientSession` for each Codex turn. Reusing it across turns would replay +/// the previous turn's sticky-routing token into the next turn, which violates the client/server +/// contract and can cause routing bugs. pub struct ModelClientSession { - state: Arc, + client: ModelClient, connection: Option, websocket_last_items: Vec, - transport_manager: TransportManager, - turn_metadata_header: Option, /// Turn state for sticky routing. /// /// This is an `OnceLock` that stores the turn state value received from the server @@ -114,54 +155,65 @@ pub struct ModelClientSession { turn_state: Arc>, } -#[allow(clippy::too_many_arguments)] impl ModelClient { + #[allow(clippy::too_many_arguments)] + /// Creates a new session-scoped `ModelClient`. + /// + /// All arguments are expected to be stable for the lifetime of a Codex session. Per-turn values + /// are passed to [`ModelClientSession::stream`] (and other turn-scoped methods) explicitly. pub fn new( - config: Arc, auth_manager: Option>, - model_info: ModelInfo, - otel_manager: OtelManager, - provider: ModelProviderInfo, - effort: Option, - summary: ReasoningSummaryConfig, conversation_id: ThreadId, + provider: ModelProviderInfo, session_source: SessionSource, - transport_manager: TransportManager, + model_verbosity: Option, + enable_responses_websockets: bool, + enable_request_compression: bool, + include_timing_metrics: bool, + beta_features_header: Option, ) -> Self { Self { state: Arc::new(ModelClientState { - config, auth_manager, - model_info, - otel_manager, - provider, conversation_id, - effort, - summary, + provider, session_source, - transport_manager, + model_verbosity, + enable_responses_websockets, + enable_request_compression, + include_timing_metrics, + beta_features_header, + disable_websockets: AtomicBool::new(false), }), } } - pub fn new_session(&self, turn_metadata_header: Option) -> ModelClientSession { + /// Creates a fresh turn-scoped streaming session. + /// + /// This does not open any network connections; the WebSocket connection is established lazily + /// when the first WebSocket stream request is issued. + pub fn new_session(&self) -> ModelClientSession { ModelClientSession { - state: Arc::clone(&self.state), + client: self.clone(), connection: None, websocket_last_items: Vec::new(), - transport_manager: self.state.transport_manager.clone(), - turn_metadata_header, turn_state: Arc::new(OnceLock::new()), } } -} -impl ModelClient { /// Compacts the current conversation history using the Compact endpoint. /// /// This is a unary call (no streaming) that returns a new list of /// `ResponseItem`s representing the compacted transcript. - pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result> { + /// + /// The model selection and telemetry context are passed explicitly to keep `ModelClient` + /// session-scoped. + pub async fn compact_conversation_history( + &self, + prompt: &Prompt, + model_info: &ModelInfo, + otel_manager: &OtelManager, + ) -> Result> { if prompt.input.is_empty() { return Ok(Vec::new()); } @@ -176,13 +228,13 @@ impl ModelClient { .to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?; let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; let transport = ReqwestTransport::new(build_reqwest_client()); - let request_telemetry = self.build_request_telemetry(); + let request_telemetry = Self::build_request_telemetry(otel_manager); let client = ApiCompactClient::new(transport, api_provider, api_auth) .with_telemetry(Some(request_telemetry)); let instructions = prompt.base_instructions.text.clone(); let payload = ApiCompactionInput { - model: &self.state.model_info.slug, + model: &model_info.slug, input: &prompt.input, instructions: &instructions, }; @@ -197,9 +249,15 @@ impl ModelClient { /// Builds memory summaries for each provided normalized trace. /// /// This is a unary call (no streaming) to `/v1/memories/trace_summarize`. + /// + /// The model selection, reasoning effort, and telemetry context are passed explicitly to keep + /// `ModelClient` session-scoped. pub async fn summarize_memory_traces( &self, traces: Vec, + model_info: &ModelInfo, + effort: Option, + otel_manager: &OtelManager, ) -> Result> { if traces.is_empty() { return Ok(Vec::new()); @@ -216,14 +274,14 @@ impl ModelClient { .to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?; let api_auth = auth_provider_from_auth(auth, &self.state.provider)?; let transport = ReqwestTransport::new(build_reqwest_client()); - let request_telemetry = self.build_request_telemetry(); + let request_telemetry = Self::build_request_telemetry(otel_manager); let client = ApiMemoriesClient::new(transport, api_provider, api_auth) .with_telemetry(Some(request_telemetry)); let payload = ApiMemoryTraceSummarizeInput { - model: self.state.model_info.slug.clone(), + model: model_info.slug.clone(), traces, - reasoning: self.state.effort.map(|effort| Reasoning { + reasoning: effort.map(|effort| Reasoning { effort: Some(effort), summary: None, }), @@ -250,79 +308,62 @@ impl ModelClient { } extra_headers } + + /// Builds request telemetry for unary API calls (e.g., Compact endpoint). + fn build_request_telemetry(otel_manager: &OtelManager) -> Arc { + let telemetry = Arc::new(ApiTelemetry::new(otel_manager.clone())); + let request_telemetry: Arc = telemetry; + request_telemetry + } } impl ModelClientSession { - /// Streams a single model turn using the configured Responses transport. - pub async fn stream(&mut self, prompt: &Prompt) -> Result { - let wire_api = self.state.provider.wire_api; - match wire_api { - WireApi::Responses => { - let websocket_enabled = self.responses_websocket_enabled() - && !self.transport_manager.disable_websockets(); - - if websocket_enabled { - self.stream_responses_websocket(prompt).await - } else { - self.stream_responses_api(prompt).await - } - } - } + fn disable_websockets(&self) -> bool { + self.client.state.disable_websockets.load(Ordering::Relaxed) } - pub(crate) fn try_switch_fallback_transport(&mut self) -> bool { - let websocket_enabled = self.responses_websocket_enabled(); - let activated = self - .transport_manager - .activate_http_fallback(websocket_enabled); - if activated { - warn!("falling back to HTTP"); - self.state.otel_manager.counter( - "codex.transport.fallback_to_http", - 1, - &[("from_wire_api", "responses_websocket")], - ); - - self.connection = None; - self.websocket_last_items.clear(); - } - activated + fn activate_http_fallback(&self, websocket_enabled: bool) -> bool { + websocket_enabled + && !self + .client + .state + .disable_websockets + .swap(true, Ordering::Relaxed) } fn responses_websocket_enabled(&self) -> bool { - self.state.provider.supports_websockets - && self - .state - .config - .features - .enabled(Feature::ResponsesWebsockets) + self.client.state.provider.supports_websockets + && self.client.state.enable_responses_websockets } - fn build_responses_request(&self, prompt: &Prompt) -> Result { + fn build_responses_request(prompt: &Prompt) -> Result { let instructions = prompt.base_instructions.text.clone(); let tools_json: Vec = create_tools_json_for_responses_api(&prompt.tools)?; Ok(build_api_prompt(prompt, instructions, tools_json)) } + #[allow(clippy::too_many_arguments)] fn build_responses_options( &self, prompt: &Prompt, + model_info: &ModelInfo, + effort: Option, + summary: ReasoningSummaryConfig, + web_search_eligible: bool, + turn_metadata_header: Option<&str>, compression: Compression, ) -> ApiResponsesOptions { - let turn_metadata_header = self - .turn_metadata_header - .as_deref() - .and_then(|value| HeaderValue::from_str(value).ok()); - let model_info = &self.state.model_info; + let turn_metadata_header = + turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok()); let default_reasoning_effort = model_info.default_reasoning_level; let reasoning = if model_info.supports_reasoning_summaries { Some(Reasoning { - effort: self.state.effort.or(default_reasoning_effort), - summary: if self.state.summary == ReasoningSummaryConfig::None { + effort: effort.or(default_reasoning_effort), + summary: if summary == ReasoningSummaryConfig::None { None } else { - Some(self.state.summary) + Some(summary) }, }) } else { @@ -336,12 +377,12 @@ impl ModelClientSession { }; let verbosity = if model_info.support_verbosity { - self.state - .config + self.client + .state .model_verbosity .or(model_info.default_verbosity) } else { - if self.state.config.model_verbosity.is_some() { + if self.client.state.model_verbosity.is_some() { warn!( "model_verbosity is set but ignored as the model does not support verbosity: {}", model_info.slug @@ -351,7 +392,7 @@ impl ModelClientSession { }; let text = create_text_param_for_request(verbosity, &prompt.output_schema); - let conversation_id = self.state.conversation_id.to_string(); + let conversation_id = self.client.state.conversation_id.to_string(); ApiResponsesOptions { reasoning, @@ -360,9 +401,10 @@ impl ModelClientSession { text, store_override: None, conversation_id: Some(conversation_id), - session_source: Some(self.state.session_source.clone()), + session_source: Some(self.client.state.session_source.clone()), extra_headers: build_responses_headers( - &self.state.config, + self.client.state.beta_features_header.as_deref(), + web_search_eligible, Some(&self.turn_state), turn_metadata_header.as_ref(), ), @@ -388,6 +430,7 @@ impl ModelClientSession { fn prepare_websocket_request( &self, + model_slug: &str, api_prompt: &ApiPrompt, options: &ApiResponsesOptions, ) -> ResponsesWsRequest { @@ -408,7 +451,7 @@ impl ModelClientSession { let store = store_override.unwrap_or(false); let payload = ResponseCreateWsRequest { - model: self.state.model_info.slug.clone(), + model: model_slug.to_string(), instructions: api_prompt.instructions.clone(), input: api_prompt.input.clone(), tools: api_prompt.tools.clone(), @@ -427,6 +470,7 @@ impl ModelClientSession { async fn websocket_connection( &mut self, + otel_manager: &OtelManager, api_provider: codex_api::Provider, api_auth: CoreAuthProvider, options: &ApiResponsesOptions, @@ -439,13 +483,13 @@ impl ModelClientSession { if needs_new { let mut headers = options.extra_headers.clone(); headers.extend(build_conversation_headers(options.conversation_id.clone())); - if self.state.config.features.enabled(Feature::RuntimeMetrics) { + if self.client.state.include_timing_metrics { headers.insert( X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER, HeaderValue::from_static("true"), ); } - let websocket_telemetry = self.build_websocket_telemetry(); + let websocket_telemetry = Self::build_websocket_telemetry(otel_manager); let new_conn: ApiWebSocketConnection = ApiWebSocketResponsesClient::new(api_provider, api_auth) .connect( @@ -463,13 +507,9 @@ impl ModelClientSession { } fn responses_request_compression(&self, auth: Option<&crate::auth::CodexAuth>) -> Compression { - if self - .state - .config - .features - .enabled(Feature::EnableRequestCompression) + if self.client.state.enable_request_compression && auth.is_some_and(CodexAuth::is_chatgpt_auth) - && self.state.provider.is_openai() + && self.client.state.provider.is_openai() { Compression::Zstd } else { @@ -481,17 +521,29 @@ impl ModelClientSession { /// /// Handles SSE fixtures, reasoning summaries, verbosity, and the /// `text` controls used for output schemas. - async fn stream_responses_api(&self, prompt: &Prompt) -> Result { + #[allow(clippy::too_many_arguments)] + async fn stream_responses_api( + &self, + prompt: &Prompt, + model_info: &ModelInfo, + otel_manager: &OtelManager, + effort: Option, + summary: ReasoningSummaryConfig, + web_search_eligible: bool, + turn_metadata_header: Option<&str>, + ) -> Result { if let Some(path) = &*CODEX_RS_SSE_FIXTURE { warn!(path, "Streaming from fixture"); - let stream = - codex_api::stream_from_fixture(path, self.state.provider.stream_idle_timeout()) - .map_err(map_api_error)?; - return Ok(map_response_stream(stream, self.state.otel_manager.clone())); + let stream = codex_api::stream_from_fixture( + path, + self.client.state.provider.stream_idle_timeout(), + ) + .map_err(map_api_error)?; + return Ok(map_response_stream(stream, otel_manager.clone())); } - let auth_manager = self.state.auth_manager.clone(); - let api_prompt = self.build_responses_request(prompt)?; + let auth_manager = self.client.state.auth_manager.clone(); + let api_prompt = Self::build_responses_request(prompt)?; let mut auth_recovery = auth_manager .as_ref() @@ -502,26 +554,35 @@ impl ModelClientSession { None => None, }; let api_provider = self + .client .state .provider .to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?; - let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; + let api_auth = auth_provider_from_auth(auth.clone(), &self.client.state.provider)?; let transport = ReqwestTransport::new(build_reqwest_client()); - let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry(); + let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(otel_manager); let compression = self.responses_request_compression(auth.as_ref()); let client = ApiResponsesClient::new(transport, api_provider, api_auth) .with_telemetry(Some(request_telemetry), Some(sse_telemetry)); - let options = self.build_responses_options(prompt, compression); + let options = self.build_responses_options( + prompt, + model_info, + effort, + summary, + web_search_eligible, + turn_metadata_header, + compression, + ); let stream_result = client - .stream_prompt(&self.state.model_info.slug, &api_prompt, options) + .stream_prompt(&model_info.slug, &api_prompt, options) .await; match stream_result { Ok(stream) => { - return Ok(map_response_stream(stream, self.state.otel_manager.clone())); + return Ok(map_response_stream(stream, otel_manager.clone())); } Err(ApiError::Transport( unauthorized_transport @ TransportError::Http { status, .. }, @@ -535,9 +596,19 @@ impl ModelClientSession { } /// Streams a turn via the Responses API over WebSocket transport. - async fn stream_responses_websocket(&mut self, prompt: &Prompt) -> Result { - let auth_manager = self.state.auth_manager.clone(); - let api_prompt = self.build_responses_request(prompt)?; + #[allow(clippy::too_many_arguments)] + async fn stream_responses_websocket( + &mut self, + prompt: &Prompt, + model_info: &ModelInfo, + otel_manager: &OtelManager, + effort: Option, + summary: ReasoningSummaryConfig, + web_search_eligible: bool, + turn_metadata_header: Option<&str>, + ) -> Result { + let auth_manager = self.client.state.auth_manager.clone(); + let api_prompt = Self::build_responses_request(prompt)?; let mut auth_recovery = auth_manager .as_ref() @@ -548,17 +619,31 @@ impl ModelClientSession { None => None, }; let api_provider = self + .client .state .provider .to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?; - let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?; + let api_auth = auth_provider_from_auth(auth.clone(), &self.client.state.provider)?; let compression = self.responses_request_compression(auth.as_ref()); - let options = self.build_responses_options(prompt, compression); - let request = self.prepare_websocket_request(&api_prompt, &options); + let options = self.build_responses_options( + prompt, + model_info, + effort, + summary, + web_search_eligible, + turn_metadata_header, + compression, + ); + let request = self.prepare_websocket_request(&model_info.slug, &api_prompt, &options); let connection = match self - .websocket_connection(api_provider.clone(), api_auth.clone(), &options) + .websocket_connection( + otel_manager, + api_provider.clone(), + api_auth.clone(), + &options, + ) .await { Ok(connection) => connection, @@ -577,35 +662,97 @@ impl ModelClientSession { .map_err(map_api_error)?; self.websocket_last_items = api_prompt.input.clone(); - return Ok(map_response_stream( - stream_result, - self.state.otel_manager.clone(), - )); + return Ok(map_response_stream(stream_result, otel_manager.clone())); } } /// Builds request and SSE telemetry for streaming API calls. - fn build_streaming_telemetry(&self) -> (Arc, Arc) { - let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone())); + fn build_streaming_telemetry( + otel_manager: &OtelManager, + ) -> (Arc, Arc) { + let telemetry = Arc::new(ApiTelemetry::new(otel_manager.clone())); let request_telemetry: Arc = telemetry.clone(); let sse_telemetry: Arc = telemetry; (request_telemetry, sse_telemetry) } /// Builds telemetry for the Responses API WebSocket transport. - fn build_websocket_telemetry(&self) -> Arc { - let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone())); + fn build_websocket_telemetry(otel_manager: &OtelManager) -> Arc { + let telemetry = Arc::new(ApiTelemetry::new(otel_manager.clone())); let websocket_telemetry: Arc = telemetry; websocket_telemetry } -} -impl ModelClient { - /// Builds request telemetry for unary API calls (e.g., Compact endpoint). - fn build_request_telemetry(&self) -> Arc { - let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone())); - let request_telemetry: Arc = telemetry; - request_telemetry + #[allow(clippy::too_many_arguments)] + /// Streams a single model request within the current turn. + /// + /// The caller is responsible for passing per-turn settings explicitly (model selection, + /// reasoning settings, telemetry context, web search eligibility, and turn metadata). This + /// method will prefer the Responses WebSocket transport when enabled and healthy, and will + /// fall back to the HTTP Responses API transport otherwise. + pub async fn stream( + &mut self, + prompt: &Prompt, + model_info: &ModelInfo, + otel_manager: &OtelManager, + effort: Option, + summary: ReasoningSummaryConfig, + web_search_eligible: bool, + turn_metadata_header: Option<&str>, + ) -> Result { + let wire_api = self.client.state.provider.wire_api; + match wire_api { + WireApi::Responses => { + let websocket_enabled = + self.responses_websocket_enabled() && !self.disable_websockets(); + + if websocket_enabled { + self.stream_responses_websocket( + prompt, + model_info, + otel_manager, + effort, + summary, + web_search_eligible, + turn_metadata_header, + ) + .await + } else { + self.stream_responses_api( + prompt, + model_info, + otel_manager, + effort, + summary, + web_search_eligible, + turn_metadata_header, + ) + .await + } + } + } + } + + /// Permanently disables WebSockets for this Codex session and resets WebSocket state. + /// + /// This is used after exhausting the provider retry budget, to force subsequent requests onto + /// the HTTP transport. Returns `true` if this call activated fallback, or `false` if fallback + /// was already active. + pub(crate) fn try_switch_fallback_transport(&mut self, otel_manager: &OtelManager) -> bool { + let websocket_enabled = self.responses_websocket_enabled(); + let activated = self.activate_http_fallback(websocket_enabled); + if activated { + warn!("falling back to HTTP"); + otel_manager.counter( + "codex.transport.fallback_to_http", + 1, + &[("from_wire_api", "responses_websocket")], + ); + + self.connection = None; + self.websocket_last_items.clear(); + } + activated } } @@ -620,44 +767,30 @@ fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec ApiHeaderMap { - let enabled = FEATURES - .iter() - .filter_map(|spec| { - if spec.stage.experimental_menu_description().is_some() - && config.features.enabled(spec.id) - { - Some(spec.key) - } else { - None - } - }) - .collect::>(); - let value = enabled.join(","); - let mut headers = ApiHeaderMap::new(); - if !value.is_empty() - && let Ok(header_value) = HeaderValue::from_str(value.as_str()) - { - headers.insert("x-codex-beta-features", header_value); - } - headers -} - +/// Builds the extra headers attached to Responses API requests. +/// +/// These headers implement Codex-specific conventions: +/// +/// - `x-codex-beta-features`: comma-separated beta feature keys enabled for the session. +/// - `x-oai-web-search-eligible`: whether this turn is allowed to use web search. +/// - `x-codex-turn-state`: sticky routing token captured earlier in the turn. +/// - `x-codex-turn-metadata`: optional per-turn metadata for observability. fn build_responses_headers( - config: &Config, + beta_features_header: Option<&str>, + web_search_eligible: bool, turn_state: Option<&Arc>>, turn_metadata_header: Option<&HeaderValue>, ) -> ApiHeaderMap { - let mut headers = experimental_feature_headers(config); + let mut headers = ApiHeaderMap::new(); + if let Some(value) = beta_features_header + && !value.is_empty() + && let Ok(header_value) = HeaderValue::from_str(value) + { + headers.insert("x-codex-beta-features", header_value); + } headers.insert( WEB_SEARCH_ELIGIBLE_HEADER, - HeaderValue::from_static( - if matches!(config.web_search_mode, Some(WebSearchMode::Disabled)) { - "false" - } else { - "true" - }, - ), + HeaderValue::from_static(if web_search_eligible { "true" } else { "false" }), ); if let Some(turn_state) = turn_state && let Some(state) = turn_state.get() diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index c522e35b7..cef00f64e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -20,6 +20,7 @@ use crate::compact::should_use_remote_compact_task; use crate::compact_remote::run_inline_remote_auto_compact_task; use crate::connectors; use crate::exec_policy::ExecPolicyManager; +use crate::features::FEATURES; use crate::features::Feature; use crate::features::Features; use crate::features::maybe_push_unstable_features_warning; @@ -32,7 +33,6 @@ use crate::stream_events_utils::handle_non_tool_response_item; use crate::stream_events_utils::handle_output_item_done; use crate::stream_events_utils::last_assistant_message_from_item; use crate::terminal; -use crate::transport_manager::TransportManager; use crate::truncate::TruncationPolicy; use crate::turn_metadata::build_turn_metadata_header; use crate::user_notification::UserNotifier; @@ -488,7 +488,6 @@ pub(crate) struct Session { #[derive(Debug)] pub(crate) struct TurnContext { pub(crate) sub_id: String, - pub(crate) client: ModelClient, pub(crate) config: Arc, pub(crate) auth_manager: Option>, pub(crate) model_info: ModelInfo, @@ -497,7 +496,6 @@ pub(crate) struct TurnContext { pub(crate) reasoning_effort: Option, pub(crate) reasoning_summary: ReasoningSummaryConfig, pub(crate) session_source: SessionSource, - pub(crate) transport_manager: TransportManager, /// The session's current working directory. All relative paths provided by /// the model as well as sandbox policies are resolved against this path /// instead of `std::env::current_dir()`. @@ -681,6 +679,33 @@ pub(crate) struct SessionSettingsUpdate { } impl Session { + /// Builds the `x-codex-beta-features` header value for this session. + /// + /// `ModelClient` is session-scoped and intentionally does not depend on the full `Config`, so + /// we precompute the comma-separated list of enabled experimental feature keys at session + /// creation time and thread it into the client. + fn build_model_client_beta_features_header(config: &Config) -> Option { + let beta_features_header = FEATURES + .iter() + .filter_map(|spec| { + if spec.stage.experimental_menu_description().is_some() + && config.features.enabled(spec.id) + { + Some(spec.key) + } else { + None + } + }) + .collect::>() + .join(","); + + if beta_features_header.is_empty() { + None + } else { + Some(beta_features_header) + } + } + /// Don't expand the number of mutated arguments on config. We are in the process of getting rid of it. pub(crate) fn build_per_turn_config(session_configuration: &SessionConfiguration) -> Config { // todo(aibrahim): store this state somewhere else so we don't need to mut config @@ -735,9 +760,7 @@ impl Session { session_configuration: &SessionConfiguration, per_turn_config: Config, model_info: ModelInfo, - conversation_id: ThreadId, sub_id: String, - transport_manager: TransportManager, ) -> TurnContext { let reasoning_effort = session_configuration.collaboration_mode.reasoning_effort(); let reasoning_summary = session_configuration.model_reasoning_summary; @@ -746,23 +769,10 @@ impl Session { model_info.slug.as_str(), ); let session_source = session_configuration.session_source.clone(); - let auth_manager_for_context = auth_manager.clone(); - let provider_for_context = provider.clone(); - let transport_manager_for_context = transport_manager.clone(); - let otel_manager_for_context = otel_manager.clone(); + let auth_manager_for_context = auth_manager; + let provider_for_context = provider; + let otel_manager_for_context = otel_manager; let per_turn_config = Arc::new(per_turn_config); - let client = ModelClient::new( - per_turn_config.clone(), - auth_manager, - model_info.clone(), - otel_manager, - provider, - reasoning_effort, - reasoning_summary, - conversation_id, - session_source.clone(), - transport_manager, - ); let tools_config = ToolsConfig::new(&ToolsConfigParams { model_info: &model_info, @@ -773,7 +783,6 @@ impl Session { let cwd = session_configuration.cwd.clone(); TurnContext { sub_id, - client, config: per_turn_config.clone(), auth_manager: auth_manager_for_context, model_info: model_info.clone(), @@ -782,7 +791,6 @@ impl Session { reasoning_effort, reasoning_summary, session_source, - transport_manager: transport_manager_for_context, cwd, developer_instructions: session_configuration.developer_instructions.clone(), compact_prompt: session_configuration.compact_prompt.clone(), @@ -1020,7 +1028,17 @@ impl Session { file_watcher, agent_control, state_db: state_db_ctx.clone(), - transport_manager: TransportManager::new(), + model_client: ModelClient::new( + Some(Arc::clone(&auth_manager)), + conversation_id, + session_configuration.provider.clone(), + session_configuration.session_source.clone(), + config.model_verbosity, + config.features.enabled(Feature::ResponsesWebsockets), + config.features.enabled(Feature::EnableRequestCompression), + config.features.enabled(Feature::RuntimeMetrics), + Self::build_model_client_beta_features_header(config.as_ref()), + ), }; let sess = Arc::new(Session { @@ -1351,9 +1369,7 @@ impl Session { &session_configuration, per_turn_config, model_info, - self.conversation_id, sub_id, - self.services.transport_manager.clone(), ); if let Some(final_schema) = final_output_json_schema { @@ -3353,25 +3369,11 @@ async fn spawn_review_thread( let reasoning_effort = per_turn_config.model_reasoning_effort; let reasoning_summary = per_turn_config.model_reasoning_summary; let session_source = parent_turn_context.session_source.clone(); - let transport_manager = parent_turn_context.transport_manager.clone(); let per_turn_config = Arc::new(per_turn_config); - let client = ModelClient::new( - per_turn_config.clone(), - auth_manager, - model_info.clone(), - otel_manager, - provider, - reasoning_effort, - reasoning_summary, - sess.conversation_id, - session_source.clone(), - transport_manager.clone(), - ); let review_turn_context = TurnContext { sub_id: sub_id.to_string(), - client, config: per_turn_config, auth_manager: auth_manager_for_context, model_info: model_info.clone(), @@ -3380,7 +3382,6 @@ async fn spawn_review_thread( reasoning_effort, reasoning_summary, session_source, - transport_manager, tools_config, features: parent_turn_context.features.clone(), ghost_snapshot: parent_turn_context.ghost_snapshot.clone(), @@ -3605,7 +3606,9 @@ pub(crate) async fn run_turn( let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())); let turn_metadata_header = turn_context.resolve_turn_metadata_header().await; - let mut client_session = turn_context.client.new_session(turn_metadata_header); + // `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse + // one instance across retries within this turn. + let mut client_session = sess.services.model_client.new_session(); loop { // Note that pending_input would be something like a message the user @@ -3658,6 +3661,7 @@ pub(crate) async fn run_turn( Arc::clone(&turn_context), Arc::clone(&turn_diff_tracker), &mut client_session, + turn_metadata_header.as_deref(), sampling_request_input, tool_selection, cancellation_token.child_token(), @@ -3844,6 +3848,7 @@ struct SamplingRequestToolSelection<'a> { skill_name_counts_lower: &'a HashMap, } +#[allow(clippy::too_many_arguments)] #[instrument(level = "trace", skip_all, fields( @@ -3857,6 +3862,7 @@ async fn run_sampling_request( turn_context: Arc, turn_diff_tracker: SharedTurnDiffTracker, client_session: &mut ModelClientSession, + turn_metadata_header: Option<&str>, input: Vec, tool_selection: SamplingRequestToolSelection<'_>, cancellation_token: CancellationToken, @@ -3914,6 +3920,7 @@ async fn run_sampling_request( Arc::clone(&sess), Arc::clone(&turn_context), client_session, + turn_metadata_header, Arc::clone(&turn_diff_tracker), &prompt, cancellation_token.child_token(), @@ -3943,7 +3950,9 @@ async fn run_sampling_request( // Use the configured provider-specific stream retry budget. let max_retries = turn_context.provider.stream_max_retries(); - if retries >= max_retries && client_session.try_switch_fallback_transport() { + if retries >= max_retries + && client_session.try_switch_fallback_transport(&turn_context.otel_manager) + { sess.send_event( &turn_context, EventMsg::Warning(WarningEvent { @@ -4396,6 +4405,7 @@ async fn try_run_sampling_request( sess: Arc, turn_context: Arc, client_session: &mut ModelClientSession, + turn_metadata_header: Option<&str>, turn_diff_tracker: SharedTurnDiffTracker, prompt: &Prompt, cancellation_token: CancellationToken, @@ -4426,8 +4436,20 @@ async fn try_run_sampling_request( ); sess.persist_rollout_items(&[rollout_item]).await; + let web_search_eligible = !matches!( + turn_context.config.web_search_mode, + Some(WebSearchMode::Disabled) + ); let mut stream = client_session - .stream(prompt) + .stream( + prompt, + &turn_context.model_info, + &turn_context.otel_manager, + turn_context.reasoning_effort, + turn_context.reasoning_summary, + web_search_eligible, + turn_metadata_header, + ) .instrument(trace_span!("stream_request")) .or_cancel(&cancellation_token) .await??; @@ -5594,7 +5616,17 @@ mod tests { file_watcher, agent_control, state_db: None, - transport_manager: TransportManager::new(), + model_client: ModelClient::new( + Some(auth_manager.clone()), + conversation_id, + session_configuration.provider.clone(), + session_configuration.session_source.clone(), + config.model_verbosity, + config.features.enabled(Feature::ResponsesWebsockets), + config.features.enabled(Feature::EnableRequestCompression), + config.features.enabled(Feature::RuntimeMetrics), + Session::build_model_client_beta_features_header(config.as_ref()), + ), }; let turn_context = Session::make_turn_context( @@ -5604,9 +5636,7 @@ mod tests { &session_configuration, per_turn_config, model_info, - conversation_id, "turn_id".to_string(), - services.transport_manager.clone(), ); let session = Session { @@ -5716,7 +5746,17 @@ mod tests { file_watcher, agent_control, state_db: None, - transport_manager: TransportManager::new(), + model_client: ModelClient::new( + Some(Arc::clone(&auth_manager)), + conversation_id, + session_configuration.provider.clone(), + session_configuration.session_source.clone(), + config.model_verbosity, + config.features.enabled(Feature::ResponsesWebsockets), + config.features.enabled(Feature::EnableRequestCompression), + config.features.enabled(Feature::RuntimeMetrics), + Session::build_model_client_beta_features_header(config.as_ref()), + ), }; let turn_context = Arc::new(Session::make_turn_context( @@ -5726,9 +5766,7 @@ mod tests { &session_configuration, per_turn_config, model_info, - conversation_id, "turn_id".to_string(), - services.transport_manager.clone(), )); let session = Arc::new(Session { diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index c8aba5661..91b95759c 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::ModelProviderInfo; use crate::Prompt; +use crate::client::ModelClientSession; use crate::client_common::ResponseEvent; use crate::codex::Session; use crate::codex::TurnContext; @@ -19,6 +20,7 @@ use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; use crate::util::backoff; +use codex_protocol::config_types::WebSearchMode; use codex_protocol::items::ContextCompactionItem; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; @@ -87,6 +89,10 @@ async fn run_compact_task_inner( let max_retries = turn_context.provider.stream_max_retries(); let mut retries = 0; + let turn_metadata_header = turn_context.resolve_turn_metadata_header().await; + let mut client_session = sess.services.model_client.new_session(); + // Reuse one client session so turn-scoped state (sticky routing, websocket append tracking) + // survives retries within this compact turn. // TODO: If we need to guarantee the persisted mode always matches the prompt used for this // turn, capture it in TurnContext at creation time. Using SessionConfiguration here avoids @@ -119,7 +125,14 @@ async fn run_compact_task_inner( personality: turn_context.personality, ..Default::default() }; - let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await; + let attempt_result = drain_to_completed( + &sess, + turn_context.as_ref(), + &mut client_session, + turn_metadata_header.as_deref(), + &prompt, + ) + .await; match attempt_result { Ok(()) => { @@ -335,11 +348,25 @@ fn build_compacted_history_with_limit( async fn drain_to_completed( sess: &Session, turn_context: &TurnContext, + client_session: &mut ModelClientSession, + turn_metadata_header: Option<&str>, prompt: &Prompt, ) -> CodexResult<()> { - let turn_metadata_header = turn_context.resolve_turn_metadata_header().await; - let mut client_session = turn_context.client.new_session(turn_metadata_header); - let mut stream = client_session.stream(prompt).await?; + let web_search_eligible = !matches!( + turn_context.config.web_search_mode, + Some(WebSearchMode::Disabled) + ); + let mut stream = client_session + .stream( + prompt, + &turn_context.model_info, + &turn_context.otel_manager, + turn_context.reasoning_effort, + turn_context.reasoning_summary, + web_search_eligible, + turn_metadata_header, + ) + .await?; loop { let maybe_event = stream.next().await; let Some(event) = maybe_event else { diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index fbb6863c3..f46084bd9 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -76,9 +76,14 @@ async fn run_remote_compact_task_inner_impl( output_schema: None, }; - let mut new_history = turn_context - .client - .compact_conversation_history(&prompt) + let mut new_history = sess + .services + .model_client + .compact_conversation_history( + &prompt, + &turn_context.model_info, + &turn_context.otel_manager, + ) .await?; if !ghost_snapshots.is_empty() { diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 869df93d0..50a1f8538 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -40,7 +40,6 @@ pub mod landlock; pub mod mcp; mod mcp_connection_manager; pub mod models_manager; -mod transport_manager; pub use mcp_connection_manager::MCP_SANDBOX_STATE_CAPABILITY; pub use mcp_connection_manager::MCP_SANDBOX_STATE_METHOD; pub use mcp_connection_manager::SandboxState; @@ -122,7 +121,6 @@ pub use rollout::list::read_head_for_summary; pub use rollout::list::read_session_meta_line; pub use rollout::rollout_date_parts; pub use rollout::session_index::find_thread_names_by_ids; -pub use transport_manager::TransportManager; mod function_tool; mod state; mod tasks; diff --git a/codex-rs/core/src/memory_trace.rs b/codex-rs/core/src/memory_trace.rs index 807ff64df..651998014 100644 --- a/codex-rs/core/src/memory_trace.rs +++ b/codex-rs/core/src/memory_trace.rs @@ -6,6 +6,9 @@ use crate::error::CodexErr; use crate::error::Result; use codex_api::MemoryTrace as ApiMemoryTrace; use codex_api::MemoryTraceMetadata as ApiMemoryTraceMetadata; +use codex_otel::OtelManager; +use codex_protocol::openai_models::ModelInfo; +use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use serde_json::Map; use serde_json::Value; @@ -27,9 +30,15 @@ struct PreparedTrace { /// /// The request/response wiring mirrors the memory trace summarize E2E flow: /// `/v1/memories/trace_summarize` with one output object per input trace. +/// +/// The caller provides the model selection, reasoning effort, and telemetry context explicitly so +/// the session-scoped [`ModelClient`] can be reused across turns. pub async fn build_memories_from_trace_files( client: &ModelClient, trace_paths: &[PathBuf], + model_info: &ModelInfo, + effort: Option, + otel_manager: &OtelManager, ) -> Result> { if trace_paths.is_empty() { return Ok(Vec::new()); @@ -41,7 +50,9 @@ pub async fn build_memories_from_trace_files( } let traces = prepared.iter().map(|trace| trace.payload.clone()).collect(); - let output = client.summarize_memory_traces(traces).await?; + let output = client + .summarize_memory_traces(traces, model_info, effort, otel_manager) + .await?; if output.len() != prepared.len() { return Err(CodexErr::InvalidRequest(format!( "unexpected memory summarize output length: expected {}, got {}", diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index e9a028bfd..0e9c03803 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -4,6 +4,7 @@ use crate::AuthManager; use crate::RolloutRecorder; use crate::agent::AgentControl; use crate::analytics_client::AnalyticsEventsClient; +use crate::client::ModelClient; use crate::exec_policy::ExecPolicyManager; use crate::file_watcher::FileWatcher; use crate::mcp_connection_manager::McpConnectionManager; @@ -11,7 +12,6 @@ use crate::models_manager::manager::ModelsManager; use crate::skills::SkillsManager; use crate::state_db::StateDbHandle; use crate::tools::sandboxing::ApprovalStore; -use crate::transport_manager::TransportManager; use crate::unified_exec::UnifiedExecProcessManager; use crate::user_notification::UserNotifier; use codex_otel::OtelManager; @@ -37,5 +37,6 @@ pub(crate) struct SessionServices { pub(crate) file_watcher: Arc, pub(crate) agent_control: AgentControl, pub(crate) state_db: Option, - pub(crate) transport_manager: TransportManager, + /// Session-scoped model client shared across turns. + pub(crate) model_client: ModelClient, } diff --git a/codex-rs/core/src/transport_manager.rs b/codex-rs/core/src/transport_manager.rs deleted file mode 100644 index 2b5d09500..000000000 --- a/codex-rs/core/src/transport_manager.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::sync::Arc; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; - -#[derive(Clone, Debug, Default)] -pub struct TransportManager { - disable_websockets: Arc, -} - -impl TransportManager { - pub fn new() -> Self { - Self::default() - } - - pub fn disable_websockets(&self) -> bool { - self.disable_websockets.load(Ordering::Relaxed) - } - - pub fn activate_http_fallback(&self, websocket_enabled: bool) -> bool { - websocket_enabled && !self.disable_websockets.swap(true, Ordering::Relaxed) - } -} diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index fd65bcf32..9e70264bf 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -10,7 +10,6 @@ use codex_core::ModelProviderInfo; use codex_core::Prompt; use codex_core::ResponseEvent; use codex_core::ResponseItem; -use codex_core::TransportManager; use codex_core::WEB_SEARCH_ELIGIBLE_HEADER; use codex_core::WireApi; use codex_core::models_manager::manager::ModelsManager; @@ -88,19 +87,19 @@ async fn responses_stream_includes_subagent_header_on_review() { session_source.clone(), ); - let mut client_session = ModelClient::new( - Arc::clone(&config), + let web_search_eligible = !matches!(config.web_search_mode, Some(WebSearchMode::Disabled)); + let client = ModelClient::new( None, - model_info, - otel_manager, - provider, - effort, - summary, conversation_id, + provider.clone(), session_source, - TransportManager::new(), - ) - .new_session(None); + config.model_verbosity, + false, + false, + false, + None, + ); + let mut client_session = client.new_session(); let mut prompt = Prompt::default(); prompt.input = vec![ResponseItem::Message { @@ -113,7 +112,18 @@ async fn responses_stream_includes_subagent_header_on_review() { phase: None, }]; - let mut stream = client_session.stream(&prompt).await.expect("stream failed"); + let mut stream = client_session + .stream( + &prompt, + &model_info, + &otel_manager, + effort, + summary, + web_search_eligible, + None, + ) + .await + .expect("stream failed"); while let Some(event) = stream.next().await { if matches!(event, Ok(ResponseEvent::Completed { .. })) { break; @@ -188,19 +198,19 @@ async fn responses_stream_includes_subagent_header_on_other() { session_source.clone(), ); - let mut client_session = ModelClient::new( - Arc::clone(&config), + let web_search_eligible = !matches!(config.web_search_mode, Some(WebSearchMode::Disabled)); + let client = ModelClient::new( None, - model_info, - otel_manager, - provider, - effort, - summary, conversation_id, + provider.clone(), session_source, - TransportManager::new(), - ) - .new_session(None); + config.model_verbosity, + false, + false, + false, + None, + ); + let mut client_session = client.new_session(); let mut prompt = Prompt::default(); prompt.input = vec![ResponseItem::Message { @@ -213,7 +223,18 @@ async fn responses_stream_includes_subagent_header_on_other() { phase: None, }]; - let mut stream = client_session.stream(&prompt).await.expect("stream failed"); + let mut stream = client_session + .stream( + &prompt, + &model_info, + &otel_manager, + effort, + summary, + web_search_eligible, + None, + ) + .await + .expect("stream failed"); while let Some(event) = stream.next().await { if matches!(event, Ok(ResponseEvent::Completed { .. })) { break; @@ -346,19 +367,19 @@ async fn responses_respects_model_info_overrides_from_config() { session_source.clone(), ); - let mut client = ModelClient::new( - Arc::clone(&config), + let web_search_eligible = !matches!(config.web_search_mode, Some(WebSearchMode::Disabled)); + let client = ModelClient::new( None, - model_info, - otel_manager, - provider, - effort, - summary, conversation_id, + provider.clone(), session_source, - TransportManager::new(), - ) - .new_session(None); + config.model_verbosity, + false, + false, + false, + None, + ); + let mut client_session = client.new_session(); let mut prompt = Prompt::default(); prompt.input = vec![ResponseItem::Message { @@ -371,7 +392,18 @@ async fn responses_respects_model_info_overrides_from_config() { phase: None, }]; - let mut stream = client.stream(&prompt).await.expect("stream failed"); + let mut stream = client_session + .stream( + &prompt, + &model_info, + &otel_manager, + effort, + summary, + web_search_eligible, + None, + ) + .await + .expect("stream failed"); while let Some(event) = stream.next().await { if matches!(event, Ok(ResponseEvent::Completed { .. })) { break; diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index b6743a683..731ce97c0 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -11,7 +11,6 @@ use codex_core::Prompt; use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::ThreadManager; -use codex_core::TransportManager; use codex_core::WireApi; use codex_core::auth::AuthCredentialsStoreMode; use codex_core::built_in_model_providers; @@ -1264,19 +1263,18 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { SessionSource::Exec, ); - let mut client = ModelClient::new( - Arc::clone(&config), + let client = ModelClient::new( None, - model_info, - otel_manager, - provider, - effort, - summary, conversation_id, + provider.clone(), SessionSource::Exec, - TransportManager::new(), - ) - .new_session(None); + config.model_verbosity, + false, + false, + false, + None, + ); + let mut client_session = client.new_session(); let mut prompt = Prompt::default(); prompt.input.push(ResponseItem::Reasoning { @@ -1340,8 +1338,16 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() { output: "ok".into(), }); - let mut stream = client - .stream(&prompt) + let mut stream = client_session + .stream( + &prompt, + &model_info, + &otel_manager, + effort, + summary, + true, + None, + ) .await .expect("responses stream to start"); diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 15005caa4..e3a1ef0a8 100644 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -8,7 +8,6 @@ use codex_core::ModelProviderInfo; use codex_core::Prompt; use codex_core::ResponseEvent; use codex_core::ResponseItem; -use codex_core::TransportManager; use codex_core::WireApi; use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER; use codex_core::features::Feature; @@ -20,6 +19,8 @@ use codex_otel::metrics::MetricsConfig; use codex_protocol::ThreadId; use codex_protocol::account::PlanType; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::openai_models::ModelInfo; +use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use core_test_support::load_default_config_for_test; use core_test_support::responses::WebSocketConnectionConfig; use core_test_support::responses::WebSocketTestServer; @@ -42,6 +43,10 @@ const MODEL: &str = "gpt-5.2-codex"; struct WebsocketTestHarness { _codex_home: TempDir, client: ModelClient, + model_info: ModelInfo, + effort: Option, + summary: ReasoningSummary, + web_search_eligible: bool, otel_manager: OtelManager, } @@ -56,10 +61,10 @@ async fn responses_websocket_streams_request() { .await; let harness = websocket_harness(&server).await; - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt = prompt_with_input(vec![message_item("hello")]); - stream_until_complete(&mut session, &prompt).await; + stream_until_complete(&mut client_session, &harness, &prompt).await; let connection = server.single_connection(); assert_eq!(connection.len(), 1); @@ -86,10 +91,10 @@ async fn responses_websocket_emits_websocket_telemetry_events() { let harness = websocket_harness(&server).await; harness.otel_manager.reset_runtime_metrics(); - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt = prompt_with_input(vec![message_item("hello")]); - stream_until_complete(&mut session, &prompt).await; + stream_until_complete(&mut client_session, &harness, &prompt).await; tokio::time::sleep(Duration::from_millis(10)).await; @@ -124,10 +129,10 @@ async fn responses_websocket_includes_timing_metrics_header_when_runtime_metrics let harness = websocket_harness_with_runtime_metrics(&server, true).await; harness.otel_manager.reset_runtime_metrics(); - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt = prompt_with_input(vec![message_item("hello")]); - stream_until_complete(&mut session, &prompt).await; + stream_until_complete(&mut client_session, &harness, &prompt).await; tokio::time::sleep(Duration::from_millis(10)).await; let handshake = server.single_handshake(); @@ -157,10 +162,10 @@ async fn responses_websocket_omits_timing_metrics_header_when_runtime_metrics_di .await; let harness = websocket_harness_with_runtime_metrics(&server, false).await; - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt = prompt_with_input(vec![message_item("hello")]); - stream_until_complete(&mut session, &prompt).await; + stream_until_complete(&mut client_session, &harness, &prompt).await; let handshake = server.single_handshake(); assert_eq!( @@ -182,11 +187,19 @@ async fn responses_websocket_emits_reasoning_included_event() { .await; let harness = websocket_harness(&server).await; - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt = prompt_with_input(vec![message_item("hello")]); - let mut stream = session - .stream(&prompt) + let mut stream = client_session + .stream( + &prompt, + &harness.model_info, + &harness.otel_manager, + harness.effort, + harness.summary, + harness.web_search_eligible, + None, + ) .await .expect("websocket stream failed"); @@ -245,11 +258,19 @@ async fn responses_websocket_emits_rate_limit_events() { .await; let harness = websocket_harness(&server).await; - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt = prompt_with_input(vec![message_item("hello")]); - let mut stream = session - .stream(&prompt) + let mut stream = client_session + .stream( + &prompt, + &harness.model_info, + &harness.otel_manager, + harness.effort, + harness.summary, + harness.web_search_eligible, + None, + ) .await .expect("websocket stream failed"); @@ -300,12 +321,12 @@ async fn responses_websocket_appends_on_prefix() { .await; let harness = websocket_harness(&server).await; - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt_one = prompt_with_input(vec![message_item("hello")]); let prompt_two = prompt_with_input(vec![message_item("hello"), message_item("second")]); - stream_until_complete(&mut session, &prompt_one).await; - stream_until_complete(&mut session, &prompt_two).await; + stream_until_complete(&mut client_session, &harness, &prompt_one).await; + stream_until_complete(&mut client_session, &harness, &prompt_two).await; let connection = server.single_connection(); assert_eq!(connection.len(), 2); @@ -336,12 +357,12 @@ async fn responses_websocket_creates_on_non_prefix() { .await; let harness = websocket_harness(&server).await; - let mut session = harness.client.new_session(None); + let mut client_session = harness.client.new_session(); let prompt_one = prompt_with_input(vec![message_item("hello")]); let prompt_two = prompt_with_input(vec![message_item("different")]); - stream_until_complete(&mut session, &prompt_one).await; - stream_until_complete(&mut session, &prompt_two).await; + stream_until_complete(&mut client_session, &harness, &prompt_one).await; + stream_until_complete(&mut client_session, &harness, &prompt_two).await; let connection = server.single_connection(); assert_eq!(connection.len(), 2); @@ -431,29 +452,47 @@ async fn websocket_harness_with_runtime_metrics( SessionSource::Exec, ) .with_metrics(metrics); + let effort = None; + let summary = ReasoningSummary::Auto; + let web_search_eligible = true; let client = ModelClient::new( - Arc::clone(&config), None, - model_info, - otel_manager.clone(), - provider.clone(), - None, - ReasoningSummary::Auto, conversation_id, + provider.clone(), SessionSource::Exec, - TransportManager::new(), + config.model_verbosity, + true, + false, + runtime_metrics_enabled, + None, ); WebsocketTestHarness { _codex_home: codex_home, client, + model_info, + effort, + summary, + web_search_eligible, otel_manager, } } -async fn stream_until_complete(session: &mut ModelClientSession, prompt: &Prompt) { - let mut stream = session - .stream(prompt) +async fn stream_until_complete( + client_session: &mut ModelClientSession, + harness: &WebsocketTestHarness, + prompt: &Prompt, +) { + let mut stream = client_session + .stream( + prompt, + &harness.model_info, + &harness.otel_manager, + harness.effort, + harness.summary, + harness.web_search_eligible, + None, + ) .await .expect("websocket stream failed"); diff --git a/codex-rs/core/tests/suite/stream_no_completed.rs b/codex-rs/core/tests/suite/stream_no_completed.rs index e8b130970..6528e6277 100644 --- a/codex-rs/core/tests/suite/stream_no_completed.rs +++ b/codex-rs/core/tests/suite/stream_no_completed.rs @@ -71,7 +71,7 @@ async fn retries_on_early_close() { name: "openai".into(), base_url: Some(format!("{}/v1", server.uri())), // Environment variable that should exist in the test environment. - // ModelClientSession will return an error if the environment variable for the + // ModelClient will return an error if the environment variable for the // provider is not set. env_key: Some("PATH".into()), env_key_instructions: None,