diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 1f8463a60..46d248e22 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -859,6 +859,7 @@ pub enum CommandExecutionStatus { InProgress, Completed, Failed, + Declined, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 387aabc32..0c2445d85 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -175,12 +175,20 @@ pub(crate) async fn apply_bespoke_event_handling( }); } ApiVersion::V2 => { + let item_id = call_id.clone(); + let command_actions = parsed_cmd + .iter() + .cloned() + .map(V2ParsedCommand::from) + .collect::>(); + let command_string = shlex_join(&command); + let params = CommandExecutionRequestApprovalParams { thread_id: conversation_id.to_string(), turn_id: turn_id.clone(), // Until we migrate the core to be aware of a first class CommandExecutionItem // and emit the corresponding EventMsg, we repurpose the call_id as the item_id. - item_id: call_id.clone(), + item_id: item_id.clone(), reason, risk: risk.map(V2SandboxCommandAssessment::from), }; @@ -190,8 +198,17 @@ pub(crate) async fn apply_bespoke_event_handling( )) .await; tokio::spawn(async move { - on_command_execution_request_approval_response(event_id, rx, conversation) - .await; + on_command_execution_request_approval_response( + event_id, + item_id, + command_string, + cwd, + command_actions, + rx, + conversation, + outgoing, + ) + .await; }); } }, @@ -370,16 +387,21 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::ExecCommandBegin(exec_command_begin_event) => { + let item_id = exec_command_begin_event.call_id.clone(); + let command_actions = exec_command_begin_event + .parsed_cmd + .into_iter() + .map(V2ParsedCommand::from) + .collect::>(); + let command = shlex_join(&exec_command_begin_event.command); + let cwd = exec_command_begin_event.cwd; + let item = ThreadItem::CommandExecution { - id: exec_command_begin_event.call_id.clone(), - command: shlex_join(&exec_command_begin_event.command), - cwd: exec_command_begin_event.cwd, + id: item_id, + command, + cwd, status: CommandExecutionStatus::InProgress, - command_actions: exec_command_begin_event - .parsed_cmd - .into_iter() - .map(V2ParsedCommand::from) - .collect(), + command_actions, aggregated_output: None, exit_code: None, duration_ms: None, @@ -417,6 +439,10 @@ pub(crate) async fn apply_bespoke_event_handling( } else { CommandExecutionStatus::Failed }; + let command_actions = parsed_cmd + .into_iter() + .map(V2ParsedCommand::from) + .collect::>(); let aggregated_output = if aggregated_output.is_empty() { None @@ -431,7 +457,7 @@ pub(crate) async fn apply_bespoke_event_handling( command: shlex_join(&command), cwd, status, - command_actions: parsed_cmd.into_iter().map(V2ParsedCommand::from).collect(), + command_actions, aggregated_output, exit_code: Some(exit_code), duration_ms: Some(duration_ms), @@ -516,6 +542,30 @@ async fn complete_file_change_item( .await; } +async fn complete_command_execution_item( + item_id: String, + command: String, + cwd: PathBuf, + command_actions: Vec, + status: CommandExecutionStatus, + outgoing: &OutgoingMessageSender, +) { + let item = ThreadItem::CommandExecution { + id: item_id, + command, + cwd, + status, + command_actions, + aggregated_output: None, + exit_code: None, + duration_ms: None, + }; + let notification = ItemCompletedNotification { item }; + outgoing + .send_server_notification(ServerNotification::ItemCompleted(notification)) + .await; +} + async fn find_and_remove_turn_summary( conversation_id: ConversationId, turn_summary_store: &TurnSummaryStore, @@ -765,42 +815,68 @@ async fn on_file_change_request_approval_response( } } +#[allow(clippy::too_many_arguments)] async fn on_command_execution_request_approval_response( event_id: String, + item_id: String, + command: String, + cwd: PathBuf, + command_actions: Vec, receiver: oneshot::Receiver, conversation: Arc, + outgoing: Arc, ) { let response = receiver.await; - let value = match response { - Ok(value) => value, + let (decision, completion_status) = match response { + Ok(value) => { + let response = serde_json::from_value::(value) + .unwrap_or_else(|err| { + error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}"); + CommandExecutionRequestApprovalResponse { + decision: ApprovalDecision::Decline, + accept_settings: None, + } + }); + + let CommandExecutionRequestApprovalResponse { + decision, + accept_settings, + } = response; + + let (decision, completion_status) = match (decision, accept_settings) { + (ApprovalDecision::Accept, Some(settings)) if settings.for_session => { + (ReviewDecision::ApprovedForSession, None) + } + (ApprovalDecision::Accept, _) => (ReviewDecision::Approved, None), + (ApprovalDecision::Decline, _) => ( + ReviewDecision::Denied, + Some(CommandExecutionStatus::Declined), + ), + (ApprovalDecision::Cancel, _) => ( + ReviewDecision::Abort, + Some(CommandExecutionStatus::Declined), + ), + }; + (decision, completion_status) + } Err(err) => { error!("request failed: {err:?}"); - return; + (ReviewDecision::Denied, Some(CommandExecutionStatus::Failed)) } }; - let response = serde_json::from_value::(value) - .unwrap_or_else(|err| { - error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}"); - CommandExecutionRequestApprovalResponse { - decision: ApprovalDecision::Decline, - accept_settings: None, - } - }); + if let Some(status) = completion_status { + complete_command_execution_item( + item_id.clone(), + command.clone(), + cwd.clone(), + command_actions.clone(), + status, + outgoing.as_ref(), + ) + .await; + } - let CommandExecutionRequestApprovalResponse { - decision, - accept_settings, - } = response; - - let decision = match (decision, accept_settings) { - (ApprovalDecision::Accept, Some(settings)) if settings.for_session => { - ReviewDecision::ApprovedForSession - } - (ApprovalDecision::Accept, _) => ReviewDecision::Approved, - (ApprovalDecision::Decline, _) => ReviewDecision::Denied, - (ApprovalDecision::Cancel, _) => ReviewDecision::Abort, - }; if let Err(err) = conversation .submit(Op::ExecApproval { id: event_id, diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index a25a9acea..de4d1cd2c 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -8,6 +8,7 @@ use app_test_support::create_shell_command_sse_response; use app_test_support::format_with_current_shell_display; use app_test_support::to_response; use codex_app_server_protocol::ApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::ItemCompletedNotification; @@ -329,6 +330,145 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_start_exec_approval_decline_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().to_path_buf(); + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let responses = vec![ + create_shell_command_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + None, + Some(5000), + "call-decline", + )?, + create_final_assistant_message_sse_response("done")?, + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?; + + let mut mcp = McpProcess::new(codex_home.as_path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run python".to_string(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let started_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let started_notif = mcp + .read_stream_until_notification_message("item/started") + .await?; + let started: ItemStartedNotification = + serde_json::from_value(started_notif.params.clone().expect("item/started params"))?; + if let ThreadItem::CommandExecution { .. } = started.item { + return Ok::(started.item); + } + } + }) + .await??; + let ThreadItem::CommandExecution { id, status, .. } = started_command_execution else { + unreachable!("loop ensures we break on command execution items"); + }; + assert_eq!(id, "call-decline"); + assert_eq!(status, CommandExecutionStatus::InProgress); + + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else { + panic!("expected CommandExecutionRequestApproval request") + }; + assert_eq!(params.item_id, "call-decline"); + assert_eq!(params.thread_id, thread.id); + assert_eq!(params.turn_id, turn.id); + + mcp.send_response( + request_id, + serde_json::to_value(CommandExecutionRequestApprovalResponse { + decision: ApprovalDecision::Decline, + accept_settings: None, + })?, + ) + .await?; + + let completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let completed_notif = mcp + .read_stream_until_notification_message("item/completed") + .await?; + let completed: ItemCompletedNotification = serde_json::from_value( + completed_notif + .params + .clone() + .expect("item/completed params"), + )?; + if let ThreadItem::CommandExecution { .. } = completed.item { + return Ok::(completed.item); + } + } + }) + .await??; + let ThreadItem::CommandExecution { + id, + status, + exit_code, + aggregated_output, + .. + } = completed_command_execution + else { + unreachable!("loop ensures we break on command execution items"); + }; + assert_eq!(id, "call-decline"); + assert_eq!(status, CommandExecutionStatus::Declined); + assert!(exit_code.is_none()); + assert!(aggregated_output.is_none()); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + Ok(()) +} + #[tokio::test] async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/exec/src/exec_events.rs b/codex-rs/exec/src/exec_events.rs index 23f471e22..f3726dad7 100644 --- a/codex-rs/exec/src/exec_events.rs +++ b/codex-rs/exec/src/exec_events.rs @@ -144,6 +144,7 @@ pub enum CommandExecutionStatus { InProgress, Completed, Failed, + Declined, } /// A command executed by the agent.