From 2ae1f81d84fecfb147ed9f2f2350f9bbb49f30b2 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Fri, 21 Nov 2025 09:19:39 -0800 Subject: [PATCH] [app-server] feat: add Declined status for command exec (#7101) Add a `Declined` status for when we request an approval from the user and the user declines. This allows us to distinguish from commands that actually ran, but failed. This behaves similarly to apply_patch / FileChange, which does the same thing. --- .../app-server-protocol/src/protocol/v2.rs | 1 + .../app-server/src/bespoke_event_handling.rs | 148 +++++++++++++----- .../app-server/tests/suite/v2/turn_start.rs | 140 +++++++++++++++++ codex-rs/exec/src/exec_events.rs | 1 + 4 files changed, 254 insertions(+), 36 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 1f8463a60..46d248e22 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -859,6 +859,7 @@ pub enum CommandExecutionStatus { InProgress, Completed, Failed, + Declined, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 387aabc32..0c2445d85 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -175,12 +175,20 @@ pub(crate) async fn apply_bespoke_event_handling( }); } ApiVersion::V2 => { + let item_id = call_id.clone(); + let command_actions = parsed_cmd + .iter() + .cloned() + .map(V2ParsedCommand::from) + .collect::>(); + let command_string = shlex_join(&command); + let params = CommandExecutionRequestApprovalParams { thread_id: conversation_id.to_string(), turn_id: turn_id.clone(), // Until we migrate the core to be aware of a first class CommandExecutionItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. - item_id: call_id.clone(), + item_id: item_id.clone(), reason, risk: risk.map(V2SandboxCommandAssessment::from), }; @@ -190,8 +198,17 @@ pub(crate) async fn apply_bespoke_event_handling( )) .await; tokio::spawn(async move { - on_command_execution_request_approval_response(event_id, rx, conversation) - .await; + on_command_execution_request_approval_response( + event_id, + item_id, + command_string, + cwd, + command_actions, + rx, + conversation, + outgoing, + ) + .await; }); } }, @@ -370,16 +387,21 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::ExecCommandBegin(exec_command_begin_event) => { + let item_id = exec_command_begin_event.call_id.clone(); + let command_actions = exec_command_begin_event + .parsed_cmd + .into_iter() + .map(V2ParsedCommand::from) + .collect::>(); + let command = shlex_join(&exec_command_begin_event.command); + let cwd = exec_command_begin_event.cwd; + let item = ThreadItem::CommandExecution { - id: exec_command_begin_event.call_id.clone(), - command: shlex_join(&exec_command_begin_event.command), - cwd: exec_command_begin_event.cwd, + id: item_id, + command, + cwd, status: CommandExecutionStatus::InProgress, - command_actions: exec_command_begin_event - .parsed_cmd - .into_iter() - .map(V2ParsedCommand::from) - .collect(), + command_actions, aggregated_output: None, exit_code: None, duration_ms: None, @@ -417,6 +439,10 @@ pub(crate) async fn apply_bespoke_event_handling( } else { CommandExecutionStatus::Failed }; + let command_actions = parsed_cmd + .into_iter() + .map(V2ParsedCommand::from) + .collect::>(); let aggregated_output = if aggregated_output.is_empty() { None @@ -431,7 +457,7 @@ pub(crate) async fn apply_bespoke_event_handling( command: shlex_join(&command), cwd, status, - command_actions: parsed_cmd.into_iter().map(V2ParsedCommand::from).collect(), + command_actions, aggregated_output, exit_code: Some(exit_code), duration_ms: Some(duration_ms), @@ -516,6 +542,30 @@ async fn complete_file_change_item( .await; } +async fn complete_command_execution_item( + item_id: String, + command: String, + cwd: PathBuf, + command_actions: Vec, + status: CommandExecutionStatus, + outgoing: &OutgoingMessageSender, +) { + let item = ThreadItem::CommandExecution { + id: item_id, + command, + cwd, + status, + command_actions, + aggregated_output: None, + exit_code: None, + duration_ms: None, + }; + let notification = ItemCompletedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemCompleted(notification)) + .await; +} + async fn find_and_remove_turn_summary( conversation_id: ConversationId, turn_summary_store: &TurnSummaryStore, @@ -765,42 +815,68 @@ async fn on_file_change_request_approval_response( } } +#[allow(clippy::too_many_arguments)] async fn on_command_execution_request_approval_response( event_id: String, + item_id: String, + command: String, + cwd: PathBuf, + command_actions: Vec, receiver: oneshot::Receiver, conversation: Arc, + outgoing: Arc, ) { let response = receiver.await; - let value = match response { - Ok(value) => value, + let (decision, completion_status) = match response { + Ok(value) => { + let response = serde_json::from_value::(value) + .unwrap_or_else(|err| { + error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}"); + CommandExecutionRequestApprovalResponse { + decision: ApprovalDecision::Decline, + accept_settings: None, + } + }); + + let CommandExecutionRequestApprovalResponse { + decision, + accept_settings, + } = response; + + let (decision, completion_status) = match (decision, accept_settings) { + (ApprovalDecision::Accept, Some(settings)) if settings.for_session => { + (ReviewDecision::ApprovedForSession, None) + } + (ApprovalDecision::Accept, _) => (ReviewDecision::Approved, None), + (ApprovalDecision::Decline, _) => ( + ReviewDecision::Denied, + Some(CommandExecutionStatus::Declined), + ), + (ApprovalDecision::Cancel, _) => ( + ReviewDecision::Abort, + Some(CommandExecutionStatus::Declined), + ), + }; + (decision, completion_status) + } Err(err) => { error!("request failed: {err:?}"); - return; + (ReviewDecision::Denied, Some(CommandExecutionStatus::Failed)) } }; - let response = serde_json::from_value::(value) - .unwrap_or_else(|err| { - error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}"); - CommandExecutionRequestApprovalResponse { - decision: ApprovalDecision::Decline, - accept_settings: None, - } - }); + if let Some(status) = completion_status { + complete_command_execution_item( + item_id.clone(), + command.clone(), + cwd.clone(), + command_actions.clone(), + status, + outgoing.as_ref(), + ) + .await; + } - let CommandExecutionRequestApprovalResponse { - decision, - accept_settings, - } = response; - - let decision = match (decision, accept_settings) { - (ApprovalDecision::Accept, Some(settings)) if settings.for_session => { - ReviewDecision::ApprovedForSession - } - (ApprovalDecision::Accept, _) => ReviewDecision::Approved, - (ApprovalDecision::Decline, _) => ReviewDecision::Denied, - (ApprovalDecision::Cancel, _) => ReviewDecision::Abort, - }; if let Err(err) = conversation .submit(Op::ExecApproval { id: event_id, diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index a25a9acea..de4d1cd2c 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -8,6 +8,7 @@ use app_test_support::create_shell_command_sse_response; use app_test_support::format_with_current_shell_display; use app_test_support::to_response; use codex_app_server_protocol::ApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::ItemCompletedNotification; @@ -329,6 +330,145 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_start_exec_approval_decline_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().to_path_buf(); + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let responses = vec![ + create_shell_command_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + None, + Some(5000), + "call-decline", + )?, + create_final_assistant_message_sse_response("done")?, + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?; + + let mut mcp = McpProcess::new(codex_home.as_path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run python".to_string(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let started_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let started_notif = mcp + .read_stream_until_notification_message("item/started") + .await?; + let started: ItemStartedNotification = + serde_json::from_value(started_notif.params.clone().expect("item/started params"))?; + if let ThreadItem::CommandExecution { .. } = started.item { + return Ok::(started.item); + } + } + }) + .await??; + let ThreadItem::CommandExecution { id, status, .. } = started_command_execution else { + unreachable!("loop ensures we break on command execution items"); + }; + assert_eq!(id, "call-decline"); + assert_eq!(status, CommandExecutionStatus::InProgress); + + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else { + panic!("expected CommandExecutionRequestApproval request") + }; + assert_eq!(params.item_id, "call-decline"); + assert_eq!(params.thread_id, thread.id); + assert_eq!(params.turn_id, turn.id); + + mcp.send_response( + request_id, + serde_json::to_value(CommandExecutionRequestApprovalResponse { + decision: ApprovalDecision::Decline, + accept_settings: None, + })?, + ) + .await?; + + let completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let completed_notif = mcp + .read_stream_until_notification_message("item/completed") + .await?; + let completed: ItemCompletedNotification = serde_json::from_value( + completed_notif + .params + .clone() + .expect("item/completed params"), + )?; + if let ThreadItem::CommandExecution { .. } = completed.item { + return Ok::(completed.item); + } + } + }) + .await??; + let ThreadItem::CommandExecution { + id, + status, + exit_code, + aggregated_output, + .. + } = completed_command_execution + else { + unreachable!("loop ensures we break on command execution items"); + }; + assert_eq!(id, "call-decline"); + assert_eq!(status, CommandExecutionStatus::Declined); + assert!(exit_code.is_none()); + assert!(aggregated_output.is_none()); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + Ok(()) +} + #[tokio::test] async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs index 23f471e22..f3726dad7 100644 --- a/codex-rs/exec/src/exec_events.rs +++ b/codex-rs/exec/src/exec_events.rs @@ -144,6 +144,7 @@ pub enum CommandExecutionStatus { InProgress, Completed, Failed, + Declined, } /// A command executed by the agent.