From 2036a5f5e07c99eb1930f9e8ec207e8bf7e94afe Mon Sep 17 00:00:00 2001 From: colby-oai <228809017+colby-oai@users.noreply.github.com> Date: Fri, 20 Feb 2026 10:26:19 -0500 Subject: [PATCH] Add MCP server context to otel tool_result logs (#12267) Summary - capture the origin for each configured MCP server and expose it via the connection manager - plumb MCP server name/origin into tool logging and emit codex.tool_result events with those fields - add unit coverage for origin parsing and extend OTEL tests to assert empty MCP fields for non-MCP tools - currently not logging full urls or url paths to prevent logging potentially sensitive data Testing - Not run (not requested) --- codex-rs/core/src/mcp_connection_manager.rs | 50 ++++++++++++++++ codex-rs/core/src/tools/registry.rs | 21 +++++++ codex-rs/core/tests/suite/otel.rs | 62 ++++++++++++++++++++ codex-rs/otel/src/traces/otel_manager.rs | 13 ++++ codex-rs/otel/tests/suite/runtime_summary.rs | 2 + 5 files changed, 148 insertions(+) diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index c76e19809..05ac2a9a9 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -73,6 +73,7 @@ use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::instrument; use tracing::warn; +use url::Url; use crate::codex::INITIAL_SUBMIT_ID; use crate::config::types::McpServerConfig; @@ -507,6 +508,7 @@ pub struct SandboxState { /// A thin wrapper around a set of running [`RmcpClient`] instances. pub(crate) struct McpConnectionManager { clients: HashMap, + server_origins: HashMap, elicitation_requests: ElicitationRequestManager, } @@ -514,6 +516,7 @@ impl McpConnectionManager { pub(crate) fn new_uninitialized(approval_policy: &Constrained) -> Self { Self { clients: HashMap::new(), + server_origins: HashMap::new(), elicitation_requests: ElicitationRequestManager::new(approval_policy.value()), } } @@ -529,6 +532,10 @@ impl McpConnectionManager { !self.clients.is_empty() } + pub(crate) fn server_origin(&self, server_name: &str) -> Option<&str> { + self.server_origins.get(server_name).map(String::as_str) + } + pub fn set_approval_policy(&self, approval_policy: &Constrained) { if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() { *policy = approval_policy.value(); @@ -548,10 +555,14 @@ impl McpConnectionManager { ) -> (Self, CancellationToken) { let cancel_token = CancellationToken::new(); let mut clients = HashMap::new(); + let mut server_origins = HashMap::new(); let mut join_set = JoinSet::new(); let elicitation_requests = ElicitationRequestManager::new(approval_policy.value()); let mcp_servers = mcp_servers.clone(); for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) { + if let Some(origin) = transport_origin(&cfg.transport) { + server_origins.insert(server_name.clone(), origin); + } let cancel_token = cancel_token.child_token(); let _ = emit_update( &tx_event, @@ -624,6 +635,7 @@ impl McpConnectionManager { } let manager = Self { clients, + server_origins, elicitation_requests: elicitation_requests.clone(), }; tokio::spawn(async move { @@ -1448,6 +1460,16 @@ fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) { } } +fn transport_origin(transport: &McpServerTransportConfig) -> Option { + match transport { + McpServerTransportConfig::StreamableHttp { url, .. } => { + let parsed = Url::parse(url).ok()?; + Some(parsed.origin().ascii_serialization()) + } + McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()), + } +} + async fn list_tools_for_client_uncached( server_name: &str, client: &Arc, @@ -2164,4 +2186,32 @@ mod tests { display ); } + + #[test] + fn transport_origin_extracts_http_origin() { + let transport = McpServerTransportConfig::StreamableHttp { + url: "https://example.com:8443/path?query=1".to_string(), + bearer_token_env_var: None, + http_headers: None, + env_http_headers: None, + }; + + assert_eq!( + transport_origin(&transport), + Some("https://example.com:8443".to_string()) + ); + } + + #[test] + fn transport_origin_is_stdio_for_stdio_transport() { + let transport = McpServerTransportConfig::Stdio { + command: "server".to_string(), + args: Vec::new(), + env: None, + env_vars: Vec::new(), + cwd: None, + }; + + assert_eq!(transport_origin(&transport), Some("stdio".to_string())); + } } diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index 4ffb7ca10..7a0be68df 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -102,6 +102,21 @@ impl ToolRegistry { sandbox_policy_tag(&invocation.turn.sandbox_policy), ), ]; + let (mcp_server, mcp_server_origin) = match &invocation.payload { + ToolPayload::Mcp { server, .. } => { + let manager = invocation + .session + .services + .mcp_connection_manager + .read() + .await; + let origin = manager.server_origin(server).map(str::to_owned); + (Some(server.clone()), origin) + } + _ => (None, None), + }; + let mcp_server_ref = mcp_server.as_deref(); + let mcp_server_origin_ref = mcp_server_origin.as_deref(); let handler = match self.handler(tool_name.as_ref()) { Some(handler) => handler, @@ -116,6 +131,8 @@ impl ToolRegistry { false, &message, &metric_tags, + mcp_server_ref, + mcp_server_origin_ref, ); return Err(FunctionCallError::RespondToModel(message)); } @@ -131,6 +148,8 @@ impl ToolRegistry { false, &message, &metric_tags, + mcp_server_ref, + mcp_server_origin_ref, ); return Err(FunctionCallError::Fatal(message)); } @@ -146,6 +165,8 @@ impl ToolRegistry { &call_id_owned, log_payload.as_ref(), &metric_tags, + mcp_server_ref, + mcp_server_origin_ref, || { let handler = handler.clone(); let output_cell = &output_cell; diff --git a/codex-rs/core/tests/suite/otel.rs b/codex-rs/core/tests/suite/otel.rs index fd3d1ca16..03f87aec1 100644 --- a/codex-rs/core/tests/suite/otel.rs +++ b/codex-rs/core/tests/suite/otel.rs @@ -32,6 +32,64 @@ use tracing_test::traced_test; use tracing_subscriber::fmt::format::FmtSpan; use tracing_test::internal::MockWriter; +fn extract_log_field(line: &str, key: &str) -> Option { + let quoted_prefix = format!("{key}=\""); + if let Some(start) = line.find("ed_prefix) { + let value_start = start + quoted_prefix.len(); + if let Some(end_rel) = line[value_start..].find('"') { + return Some(line[value_start..value_start + end_rel].to_string()); + } + } + + let bare_prefix = format!("{key}="); + for token in line.split_whitespace() { + let trimmed = token.trim_end_matches(','); + if let Some(value) = trimmed.strip_prefix(&bare_prefix) { + return Some(value.to_string()); + } + } + + None +} + +fn assert_empty_mcp_tool_fields(line: &str) -> Result<(), String> { + let mcp_server = extract_log_field(line, "mcp_server") + .ok_or_else(|| "missing mcp_server field".to_string())?; + if !mcp_server.is_empty() { + return Err(format!("expected empty mcp_server, got {mcp_server}")); + } + + let mcp_server_origin = extract_log_field(line, "mcp_server_origin") + .ok_or_else(|| "missing mcp_server_origin field".to_string())?; + if !mcp_server_origin.is_empty() { + return Err(format!( + "expected empty mcp_server_origin, got {mcp_server_origin}" + )); + } + + Ok(()) +} + +#[test] +fn extract_log_field_handles_empty_bare_values() { + let line = "event.name=\"codex.tool_result\" mcp_server= mcp_server_origin="; + assert_eq!(extract_log_field(line, "mcp_server"), Some(String::new())); + assert_eq!( + extract_log_field(line, "mcp_server_origin"), + Some(String::new()) + ); +} + +#[test] +fn extract_log_field_does_not_confuse_similar_keys() { + let line = "event.name=\"codex.tool_result\" mcp_server_origin=stdio"; + assert_eq!(extract_log_field(line, "mcp_server"), None); + assert_eq!( + extract_log_field(line, "mcp_server_origin"), + Some("stdio".to_string()) + ); +} + #[tokio::test] #[traced_test] async fn responses_api_emits_api_request_event() { @@ -687,6 +745,7 @@ async fn handle_response_item_records_tool_result_for_custom_tool_call() { if !line.contains("success=false") { return Err("missing success field".to_string()); } + assert_empty_mcp_tool_fields(line)?; Ok(()) }); @@ -756,6 +815,7 @@ async fn handle_response_item_records_tool_result_for_function_call() { if !line.contains("success=false") { return Err("missing success field".to_string()); } + assert_empty_mcp_tool_fields(line)?; Ok(()) }); @@ -828,6 +888,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_missing_ids() if !line.contains("success=false") { return Err("missing success field".to_string()); } + assert_empty_mcp_tool_fields(line)?; Ok(()) }); @@ -899,6 +960,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_call() { if !line.contains("success=false") { return Err("missing success field".to_string()); } + assert_empty_mcp_tool_fields(line)?; Ok(()) }); diff --git a/codex-rs/otel/src/traces/otel_manager.rs b/codex-rs/otel/src/traces/otel_manager.rs index e3ba85eab..9331801f9 100644 --- a/codex-rs/otel/src/traces/otel_manager.rs +++ b/codex-rs/otel/src/traces/otel_manager.rs @@ -588,12 +588,15 @@ impl OtelManager { ); } + #[allow(clippy::too_many_arguments)] pub async fn log_tool_result_with_tags( &self, tool_name: &str, call_id: &str, arguments: &str, extra_tags: &[(&str, &str)], + mcp_server: Option<&str>, + mcp_server_origin: Option<&str>, f: F, ) -> Result<(String, bool), E> where @@ -618,6 +621,8 @@ impl OtelManager { success, output.as_ref(), extra_tags, + mcp_server, + mcp_server_origin, ); result @@ -641,6 +646,8 @@ impl OtelManager { duration_ms = %Duration::ZERO.as_millis(), success = %false, output = %error, + mcp_server = "", + mcp_server_origin = "", ); } @@ -654,6 +661,8 @@ impl OtelManager { success: bool, output: &str, extra_tags: &[(&str, &str)], + mcp_server: Option<&str>, + mcp_server_origin: Option<&str>, ) { let success_str = if success { "true" } else { "false" }; let mut tags = Vec::with_capacity(2 + extra_tags.len()); @@ -662,6 +671,8 @@ impl OtelManager { tags.extend_from_slice(extra_tags); self.counter(TOOL_CALL_COUNT_METRIC, 1, &tags); self.record_duration(TOOL_CALL_DURATION_METRIC, duration, &tags); + let mcp_server = mcp_server.unwrap_or(""); + let mcp_server_origin = mcp_server_origin.unwrap_or(""); tracing::event!( tracing::Level::INFO, event.name = "codex.tool_result", @@ -681,6 +692,8 @@ impl OtelManager { duration_ms = %duration.as_millis(), success = %success_str, output = %output, + mcp_server = %mcp_server, + mcp_server_origin = %mcp_server_origin, ); } diff --git a/codex-rs/otel/tests/suite/runtime_summary.rs b/codex-rs/otel/tests/suite/runtime_summary.rs index 0737e0797..4020e70b4 100644 --- a/codex-rs/otel/tests/suite/runtime_summary.rs +++ b/codex-rs/otel/tests/suite/runtime_summary.rs @@ -44,6 +44,8 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( true, "ok", &[], + None, + None, ); manager.record_api_request(1, Some(200), None, Duration::from_millis(300)); manager.record_websocket_request(Duration::from_millis(400), None);