From 0c09dc3c033eadb76a64d61b368972e11eb90fbd Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Tue, 13 Jan 2026 22:14:41 -0800 Subject: [PATCH] feat: add threadId to MCP server messages (#9192) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This favors `threadId` instead of `conversationId` so we use the same terms as https://developers.openai.com/codex/sdk/. To test the local build: ``` cd codex-rs cargo build --bin codex npx -y @modelcontextprotocol/inspector ./target/debug/codex mcp-server ``` I sent: ```json { "method": "tools/call", "params": { "name": "codex", "arguments": { "prompt": "favorite ls option?" }, "_meta": { "progressToken": 0 } } } ``` and got: ```json { "content": [ { "type": "text", "text": "`ls -lah` (or `ls -alh`) — long listing, includes dotfiles, human-readable sizes." } ], "structuredContent": { "threadId": "019bbb20-bff6-7130-83aa-bf45ab33250e" } } ``` and successfully used the `threadId` in the follow-up with the `codex-reply` tool call: ```json { "method": "tools/call", "params": { "name": "codex-reply", "arguments": { "prompt": "what is the long versoin", "threadId": "019bbb20-bff6-7130-83aa-bf45ab33250e" }, "_meta": { "progressToken": 1 } } } ``` whose response also has the `threadId`: ```json { "content": [ { "type": "text", "text": "Long listing is `ls -l` (adds permissions, owner/group, size, timestamp)." } ], "structuredContent": { "threadId": "019bbb20-bff6-7130-83aa-bf45ab33250e" } } ``` Fixes https://github.com/openai/codex/issues/3712. --- codex-rs/mcp-server/src/codex_tool_config.rs | 40 +++++- codex-rs/mcp-server/src/codex_tool_runner.rs | 127 +++++++++++++----- codex-rs/mcp-server/src/exec_approval.rs | 5 + codex-rs/mcp-server/src/message_processor.rs | 42 +++--- codex-rs/mcp-server/src/outgoing_message.rs | 79 ++++++++++- codex-rs/mcp-server/src/patch_approval.rs | 7 +- codex-rs/mcp-server/tests/suite/codex_tool.rs | 60 ++++++--- 7 files changed, 270 insertions(+), 90 deletions(-) diff --git a/codex-rs/mcp-server/src/codex_tool_config.rs b/codex-rs/mcp-server/src/codex_tool_config.rs index 26fdc0732..f5237bfc0 100644 --- a/codex-rs/mcp-server/src/codex_tool_config.rs +++ b/codex-rs/mcp-server/src/codex_tool_config.rs @@ -3,6 +3,7 @@ use codex_core::config::Config; use codex_core::config::ConfigOverrides; use codex_core::protocol::AskForApproval; +use codex_protocol::ThreadId; use codex_protocol::config_types::SandboxMode; use codex_utils_json_to_toml::json_to_toml; use mcp_types::Tool; @@ -185,13 +186,36 @@ impl CodexToolCallParam { #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct CodexToolCallReplyParam { - /// The conversation id for this Codex session. - pub conversation_id: String, + /// DEPRECATED: use threadId instead. + #[serde(default, skip_serializing_if = "Option::is_none")] + conversation_id: Option, + + /// The thread id for this Codex session. + /// This field is required, but we keep it optional here for backward + /// compatibility for clients that still use conversationId. + #[serde(default, skip_serializing_if = "Option::is_none")] + thread_id: Option, /// The *next user prompt* to continue the Codex conversation. pub prompt: String, } +impl CodexToolCallReplyParam { + pub(crate) fn get_thread_id(&self) -> anyhow::Result { + if let Some(thread_id) = &self.thread_id { + let thread_id = ThreadId::from_string(thread_id)?; + Ok(thread_id) + } else if let Some(conversation_id) = &self.conversation_id { + let thread_id = ThreadId::from_string(conversation_id)?; + Ok(thread_id) + } else { + Err(anyhow::anyhow!( + "either threadId or conversationId must be provided" + )) + } + } +} + /// Builds a `Tool` definition for the `codex-reply` tool-call. pub(crate) fn create_tool_for_codex_tool_call_reply_param() -> Tool { let schema = SchemaSettings::draft2019_09() @@ -217,8 +241,7 @@ pub(crate) fn create_tool_for_codex_tool_call_reply_param() -> Tool { input_schema: tool_input_schema, output_schema: None, description: Some( - "Continue a Codex conversation by providing the conversation id and prompt." - .to_string(), + "Continue a Codex conversation by providing the thread id and prompt.".to_string(), ), annotations: None, } @@ -317,20 +340,23 @@ mod tests { let tool = create_tool_for_codex_tool_call_reply_param(); let tool_json = serde_json::to_value(&tool).expect("tool serializes"); let expected_tool_json = serde_json::json!({ - "description": "Continue a Codex conversation by providing the conversation id and prompt.", + "description": "Continue a Codex conversation by providing the thread id and prompt.", "inputSchema": { "properties": { "conversationId": { - "description": "The conversation id for this Codex session.", + "description": "DEPRECATED: use threadId instead.", "type": "string" }, "prompt": { "description": "The *next user prompt* to continue the Codex conversation.", "type": "string" }, + "threadId": { + "description": "The thread id for this Codex session. This field is required, but we keep it optional here for backward compatibility for clients that still use conversationId.", + "type": "string" + } }, "required": [ - "conversationId", "prompt", ], "type": "object", diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 1ee4cbd7f..c7aaa6190 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -32,6 +32,26 @@ use tokio::sync::Mutex; pub(crate) const INVALID_PARAMS_ERROR_CODE: i64 = -32602; +/// To adhere to MCP `tools/call` response format, include the Codex +/// `threadId` in the `structured_content` field of the response. +fn create_call_tool_result_with_thread_id( + thread_id: ThreadId, + text: String, + is_error: Option, +) -> CallToolResult { + CallToolResult { + content: vec![ContentBlock::TextContent(TextContent { + r#type: "text".to_string(), + text, + annotations: None, + })], + is_error, + structured_content: Some(json!({ + "threadId": thread_id, + })), + } +} + /// Run a complete Codex session and stream events back to the client. /// /// On completion (success or error) the function sends the appropriate @@ -73,7 +93,10 @@ pub async fn run_codex_tool_session( outgoing .send_event_as_notification( &session_configured_event, - Some(OutgoingNotificationMeta::new(Some(id.clone()))), + Some(OutgoingNotificationMeta { + request_id: Some(id.clone()), + thread_id: Some(thread_id), + }), ) .await; @@ -100,27 +123,40 @@ pub async fn run_codex_tool_session( if let Err(e) = thread.submit_with_id(submission).await { tracing::error!("Failed to submit initial prompt: {e}"); + let result = create_call_tool_result_with_thread_id( + thread_id, + format!("Failed to submit initial prompt: {e}"), + Some(true), + ); + outgoing.send_response(id.clone(), result).await; // unregister the id so we don't keep it in the map running_requests_id_to_codex_uuid.lock().await.remove(&id); return; } - run_codex_tool_session_inner(thread, outgoing, id, running_requests_id_to_codex_uuid).await; + run_codex_tool_session_inner( + thread_id, + thread, + outgoing, + id, + running_requests_id_to_codex_uuid, + ) + .await; } pub async fn run_codex_tool_session_reply( - conversation: Arc, + thread_id: ThreadId, + thread: Arc, outgoing: Arc, request_id: RequestId, prompt: String, running_requests_id_to_codex_uuid: Arc>>, - conversation_id: ThreadId, ) { running_requests_id_to_codex_uuid .lock() .await - .insert(request_id.clone(), conversation_id); - if let Err(e) = conversation + .insert(request_id.clone(), thread_id); + if let Err(e) = thread .submit(Op::UserInput { items: vec![UserInput::Text { text: prompt }], final_output_json_schema: None, @@ -128,6 +164,12 @@ pub async fn run_codex_tool_session_reply( .await { tracing::error!("Failed to submit user input: {e}"); + let result = create_call_tool_result_with_thread_id( + thread_id, + format!("Failed to submit user input: {e}"), + Some(true), + ); + outgoing.send_response(request_id.clone(), result).await; // unregister the id so we don't keep it in the map running_requests_id_to_codex_uuid .lock() @@ -137,7 +179,8 @@ pub async fn run_codex_tool_session_reply( } run_codex_tool_session_inner( - conversation, + thread_id, + thread, outgoing, request_id, running_requests_id_to_codex_uuid, @@ -146,7 +189,8 @@ pub async fn run_codex_tool_session_reply( } async fn run_codex_tool_session_inner( - codex: Arc, + thread_id: ThreadId, + thread: Arc, outgoing: Arc, request_id: RequestId, running_requests_id_to_codex_uuid: Arc>>, @@ -159,12 +203,15 @@ async fn run_codex_tool_session_inner( // Stream events until the task needs to pause for user interaction or // completes. loop { - match codex.next_event().await { + match thread.next_event().await { Ok(event) => { outgoing .send_event_as_notification( &event, - Some(OutgoingNotificationMeta::new(Some(request_id.clone()))), + Some(OutgoingNotificationMeta { + request_id: Some(request_id.clone()), + thread_id: Some(thread_id), + }), ) .await; @@ -182,21 +229,24 @@ async fn run_codex_tool_session_inner( command, cwd, outgoing.clone(), - codex.clone(), + thread.clone(), request_id.clone(), request_id_str.clone(), event.id.clone(), call_id, parsed_cmd, + thread_id, ) .await; continue; } EventMsg::Error(err_event) => { - // Return a response to conclude the tool call when the Codex session reports an error (e.g., interruption). - let result = json!({ - "error": err_event.message, - }); + // Always respond in tools/call's expected shape, and include conversationId so the client can resume. + let result = create_call_tool_result_with_thread_id( + thread_id, + err_event.message, + Some(true), + ); outgoing.send_response(request_id.clone(), result).await; break; } @@ -220,10 +270,11 @@ async fn run_codex_tool_session_inner( grant_root, changes, outgoing.clone(), - codex.clone(), + thread.clone(), request_id.clone(), request_id_str.clone(), event.id.clone(), + thread_id, ) .await; continue; @@ -233,15 +284,7 @@ async fn run_codex_tool_session_inner( Some(msg) => msg, None => "".to_string(), }; - let result = CallToolResult { - content: vec![ContentBlock::TextContent(TextContent { - r#type: "text".to_string(), - text, - annotations: None, - })], - is_error: None, - structured_content: None, - }; + let result = create_call_tool_result_with_thread_id(thread_id, text, None); outgoing.send_response(request_id.clone(), result).await; // unregister the id so we don't keep it in the map running_requests_id_to_codex_uuid @@ -317,20 +360,32 @@ async fn run_codex_tool_session_inner( } } Err(e) => { - let result = CallToolResult { - content: vec![ContentBlock::TextContent(TextContent { - r#type: "text".to_string(), - text: format!("Codex runtime error: {e}"), - annotations: None, - })], - is_error: Some(true), - // TODO(mbolin): Could present the error in a more - // structured way. - structured_content: None, - }; + let result = create_call_tool_result_with_thread_id( + thread_id, + format!("Codex runtime error: {e}"), + Some(true), + ); outgoing.send_response(request_id.clone(), result).await; break; } } } } + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn call_tool_result_includes_thread_id_in_structured_content() { + let thread_id = ThreadId::new(); + let result = create_call_tool_result_with_thread_id(thread_id, "done".to_string(), None); + assert_eq!( + result.structured_content, + Some(json!({ + "threadId": thread_id, + })) + ); + } +} diff --git a/codex-rs/mcp-server/src/exec_approval.rs b/codex-rs/mcp-server/src/exec_approval.rs index 47f52caf7..a98099dcf 100644 --- a/codex-rs/mcp-server/src/exec_approval.rs +++ b/codex-rs/mcp-server/src/exec_approval.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use codex_core::CodexThread; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; +use codex_protocol::ThreadId; use codex_protocol::parse_command::ParsedCommand; use mcp_types::ElicitRequest; use mcp_types::ElicitRequestParamsRequestedSchema; @@ -30,6 +31,8 @@ pub struct ExecApprovalElicitRequestParams { // These are additional fields the client can use to // correlate the request with the codex tool call. + #[serde(rename = "threadId")] + pub thread_id: ThreadId, pub codex_elicitation: String, pub codex_mcp_tool_call_id: String, pub codex_event_id: String, @@ -59,6 +62,7 @@ pub(crate) async fn handle_exec_approval_request( event_id: String, call_id: String, codex_parsed_cmd: Vec, + thread_id: ThreadId, ) { let escaped_command = shlex::try_join(command.iter().map(String::as_str)).unwrap_or_else(|_| command.join(" ")); @@ -74,6 +78,7 @@ pub(crate) async fn handle_exec_approval_request( properties: json!({}), required: None, }, + thread_id, codex_elicitation: "exec-approval".to_string(), codex_mcp_tool_call_id: tool_call_id.clone(), codex_event_id: event_id.clone(), diff --git a/codex-rs/mcp-server/src/message_processor.rs b/codex-rs/mcp-server/src/message_processor.rs index dcf5411a0..33bad85d7 100644 --- a/codex-rs/mcp-server/src/message_processor.rs +++ b/codex-rs/mcp-server/src/message_processor.rs @@ -433,10 +433,7 @@ impl MessageProcessor { tracing::info!("tools/call -> params: {:?}", arguments); // parse arguments - let CodexToolCallReplyParam { - conversation_id, - prompt, - } = match arguments { + let codex_tool_call_reply_param: CodexToolCallReplyParam = match arguments { Some(json_val) => match serde_json::from_value::(json_val) { Ok(params) => params, Err(e) => { @@ -457,12 +454,12 @@ impl MessageProcessor { }, None => { tracing::error!( - "Missing arguments for codex-reply tool-call; the `conversation_id` and `prompt` fields are required." + "Missing arguments for codex-reply tool-call; the `thread_id` and `prompt` fields are required." ); let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), - text: "Missing arguments for codex-reply tool-call; the `conversation_id` and `prompt` fields are required.".to_owned(), + text: "Missing arguments for codex-reply tool-call; the `thread_id` and `prompt` fields are required.".to_owned(), annotations: None, })], is_error: Some(true), @@ -473,14 +470,15 @@ impl MessageProcessor { return; } }; - let conversation_id = match ThreadId::from_string(&conversation_id) { + + let thread_id = match codex_tool_call_reply_param.get_thread_id() { Ok(id) => id, Err(e) => { - tracing::error!("Failed to parse conversation_id: {e}"); + tracing::error!("Failed to parse thread_id: {e}"); let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), - text: format!("Failed to parse conversation_id: {e}"), + text: format!("Failed to parse thread_id: {e}"), annotations: None, })], is_error: Some(true), @@ -496,18 +494,20 @@ impl MessageProcessor { let outgoing = self.outgoing.clone(); let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone(); - let codex = match self.thread_manager.get_thread(conversation_id).await { + let codex = match self.thread_manager.get_thread(thread_id).await { Ok(c) => c, Err(_) => { - tracing::warn!("Session not found for conversation_id: {conversation_id}"); + tracing::warn!("Session not found for thread_id: {thread_id}"); let result = CallToolResult { content: vec![ContentBlock::TextContent(TextContent { r#type: "text".to_owned(), - text: format!("Session not found for conversation_id: {conversation_id}"), + text: format!("Session not found for thread_id: {thread_id}"), annotations: None, })], is_error: Some(true), - structured_content: None, + structured_content: Some(json!({ + "threadId": thread_id, + })), }; outgoing.send_response(request_id, result).await; return; @@ -515,19 +515,19 @@ impl MessageProcessor { }; // Spawn the long-running reply handler. + let prompt = codex_tool_call_reply_param.prompt.clone(); tokio::spawn({ let outgoing = outgoing.clone(); - let prompt = prompt.clone(); let running_requests_id_to_codex_uuid = running_requests_id_to_codex_uuid.clone(); async move { crate::codex_tool_runner::run_codex_tool_session_reply( + thread_id, codex, outgoing, request_id, prompt, running_requests_id_to_codex_uuid, - conversation_id, ) .await; } @@ -563,8 +563,8 @@ impl MessageProcessor { RequestId::Integer(i) => i.to_string(), }; - // Obtain the conversation id while holding the first lock, then release. - let conversation_id = { + // Obtain the thread id while holding the first lock, then release. + let thread_id = { let map_guard = self.running_requests_id_to_codex_uuid.lock().await; match map_guard.get(&request_id) { Some(id) => *id, @@ -574,13 +574,13 @@ impl MessageProcessor { } } }; - tracing::info!("conversation_id: {conversation_id}"); + tracing::info!("thread_id: {thread_id}"); - // Obtain the Codex conversation from the server. - let codex_arc = match self.thread_manager.get_thread(conversation_id).await { + // Obtain the Codex thread from the server. + let codex_arc = match self.thread_manager.get_thread(thread_id).await { Ok(c) => c, Err(_) => { - tracing::warn!("Session not found for conversation_id: {conversation_id}"); + tracing::warn!("Session not found for thread_id: {thread_id}"); return; } }; diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index fef5c8bac..0b05bc360 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -3,6 +3,7 @@ use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; use codex_core::protocol::Event; +use codex_protocol::ThreadId; use mcp_types::JSONRPC_VERSION; use mcp_types::JSONRPCError; use mcp_types::JSONRPCErrorError; @@ -209,12 +210,11 @@ pub(crate) struct OutgoingNotificationParams { #[serde(rename_all = "camelCase")] pub(crate) struct OutgoingNotificationMeta { pub request_id: Option, -} -impl OutgoingNotificationMeta { - pub(crate) fn new(request_id: Option) -> Self { - Self { request_id } - } + /// Because multiple threads may be multiplexed over a single MCP connection, + /// include the `threadId` in the notification meta. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub thread_id: Option, } #[derive(Debug, Clone, PartialEq, Serialize)] @@ -251,12 +251,12 @@ mod tests { let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); - let conversation_id = ThreadId::new(); + let thread_id = ThreadId::new(); let rollout_file = NamedTempFile::new()?; let event = Event { id: "1".to_string(), msg: EventMsg::SessionConfigured(SessionConfiguredEvent { - session_id: conversation_id, + session_id: thread_id, model: "gpt-4o".to_string(), model_provider_id: "test-provider".to_string(), approval_policy: AskForApproval::Never, @@ -313,6 +313,7 @@ mod tests { }; let meta = OutgoingNotificationMeta { request_id: Some(RequestId::String("123".to_string())), + thread_id: None, }; outgoing_message_sender @@ -348,4 +349,68 @@ mod tests { assert_eq!(params.unwrap(), expected_params); Ok(()) } + + #[tokio::test] + async fn test_send_event_as_notification_with_meta_and_thread_id() -> Result<()> { + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::(); + let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); + + let thread_id = ThreadId::new(); + let rollout_file = NamedTempFile::new()?; + let session_configured_event = SessionConfiguredEvent { + session_id: thread_id, + model: "gpt-4o".to_string(), + model_provider_id: "test-provider".to_string(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::ReadOnly, + cwd: PathBuf::from("/home/user/project"), + reasoning_effort: Some(ReasoningEffort::default()), + history_log_id: 1, + history_entry_count: 1000, + initial_messages: None, + rollout_path: rollout_file.path().to_path_buf(), + }; + let event = Event { + id: "1".to_string(), + msg: EventMsg::SessionConfigured(session_configured_event.clone()), + }; + let meta = OutgoingNotificationMeta { + request_id: Some(RequestId::String("123".to_string())), + thread_id: Some(thread_id), + }; + + outgoing_message_sender + .send_event_as_notification(&event, Some(meta)) + .await; + + let result = outgoing_rx.recv().await.unwrap(); + let OutgoingMessage::Notification(OutgoingNotification { method, params }) = result else { + panic!("expected Notification for first message"); + }; + assert_eq!(method, "codex/event"); + let expected_params = json!({ + "_meta": { + "requestId": "123", + "threadId": thread_id.to_string(), + }, + "id": "1", + "msg": { + "type": "session_configured", + "session_id": session_configured_event.session_id, + "model": "gpt-4o", + "model_provider_id": "test-provider", + "approval_policy": "never", + "sandbox_policy": { + "type": "read-only" + }, + "cwd": "/home/user/project", + "reasoning_effort": session_configured_event.reasoning_effort, + "history_log_id": session_configured_event.history_log_id, + "history_entry_count": session_configured_event.history_entry_count, + "rollout_path": rollout_file.path().to_path_buf(), + } + }); + assert_eq!(params.unwrap(), expected_params); + Ok(()) + } } diff --git a/codex-rs/mcp-server/src/patch_approval.rs b/codex-rs/mcp-server/src/patch_approval.rs index 00e4f204a..5c3073959 100644 --- a/codex-rs/mcp-server/src/patch_approval.rs +++ b/codex-rs/mcp-server/src/patch_approval.rs @@ -6,6 +6,7 @@ use codex_core::CodexThread; use codex_core::protocol::FileChange; use codex_core::protocol::Op; use codex_core::protocol::ReviewDecision; +use codex_protocol::ThreadId; use mcp_types::ElicitRequest; use mcp_types::ElicitRequestParamsRequestedSchema; use mcp_types::JSONRPCErrorError; @@ -19,11 +20,13 @@ use tracing::error; use crate::codex_tool_runner::INVALID_PARAMS_ERROR_CODE; use crate::outgoing_message::OutgoingMessageSender; -#[derive(Debug, Serialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct PatchApprovalElicitRequestParams { pub message: String, #[serde(rename = "requestedSchema")] pub requested_schema: ElicitRequestParamsRequestedSchema, + #[serde(rename = "threadId")] + pub thread_id: ThreadId, pub codex_elicitation: String, pub codex_mcp_tool_call_id: String, pub codex_event_id: String, @@ -51,6 +54,7 @@ pub(crate) async fn handle_patch_approval_request( request_id: RequestId, tool_call_id: String, event_id: String, + thread_id: ThreadId, ) { let mut message_lines = Vec::new(); if let Some(r) = &reason { @@ -65,6 +69,7 @@ pub(crate) async fn handle_patch_approval_request( properties: json!({}), required: None, }, + thread_id, codex_elicitation: "patch-approval".to_string(), codex_mcp_tool_call_id: tool_call_id.clone(), codex_event_id: event_id.clone(), diff --git a/codex-rs/mcp-server/tests/suite/codex_tool.rs b/codex-rs/mcp-server/tests/suite/codex_tool.rs index c0f9310c5..dfe365120 100644 --- a/codex-rs/mcp-server/tests/suite/codex_tool.rs +++ b/codex-rs/mcp-server/tests/suite/codex_tool.rs @@ -119,6 +119,7 @@ async fn shell_command_approval_triggers_elicitation() -> anyhow::Result<()> { workdir_for_shell_function_call.path(), codex_request_id.to_string(), params.codex_event_id.clone(), + params.thread_id, )?; assert_eq!(expected_elicitation_request, elicitation_request); @@ -158,7 +159,10 @@ async fn shell_command_approval_triggers_elicitation() -> anyhow::Result<()> { "text": "File created!", "type": "text" } - ] + ], + "structuredContent": { + "threadId": params.thread_id, + } }), }, codex_response @@ -175,6 +179,7 @@ fn create_expected_elicitation_request( workdir: &Path, codex_mcp_tool_call_id: String, codex_event_id: String, + thread_id: codex_protocol::ThreadId, ) -> anyhow::Result { let expected_message = format!( "Allow Codex to run `{}` in `{}`?", @@ -193,6 +198,7 @@ fn create_expected_elicitation_request( properties: json!({}), required: None, }, + thread_id, codex_elicitation: "exec-approval".to_string(), codex_mcp_tool_call_id, codex_event_id, @@ -260,7 +266,13 @@ async fn patch_approval_triggers_elicitation() -> anyhow::Result<()> { ) .await??; - let elicitation_request_id = RequestId::Integer(0); + let elicitation_request_id = elicitation_request.id.clone(); + let params = serde_json::from_value::( + elicitation_request + .params + .clone() + .ok_or_else(|| anyhow::anyhow!("elicitation_request.params must be set"))?, + )?; let mut expected_changes = HashMap::new(); expected_changes.insert( @@ -277,7 +289,8 @@ async fn patch_approval_triggers_elicitation() -> anyhow::Result<()> { None, // No grant_root expected None, // No reason expected codex_request_id.to_string(), - "1".to_string(), + params.codex_event_id.clone(), + params.thread_id, )?; assert_eq!(expected_elicitation_request, elicitation_request); @@ -307,7 +320,10 @@ async fn patch_approval_triggers_elicitation() -> anyhow::Result<()> { "text": "Patch has been applied successfully!", "type": "text" } - ] + ], + "structuredContent": { + "threadId": params.thread_id, + } }), }, codex_response @@ -331,7 +347,7 @@ async fn test_codex_tool_passes_base_instructions() { } async fn codex_tool_passes_base_instructions() -> anyhow::Result<()> { - #![expect(clippy::unwrap_used)] + #![expect(clippy::expect_used, clippy::unwrap_used)] let server = create_mock_chat_completions_server(vec![create_final_assistant_message_sse_response( @@ -360,20 +376,26 @@ async fn codex_tool_passes_base_instructions() -> anyhow::Result<()> { mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)), ) .await??; + assert_eq!(codex_response.jsonrpc, JSONRPC_VERSION); + assert_eq!(codex_response.id, RequestId::Integer(codex_request_id)); assert_eq!( - JSONRPCResponse { - jsonrpc: JSONRPC_VERSION.into(), - id: RequestId::Integer(codex_request_id), - result: json!({ - "content": [ - { - "text": "Enjoy!", - "type": "text" - } - ] - }), - }, - codex_response + codex_response.result, + json!({ + "content": [ + { + "text": "Enjoy!", + "type": "text" + } + ], + "structuredContent": { + "threadId": codex_response + .result + .get("structuredContent") + .and_then(|v| v.get("threadId")) + .and_then(serde_json::Value::as_str) + .expect("codex tool response should include structuredContent.threadId"), + } + }) ); let requests = server.received_requests().await.unwrap(); @@ -412,6 +434,7 @@ fn create_expected_patch_approval_elicitation_request( reason: Option, codex_mcp_tool_call_id: String, codex_event_id: String, + thread_id: codex_protocol::ThreadId, ) -> anyhow::Result { let mut message_lines = Vec::new(); if let Some(r) = &reason { @@ -430,6 +453,7 @@ fn create_expected_patch_approval_elicitation_request( properties: json!({}), required: None, }, + thread_id, codex_elicitation: "patch-approval".to_string(), codex_mcp_tool_call_id, codex_event_id,