feat: add service name to app-server (#12319)

Add service name to the app-server so that the app can use it's own
service name

This is on thread level because later we might plan the app-server to
become a singleton on the computer
This commit is contained in:
jif-oai 2026-02-25 09:51:42 +00:00 committed by GitHub
parent 6a3233da64
commit 10c04e11b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 165 additions and 157 deletions

View file

@ -2190,6 +2190,12 @@
"type": "null"
}
]
},
"serviceName": {
"type": [
"string",
"null"
]
}
},
"type": "object"

View file

@ -13323,6 +13323,12 @@
"type": "null"
}
]
},
"serviceName": {
"type": [
"string",
"null"
]
}
},
"title": "ThreadStartParams",

View file

@ -150,6 +150,12 @@
"type": "null"
}
]
},
"serviceName": {
"type": [
"string",
"null"
]
}
},
"title": "ThreadStartParams",

View file

@ -6,7 +6,7 @@ import type { JsonValue } from "../serde_json/JsonValue";
import type { AskForApproval } from "./AskForApproval";
import type { SandboxMode } from "./SandboxMode";
export type ThreadStartParams = {model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, /**
export type ThreadStartParams = {model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, serviceName?: string | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, /**
* If true, opt into emitting raw Responses API items on the event stream.
* This is for internal use only (e.g. Codex Cloud).
*/

View file

@ -1661,6 +1661,8 @@ pub struct ThreadStartParams {
#[ts(optional = nullable)]
pub config: Option<HashMap<String, JsonValue>>,
#[ts(optional = nullable)]
pub service_name: Option<String>,
#[ts(optional = nullable)]
pub base_instructions: Option<String>,
#[ts(optional = nullable)]
pub developer_instructions: Option<String>,

View file

@ -170,6 +170,7 @@ Start a fresh thread when you need a new Codex conversation.
"approvalPolicy": "never",
"sandbox": "workspaceWrite",
"personality": "friendly",
"serviceName": "my_app_server_client", // optional metrics tag (`service_name`)
// Experimental: requires opt-in
"dynamicTools": [
{

View file

@ -1956,6 +1956,7 @@ impl CodexMessageProcessor {
approval_policy,
sandbox,
config,
service_name,
base_instructions,
developer_instructions,
dynamic_tools,
@ -2023,7 +2024,12 @@ impl CodexMessageProcessor {
match self
.thread_manager
.start_thread_with_tools(config, core_dynamic_tools, persist_extended_history)
.start_thread_with_tools_and_service_name(
config,
core_dynamic_tools,
persist_extended_history,
service_name,
)
.await
{
Ok(new_conv) => {

View file

@ -146,6 +146,34 @@ model_reasoning_effort = "high"
Ok(())
}
#[tokio::test]
async fn thread_start_accepts_metrics_service_name() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
service_name: Some("my_app_server_client".to_string()),
..Default::default()
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
assert!(!thread.id.is_empty(), "thread id should not be empty");
Ok(())
}
#[tokio::test]
async fn thread_start_ephemeral_remains_pathless() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View file

@ -83,7 +83,7 @@ impl AgentControl {
let new_thread = match session_source {
Some(session_source) => {
state
.spawn_new_thread_with_source(config, self.clone(), session_source, false)
.spawn_new_thread_with_source(config, self.clone(), session_source, false, None)
.await?
}
None => state.spawn_new_thread(config, self.clone()).await?,

View file

@ -315,6 +315,7 @@ impl Codex {
agent_control: AgentControl,
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@ -421,6 +422,7 @@ impl Codex {
codex_home: config.codex_home.clone(),
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name,
session_source,
dynamic_tools,
persist_extended_history,
@ -772,6 +774,8 @@ pub(crate) struct SessionConfiguration {
// TODO(pakrym): Remove config from here
original_config_do_not_use: Arc<Config>,
/// Optional service name tag for session metrics.
metrics_service_name: Option<String>,
/// Source of the session (cli, vscode, exec, mcp, ...)
session_source: SessionSource,
dynamic_tools: Vec<DynamicToolSpec>,
@ -1190,7 +1194,7 @@ impl Session {
let auth = auth.as_ref();
let auth_mode = auth.map(CodexAuth::auth_mode).map(TelemetryAuthMode::from);
let otel_manager = OtelManager::new(
let mut otel_manager = OtelManager::new(
conversation_id,
session_configuration.collaboration_mode.model(),
session_configuration.collaboration_mode.model(),
@ -1202,6 +1206,9 @@ impl Session {
terminal::user_agent(),
session_configuration.session_source.clone(),
);
if let Some(service_name) = session_configuration.metrics_service_name.as_deref() {
otel_manager = otel_manager.with_metrics_service_name(service_name);
}
config.features.emit_metrics(&otel_manager);
otel_manager.counter(
"codex.thread.started",
@ -7641,6 +7648,7 @@ mod tests {
codex_home: config.codex_home.clone(),
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@ -7732,6 +7740,7 @@ mod tests {
codex_home: config.codex_home.clone(),
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@ -8042,6 +8051,7 @@ mod tests {
codex_home: config.codex_home.clone(),
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@ -8093,6 +8103,7 @@ mod tests {
codex_home: config.codex_home.clone(),
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@ -8172,6 +8183,7 @@ mod tests {
codex_home: config.codex_home.clone(),
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@ -8328,6 +8340,7 @@ mod tests {
codex_home: config.codex_home.clone(),
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
session_source: SessionSource::Exec,
dynamic_tools,
persist_extended_history: false,

View file

@ -59,6 +59,7 @@ pub(crate) async fn run_codex_thread_interactive(
parent_session.services.agent_control.clone(),
Vec::new(),
false,
None,
)
.await?;
let codex = Arc::new(codex);

View file

@ -291,6 +291,22 @@ impl ThreadManager {
config: Config,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.start_thread_with_tools_and_service_name(
config,
dynamic_tools,
persist_extended_history,
None,
)
.await
}
pub async fn start_thread_with_tools_and_service_name(
&self,
config: Config,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
) -> CodexResult<NewThread> {
self.state
.spawn_thread(
@ -300,6 +316,7 @@ impl ThreadManager {
self.agent_control(),
dynamic_tools,
persist_extended_history,
metrics_service_name,
)
.await
}
@ -330,6 +347,7 @@ impl ThreadManager {
self.agent_control(),
Vec::new(),
persist_extended_history,
None,
)
.await
}
@ -371,6 +389,7 @@ impl ThreadManager {
self.agent_control(),
Vec::new(),
persist_extended_history,
None,
)
.await
}
@ -421,8 +440,14 @@ impl ThreadManagerState {
config: Config,
agent_control: AgentControl,
) -> CodexResult<NewThread> {
self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone(), false)
.await
self.spawn_new_thread_with_source(
config,
agent_control,
self.session_source.clone(),
false,
None,
)
.await
}
pub(crate) async fn spawn_new_thread_with_source(
@ -431,6 +456,7 @@ impl ThreadManagerState {
agent_control: AgentControl,
session_source: SessionSource,
persist_extended_history: bool,
metrics_service_name: Option<String>,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
@ -440,6 +466,7 @@ impl ThreadManagerState {
session_source,
Vec::new(),
persist_extended_history,
metrics_service_name,
)
.await
}
@ -460,11 +487,13 @@ impl ThreadManagerState {
session_source,
Vec::new(),
false,
None,
)
.await
}
/// Spawn a new thread with optional history and register it with the manager.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn spawn_thread(
&self,
config: Config,
@ -473,6 +502,7 @@ impl ThreadManagerState {
agent_control: AgentControl,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
@ -482,6 +512,7 @@ impl ThreadManagerState {
self.session_source.clone(),
dynamic_tools,
persist_extended_history,
metrics_service_name,
)
.await
}
@ -496,6 +527,7 @@ impl ThreadManagerState {
session_source: SessionSource,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
metrics_service_name: Option<String>,
) -> CodexResult<NewThread> {
let watch_registration = self.file_watcher.register_config(&config);
let CodexSpawnOk {
@ -511,6 +543,7 @@ impl ThreadManagerState {
agent_control,
dynamic_tools,
persist_extended_history,
metrics_service_name,
)
.await?;
self.finalize_thread_spawn(codex, thread_id, watch_registration)

View file

@ -86,7 +86,6 @@ mod live_reload;
mod model_info_overrides;
mod model_overrides;
mod model_switching;
mod model_tools;
mod model_visible_layout;
mod models_cache_ttl;
mod models_etag_responses;

View file

@ -1,149 +0,0 @@
#![allow(clippy::unwrap_used)]
use codex_core::features::Feature;
use codex_protocol::config_types::WebSearchMode;
use core_test_support::responses;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
#[allow(clippy::expect_used)]
fn tool_identifiers(body: &serde_json::Value) -> Vec<String> {
body["tools"]
.as_array()
.unwrap()
.iter()
.map(|tool| {
tool.get("name")
.and_then(|v| v.as_str())
.or_else(|| tool.get("type").and_then(|v| v.as_str()))
.map(std::string::ToString::to_string)
.expect("tool should have either name or type")
})
.collect()
}
#[allow(clippy::expect_used)]
async fn collect_tool_identifiers_for_model(model: &str) -> Vec<String> {
let server = start_mock_server().await;
let sse = responses::sse(vec![
responses::ev_response_created(model),
responses::ev_completed(model),
]);
let resp_mock = responses::mount_sse_once(&server, sse).await;
let mut builder = test_codex()
.with_model(model)
// Keep tool expectations stable when the default web_search mode changes.
.with_config(|config| {
config
.web_search_mode
.set(WebSearchMode::Cached)
.expect("test web_search_mode should satisfy constraints");
config.features.enable(Feature::CollaborationModes);
});
let test = builder
.build(&server)
.await
.expect("create test Codex conversation");
test.submit_turn("hello tools").await.expect("submit turn");
let body = resp_mock.single_request().body_json();
tool_identifiers(&body)
}
fn expected_default_tools(shell_tool: &str, tail: &[&str]) -> Vec<String> {
let mut tools = if cfg!(windows) {
vec![shell_tool.to_string()]
} else {
vec!["exec_command".to_string(), "write_stdin".to_string()]
};
tools.extend(tail.iter().map(|tool| (*tool).to_string()));
tools
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn model_selects_expected_tools() {
skip_if_no_network!();
use pretty_assertions::assert_eq;
let gpt51_codex_max_tools = collect_tool_identifiers_for_model("gpt-5.1-codex-max").await;
assert_eq!(
gpt51_codex_max_tools,
expected_default_tools(
"shell_command",
&[
"update_plan",
"request_user_input",
"apply_patch",
"web_search",
"view_image",
],
),
"gpt-5.1-codex-max should expose the apply_patch tool",
);
let gpt5_codex_tools = collect_tool_identifiers_for_model("gpt-5-codex").await;
assert_eq!(
gpt5_codex_tools,
expected_default_tools(
"shell_command",
&[
"update_plan",
"request_user_input",
"apply_patch",
"web_search",
"view_image",
],
),
"gpt-5-codex should expose the apply_patch tool",
);
let gpt51_codex_tools = collect_tool_identifiers_for_model("gpt-5.1-codex").await;
assert_eq!(
gpt51_codex_tools,
expected_default_tools(
"shell_command",
&[
"update_plan",
"request_user_input",
"apply_patch",
"web_search",
"view_image",
],
),
"gpt-5.1-codex should expose the apply_patch tool",
);
let gpt5_tools = collect_tool_identifiers_for_model("gpt-5").await;
assert_eq!(
gpt5_tools,
expected_default_tools(
"shell",
&[
"update_plan",
"request_user_input",
"web_search",
"view_image",
],
),
"gpt-5 should expose the apply_patch tool",
);
let gpt51_tools = collect_tool_identifiers_for_model("gpt-5.1").await;
assert_eq!(
gpt51_tools,
expected_default_tools(
"shell_command",
&[
"update_plan",
"request_user_input",
"apply_patch",
"web_search",
"view_image",
],
),
"gpt-5.1 should expose the apply_patch tool",
);
}

View file

@ -45,6 +45,7 @@ pub struct OtelEventMetadata {
pub(crate) account_id: Option<String>,
pub(crate) account_email: Option<String>,
pub(crate) originator: String,
pub(crate) service_name: Option<String>,
pub(crate) session_source: String,
pub(crate) model: String,
pub(crate) slug: String,
@ -67,6 +68,11 @@ impl OtelManager {
self
}
pub fn with_metrics_service_name(mut self, service_name: &str) -> Self {
self.metadata.service_name = Some(sanitize_metric_tag_value(service_name));
self
}
pub fn with_metrics(mut self, metrics: MetricsClient) -> Self {
self.metrics = Some(metrics);
self.metrics_use_metadata_tags = true;
@ -197,7 +203,7 @@ impl OtelManager {
if !self.metrics_use_metadata_tags {
return Ok(Vec::new());
}
let mut tags = Vec::with_capacity(6);
let mut tags = Vec::with_capacity(7);
Self::push_metadata_tag(&mut tags, "auth_mode", self.metadata.auth_mode.as_deref())?;
Self::push_metadata_tag(
&mut tags,
@ -209,6 +215,11 @@ impl OtelManager {
"originator",
Some(self.metadata.originator.as_str()),
)?;
Self::push_metadata_tag(
&mut tags,
"service_name",
self.metadata.service_name.as_deref(),
)?;
Self::push_metadata_tag(&mut tags, "model", Some(self.metadata.model.as_str()))?;
Self::push_metadata_tag(&mut tags, "app.version", Some(self.metadata.app_version))?;
Ok(tags)

View file

@ -79,6 +79,7 @@ impl OtelManager {
account_id,
account_email,
originator: sanitize_metric_tag_value(originator.as_str()),
service_name: None,
session_source: session_source.to_string(),
model: model.to_owned(),
slug: slug.to_owned(),

View file

@ -109,3 +109,47 @@ fn manager_allows_disabling_metadata_tags() -> Result<()> {
Ok(())
}
#[test]
fn manager_attaches_optional_service_name_tag() -> Result<()> {
let (metrics, exporter) = build_metrics_with_defaults(&[])?;
let manager = OtelManager::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
None,
None,
None,
"test_originator".to_string(),
false,
"tty".to_string(),
SessionSource::Cli,
)
.with_metrics_service_name("my_app_server_client")
.with_metrics(metrics);
manager.counter("codex.session_started", 1, &[]);
manager.shutdown_metrics()?;
let resource_metrics = latest_metrics(&exporter);
let metric =
find_metric(&resource_metrics, "codex.session_started").expect("counter metric missing");
let attrs = match metric.data() {
AggregatedMetrics::U64(data) => match data {
MetricData::Sum(sum) => {
let points: Vec<_> = sum.data_points().collect();
assert_eq!(points.len(), 1);
attributes_to_map(points[0].attributes())
}
_ => panic!("unexpected counter aggregation"),
},
_ => panic!("unexpected counter data type"),
};
assert_eq!(
attrs.get("service_name"),
Some(&"my_app_server_client".to_string())
);
Ok(())
}