diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 027c2561b..650b6f31a 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -438,6 +438,13 @@ server_request_definitions! { response: v2::CommandExecutionRequestApprovalResponse, }, + /// Sent when approval is requested for a specific file change. + /// This request is used for Turns started via turn/start. + FileChangeRequestApproval => "item/fileChange/requestApproval" { + params: v2::FileChangeRequestApprovalParams, + response: v2::FileChangeRequestApprovalResponse, + }, + /// DEPRECATED APIs below /// Request to approve a patch. /// This request is used for Turns started via the legacy APIs (i.e. SendUserTurn, SendUserMessage). diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 3c82d908d..6b61245bc 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -794,20 +794,23 @@ pub struct FileUpdateChange { } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] -#[serde(rename_all = "camelCase")] +#[serde(tag = "type", rename_all = "camelCase")] +#[ts(tag = "type")] #[ts(export_to = "v2/")] pub enum PatchChangeKind { Add, Delete, - Update, + Update { move_path: Option }, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] pub enum PatchApplyStatus { + InProgress, Completed, Failed, + Declined, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] @@ -982,6 +985,26 @@ pub struct CommandExecutionRequestApprovalResponse { pub accept_settings: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct FileChangeRequestApprovalParams { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + /// Optional explanatory reason (e.g. request for extra write access). + pub reason: Option, + /// [UNSTABLE] When set, the agent is asking the user to allow writes under this root + /// for the remainder of the session (unclear if this is honored today). + pub grant_root: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[ts(export_to = "v2/")] +pub struct FileChangeRequestApprovalResponse { + pub decision: ApprovalDecision, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server-test-client/src/main.rs b/codex-rs/app-server-test-client/src/main.rs index 6130b2d51..7f2b75bd2 100644 --- a/codex-rs/app-server-test-client/src/main.rs +++ b/codex-rs/app-server-test-client/src/main.rs @@ -24,6 +24,8 @@ use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::CommandExecutionRequestAcceptSettings; use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::FileChangeRequestApprovalParams; +use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::GetAccountRateLimitsResponse; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::InitializeResponse; @@ -677,6 +679,9 @@ impl CodexClient { ServerRequest::CommandExecutionRequestApproval { request_id, params } => { self.handle_command_execution_request_approval(request_id, params)?; } + ServerRequest::FileChangeRequestApproval { request_id, params } => { + self.approve_file_change_request(request_id, params)?; + } other => { bail!("received unsupported server request: {other:?}"); } @@ -717,6 +722,37 @@ impl CodexClient { Ok(()) } + fn approve_file_change_request( + &mut self, + request_id: RequestId, + params: FileChangeRequestApprovalParams, + ) -> Result<()> { + let FileChangeRequestApprovalParams { + thread_id, + turn_id, + item_id, + reason, + grant_root, + } = params; + + println!( + "\n< fileChange approval requested for thread {thread_id}, turn {turn_id}, item {item_id}" + ); + if let Some(reason) = reason.as_deref() { + println!("< reason: {reason}"); + } + if let Some(grant_root) = grant_root.as_deref() { + println!("< grant root: {}", grant_root.display()); + } + + let response = FileChangeRequestApprovalResponse { + decision: ApprovalDecision::Accept, + }; + self.send_server_request_response(request_id, &response)?; + println!("< approved fileChange request for item {item_id}"); + Ok(()) + } + fn send_server_request_response(&mut self, request_id: RequestId, response: &T) -> Result<()> where T: Serialize, diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index b4fdbe3e0..d4b00e67f 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -15,12 +15,17 @@ use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::ExecCommandApprovalParams; use codex_app_server_protocol::ExecCommandApprovalResponse; +use codex_app_server_protocol::FileChangeRequestApprovalParams; +use codex_app_server_protocol::FileChangeRequestApprovalResponse; +use codex_app_server_protocol::FileUpdateChange; use codex_app_server_protocol::InterruptConversationResponse; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::McpToolCallError; use codex_app_server_protocol::McpToolCallResult; use codex_app_server_protocol::McpToolCallStatus; +use codex_app_server_protocol::PatchApplyStatus; +use codex_app_server_protocol::PatchChangeKind as V2PatchChangeKind; use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; use codex_app_server_protocol::ReasoningTextDeltaNotification; @@ -40,6 +45,7 @@ use codex_core::protocol::Event; use codex_core::protocol::EventMsg; use codex_core::protocol::ExecApprovalRequestEvent; use codex_core::protocol::ExecCommandEndEvent; +use codex_core::protocol::FileChange as CoreFileChange; use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; use codex_core::protocol::Op; @@ -47,7 +53,9 @@ use codex_core::protocol::ReviewDecision; use codex_core::review_format::format_review_findings_block; use codex_protocol::ConversationId; use codex_protocol::protocol::ReviewOutputEvent; +use std::collections::HashMap; use std::convert::TryFrom; +use std::path::PathBuf; use std::sync::Arc; use tokio::sync::oneshot; use tracing::error; @@ -70,24 +78,74 @@ pub(crate) async fn apply_bespoke_event_handling( } EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, + turn_id, changes, reason, grant_root, - }) => { - let params = ApplyPatchApprovalParams { - conversation_id, - call_id, - file_changes: changes, - reason, - grant_root, - }; - let rx = outgoing - .send_request(ServerRequestPayload::ApplyPatchApproval(params)) - .await; - tokio::spawn(async move { - on_patch_approval_response(event_id, rx, conversation).await; - }); - } + }) => match api_version { + ApiVersion::V1 => { + let params = ApplyPatchApprovalParams { + conversation_id, + call_id, + file_changes: changes.clone(), + reason, + grant_root, + }; + let rx = outgoing + .send_request(ServerRequestPayload::ApplyPatchApproval(params)) + .await; + tokio::spawn(async move { + on_patch_approval_response(event_id, rx, conversation).await; + }); + } + ApiVersion::V2 => { + // Until we migrate the core to be aware of a first class FileChangeItem + // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. + let item_id = call_id.clone(); + let patch_changes = convert_patch_changes(&changes); + + let first_start = { + let mut map = turn_summary_store.lock().await; + let summary = map.entry(conversation_id).or_default(); + summary.file_change_started.insert(item_id.clone()) + }; + if first_start { + let item = ThreadItem::FileChange { + id: item_id.clone(), + changes: patch_changes.clone(), + status: PatchApplyStatus::InProgress, + }; + let notification = ItemStartedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; + } + + let params = FileChangeRequestApprovalParams { + thread_id: conversation_id.to_string(), + turn_id: turn_id.clone(), + item_id: item_id.clone(), + reason, + grant_root, + }; + let rx = outgoing + .send_request(ServerRequestPayload::FileChangeRequestApproval(params)) + .await; + tokio::spawn(async move { + on_file_change_request_approval_response( + event_id, + conversation_id, + item_id, + patch_changes, + rx, + conversation, + outgoing, + turn_summary_store, + ) + .await; + }); + } + }, EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { call_id, turn_id, @@ -244,6 +302,49 @@ pub(crate) async fn apply_bespoke_event_handling( .send_server_notification(ServerNotification::ItemCompleted(notification)) .await; } + EventMsg::PatchApplyBegin(patch_begin_event) => { + // Until we migrate the core to be aware of a first class FileChangeItem + // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. + let item_id = patch_begin_event.call_id.clone(); + + let first_start = { + let mut map = turn_summary_store.lock().await; + let summary = map.entry(conversation_id).or_default(); + summary.file_change_started.insert(item_id.clone()) + }; + if first_start { + let item = ThreadItem::FileChange { + id: item_id.clone(), + changes: convert_patch_changes(&patch_begin_event.changes), + status: PatchApplyStatus::InProgress, + }; + let notification = ItemStartedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemStarted(notification)) + .await; + } + } + EventMsg::PatchApplyEnd(patch_end_event) => { + // Until we migrate the core to be aware of a first class FileChangeItem + // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. + let item_id = patch_end_event.call_id.clone(); + + let status = if patch_end_event.success { + PatchApplyStatus::Completed + } else { + PatchApplyStatus::Failed + }; + let changes = convert_patch_changes(&patch_end_event.changes); + complete_file_change_item( + conversation_id, + item_id, + changes, + status, + outgoing.as_ref(), + &turn_summary_store, + ) + .await; + } EventMsg::ExecCommandBegin(exec_command_begin_event) => { let item = ThreadItem::CommandExecution { id: exec_command_begin_event.call_id.clone(), @@ -365,6 +466,32 @@ async fn emit_turn_completed_with_status( .await; } +async fn complete_file_change_item( + conversation_id: ConversationId, + item_id: String, + changes: Vec, + status: PatchApplyStatus, + outgoing: &OutgoingMessageSender, + turn_summary_store: &TurnSummaryStore, +) { + { + let mut map = turn_summary_store.lock().await; + if let Some(summary) = map.get_mut(&conversation_id) { + summary.file_change_started.remove(&item_id); + } + } + + let item = ThreadItem::FileChange { + id: item_id, + changes, + status, + }; + let notification = ItemCompletedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemCompleted(notification)) + .await; +} + async fn find_and_remove_turn_summary( conversation_id: ConversationId, turn_summary_store: &TurnSummaryStore, @@ -512,6 +639,110 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String { } } +fn convert_patch_changes(changes: &HashMap) -> Vec { + let mut converted: Vec = changes + .iter() + .map(|(path, change)| FileUpdateChange { + path: path.to_string_lossy().into_owned(), + kind: map_patch_change_kind(change), + diff: format_file_change_diff(change), + }) + .collect(); + converted.sort_by(|a, b| a.path.cmp(&b.path)); + converted +} + +fn map_patch_change_kind(change: &CoreFileChange) -> V2PatchChangeKind { + match change { + CoreFileChange::Add { .. } => V2PatchChangeKind::Add, + CoreFileChange::Delete { .. } => V2PatchChangeKind::Delete, + CoreFileChange::Update { move_path, .. } => V2PatchChangeKind::Update { + move_path: move_path.clone(), + }, + } +} + +fn format_file_change_diff(change: &CoreFileChange) -> String { + match change { + CoreFileChange::Add { content } => content.clone(), + CoreFileChange::Delete { content } => content.clone(), + CoreFileChange::Update { + unified_diff, + move_path, + } => { + if let Some(path) = move_path { + format!("{unified_diff}\n\nMoved to: {}", path.display()) + } else { + unified_diff.clone() + } + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn on_file_change_request_approval_response( + event_id: String, + conversation_id: ConversationId, + item_id: String, + changes: Vec, + receiver: oneshot::Receiver, + codex: Arc, + outgoing: Arc, + turn_summary_store: TurnSummaryStore, +) { + let response = receiver.await; + let (decision, completion_status) = match response { + Ok(value) => { + let response = serde_json::from_value::(value) + .unwrap_or_else(|err| { + error!("failed to deserialize FileChangeRequestApprovalResponse: {err}"); + FileChangeRequestApprovalResponse { + decision: ApprovalDecision::Decline, + } + }); + + let (decision, completion_status) = match response.decision { + ApprovalDecision::Accept => (ReviewDecision::Approved, None), + ApprovalDecision::Decline => { + (ReviewDecision::Denied, Some(PatchApplyStatus::Declined)) + } + ApprovalDecision::Cancel => { + (ReviewDecision::Abort, Some(PatchApplyStatus::Declined)) + } + }; + // Allow EventMsg::PatchApplyEnd to emit ItemCompleted for accepted patches. + // Only short-circuit on declines/cancels/failures. + (decision, completion_status) + } + Err(err) => { + error!("request failed: {err:?}"); + (ReviewDecision::Denied, Some(PatchApplyStatus::Failed)) + } + }; + + if let Some(status) = completion_status { + complete_file_change_item( + conversation_id, + item_id, + changes, + status, + outgoing.as_ref(), + &turn_summary_store, + ) + .await; + } + + if let Err(err) = codex + .submit(Op::PatchApproval { + id: event_id, + decision, + }) + .await + { + error!("failed to submit PatchApproval: {err}"); + } +} + async fn on_command_execution_request_approval_response( event_id: String, receiver: oneshot::Receiver, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 8b6539eb6..bf9f0b940 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -139,6 +139,7 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::user_input::UserInput as CoreInputItem; use codex_utils_json_to_toml::json_to_toml; use std::collections::HashMap; +use std::collections::HashSet; use std::ffi::OsStr; use std::io::Error as IoError; use std::path::Path; @@ -162,6 +163,7 @@ pub(crate) type PendingInterrupts = Arc, + pub(crate) file_change_started: HashSet, } pub(crate) type TurnSummaryStore = Arc>>; diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 8c4cf4e67..b644315d9 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -1,14 +1,20 @@ use anyhow::Result; use app_test_support::McpProcess; +use app_test_support::create_apply_patch_sse_response; use app_test_support::create_final_assistant_message_sse_response; use app_test_support::create_mock_chat_completions_server; use app_test_support::create_mock_chat_completions_server_unchecked; use app_test_support::create_shell_sse_response; use app_test_support::to_response; +use codex_app_server_protocol::ApprovalDecision; use codex_app_server_protocol::CommandExecutionStatus; +use codex_app_server_protocol::FileChangeRequestApprovalResponse; +use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::PatchApplyStatus; +use codex_app_server_protocol::PatchChangeKind; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadItem; @@ -471,6 +477,300 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_start_file_change_approval_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let patch = r#"*** Begin Patch +*** Add File: README.md ++new line +*** End Patch +"#; + let responses = vec![ + create_apply_patch_sse_response(patch, "patch-call")?, + create_final_assistant_message_sse_response("patch applied")?, + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(&codex_home, &server.uri(), "untrusted")?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "apply patch".into(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let started_file_change = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let started_notif = mcp + .read_stream_until_notification_message("item/started") + .await?; + let started: ItemStartedNotification = + serde_json::from_value(started_notif.params.clone().expect("item/started params"))?; + if let ThreadItem::FileChange { .. } = started.item { + return Ok::(started.item); + } + } + }) + .await??; + let ThreadItem::FileChange { + ref id, + status, + ref changes, + } = started_file_change + else { + unreachable!("loop ensures we break on file change items"); + }; + assert_eq!(id, "patch-call"); + assert_eq!(status, PatchApplyStatus::InProgress); + let started_changes = changes.clone(); + + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::FileChangeRequestApproval { request_id, params } = server_req else { + panic!("expected FileChangeRequestApproval request") + }; + assert_eq!(params.item_id, "patch-call"); + assert_eq!(params.thread_id, thread.id); + assert_eq!(params.turn_id, turn.id); + let expected_readme_path = workspace.join("README.md"); + let expected_readme_path = expected_readme_path.to_string_lossy().into_owned(); + pretty_assertions::assert_eq!( + started_changes, + vec![codex_app_server_protocol::FileUpdateChange { + path: expected_readme_path.clone(), + kind: PatchChangeKind::Add, + diff: "new line\n".to_string(), + }] + ); + + mcp.send_response( + request_id, + serde_json::to_value(FileChangeRequestApprovalResponse { + decision: ApprovalDecision::Accept, + })?, + ) + .await?; + + let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let completed_notif = mcp + .read_stream_until_notification_message("item/completed") + .await?; + let completed: ItemCompletedNotification = serde_json::from_value( + completed_notif + .params + .clone() + .expect("item/completed params"), + )?; + if let ThreadItem::FileChange { .. } = completed.item { + return Ok::(completed.item); + } + } + }) + .await??; + let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else { + unreachable!("loop ensures we break on file change items"); + }; + assert_eq!(id, "patch-call"); + assert_eq!(status, PatchApplyStatus::Completed); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + let readme_contents = std::fs::read_to_string(expected_readme_path)?; + assert_eq!(readme_contents, "new line\n"); + + Ok(()) +} + +#[tokio::test] +async fn turn_start_file_change_approval_decline_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let patch = r#"*** Begin Patch +*** Add File: README.md ++new line +*** End Patch +"#; + let responses = vec![ + create_apply_patch_sse_response(patch, "patch-call")?, + create_final_assistant_message_sse_response("patch declined")?, + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(&codex_home, &server.uri(), "untrusted")?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "apply patch".into(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let started_file_change = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let started_notif = mcp + .read_stream_until_notification_message("item/started") + .await?; + let started: ItemStartedNotification = + serde_json::from_value(started_notif.params.clone().expect("item/started params"))?; + if let ThreadItem::FileChange { .. } = started.item { + return Ok::(started.item); + } + } + }) + .await??; + let ThreadItem::FileChange { + ref id, + status, + ref changes, + } = started_file_change + else { + unreachable!("loop ensures we break on file change items"); + }; + assert_eq!(id, "patch-call"); + assert_eq!(status, PatchApplyStatus::InProgress); + let started_changes = changes.clone(); + + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::FileChangeRequestApproval { request_id, params } = server_req else { + panic!("expected FileChangeRequestApproval request") + }; + assert_eq!(params.item_id, "patch-call"); + assert_eq!(params.thread_id, thread.id); + assert_eq!(params.turn_id, turn.id); + let expected_readme_path = workspace.join("README.md"); + let expected_readme_path_str = expected_readme_path.to_string_lossy().into_owned(); + pretty_assertions::assert_eq!( + started_changes, + vec![codex_app_server_protocol::FileUpdateChange { + path: expected_readme_path_str.clone(), + kind: PatchChangeKind::Add, + diff: "new line\n".to_string(), + }] + ); + + mcp.send_response( + request_id, + serde_json::to_value(FileChangeRequestApprovalResponse { + decision: ApprovalDecision::Decline, + })?, + ) + .await?; + + let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let completed_notif = mcp + .read_stream_until_notification_message("item/completed") + .await?; + let completed: ItemCompletedNotification = serde_json::from_value( + completed_notif + .params + .clone() + .expect("item/completed params"), + )?; + if let ThreadItem::FileChange { .. } = completed.item { + return Ok::(completed.item); + } + } + }) + .await??; + let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else { + unreachable!("loop ensures we break on file change items"); + }; + assert_eq!(id, "patch-call"); + assert_eq!(status, PatchApplyStatus::Declined); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + assert!( + !expected_readme_path.exists(), + "declined patch should not be applied" + ); + + Ok(()) +} + // Helper to create a config.toml pointing at the mock model server. fn create_config_toml( codex_home: &Path, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 182ca67f4..4d9285dd2 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -919,6 +919,7 @@ impl Session { let event = EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, + turn_id: turn_context.sub_id.clone(), changes, reason, grant_root, diff --git a/codex-rs/core/src/tools/events.rs b/codex-rs/core/src/tools/events.rs index 131b8f698..37df12d4b 100644 --- a/codex-rs/core/src/tools/events.rs +++ b/codex-rs/core/src/tools/events.rs @@ -179,15 +179,17 @@ impl ToolEmitter { ctx.turn, EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: ctx.call_id.to_string(), + turn_id: ctx.turn.sub_id.clone(), auto_approved: *auto_approved, changes: changes.clone(), }), ) .await; } - (Self::ApplyPatch { .. }, ToolEventStage::Success(output)) => { + (Self::ApplyPatch { changes, .. }, ToolEventStage::Success(output)) => { emit_patch_end( ctx, + changes.clone(), output.stdout.text.clone(), output.stderr.text.clone(), output.exit_code == 0, @@ -195,11 +197,12 @@ impl ToolEmitter { .await; } ( - Self::ApplyPatch { .. }, + Self::ApplyPatch { changes, .. }, ToolEventStage::Failure(ToolEventFailure::Output(output)), ) => { emit_patch_end( ctx, + changes.clone(), output.stdout.text.clone(), output.stderr.text.clone(), output.exit_code == 0, @@ -207,10 +210,17 @@ impl ToolEmitter { .await; } ( - Self::ApplyPatch { .. }, + Self::ApplyPatch { changes, .. }, ToolEventStage::Failure(ToolEventFailure::Message(message)), ) => { - emit_patch_end(ctx, String::new(), (*message).to_string(), false).await; + emit_patch_end( + ctx, + changes.clone(), + String::new(), + (*message).to_string(), + false, + ) + .await; } ( Self::UnifiedExec { @@ -409,15 +419,23 @@ async fn emit_exec_end( .await; } -async fn emit_patch_end(ctx: ToolEventCtx<'_>, stdout: String, stderr: String, success: bool) { +async fn emit_patch_end( + ctx: ToolEventCtx<'_>, + changes: HashMap, + stdout: String, + stderr: String, + success: bool, +) { ctx.session .send_event( ctx.turn, EventMsg::PatchApplyEnd(PatchApplyEndEvent { call_id: ctx.call_id.to_string(), + turn_id: ctx.turn.sub_id.clone(), stdout, stderr, success, + changes, }), ) .await; diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index 2d550fea4..1f007bbe0 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -346,6 +346,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { call_id, auto_approved, changes, + .. }) => { // Store metadata so we can calculate duration later when we // receive the corresponding PatchApplyEnd event. diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs index 64288e138..23f471e22 100644 --- a/codex-rs/exec/src/exec_events.rs +++ b/codex-rs/exec/src/exec_events.rs @@ -166,6 +166,7 @@ pub struct FileUpdateChange { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)] #[serde(rename_all = "snake_case")] pub enum PatchApplyStatus { + InProgress, Completed, Failed, } diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index 5053f6192..cbcd5458b 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -822,6 +822,7 @@ fn patch_apply_success_produces_item_completed_patchapply() { "p1", EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: "call-1".to_string(), + turn_id: "turn-1".to_string(), auto_approved: true, changes: changes.clone(), }), @@ -834,9 +835,11 @@ fn patch_apply_success_produces_item_completed_patchapply() { "p2", EventMsg::PatchApplyEnd(PatchApplyEndEvent { call_id: "call-1".to_string(), + turn_id: "turn-1".to_string(), stdout: "applied 3 changes".to_string(), stderr: String::new(), success: true, + changes: changes.clone(), }), ); let out_end = ep.collect_thread_events(&end); @@ -891,6 +894,7 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() { "p1", EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: "call-2".to_string(), + turn_id: "turn-2".to_string(), auto_approved: false, changes: changes.clone(), }), @@ -902,9 +906,11 @@ fn patch_apply_failure_produces_item_completed_patchapply_failed() { "p2", EventMsg::PatchApplyEnd(PatchApplyEndEvent { call_id: "call-2".to_string(), + turn_id: "turn-2".to_string(), stdout: String::new(), stderr: "failed to apply".to_string(), success: false, + changes: changes.clone(), }), ); let out_end = ep.collect_thread_events(&end); diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 93dc7764d..8dccb5125 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -210,6 +210,7 @@ async fn run_codex_tool_session_inner( } EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, + turn_id: _, reason, grant_root, changes, diff --git a/codex-rs/protocol/src/approvals.rs b/codex-rs/protocol/src/approvals.rs index f7c5fc604..25f5e90e9 100644 --- a/codex-rs/protocol/src/approvals.rs +++ b/codex-rs/protocol/src/approvals.rs @@ -57,6 +57,10 @@ pub struct ExecApprovalRequestEvent { pub struct ApplyPatchApprovalRequestEvent { /// Responses API call id for the associated patch apply call, if available. pub call_id: String, + /// Turn ID that this patch belongs to. + /// Uses `#[serde(default)]` for backwards compatibility with older senders. + #[serde(default)] + pub turn_id: String, pub changes: HashMap, /// Optional explanatory reason (e.g. request for extra write access). #[serde(skip_serializing_if = "Option::is_none")] diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 1825d7636..f20e41283 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1374,6 +1374,10 @@ pub struct StreamInfoEvent { pub struct PatchApplyBeginEvent { /// Identifier so this can be paired with the PatchApplyEnd event. pub call_id: String, + /// Turn ID that this patch belongs to. + /// Uses `#[serde(default)]` for backwards compatibility. + #[serde(default)] + pub turn_id: String, /// If true, there was no ApplyPatchApprovalRequest for this patch. pub auto_approved: bool, /// The changes to be applied. @@ -1384,12 +1388,19 @@ pub struct PatchApplyBeginEvent { pub struct PatchApplyEndEvent { /// Identifier for the PatchApplyBegin that finished. pub call_id: String, + /// Turn ID that this patch belongs to. + /// Uses `#[serde(default)]` for backwards compatibility. + #[serde(default)] + pub turn_id: String, /// Captured stdout (summary printed by apply_patch). pub stdout: String, /// Captured stderr (parser errors, IO failures, etc.). pub stderr: String, /// Whether the patch was applied successfully. pub success: bool, + /// The changes that were applied (mirrors PatchApplyBeginEvent::changes). + #[serde(default)] + pub changes: HashMap, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 987a956fe..a5728ab14 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -1464,6 +1464,7 @@ impl ChatWidget { // }), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id: "1".to_string(), + turn_id: "turn-1".to_string(), changes: HashMap::from([ ( PathBuf::from("/tmp/test.txt"), diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index d60a20b8f..1393e5288 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -1912,6 +1912,7 @@ fn approval_modal_patch_snapshot() { ); let ev = ApplyPatchApprovalRequestEvent { call_id: "call-approve-patch".into(), + turn_id: "turn-approve-patch".into(), changes, reason: Some("The model wants to apply changes".into()), grant_root: Some(PathBuf::from("/tmp")), @@ -2164,6 +2165,7 @@ fn apply_patch_events_emit_history_cells() { ); let ev = ApplyPatchApprovalRequestEvent { call_id: "c1".into(), + turn_id: "turn-c1".into(), changes, reason: None, grant_root: None, @@ -2204,6 +2206,7 @@ fn apply_patch_events_emit_history_cells() { ); let begin = PatchApplyBeginEvent { call_id: "c1".into(), + turn_id: "turn-c1".into(), auto_approved: true, changes: changes2, }; @@ -2220,11 +2223,20 @@ fn apply_patch_events_emit_history_cells() { ); // 3) End apply success -> success cell + let mut end_changes = HashMap::new(); + end_changes.insert( + PathBuf::from("foo.txt"), + FileChange::Add { + content: "hello\n".to_string(), + }, + ); let end = PatchApplyEndEvent { call_id: "c1".into(), + turn_id: "turn-c1".into(), stdout: "ok\n".into(), stderr: String::new(), success: true, + changes: end_changes, }; chat.handle_codex_event(Event { id: "s1".into(), @@ -2252,6 +2264,7 @@ fn apply_patch_manual_approval_adjusts_header() { id: "s1".into(), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id: "c1".into(), + turn_id: "turn-c1".into(), changes: proposed_changes, reason: None, grant_root: None, @@ -2270,6 +2283,7 @@ fn apply_patch_manual_approval_adjusts_header() { id: "s1".into(), msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: "c1".into(), + turn_id: "turn-c1".into(), auto_approved: false, changes: apply_changes, }), @@ -2299,6 +2313,7 @@ fn apply_patch_manual_flow_snapshot() { id: "s1".into(), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id: "c1".into(), + turn_id: "turn-c1".into(), changes: proposed_changes, reason: Some("Manual review required".into()), grant_root: None, @@ -2321,6 +2336,7 @@ fn apply_patch_manual_flow_snapshot() { id: "s1".into(), msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: "c1".into(), + turn_id: "turn-c1".into(), auto_approved: false, changes: apply_changes, }), @@ -2348,6 +2364,7 @@ fn apply_patch_approval_sends_op_with_submission_id() { ); let ev = ApplyPatchApprovalRequestEvent { call_id: "call-999".into(), + turn_id: "turn-999".into(), changes, reason: None, grant_root: None, @@ -2387,6 +2404,7 @@ fn apply_patch_full_flow_integration_like() { id: "sub-xyz".into(), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id: "call-1".into(), + turn_id: "turn-call-1".into(), changes, reason: None, grant_root: None, @@ -2427,17 +2445,25 @@ fn apply_patch_full_flow_integration_like() { id: "sub-xyz".into(), msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent { call_id: "call-1".into(), + turn_id: "turn-call-1".into(), auto_approved: false, changes: changes2, }), }); + let mut end_changes = HashMap::new(); + end_changes.insert( + PathBuf::from("pkg.rs"), + FileChange::Add { content: "".into() }, + ); chat.handle_codex_event(Event { id: "sub-xyz".into(), msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent { call_id: "call-1".into(), + turn_id: "turn-call-1".into(), stdout: String::from("ok"), stderr: String::new(), success: true, + changes: end_changes, }), }); } @@ -2458,6 +2484,7 @@ fn apply_patch_untrusted_shows_approval_modal() { id: "sub-1".into(), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id: "call-1".into(), + turn_id: "turn-call-1".into(), changes, reason: None, grant_root: None, @@ -2506,6 +2533,7 @@ fn apply_patch_request_shows_diff_summary() { id: "sub-apply".into(), msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id: "call-apply".into(), + turn_id: "turn-apply".into(), changes, reason: None, grant_root: None,