Session-level model client (#10664)

Make ModelClient a session-scoped object.
Move state that is session level onto the client, and make state that is
per-turn explicit on corresponding methods.
Stop taking a huge Config object, instead only pass in values that are
actually needed.

---------

Co-authored-by: Josh McKinney <joshka@openai.com>
This commit is contained in:
pakrym-oai 2026-02-04 16:58:48 -08:00 committed by GitHub
parent 224c9f768d
commit 0e8d359da9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 601 additions and 333 deletions

View file

@ -1,5 +1,21 @@
//! Session- and turn-scoped helpers for talking to model provider APIs.
//!
//! `ModelClient` is intended to live for the lifetime of a Codex session and holds the stable
//! configuration and state needed to talk to a provider (auth, provider selection, conversation id,
//! and feature-gated request behavior).
//!
//! Per-turn settings (model selection, reasoning controls, telemetry context, web search
//! eligibility, and turn metadata) are passed explicitly to streaming and unary methods so that the
//! turn lifetime is visible at the call site.
//!
//! A [`ModelClientSession`] is created per turn and is used to stream one or more Responses API
//! requests during that turn. It caches a Responses WebSocket connection (opened lazily) and
//! stores per-turn state such as the `x-codex-turn-state` token used for sticky routing.
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use crate::api_bridge::CoreAuthProvider;
use crate::api_bridge::auth_provider_from_auth;
@ -33,7 +49,7 @@ use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::config_types::Verbosity as VerbosityConfig;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
@ -58,17 +74,13 @@ use crate::auth::RefreshTokenError;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::config::Config;
use crate::default_client::build_reqwest_client;
use crate::error::CodexErr;
use crate::error::Result;
use crate::features::FEATURES;
use crate::features::Feature;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::transport_manager::TransportManager;
pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
@ -76,31 +88,60 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
/// Session-scoped state shared by all [`ModelClient`] clones.
///
/// This is intentionally kept minimal so `ModelClient` does not need to hold a full `Config`. Most
/// configuration is per turn and is passed explicitly to streaming/unary methods.
#[derive(Debug)]
struct ModelClientState {
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
model_info: ModelInfo,
otel_manager: OtelManager,
provider: ModelProviderInfo,
conversation_id: ThreadId,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
provider: ModelProviderInfo,
session_source: SessionSource,
transport_manager: TransportManager,
model_verbosity: Option<VerbosityConfig>,
enable_responses_websockets: bool,
enable_request_compression: bool,
include_timing_metrics: bool,
beta_features_header: Option<String>,
disable_websockets: AtomicBool,
}
/// A session-scoped client for model-provider API calls.
///
/// This holds configuration and state that should be shared across turns within a Codex session
/// (auth, provider selection, conversation id, feature-gated request behavior, and transport
/// fallback state).
///
/// WebSocket fallback is session-scoped: once a turn activates the HTTP fallback, subsequent turns
/// will also use HTTP for the remainder of the session.
///
/// Turn-scoped settings (model selection, reasoning controls, telemetry context, web search
/// eligibility, and turn metadata) are passed explicitly to the relevant methods to keep turn
/// lifetime visible at the call site.
///
/// This type is cheap to clone.
#[derive(Debug, Clone)]
pub struct ModelClient {
state: Arc<ModelClientState>,
}
/// A turn-scoped streaming session created from a [`ModelClient`].
///
/// The session lazily establishes a Responses WebSocket connection (and reuses it across multiple
/// requests) and caches per-turn state:
///
/// - The last request's input items, so subsequent calls can use `response.append` when the input
/// is an incremental extension of the previous request.
/// - The `x-codex-turn-state` sticky-routing token, which must be replayed for all requests within
/// the same turn.
///
/// Create a fresh `ModelClientSession` for each Codex turn. Reusing it across turns would replay
/// the previous turn's sticky-routing token into the next turn, which violates the client/server
/// contract and can cause routing bugs.
pub struct ModelClientSession {
state: Arc<ModelClientState>,
client: ModelClient,
connection: Option<ApiWebSocketConnection>,
websocket_last_items: Vec<ResponseItem>,
transport_manager: TransportManager,
turn_metadata_header: Option<String>,
/// Turn state for sticky routing.
///
/// This is an `OnceLock` that stores the turn state value received from the server
@ -114,54 +155,65 @@ pub struct ModelClientSession {
turn_state: Arc<OnceLock<String>>,
}
#[allow(clippy::too_many_arguments)]
impl ModelClient {
#[allow(clippy::too_many_arguments)]
/// Creates a new session-scoped `ModelClient`.
///
/// All arguments are expected to be stable for the lifetime of a Codex session. Per-turn values
/// are passed to [`ModelClientSession::stream`] (and other turn-scoped methods) explicitly.
pub fn new(
config: Arc<Config>,
auth_manager: Option<Arc<AuthManager>>,
model_info: ModelInfo,
otel_manager: OtelManager,
provider: ModelProviderInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
conversation_id: ThreadId,
provider: ModelProviderInfo,
session_source: SessionSource,
transport_manager: TransportManager,
model_verbosity: Option<VerbosityConfig>,
enable_responses_websockets: bool,
enable_request_compression: bool,
include_timing_metrics: bool,
beta_features_header: Option<String>,
) -> Self {
Self {
state: Arc::new(ModelClientState {
config,
auth_manager,
model_info,
otel_manager,
provider,
conversation_id,
effort,
summary,
provider,
session_source,
transport_manager,
model_verbosity,
enable_responses_websockets,
enable_request_compression,
include_timing_metrics,
beta_features_header,
disable_websockets: AtomicBool::new(false),
}),
}
}
pub fn new_session(&self, turn_metadata_header: Option<String>) -> ModelClientSession {
/// Creates a fresh turn-scoped streaming session.
///
/// This does not open any network connections; the WebSocket connection is established lazily
/// when the first WebSocket stream request is issued.
pub fn new_session(&self) -> ModelClientSession {
ModelClientSession {
state: Arc::clone(&self.state),
client: self.clone(),
connection: None,
websocket_last_items: Vec::new(),
transport_manager: self.state.transport_manager.clone(),
turn_metadata_header,
turn_state: Arc::new(OnceLock::new()),
}
}
}
impl ModelClient {
/// Compacts the current conversation history using the Compact endpoint.
///
/// This is a unary call (no streaming) that returns a new list of
/// `ResponseItem`s representing the compacted transcript.
pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result<Vec<ResponseItem>> {
///
/// The model selection and telemetry context are passed explicitly to keep `ModelClient`
/// session-scoped.
pub async fn compact_conversation_history(
&self,
prompt: &Prompt,
model_info: &ModelInfo,
otel_manager: &OtelManager,
) -> Result<Vec<ResponseItem>> {
if prompt.input.is_empty() {
return Ok(Vec::new());
}
@ -176,13 +228,13 @@ impl ModelClient {
.to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = self.build_request_telemetry();
let request_telemetry = Self::build_request_telemetry(otel_manager);
let client = ApiCompactClient::new(transport, api_provider, api_auth)
.with_telemetry(Some(request_telemetry));
let instructions = prompt.base_instructions.text.clone();
let payload = ApiCompactionInput {
model: &self.state.model_info.slug,
model: &model_info.slug,
input: &prompt.input,
instructions: &instructions,
};
@ -197,9 +249,15 @@ impl ModelClient {
/// Builds memory summaries for each provided normalized trace.
///
/// This is a unary call (no streaming) to `/v1/memories/trace_summarize`.
///
/// The model selection, reasoning effort, and telemetry context are passed explicitly to keep
/// `ModelClient` session-scoped.
pub async fn summarize_memory_traces(
&self,
traces: Vec<ApiMemoryTrace>,
model_info: &ModelInfo,
effort: Option<ReasoningEffortConfig>,
otel_manager: &OtelManager,
) -> Result<Vec<ApiMemoryTraceSummaryOutput>> {
if traces.is_empty() {
return Ok(Vec::new());
@ -216,14 +274,14 @@ impl ModelClient {
.to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?;
let api_auth = auth_provider_from_auth(auth, &self.state.provider)?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = self.build_request_telemetry();
let request_telemetry = Self::build_request_telemetry(otel_manager);
let client = ApiMemoriesClient::new(transport, api_provider, api_auth)
.with_telemetry(Some(request_telemetry));
let payload = ApiMemoryTraceSummarizeInput {
model: self.state.model_info.slug.clone(),
model: model_info.slug.clone(),
traces,
reasoning: self.state.effort.map(|effort| Reasoning {
reasoning: effort.map(|effort| Reasoning {
effort: Some(effort),
summary: None,
}),
@ -250,79 +308,62 @@ impl ModelClient {
}
extra_headers
}
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
fn build_request_telemetry(otel_manager: &OtelManager) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(otel_manager.clone()));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
request_telemetry
}
}
impl ModelClientSession {
/// Streams a single model turn using the configured Responses transport.
pub async fn stream(&mut self, prompt: &Prompt) -> Result<ResponseStream> {
let wire_api = self.state.provider.wire_api;
match wire_api {
WireApi::Responses => {
let websocket_enabled = self.responses_websocket_enabled()
&& !self.transport_manager.disable_websockets();
if websocket_enabled {
self.stream_responses_websocket(prompt).await
} else {
self.stream_responses_api(prompt).await
}
}
}
fn disable_websockets(&self) -> bool {
self.client.state.disable_websockets.load(Ordering::Relaxed)
}
pub(crate) fn try_switch_fallback_transport(&mut self) -> bool {
let websocket_enabled = self.responses_websocket_enabled();
let activated = self
.transport_manager
.activate_http_fallback(websocket_enabled);
if activated {
warn!("falling back to HTTP");
self.state.otel_manager.counter(
"codex.transport.fallback_to_http",
1,
&[("from_wire_api", "responses_websocket")],
);
self.connection = None;
self.websocket_last_items.clear();
}
activated
fn activate_http_fallback(&self, websocket_enabled: bool) -> bool {
websocket_enabled
&& !self
.client
.state
.disable_websockets
.swap(true, Ordering::Relaxed)
}
fn responses_websocket_enabled(&self) -> bool {
self.state.provider.supports_websockets
&& self
.state
.config
.features
.enabled(Feature::ResponsesWebsockets)
self.client.state.provider.supports_websockets
&& self.client.state.enable_responses_websockets
}
fn build_responses_request(&self, prompt: &Prompt) -> Result<ApiPrompt> {
fn build_responses_request(prompt: &Prompt) -> Result<ApiPrompt> {
let instructions = prompt.base_instructions.text.clone();
let tools_json: Vec<Value> = create_tools_json_for_responses_api(&prompt.tools)?;
Ok(build_api_prompt(prompt, instructions, tools_json))
}
#[allow(clippy::too_many_arguments)]
fn build_responses_options(
&self,
prompt: &Prompt,
model_info: &ModelInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
web_search_eligible: bool,
turn_metadata_header: Option<&str>,
compression: Compression,
) -> ApiResponsesOptions {
let turn_metadata_header = self
.turn_metadata_header
.as_deref()
.and_then(|value| HeaderValue::from_str(value).ok());
let model_info = &self.state.model_info;
let turn_metadata_header =
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok());
let default_reasoning_effort = model_info.default_reasoning_level;
let reasoning = if model_info.supports_reasoning_summaries {
Some(Reasoning {
effort: self.state.effort.or(default_reasoning_effort),
summary: if self.state.summary == ReasoningSummaryConfig::None {
effort: effort.or(default_reasoning_effort),
summary: if summary == ReasoningSummaryConfig::None {
None
} else {
Some(self.state.summary)
Some(summary)
},
})
} else {
@ -336,12 +377,12 @@ impl ModelClientSession {
};
let verbosity = if model_info.support_verbosity {
self.state
.config
self.client
.state
.model_verbosity
.or(model_info.default_verbosity)
} else {
if self.state.config.model_verbosity.is_some() {
if self.client.state.model_verbosity.is_some() {
warn!(
"model_verbosity is set but ignored as the model does not support verbosity: {}",
model_info.slug
@ -351,7 +392,7 @@ impl ModelClientSession {
};
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
let conversation_id = self.state.conversation_id.to_string();
let conversation_id = self.client.state.conversation_id.to_string();
ApiResponsesOptions {
reasoning,
@ -360,9 +401,10 @@ impl ModelClientSession {
text,
store_override: None,
conversation_id: Some(conversation_id),
session_source: Some(self.state.session_source.clone()),
session_source: Some(self.client.state.session_source.clone()),
extra_headers: build_responses_headers(
&self.state.config,
self.client.state.beta_features_header.as_deref(),
web_search_eligible,
Some(&self.turn_state),
turn_metadata_header.as_ref(),
),
@ -388,6 +430,7 @@ impl ModelClientSession {
fn prepare_websocket_request(
&self,
model_slug: &str,
api_prompt: &ApiPrompt,
options: &ApiResponsesOptions,
) -> ResponsesWsRequest {
@ -408,7 +451,7 @@ impl ModelClientSession {
let store = store_override.unwrap_or(false);
let payload = ResponseCreateWsRequest {
model: self.state.model_info.slug.clone(),
model: model_slug.to_string(),
instructions: api_prompt.instructions.clone(),
input: api_prompt.input.clone(),
tools: api_prompt.tools.clone(),
@ -427,6 +470,7 @@ impl ModelClientSession {
async fn websocket_connection(
&mut self,
otel_manager: &OtelManager,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
options: &ApiResponsesOptions,
@ -439,13 +483,13 @@ impl ModelClientSession {
if needs_new {
let mut headers = options.extra_headers.clone();
headers.extend(build_conversation_headers(options.conversation_id.clone()));
if self.state.config.features.enabled(Feature::RuntimeMetrics) {
if self.client.state.include_timing_metrics {
headers.insert(
X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER,
HeaderValue::from_static("true"),
);
}
let websocket_telemetry = self.build_websocket_telemetry();
let websocket_telemetry = Self::build_websocket_telemetry(otel_manager);
let new_conn: ApiWebSocketConnection =
ApiWebSocketResponsesClient::new(api_provider, api_auth)
.connect(
@ -463,13 +507,9 @@ impl ModelClientSession {
}
fn responses_request_compression(&self, auth: Option<&crate::auth::CodexAuth>) -> Compression {
if self
.state
.config
.features
.enabled(Feature::EnableRequestCompression)
if self.client.state.enable_request_compression
&& auth.is_some_and(CodexAuth::is_chatgpt_auth)
&& self.state.provider.is_openai()
&& self.client.state.provider.is_openai()
{
Compression::Zstd
} else {
@ -481,17 +521,29 @@ impl ModelClientSession {
///
/// Handles SSE fixtures, reasoning summaries, verbosity, and the
/// `text` controls used for output schemas.
async fn stream_responses_api(&self, prompt: &Prompt) -> Result<ResponseStream> {
#[allow(clippy::too_many_arguments)]
async fn stream_responses_api(
&self,
prompt: &Prompt,
model_info: &ModelInfo,
otel_manager: &OtelManager,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
web_search_eligible: bool,
turn_metadata_header: Option<&str>,
) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
warn!(path, "Streaming from fixture");
let stream =
codex_api::stream_from_fixture(path, self.state.provider.stream_idle_timeout())
.map_err(map_api_error)?;
return Ok(map_response_stream(stream, self.state.otel_manager.clone()));
let stream = codex_api::stream_from_fixture(
path,
self.client.state.provider.stream_idle_timeout(),
)
.map_err(map_api_error)?;
return Ok(map_response_stream(stream, otel_manager.clone()));
}
let auth_manager = self.state.auth_manager.clone();
let api_prompt = self.build_responses_request(prompt)?;
let auth_manager = self.client.state.auth_manager.clone();
let api_prompt = Self::build_responses_request(prompt)?;
let mut auth_recovery = auth_manager
.as_ref()
@ -502,26 +554,35 @@ impl ModelClientSession {
None => None,
};
let api_provider = self
.client
.state
.provider
.to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.client.state.provider)?;
let transport = ReqwestTransport::new(build_reqwest_client());
let (request_telemetry, sse_telemetry) = self.build_streaming_telemetry();
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(otel_manager);
let compression = self.responses_request_compression(auth.as_ref());
let client = ApiResponsesClient::new(transport, api_provider, api_auth)
.with_telemetry(Some(request_telemetry), Some(sse_telemetry));
let options = self.build_responses_options(prompt, compression);
let options = self.build_responses_options(
prompt,
model_info,
effort,
summary,
web_search_eligible,
turn_metadata_header,
compression,
);
let stream_result = client
.stream_prompt(&self.state.model_info.slug, &api_prompt, options)
.stream_prompt(&model_info.slug, &api_prompt, options)
.await;
match stream_result {
Ok(stream) => {
return Ok(map_response_stream(stream, self.state.otel_manager.clone()));
return Ok(map_response_stream(stream, otel_manager.clone()));
}
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
@ -535,9 +596,19 @@ impl ModelClientSession {
}
/// Streams a turn via the Responses API over WebSocket transport.
async fn stream_responses_websocket(&mut self, prompt: &Prompt) -> Result<ResponseStream> {
let auth_manager = self.state.auth_manager.clone();
let api_prompt = self.build_responses_request(prompt)?;
#[allow(clippy::too_many_arguments)]
async fn stream_responses_websocket(
&mut self,
prompt: &Prompt,
model_info: &ModelInfo,
otel_manager: &OtelManager,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
web_search_eligible: bool,
turn_metadata_header: Option<&str>,
) -> Result<ResponseStream> {
let auth_manager = self.client.state.auth_manager.clone();
let api_prompt = Self::build_responses_request(prompt)?;
let mut auth_recovery = auth_manager
.as_ref()
@ -548,17 +619,31 @@ impl ModelClientSession {
None => None,
};
let api_provider = self
.client
.state
.provider
.to_api_provider(auth.as_ref().map(CodexAuth::internal_auth_mode))?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.state.provider)?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.client.state.provider)?;
let compression = self.responses_request_compression(auth.as_ref());
let options = self.build_responses_options(prompt, compression);
let request = self.prepare_websocket_request(&api_prompt, &options);
let options = self.build_responses_options(
prompt,
model_info,
effort,
summary,
web_search_eligible,
turn_metadata_header,
compression,
);
let request = self.prepare_websocket_request(&model_info.slug, &api_prompt, &options);
let connection = match self
.websocket_connection(api_provider.clone(), api_auth.clone(), &options)
.websocket_connection(
otel_manager,
api_provider.clone(),
api_auth.clone(),
&options,
)
.await
{
Ok(connection) => connection,
@ -577,35 +662,97 @@ impl ModelClientSession {
.map_err(map_api_error)?;
self.websocket_last_items = api_prompt.input.clone();
return Ok(map_response_stream(
stream_result,
self.state.otel_manager.clone(),
));
return Ok(map_response_stream(stream_result, otel_manager.clone()));
}
}
/// Builds request and SSE telemetry for streaming API calls.
fn build_streaming_telemetry(&self) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone()));
fn build_streaming_telemetry(
otel_manager: &OtelManager,
) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
let telemetry = Arc::new(ApiTelemetry::new(otel_manager.clone()));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
(request_telemetry, sse_telemetry)
}
/// Builds telemetry for the Responses API WebSocket transport.
fn build_websocket_telemetry(&self) -> Arc<dyn WebsocketTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone()));
fn build_websocket_telemetry(otel_manager: &OtelManager) -> Arc<dyn WebsocketTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(otel_manager.clone()));
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
websocket_telemetry
}
}
impl ModelClient {
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
fn build_request_telemetry(&self) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(self.state.otel_manager.clone()));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
request_telemetry
#[allow(clippy::too_many_arguments)]
/// Streams a single model request within the current turn.
///
/// The caller is responsible for passing per-turn settings explicitly (model selection,
/// reasoning settings, telemetry context, web search eligibility, and turn metadata). This
/// method will prefer the Responses WebSocket transport when enabled and healthy, and will
/// fall back to the HTTP Responses API transport otherwise.
pub async fn stream(
&mut self,
prompt: &Prompt,
model_info: &ModelInfo,
otel_manager: &OtelManager,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
web_search_eligible: bool,
turn_metadata_header: Option<&str>,
) -> Result<ResponseStream> {
let wire_api = self.client.state.provider.wire_api;
match wire_api {
WireApi::Responses => {
let websocket_enabled =
self.responses_websocket_enabled() && !self.disable_websockets();
if websocket_enabled {
self.stream_responses_websocket(
prompt,
model_info,
otel_manager,
effort,
summary,
web_search_eligible,
turn_metadata_header,
)
.await
} else {
self.stream_responses_api(
prompt,
model_info,
otel_manager,
effort,
summary,
web_search_eligible,
turn_metadata_header,
)
.await
}
}
}
}
/// Permanently disables WebSockets for this Codex session and resets WebSocket state.
///
/// This is used after exhausting the provider retry budget, to force subsequent requests onto
/// the HTTP transport. Returns `true` if this call activated fallback, or `false` if fallback
/// was already active.
pub(crate) fn try_switch_fallback_transport(&mut self, otel_manager: &OtelManager) -> bool {
let websocket_enabled = self.responses_websocket_enabled();
let activated = self.activate_http_fallback(websocket_enabled);
if activated {
warn!("falling back to HTTP");
otel_manager.counter(
"codex.transport.fallback_to_http",
1,
&[("from_wire_api", "responses_websocket")],
);
self.connection = None;
self.websocket_last_items.clear();
}
activated
}
}
@ -620,44 +767,30 @@ fn build_api_prompt(prompt: &Prompt, instructions: String, tools_json: Vec<Value
}
}
fn experimental_feature_headers(config: &Config) -> ApiHeaderMap {
let enabled = FEATURES
.iter()
.filter_map(|spec| {
if spec.stage.experimental_menu_description().is_some()
&& config.features.enabled(spec.id)
{
Some(spec.key)
} else {
None
}
})
.collect::<Vec<_>>();
let value = enabled.join(",");
let mut headers = ApiHeaderMap::new();
if !value.is_empty()
&& let Ok(header_value) = HeaderValue::from_str(value.as_str())
{
headers.insert("x-codex-beta-features", header_value);
}
headers
}
/// Builds the extra headers attached to Responses API requests.
///
/// These headers implement Codex-specific conventions:
///
/// - `x-codex-beta-features`: comma-separated beta feature keys enabled for the session.
/// - `x-oai-web-search-eligible`: whether this turn is allowed to use web search.
/// - `x-codex-turn-state`: sticky routing token captured earlier in the turn.
/// - `x-codex-turn-metadata`: optional per-turn metadata for observability.
fn build_responses_headers(
config: &Config,
beta_features_header: Option<&str>,
web_search_eligible: bool,
turn_state: Option<&Arc<OnceLock<String>>>,
turn_metadata_header: Option<&HeaderValue>,
) -> ApiHeaderMap {
let mut headers = experimental_feature_headers(config);
let mut headers = ApiHeaderMap::new();
if let Some(value) = beta_features_header
&& !value.is_empty()
&& let Ok(header_value) = HeaderValue::from_str(value)
{
headers.insert("x-codex-beta-features", header_value);
}
headers.insert(
WEB_SEARCH_ELIGIBLE_HEADER,
HeaderValue::from_static(
if matches!(config.web_search_mode, Some(WebSearchMode::Disabled)) {
"false"
} else {
"true"
},
),
HeaderValue::from_static(if web_search_eligible { "true" } else { "false" }),
);
if let Some(turn_state) = turn_state
&& let Some(state) = turn_state.get()

View file

@ -20,6 +20,7 @@ use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::connectors;
use crate::exec_policy::ExecPolicyManager;
use crate::features::FEATURES;
use crate::features::Feature;
use crate::features::Features;
use crate::features::maybe_push_unstable_features_warning;
@ -32,7 +33,6 @@ use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
use crate::stream_events_utils::last_assistant_message_from_item;
use crate::terminal;
use crate::transport_manager::TransportManager;
use crate::truncate::TruncationPolicy;
use crate::turn_metadata::build_turn_metadata_header;
use crate::user_notification::UserNotifier;
@ -488,7 +488,6 @@ pub(crate) struct Session {
#[derive(Debug)]
pub(crate) struct TurnContext {
pub(crate) sub_id: String,
pub(crate) client: ModelClient,
pub(crate) config: Arc<Config>,
pub(crate) auth_manager: Option<Arc<AuthManager>>,
pub(crate) model_info: ModelInfo,
@ -497,7 +496,6 @@ pub(crate) struct TurnContext {
pub(crate) reasoning_effort: Option<ReasoningEffortConfig>,
pub(crate) reasoning_summary: ReasoningSummaryConfig,
pub(crate) session_source: SessionSource,
pub(crate) transport_manager: TransportManager,
/// The session's current working directory. All relative paths provided by
/// the model as well as sandbox policies are resolved against this path
/// instead of `std::env::current_dir()`.
@ -681,6 +679,33 @@ pub(crate) struct SessionSettingsUpdate {
}
impl Session {
/// Builds the `x-codex-beta-features` header value for this session.
///
/// `ModelClient` is session-scoped and intentionally does not depend on the full `Config`, so
/// we precompute the comma-separated list of enabled experimental feature keys at session
/// creation time and thread it into the client.
fn build_model_client_beta_features_header(config: &Config) -> Option<String> {
let beta_features_header = FEATURES
.iter()
.filter_map(|spec| {
if spec.stage.experimental_menu_description().is_some()
&& config.features.enabled(spec.id)
{
Some(spec.key)
} else {
None
}
})
.collect::<Vec<_>>()
.join(",");
if beta_features_header.is_empty() {
None
} else {
Some(beta_features_header)
}
}
/// Don't expand the number of mutated arguments on config. We are in the process of getting rid of it.
pub(crate) fn build_per_turn_config(session_configuration: &SessionConfiguration) -> Config {
// todo(aibrahim): store this state somewhere else so we don't need to mut config
@ -735,9 +760,7 @@ impl Session {
session_configuration: &SessionConfiguration,
per_turn_config: Config,
model_info: ModelInfo,
conversation_id: ThreadId,
sub_id: String,
transport_manager: TransportManager,
) -> TurnContext {
let reasoning_effort = session_configuration.collaboration_mode.reasoning_effort();
let reasoning_summary = session_configuration.model_reasoning_summary;
@ -746,23 +769,10 @@ impl Session {
model_info.slug.as_str(),
);
let session_source = session_configuration.session_source.clone();
let auth_manager_for_context = auth_manager.clone();
let provider_for_context = provider.clone();
let transport_manager_for_context = transport_manager.clone();
let otel_manager_for_context = otel_manager.clone();
let auth_manager_for_context = auth_manager;
let provider_for_context = provider;
let otel_manager_for_context = otel_manager;
let per_turn_config = Arc::new(per_turn_config);
let client = ModelClient::new(
per_turn_config.clone(),
auth_manager,
model_info.clone(),
otel_manager,
provider,
reasoning_effort,
reasoning_summary,
conversation_id,
session_source.clone(),
transport_manager,
);
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
@ -773,7 +783,6 @@ impl Session {
let cwd = session_configuration.cwd.clone();
TurnContext {
sub_id,
client,
config: per_turn_config.clone(),
auth_manager: auth_manager_for_context,
model_info: model_info.clone(),
@ -782,7 +791,6 @@ impl Session {
reasoning_effort,
reasoning_summary,
session_source,
transport_manager: transport_manager_for_context,
cwd,
developer_instructions: session_configuration.developer_instructions.clone(),
compact_prompt: session_configuration.compact_prompt.clone(),
@ -1020,7 +1028,17 @@ impl Session {
file_watcher,
agent_control,
state_db: state_db_ctx.clone(),
transport_manager: TransportManager::new(),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
conversation_id,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
config.features.enabled(Feature::ResponsesWebsockets),
config.features.enabled(Feature::EnableRequestCompression),
config.features.enabled(Feature::RuntimeMetrics),
Self::build_model_client_beta_features_header(config.as_ref()),
),
};
let sess = Arc::new(Session {
@ -1351,9 +1369,7 @@ impl Session {
&session_configuration,
per_turn_config,
model_info,
self.conversation_id,
sub_id,
self.services.transport_manager.clone(),
);
if let Some(final_schema) = final_output_json_schema {
@ -3353,25 +3369,11 @@ async fn spawn_review_thread(
let reasoning_effort = per_turn_config.model_reasoning_effort;
let reasoning_summary = per_turn_config.model_reasoning_summary;
let session_source = parent_turn_context.session_source.clone();
let transport_manager = parent_turn_context.transport_manager.clone();
let per_turn_config = Arc::new(per_turn_config);
let client = ModelClient::new(
per_turn_config.clone(),
auth_manager,
model_info.clone(),
otel_manager,
provider,
reasoning_effort,
reasoning_summary,
sess.conversation_id,
session_source.clone(),
transport_manager.clone(),
);
let review_turn_context = TurnContext {
sub_id: sub_id.to_string(),
client,
config: per_turn_config,
auth_manager: auth_manager_for_context,
model_info: model_info.clone(),
@ -3380,7 +3382,6 @@ async fn spawn_review_thread(
reasoning_effort,
reasoning_summary,
session_source,
transport_manager,
tools_config,
features: parent_turn_context.features.clone(),
ghost_snapshot: parent_turn_context.ghost_snapshot.clone(),
@ -3605,7 +3606,9 @@ pub(crate) async fn run_turn(
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
let mut client_session = turn_context.client.new_session(turn_metadata_header);
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
// one instance across retries within this turn.
let mut client_session = sess.services.model_client.new_session();
loop {
// Note that pending_input would be something like a message the user
@ -3658,6 +3661,7 @@ pub(crate) async fn run_turn(
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
&mut client_session,
turn_metadata_header.as_deref(),
sampling_request_input,
tool_selection,
cancellation_token.child_token(),
@ -3844,6 +3848,7 @@ struct SamplingRequestToolSelection<'a> {
skill_name_counts_lower: &'a HashMap<String, usize>,
}
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace",
skip_all,
fields(
@ -3857,6 +3862,7 @@ async fn run_sampling_request(
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
client_session: &mut ModelClientSession,
turn_metadata_header: Option<&str>,
input: Vec<ResponseItem>,
tool_selection: SamplingRequestToolSelection<'_>,
cancellation_token: CancellationToken,
@ -3914,6 +3920,7 @@ async fn run_sampling_request(
Arc::clone(&sess),
Arc::clone(&turn_context),
client_session,
turn_metadata_header,
Arc::clone(&turn_diff_tracker),
&prompt,
cancellation_token.child_token(),
@ -3943,7 +3950,9 @@ async fn run_sampling_request(
// Use the configured provider-specific stream retry budget.
let max_retries = turn_context.provider.stream_max_retries();
if retries >= max_retries && client_session.try_switch_fallback_transport() {
if retries >= max_retries
&& client_session.try_switch_fallback_transport(&turn_context.otel_manager)
{
sess.send_event(
&turn_context,
EventMsg::Warning(WarningEvent {
@ -4396,6 +4405,7 @@ async fn try_run_sampling_request(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
client_session: &mut ModelClientSession,
turn_metadata_header: Option<&str>,
turn_diff_tracker: SharedTurnDiffTracker,
prompt: &Prompt,
cancellation_token: CancellationToken,
@ -4426,8 +4436,20 @@ async fn try_run_sampling_request(
);
sess.persist_rollout_items(&[rollout_item]).await;
let web_search_eligible = !matches!(
turn_context.config.web_search_mode,
Some(WebSearchMode::Disabled)
);
let mut stream = client_session
.stream(prompt)
.stream(
prompt,
&turn_context.model_info,
&turn_context.otel_manager,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
web_search_eligible,
turn_metadata_header,
)
.instrument(trace_span!("stream_request"))
.or_cancel(&cancellation_token)
.await??;
@ -5594,7 +5616,17 @@ mod tests {
file_watcher,
agent_control,
state_db: None,
transport_manager: TransportManager::new(),
model_client: ModelClient::new(
Some(auth_manager.clone()),
conversation_id,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
config.features.enabled(Feature::ResponsesWebsockets),
config.features.enabled(Feature::EnableRequestCompression),
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
),
};
let turn_context = Session::make_turn_context(
@ -5604,9 +5636,7 @@ mod tests {
&session_configuration,
per_turn_config,
model_info,
conversation_id,
"turn_id".to_string(),
services.transport_manager.clone(),
);
let session = Session {
@ -5716,7 +5746,17 @@ mod tests {
file_watcher,
agent_control,
state_db: None,
transport_manager: TransportManager::new(),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),
conversation_id,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
config.features.enabled(Feature::ResponsesWebsockets),
config.features.enabled(Feature::EnableRequestCompression),
config.features.enabled(Feature::RuntimeMetrics),
Session::build_model_client_beta_features_header(config.as_ref()),
),
};
let turn_context = Arc::new(Session::make_turn_context(
@ -5726,9 +5766,7 @@ mod tests {
&session_configuration,
per_turn_config,
model_info,
conversation_id,
"turn_id".to_string(),
services.transport_manager.clone(),
));
let session = Arc::new(Session {

View file

@ -2,6 +2,7 @@ use std::sync::Arc;
use crate::ModelProviderInfo;
use crate::Prompt;
use crate::client::ModelClientSession;
use crate::client_common::ResponseEvent;
use crate::codex::Session;
use crate::codex::TurnContext;
@ -19,6 +20,7 @@ use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
use crate::util::backoff;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
@ -87,6 +89,10 @@ async fn run_compact_task_inner(
let max_retries = turn_context.provider.stream_max_retries();
let mut retries = 0;
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
let mut client_session = sess.services.model_client.new_session();
// Reuse one client session so turn-scoped state (sticky routing, websocket append tracking)
// survives retries within this compact turn.
// TODO: If we need to guarantee the persisted mode always matches the prompt used for this
// turn, capture it in TurnContext at creation time. Using SessionConfiguration here avoids
@ -119,7 +125,14 @@ async fn run_compact_task_inner(
personality: turn_context.personality,
..Default::default()
};
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
let attempt_result = drain_to_completed(
&sess,
turn_context.as_ref(),
&mut client_session,
turn_metadata_header.as_deref(),
&prompt,
)
.await;
match attempt_result {
Ok(()) => {
@ -335,11 +348,25 @@ fn build_compacted_history_with_limit(
async fn drain_to_completed(
sess: &Session,
turn_context: &TurnContext,
client_session: &mut ModelClientSession,
turn_metadata_header: Option<&str>,
prompt: &Prompt,
) -> CodexResult<()> {
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
let mut client_session = turn_context.client.new_session(turn_metadata_header);
let mut stream = client_session.stream(prompt).await?;
let web_search_eligible = !matches!(
turn_context.config.web_search_mode,
Some(WebSearchMode::Disabled)
);
let mut stream = client_session
.stream(
prompt,
&turn_context.model_info,
&turn_context.otel_manager,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
web_search_eligible,
turn_metadata_header,
)
.await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {

View file

@ -76,9 +76,14 @@ async fn run_remote_compact_task_inner_impl(
output_schema: None,
};
let mut new_history = turn_context
.client
.compact_conversation_history(&prompt)
let mut new_history = sess
.services
.model_client
.compact_conversation_history(
&prompt,
&turn_context.model_info,
&turn_context.otel_manager,
)
.await?;
if !ghost_snapshots.is_empty() {

View file

@ -40,7 +40,6 @@ pub mod landlock;
pub mod mcp;
mod mcp_connection_manager;
pub mod models_manager;
mod transport_manager;
pub use mcp_connection_manager::MCP_SANDBOX_STATE_CAPABILITY;
pub use mcp_connection_manager::MCP_SANDBOX_STATE_METHOD;
pub use mcp_connection_manager::SandboxState;
@ -122,7 +121,6 @@ pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::rollout_date_parts;
pub use rollout::session_index::find_thread_names_by_ids;
pub use transport_manager::TransportManager;
mod function_tool;
mod state;
mod tasks;

View file

@ -6,6 +6,9 @@ use crate::error::CodexErr;
use crate::error::Result;
use codex_api::MemoryTrace as ApiMemoryTrace;
use codex_api::MemoryTraceMetadata as ApiMemoryTraceMetadata;
use codex_otel::OtelManager;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use serde_json::Map;
use serde_json::Value;
@ -27,9 +30,15 @@ struct PreparedTrace {
///
/// The request/response wiring mirrors the memory trace summarize E2E flow:
/// `/v1/memories/trace_summarize` with one output object per input trace.
///
/// The caller provides the model selection, reasoning effort, and telemetry context explicitly so
/// the session-scoped [`ModelClient`] can be reused across turns.
pub async fn build_memories_from_trace_files(
client: &ModelClient,
trace_paths: &[PathBuf],
model_info: &ModelInfo,
effort: Option<ReasoningEffortConfig>,
otel_manager: &OtelManager,
) -> Result<Vec<BuiltTraceMemory>> {
if trace_paths.is_empty() {
return Ok(Vec::new());
@ -41,7 +50,9 @@ pub async fn build_memories_from_trace_files(
}
let traces = prepared.iter().map(|trace| trace.payload.clone()).collect();
let output = client.summarize_memory_traces(traces).await?;
let output = client
.summarize_memory_traces(traces, model_info, effort, otel_manager)
.await?;
if output.len() != prepared.len() {
return Err(CodexErr::InvalidRequest(format!(
"unexpected memory summarize output length: expected {}, got {}",

View file

@ -4,6 +4,7 @@ use crate::AuthManager;
use crate::RolloutRecorder;
use crate::agent::AgentControl;
use crate::analytics_client::AnalyticsEventsClient;
use crate::client::ModelClient;
use crate::exec_policy::ExecPolicyManager;
use crate::file_watcher::FileWatcher;
use crate::mcp_connection_manager::McpConnectionManager;
@ -11,7 +12,6 @@ use crate::models_manager::manager::ModelsManager;
use crate::skills::SkillsManager;
use crate::state_db::StateDbHandle;
use crate::tools::sandboxing::ApprovalStore;
use crate::transport_manager::TransportManager;
use crate::unified_exec::UnifiedExecProcessManager;
use crate::user_notification::UserNotifier;
use codex_otel::OtelManager;
@ -37,5 +37,6 @@ pub(crate) struct SessionServices {
pub(crate) file_watcher: Arc<FileWatcher>,
pub(crate) agent_control: AgentControl,
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) transport_manager: TransportManager,
/// Session-scoped model client shared across turns.
pub(crate) model_client: ModelClient,
}

View file

@ -1,22 +0,0 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
#[derive(Clone, Debug, Default)]
pub struct TransportManager {
disable_websockets: Arc<AtomicBool>,
}
impl TransportManager {
pub fn new() -> Self {
Self::default()
}
pub fn disable_websockets(&self) -> bool {
self.disable_websockets.load(Ordering::Relaxed)
}
pub fn activate_http_fallback(&self, websocket_enabled: bool) -> bool {
websocket_enabled && !self.disable_websockets.swap(true, Ordering::Relaxed)
}
}

View file

@ -10,7 +10,6 @@ use codex_core::ModelProviderInfo;
use codex_core::Prompt;
use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::TransportManager;
use codex_core::WEB_SEARCH_ELIGIBLE_HEADER;
use codex_core::WireApi;
use codex_core::models_manager::manager::ModelsManager;
@ -88,19 +87,19 @@ async fn responses_stream_includes_subagent_header_on_review() {
session_source.clone(),
);
let mut client_session = ModelClient::new(
Arc::clone(&config),
let web_search_eligible = !matches!(config.web_search_mode, Some(WebSearchMode::Disabled));
let client = ModelClient::new(
None,
model_info,
otel_manager,
provider,
effort,
summary,
conversation_id,
provider.clone(),
session_source,
TransportManager::new(),
)
.new_session(None);
config.model_verbosity,
false,
false,
false,
None,
);
let mut client_session = client.new_session();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
@ -113,7 +112,18 @@ async fn responses_stream_includes_subagent_header_on_review() {
phase: None,
}];
let mut stream = client_session.stream(&prompt).await.expect("stream failed");
let mut stream = client_session
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
web_search_eligible,
None,
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
@ -188,19 +198,19 @@ async fn responses_stream_includes_subagent_header_on_other() {
session_source.clone(),
);
let mut client_session = ModelClient::new(
Arc::clone(&config),
let web_search_eligible = !matches!(config.web_search_mode, Some(WebSearchMode::Disabled));
let client = ModelClient::new(
None,
model_info,
otel_manager,
provider,
effort,
summary,
conversation_id,
provider.clone(),
session_source,
TransportManager::new(),
)
.new_session(None);
config.model_verbosity,
false,
false,
false,
None,
);
let mut client_session = client.new_session();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
@ -213,7 +223,18 @@ async fn responses_stream_includes_subagent_header_on_other() {
phase: None,
}];
let mut stream = client_session.stream(&prompt).await.expect("stream failed");
let mut stream = client_session
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
web_search_eligible,
None,
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
@ -346,19 +367,19 @@ async fn responses_respects_model_info_overrides_from_config() {
session_source.clone(),
);
let mut client = ModelClient::new(
Arc::clone(&config),
let web_search_eligible = !matches!(config.web_search_mode, Some(WebSearchMode::Disabled));
let client = ModelClient::new(
None,
model_info,
otel_manager,
provider,
effort,
summary,
conversation_id,
provider.clone(),
session_source,
TransportManager::new(),
)
.new_session(None);
config.model_verbosity,
false,
false,
false,
None,
);
let mut client_session = client.new_session();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
@ -371,7 +392,18 @@ async fn responses_respects_model_info_overrides_from_config() {
phase: None,
}];
let mut stream = client.stream(&prompt).await.expect("stream failed");
let mut stream = client_session
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
web_search_eligible,
None,
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;

View file

@ -11,7 +11,6 @@ use codex_core::Prompt;
use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::ThreadManager;
use codex_core::TransportManager;
use codex_core::WireApi;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::built_in_model_providers;
@ -1264,19 +1263,18 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
SessionSource::Exec,
);
let mut client = ModelClient::new(
Arc::clone(&config),
let client = ModelClient::new(
None,
model_info,
otel_manager,
provider,
effort,
summary,
conversation_id,
provider.clone(),
SessionSource::Exec,
TransportManager::new(),
)
.new_session(None);
config.model_verbosity,
false,
false,
false,
None,
);
let mut client_session = client.new_session();
let mut prompt = Prompt::default();
prompt.input.push(ResponseItem::Reasoning {
@ -1340,8 +1338,16 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
output: "ok".into(),
});
let mut stream = client
.stream(&prompt)
let mut stream = client_session
.stream(
&prompt,
&model_info,
&otel_manager,
effort,
summary,
true,
None,
)
.await
.expect("responses stream to start");

View file

@ -8,7 +8,6 @@ use codex_core::ModelProviderInfo;
use codex_core::Prompt;
use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::TransportManager;
use codex_core::WireApi;
use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
use codex_core::features::Feature;
@ -20,6 +19,8 @@ use codex_otel::metrics::MetricsConfig;
use codex_protocol::ThreadId;
use codex_protocol::account::PlanType;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use core_test_support::load_default_config_for_test;
use core_test_support::responses::WebSocketConnectionConfig;
use core_test_support::responses::WebSocketTestServer;
@ -42,6 +43,10 @@ const MODEL: &str = "gpt-5.2-codex";
struct WebsocketTestHarness {
_codex_home: TempDir,
client: ModelClient,
model_info: ModelInfo,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummary,
web_search_eligible: bool,
otel_manager: OtelManager,
}
@ -56,10 +61,10 @@ async fn responses_websocket_streams_request() {
.await;
let harness = websocket_harness(&server).await;
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut session, &prompt).await;
stream_until_complete(&mut client_session, &harness, &prompt).await;
let connection = server.single_connection();
assert_eq!(connection.len(), 1);
@ -86,10 +91,10 @@ async fn responses_websocket_emits_websocket_telemetry_events() {
let harness = websocket_harness(&server).await;
harness.otel_manager.reset_runtime_metrics();
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut session, &prompt).await;
stream_until_complete(&mut client_session, &harness, &prompt).await;
tokio::time::sleep(Duration::from_millis(10)).await;
@ -124,10 +129,10 @@ async fn responses_websocket_includes_timing_metrics_header_when_runtime_metrics
let harness = websocket_harness_with_runtime_metrics(&server, true).await;
harness.otel_manager.reset_runtime_metrics();
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut session, &prompt).await;
stream_until_complete(&mut client_session, &harness, &prompt).await;
tokio::time::sleep(Duration::from_millis(10)).await;
let handshake = server.single_handshake();
@ -157,10 +162,10 @@ async fn responses_websocket_omits_timing_metrics_header_when_runtime_metrics_di
.await;
let harness = websocket_harness_with_runtime_metrics(&server, false).await;
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut session, &prompt).await;
stream_until_complete(&mut client_session, &harness, &prompt).await;
let handshake = server.single_handshake();
assert_eq!(
@ -182,11 +187,19 @@ async fn responses_websocket_emits_reasoning_included_event() {
.await;
let harness = websocket_harness(&server).await;
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
let mut stream = session
.stream(&prompt)
let mut stream = client_session
.stream(
&prompt,
&harness.model_info,
&harness.otel_manager,
harness.effort,
harness.summary,
harness.web_search_eligible,
None,
)
.await
.expect("websocket stream failed");
@ -245,11 +258,19 @@ async fn responses_websocket_emits_rate_limit_events() {
.await;
let harness = websocket_harness(&server).await;
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
let mut stream = session
.stream(&prompt)
let mut stream = client_session
.stream(
&prompt,
&harness.model_info,
&harness.otel_manager,
harness.effort,
harness.summary,
harness.web_search_eligible,
None,
)
.await
.expect("websocket stream failed");
@ -300,12 +321,12 @@ async fn responses_websocket_appends_on_prefix() {
.await;
let harness = websocket_harness(&server).await;
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![message_item("hello"), message_item("second")]);
stream_until_complete(&mut session, &prompt_one).await;
stream_until_complete(&mut session, &prompt_two).await;
stream_until_complete(&mut client_session, &harness, &prompt_one).await;
stream_until_complete(&mut client_session, &harness, &prompt_two).await;
let connection = server.single_connection();
assert_eq!(connection.len(), 2);
@ -336,12 +357,12 @@ async fn responses_websocket_creates_on_non_prefix() {
.await;
let harness = websocket_harness(&server).await;
let mut session = harness.client.new_session(None);
let mut client_session = harness.client.new_session();
let prompt_one = prompt_with_input(vec![message_item("hello")]);
let prompt_two = prompt_with_input(vec![message_item("different")]);
stream_until_complete(&mut session, &prompt_one).await;
stream_until_complete(&mut session, &prompt_two).await;
stream_until_complete(&mut client_session, &harness, &prompt_one).await;
stream_until_complete(&mut client_session, &harness, &prompt_two).await;
let connection = server.single_connection();
assert_eq!(connection.len(), 2);
@ -431,29 +452,47 @@ async fn websocket_harness_with_runtime_metrics(
SessionSource::Exec,
)
.with_metrics(metrics);
let effort = None;
let summary = ReasoningSummary::Auto;
let web_search_eligible = true;
let client = ModelClient::new(
Arc::clone(&config),
None,
model_info,
otel_manager.clone(),
provider.clone(),
None,
ReasoningSummary::Auto,
conversation_id,
provider.clone(),
SessionSource::Exec,
TransportManager::new(),
config.model_verbosity,
true,
false,
runtime_metrics_enabled,
None,
);
WebsocketTestHarness {
_codex_home: codex_home,
client,
model_info,
effort,
summary,
web_search_eligible,
otel_manager,
}
}
async fn stream_until_complete(session: &mut ModelClientSession, prompt: &Prompt) {
let mut stream = session
.stream(prompt)
async fn stream_until_complete(
client_session: &mut ModelClientSession,
harness: &WebsocketTestHarness,
prompt: &Prompt,
) {
let mut stream = client_session
.stream(
prompt,
&harness.model_info,
&harness.otel_manager,
harness.effort,
harness.summary,
harness.web_search_eligible,
None,
)
.await
.expect("websocket stream failed");

View file

@ -71,7 +71,7 @@ async fn retries_on_early_close() {
name: "openai".into(),
base_url: Some(format!("{}/v1", server.uri())),
// Environment variable that should exist in the test environment.
// ModelClientSession will return an error if the environment variable for the
// ModelClient will return an error if the environment variable for the
// provider is not set.
env_key: Some("PATH".into()),
env_key_instructions: None,