Plumb MCP turn metadata through _meta (#15190)
## Summary Some background. We're looking to instrument GA turns end to end. Right now a big gap is grouping mcp tool calls with their codex sessions. We send session id and turn id headers to the responses call but not the mcp/wham calls. Ideally we could pass the args as headers like with responses, but given the setup of the rmcp client, we can't send as headers without either changing the rmcp package upstream to allow per request headers or introducing a mutex which break concurrency. An earlier attempt made the assumption that we had 1 client per thread, which allowed us to set headers at the start of a turn. @pakrym mentioned that this assumption might break in the near future. So the solution now is to package the turn metadata/session id into the _meta field in the post body and pull out in codex-backend. - send turn metadata to MCP servers via `tools/call` `_meta` instead of assuming per-thread request headers on shared clients - preserve the existing `_codex_apps` metadata while adding `x-codex-turn-metadata` for all MCP tool calls - extend tests to cover both custom MCP servers and the codex apps search flow --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
parent
2254ec4f30
commit
2bee37fe69
8 changed files with 139 additions and 12 deletions
|
|
@ -1286,6 +1286,7 @@ impl Session {
|
|||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn make_turn_context(
|
||||
conversation_id: ThreadId,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
provider: ModelProviderInfo,
|
||||
|
|
@ -1336,6 +1337,7 @@ impl Session {
|
|||
|
||||
let cwd = session_configuration.cwd.clone();
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
conversation_id.to_string(),
|
||||
sub_id.clone(),
|
||||
cwd.clone(),
|
||||
session_configuration.sandbox_policy.get(),
|
||||
|
|
@ -2394,6 +2396,7 @@ impl Session {
|
|||
.skills_for_config(&per_turn_config),
|
||||
);
|
||||
let mut turn_context: TurnContext = Self::make_turn_context(
|
||||
self.conversation_id,
|
||||
Some(Arc::clone(&self.services.auth_manager)),
|
||||
&self.services.session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
|
|
@ -5220,6 +5223,7 @@ async fn spawn_review_thread(
|
|||
let per_turn_config = Arc::new(per_turn_config);
|
||||
let review_turn_id = sub_id.to_string();
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
sess.conversation_id.to_string(),
|
||||
review_turn_id.clone(),
|
||||
parent_turn_context.cwd.clone(),
|
||||
parent_turn_context.sandbox_policy.get(),
|
||||
|
|
|
|||
|
|
@ -2517,6 +2517,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
|||
|
||||
let skills_outcome = Arc::new(services.skills_manager.skills_for_config(&per_turn_config));
|
||||
let turn_context = Session::make_turn_context(
|
||||
conversation_id,
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
&session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
|
|
@ -3315,6 +3316,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
|||
|
||||
let skills_outcome = Arc::new(services.skills_manager.skills_for_config(&per_turn_config));
|
||||
let turn_context = Arc::new(Session::make_turn_context(
|
||||
conversation_id,
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
&session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
|
|
|
|||
|
|
@ -119,7 +119,8 @@ pub(crate) async fn handle_mcp_tool_call(
|
|||
);
|
||||
return CallToolResult::from_result(result);
|
||||
}
|
||||
let request_meta = build_mcp_tool_call_request_meta(&server, metadata.as_ref());
|
||||
let request_meta =
|
||||
build_mcp_tool_call_request_meta(turn_context.as_ref(), &server, metadata.as_ref());
|
||||
|
||||
let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
|
|
@ -390,18 +391,30 @@ pub(crate) struct McpToolApprovalMetadata {
|
|||
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
|
||||
|
||||
fn build_mcp_tool_call_request_meta(
|
||||
turn_context: &TurnContext,
|
||||
server: &str,
|
||||
metadata: Option<&McpToolApprovalMetadata>,
|
||||
) -> Option<serde_json::Value> {
|
||||
if server != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return None;
|
||||
let mut request_meta = serde_json::Map::new();
|
||||
|
||||
if let Some(turn_metadata) = turn_context.turn_metadata_state.current_meta_value() {
|
||||
request_meta.insert(
|
||||
crate::X_CODEX_TURN_METADATA_HEADER.to_string(),
|
||||
turn_metadata,
|
||||
);
|
||||
}
|
||||
|
||||
let codex_apps_meta = metadata.and_then(|metadata| metadata.codex_apps_meta.as_ref())?;
|
||||
if server == CODEX_APPS_MCP_SERVER_NAME
|
||||
&& let Some(codex_apps_meta) =
|
||||
metadata.and_then(|metadata| metadata.codex_apps_meta.clone())
|
||||
{
|
||||
request_meta.insert(
|
||||
MCP_TOOL_CODEX_APPS_META_KEY.to_string(),
|
||||
serde_json::Value::Object(codex_apps_meta),
|
||||
);
|
||||
}
|
||||
|
||||
Some(serde_json::json!({
|
||||
MCP_TOOL_CODEX_APPS_META_KEY: codex_apps_meta,
|
||||
}))
|
||||
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
|
|
|
|||
|
|
@ -439,8 +439,39 @@ fn sanitize_mcp_tool_result_for_model_preserves_image_when_supported() {
|
|||
assert_eq!(got, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
|
||||
#[tokio::test]
|
||||
async fn mcp_tool_call_request_meta_includes_turn_metadata_for_custom_server() {
|
||||
let (_, turn_context) = make_session_and_context().await;
|
||||
let expected_turn_metadata = serde_json::from_str::<serde_json::Value>(
|
||||
&turn_context
|
||||
.turn_metadata_state
|
||||
.current_header_value()
|
||||
.expect("turn metadata header"),
|
||||
)
|
||||
.expect("turn metadata json");
|
||||
|
||||
let meta =
|
||||
build_mcp_tool_call_request_meta(&turn_context, "custom_server", /*metadata*/ None)
|
||||
.expect("custom servers should receive turn metadata");
|
||||
|
||||
assert_eq!(
|
||||
meta,
|
||||
serde_json::json!({
|
||||
crate::X_CODEX_TURN_METADATA_HEADER: expected_turn_metadata,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps_meta() {
|
||||
let (_, turn_context) = make_session_and_context().await;
|
||||
let expected_turn_metadata = serde_json::from_str::<serde_json::Value>(
|
||||
&turn_context
|
||||
.turn_metadata_state
|
||||
.current_header_value()
|
||||
.expect("turn metadata header"),
|
||||
)
|
||||
.expect("turn metadata json");
|
||||
let metadata = McpToolApprovalMetadata {
|
||||
annotations: None,
|
||||
connector_id: Some("calendar".to_string()),
|
||||
|
|
@ -461,8 +492,13 @@ fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
|
|||
};
|
||||
|
||||
assert_eq!(
|
||||
build_mcp_tool_call_request_meta(CODEX_APPS_MCP_SERVER_NAME, Some(&metadata)),
|
||||
build_mcp_tool_call_request_meta(
|
||||
&turn_context,
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
Some(&metadata),
|
||||
),
|
||||
Some(serde_json::json!({
|
||||
crate::X_CODEX_TURN_METADATA_HEADER: expected_turn_metadata,
|
||||
MCP_TOOL_CODEX_APPS_META_KEY: {
|
||||
"resource_uri": "connector://calendar/tools/calendar_create_event",
|
||||
"contains_mcp_source": true,
|
||||
|
|
|
|||
|
|
@ -53,6 +53,8 @@ impl From<WorkspaceGitMetadata> for TurnMetadataWorkspace {
|
|||
|
||||
#[derive(Clone, Debug, Serialize, Default)]
|
||||
pub(crate) struct TurnMetadataBag {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
session_id: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
turn_id: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||
|
|
@ -68,6 +70,7 @@ impl TurnMetadataBag {
|
|||
}
|
||||
|
||||
fn build_turn_metadata_bag(
|
||||
session_id: Option<String>,
|
||||
turn_id: Option<String>,
|
||||
sandbox: Option<String>,
|
||||
repo_root: Option<String>,
|
||||
|
|
@ -81,6 +84,7 @@ fn build_turn_metadata_bag(
|
|||
}
|
||||
|
||||
TurnMetadataBag {
|
||||
session_id,
|
||||
turn_id,
|
||||
workspaces,
|
||||
sandbox,
|
||||
|
|
@ -104,6 +108,7 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
|
|||
}
|
||||
|
||||
build_turn_metadata_bag(
|
||||
/*session_id*/ None,
|
||||
/*turn_id*/ None,
|
||||
sandbox.map(ToString::to_string),
|
||||
repo_root,
|
||||
|
|
@ -128,6 +133,7 @@ pub(crate) struct TurnMetadataState {
|
|||
|
||||
impl TurnMetadataState {
|
||||
pub(crate) fn new(
|
||||
session_id: String,
|
||||
turn_id: String,
|
||||
cwd: PathBuf,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
|
|
@ -136,6 +142,7 @@ impl TurnMetadataState {
|
|||
let repo_root = get_git_repo_root(&cwd).map(|root| root.to_string_lossy().into_owned());
|
||||
let sandbox = Some(sandbox_tag(sandbox_policy, windows_sandbox_level).to_string());
|
||||
let base_metadata = build_turn_metadata_bag(
|
||||
Some(session_id),
|
||||
Some(turn_id),
|
||||
sandbox,
|
||||
/*repo_root*/ None,
|
||||
|
|
@ -168,6 +175,11 @@ impl TurnMetadataState {
|
|||
Some(self.base_header.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn current_meta_value(&self) -> Option<serde_json::Value> {
|
||||
self.current_header_value()
|
||||
.and_then(|header| serde_json::from_str(&header).ok())
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_git_enrichment_task(&self) {
|
||||
if self.repo_root.is_none() {
|
||||
return;
|
||||
|
|
@ -189,6 +201,7 @@ impl TurnMetadataState {
|
|||
};
|
||||
|
||||
let enriched_metadata = build_turn_metadata_bag(
|
||||
state.base_metadata.session_id.clone(),
|
||||
state.base_metadata.turn_id.clone(),
|
||||
state.base_metadata.sandbox.clone(),
|
||||
Some(repo_root),
|
||||
|
|
|
|||
|
|
@ -67,6 +67,7 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
|
|||
let sandbox_policy = SandboxPolicy::new_read_only_policy();
|
||||
|
||||
let state = TurnMetadataState::new(
|
||||
"session-a".to_string(),
|
||||
"turn-a".to_string(),
|
||||
cwd,
|
||||
&sandbox_policy,
|
||||
|
|
@ -76,7 +77,9 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
|
|||
let header = state.current_header_value().expect("header");
|
||||
let json: Value = serde_json::from_str(&header).expect("json");
|
||||
let sandbox_name = json.get("sandbox").and_then(Value::as_str);
|
||||
let session_id = json.get("session_id").and_then(Value::as_str);
|
||||
|
||||
let expected_sandbox = sandbox_tag(&sandbox_policy, WindowsSandboxLevel::Disabled);
|
||||
assert_eq!(sandbox_name, Some(expected_sandbox));
|
||||
assert_eq!(session_id, Some("session-a"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -424,6 +424,39 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() -
|
|||
let requests = mock.requests();
|
||||
assert_eq!(requests.len(), 3);
|
||||
|
||||
let apps_tool_call = server
|
||||
.received_requests()
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.find_map(|request| {
|
||||
let body: Value = serde_json::from_slice(&request.body).ok()?;
|
||||
(request.url.path() == "/api/codex/apps"
|
||||
&& body.get("method").and_then(Value::as_str) == Some("tools/call"))
|
||||
.then_some(body)
|
||||
})
|
||||
.expect("apps tools/call request should be recorded");
|
||||
|
||||
assert_eq!(
|
||||
apps_tool_call.pointer("/params/_meta/_codex_apps"),
|
||||
Some(&json!({
|
||||
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": "calendar",
|
||||
}))
|
||||
);
|
||||
assert_eq!(
|
||||
apps_tool_call.pointer("/params/_meta/x-codex-turn-metadata/session_id"),
|
||||
Some(&json!(test.session_configured.session_id.to_string()))
|
||||
);
|
||||
assert!(
|
||||
apps_tool_call
|
||||
.pointer("/params/_meta/x-codex-turn-metadata/turn_id")
|
||||
.and_then(Value::as_str)
|
||||
.is_some_and(|turn_id| !turn_id.is_empty()),
|
||||
"apps tools/call should include turn metadata turn_id: {apps_tool_call:?}"
|
||||
);
|
||||
|
||||
let first_request_tools = tool_names(&requests[0].body_json());
|
||||
assert!(
|
||||
first_request_tools
|
||||
|
|
|
|||
|
|
@ -723,7 +723,7 @@ impl RmcpClient {
|
|||
None => None,
|
||||
};
|
||||
let rmcp_params = CallToolRequestParams {
|
||||
meta,
|
||||
meta: None,
|
||||
name: name.into(),
|
||||
arguments,
|
||||
task: None,
|
||||
|
|
@ -731,7 +731,30 @@ impl RmcpClient {
|
|||
let result = self
|
||||
.run_service_operation("tools/call", timeout, move |service| {
|
||||
let rmcp_params = rmcp_params.clone();
|
||||
async move { service.call_tool(rmcp_params).await }.boxed()
|
||||
let meta = meta.clone();
|
||||
async move {
|
||||
let result = service
|
||||
.peer()
|
||||
.send_request_with_option(
|
||||
ClientRequest::CallToolRequest(rmcp::model::CallToolRequest {
|
||||
method: Default::default(),
|
||||
params: rmcp_params,
|
||||
extensions: Default::default(),
|
||||
}),
|
||||
rmcp::service::PeerRequestOptions {
|
||||
timeout: None,
|
||||
meta,
|
||||
},
|
||||
)
|
||||
.await?
|
||||
.await_response()
|
||||
.await?;
|
||||
match result {
|
||||
ServerResult::CallToolResult(result) => Ok(result),
|
||||
_ => Err(rmcp::service::ServiceError::UnexpectedResponse),
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
})
|
||||
.await?;
|
||||
self.persist_oauth_tokens().await;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue