Add MCP server context to otel tool_result logs (#12267)
Summary - capture the origin for each configured MCP server and expose it via the connection manager - plumb MCP server name/origin into tool logging and emit codex.tool_result events with those fields - add unit coverage for origin parsing and extend OTEL tests to assert empty MCP fields for non-MCP tools - currently not logging full urls or url paths to prevent logging potentially sensitive data Testing - Not run (not requested)
This commit is contained in:
parent
ede561b5d1
commit
2036a5f5e0
5 changed files with 148 additions and 0 deletions
|
|
@ -73,6 +73,7 @@ use tokio::task::JoinSet;
|
|||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use tracing::warn;
|
||||
use url::Url;
|
||||
|
||||
use crate::codex::INITIAL_SUBMIT_ID;
|
||||
use crate::config::types::McpServerConfig;
|
||||
|
|
@ -507,6 +508,7 @@ pub struct SandboxState {
|
|||
/// A thin wrapper around a set of running [`RmcpClient`] instances.
|
||||
pub(crate) struct McpConnectionManager {
|
||||
clients: HashMap<String, AsyncManagedClient>,
|
||||
server_origins: HashMap<String, String>,
|
||||
elicitation_requests: ElicitationRequestManager,
|
||||
}
|
||||
|
||||
|
|
@ -514,6 +516,7 @@ impl McpConnectionManager {
|
|||
pub(crate) fn new_uninitialized(approval_policy: &Constrained<AskForApproval>) -> Self {
|
||||
Self {
|
||||
clients: HashMap::new(),
|
||||
server_origins: HashMap::new(),
|
||||
elicitation_requests: ElicitationRequestManager::new(approval_policy.value()),
|
||||
}
|
||||
}
|
||||
|
|
@ -529,6 +532,10 @@ impl McpConnectionManager {
|
|||
!self.clients.is_empty()
|
||||
}
|
||||
|
||||
pub(crate) fn server_origin(&self, server_name: &str) -> Option<&str> {
|
||||
self.server_origins.get(server_name).map(String::as_str)
|
||||
}
|
||||
|
||||
pub fn set_approval_policy(&self, approval_policy: &Constrained<AskForApproval>) {
|
||||
if let Ok(mut policy) = self.elicitation_requests.approval_policy.lock() {
|
||||
*policy = approval_policy.value();
|
||||
|
|
@ -548,10 +555,14 @@ impl McpConnectionManager {
|
|||
) -> (Self, CancellationToken) {
|
||||
let cancel_token = CancellationToken::new();
|
||||
let mut clients = HashMap::new();
|
||||
let mut server_origins = HashMap::new();
|
||||
let mut join_set = JoinSet::new();
|
||||
let elicitation_requests = ElicitationRequestManager::new(approval_policy.value());
|
||||
let mcp_servers = mcp_servers.clone();
|
||||
for (server_name, cfg) in mcp_servers.into_iter().filter(|(_, cfg)| cfg.enabled) {
|
||||
if let Some(origin) = transport_origin(&cfg.transport) {
|
||||
server_origins.insert(server_name.clone(), origin);
|
||||
}
|
||||
let cancel_token = cancel_token.child_token();
|
||||
let _ = emit_update(
|
||||
&tx_event,
|
||||
|
|
@ -624,6 +635,7 @@ impl McpConnectionManager {
|
|||
}
|
||||
let manager = Self {
|
||||
clients,
|
||||
server_origins,
|
||||
elicitation_requests: elicitation_requests.clone(),
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
|
|
@ -1448,6 +1460,16 @@ fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
|
|||
}
|
||||
}
|
||||
|
||||
fn transport_origin(transport: &McpServerTransportConfig) -> Option<String> {
|
||||
match transport {
|
||||
McpServerTransportConfig::StreamableHttp { url, .. } => {
|
||||
let parsed = Url::parse(url).ok()?;
|
||||
Some(parsed.origin().ascii_serialization())
|
||||
}
|
||||
McpServerTransportConfig::Stdio { .. } => Some("stdio".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_tools_for_client_uncached(
|
||||
server_name: &str,
|
||||
client: &Arc<RmcpClient>,
|
||||
|
|
@ -2164,4 +2186,32 @@ mod tests {
|
|||
display
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transport_origin_extracts_http_origin() {
|
||||
let transport = McpServerTransportConfig::StreamableHttp {
|
||||
url: "https://example.com:8443/path?query=1".to_string(),
|
||||
bearer_token_env_var: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
transport_origin(&transport),
|
||||
Some("https://example.com:8443".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transport_origin_is_stdio_for_stdio_transport() {
|
||||
let transport = McpServerTransportConfig::Stdio {
|
||||
command: "server".to_string(),
|
||||
args: Vec::new(),
|
||||
env: None,
|
||||
env_vars: Vec::new(),
|
||||
cwd: None,
|
||||
};
|
||||
|
||||
assert_eq!(transport_origin(&transport), Some("stdio".to_string()));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -102,6 +102,21 @@ impl ToolRegistry {
|
|||
sandbox_policy_tag(&invocation.turn.sandbox_policy),
|
||||
),
|
||||
];
|
||||
let (mcp_server, mcp_server_origin) = match &invocation.payload {
|
||||
ToolPayload::Mcp { server, .. } => {
|
||||
let manager = invocation
|
||||
.session
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await;
|
||||
let origin = manager.server_origin(server).map(str::to_owned);
|
||||
(Some(server.clone()), origin)
|
||||
}
|
||||
_ => (None, None),
|
||||
};
|
||||
let mcp_server_ref = mcp_server.as_deref();
|
||||
let mcp_server_origin_ref = mcp_server_origin.as_deref();
|
||||
|
||||
let handler = match self.handler(tool_name.as_ref()) {
|
||||
Some(handler) => handler,
|
||||
|
|
@ -116,6 +131,8 @@ impl ToolRegistry {
|
|||
false,
|
||||
&message,
|
||||
&metric_tags,
|
||||
mcp_server_ref,
|
||||
mcp_server_origin_ref,
|
||||
);
|
||||
return Err(FunctionCallError::RespondToModel(message));
|
||||
}
|
||||
|
|
@ -131,6 +148,8 @@ impl ToolRegistry {
|
|||
false,
|
||||
&message,
|
||||
&metric_tags,
|
||||
mcp_server_ref,
|
||||
mcp_server_origin_ref,
|
||||
);
|
||||
return Err(FunctionCallError::Fatal(message));
|
||||
}
|
||||
|
|
@ -146,6 +165,8 @@ impl ToolRegistry {
|
|||
&call_id_owned,
|
||||
log_payload.as_ref(),
|
||||
&metric_tags,
|
||||
mcp_server_ref,
|
||||
mcp_server_origin_ref,
|
||||
|| {
|
||||
let handler = handler.clone();
|
||||
let output_cell = &output_cell;
|
||||
|
|
|
|||
|
|
@ -32,6 +32,64 @@ use tracing_test::traced_test;
|
|||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
use tracing_test::internal::MockWriter;
|
||||
|
||||
fn extract_log_field(line: &str, key: &str) -> Option<String> {
|
||||
let quoted_prefix = format!("{key}=\"");
|
||||
if let Some(start) = line.find("ed_prefix) {
|
||||
let value_start = start + quoted_prefix.len();
|
||||
if let Some(end_rel) = line[value_start..].find('"') {
|
||||
return Some(line[value_start..value_start + end_rel].to_string());
|
||||
}
|
||||
}
|
||||
|
||||
let bare_prefix = format!("{key}=");
|
||||
for token in line.split_whitespace() {
|
||||
let trimmed = token.trim_end_matches(',');
|
||||
if let Some(value) = trimmed.strip_prefix(&bare_prefix) {
|
||||
return Some(value.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn assert_empty_mcp_tool_fields(line: &str) -> Result<(), String> {
|
||||
let mcp_server = extract_log_field(line, "mcp_server")
|
||||
.ok_or_else(|| "missing mcp_server field".to_string())?;
|
||||
if !mcp_server.is_empty() {
|
||||
return Err(format!("expected empty mcp_server, got {mcp_server}"));
|
||||
}
|
||||
|
||||
let mcp_server_origin = extract_log_field(line, "mcp_server_origin")
|
||||
.ok_or_else(|| "missing mcp_server_origin field".to_string())?;
|
||||
if !mcp_server_origin.is_empty() {
|
||||
return Err(format!(
|
||||
"expected empty mcp_server_origin, got {mcp_server_origin}"
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_log_field_handles_empty_bare_values() {
|
||||
let line = "event.name=\"codex.tool_result\" mcp_server= mcp_server_origin=";
|
||||
assert_eq!(extract_log_field(line, "mcp_server"), Some(String::new()));
|
||||
assert_eq!(
|
||||
extract_log_field(line, "mcp_server_origin"),
|
||||
Some(String::new())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extract_log_field_does_not_confuse_similar_keys() {
|
||||
let line = "event.name=\"codex.tool_result\" mcp_server_origin=stdio";
|
||||
assert_eq!(extract_log_field(line, "mcp_server"), None);
|
||||
assert_eq!(
|
||||
extract_log_field(line, "mcp_server_origin"),
|
||||
Some("stdio".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn responses_api_emits_api_request_event() {
|
||||
|
|
@ -687,6 +745,7 @@ async fn handle_response_item_records_tool_result_for_custom_tool_call() {
|
|||
if !line.contains("success=false") {
|
||||
return Err("missing success field".to_string());
|
||||
}
|
||||
assert_empty_mcp_tool_fields(line)?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
|
@ -756,6 +815,7 @@ async fn handle_response_item_records_tool_result_for_function_call() {
|
|||
if !line.contains("success=false") {
|
||||
return Err("missing success field".to_string());
|
||||
}
|
||||
assert_empty_mcp_tool_fields(line)?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
|
@ -828,6 +888,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_missing_ids()
|
|||
if !line.contains("success=false") {
|
||||
return Err("missing success field".to_string());
|
||||
}
|
||||
assert_empty_mcp_tool_fields(line)?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
|
@ -899,6 +960,7 @@ async fn handle_response_item_records_tool_result_for_local_shell_call() {
|
|||
if !line.contains("success=false") {
|
||||
return Err("missing success field".to_string());
|
||||
}
|
||||
assert_empty_mcp_tool_fields(line)?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
|
|
|||
|
|
@ -588,12 +588,15 @@ impl OtelManager {
|
|||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn log_tool_result_with_tags<F, Fut, E>(
|
||||
&self,
|
||||
tool_name: &str,
|
||||
call_id: &str,
|
||||
arguments: &str,
|
||||
extra_tags: &[(&str, &str)],
|
||||
mcp_server: Option<&str>,
|
||||
mcp_server_origin: Option<&str>,
|
||||
f: F,
|
||||
) -> Result<(String, bool), E>
|
||||
where
|
||||
|
|
@ -618,6 +621,8 @@ impl OtelManager {
|
|||
success,
|
||||
output.as_ref(),
|
||||
extra_tags,
|
||||
mcp_server,
|
||||
mcp_server_origin,
|
||||
);
|
||||
|
||||
result
|
||||
|
|
@ -641,6 +646,8 @@ impl OtelManager {
|
|||
duration_ms = %Duration::ZERO.as_millis(),
|
||||
success = %false,
|
||||
output = %error,
|
||||
mcp_server = "",
|
||||
mcp_server_origin = "",
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -654,6 +661,8 @@ impl OtelManager {
|
|||
success: bool,
|
||||
output: &str,
|
||||
extra_tags: &[(&str, &str)],
|
||||
mcp_server: Option<&str>,
|
||||
mcp_server_origin: Option<&str>,
|
||||
) {
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
let mut tags = Vec::with_capacity(2 + extra_tags.len());
|
||||
|
|
@ -662,6 +671,8 @@ impl OtelManager {
|
|||
tags.extend_from_slice(extra_tags);
|
||||
self.counter(TOOL_CALL_COUNT_METRIC, 1, &tags);
|
||||
self.record_duration(TOOL_CALL_DURATION_METRIC, duration, &tags);
|
||||
let mcp_server = mcp_server.unwrap_or("");
|
||||
let mcp_server_origin = mcp_server_origin.unwrap_or("");
|
||||
tracing::event!(
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.tool_result",
|
||||
|
|
@ -681,6 +692,8 @@ impl OtelManager {
|
|||
duration_ms = %duration.as_millis(),
|
||||
success = %success_str,
|
||||
output = %output,
|
||||
mcp_server = %mcp_server,
|
||||
mcp_server_origin = %mcp_server_origin,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -44,6 +44,8 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
|||
true,
|
||||
"ok",
|
||||
&[],
|
||||
None,
|
||||
None,
|
||||
);
|
||||
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
|
||||
manager.record_websocket_request(Duration::from_millis(400), None);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue