diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index 3f2dafbfe..7b846c80f 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -1,4 +1,5 @@ use std::collections::VecDeque; +use std::ffi::OsString; use std::fs; use std::fs::OpenOptions; use std::io::BufRead; @@ -29,6 +30,7 @@ use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::DynamicToolSpec; use codex_app_server_protocol::FileChangeApprovalDecision; use codex_app_server_protocol::FileChangeRequestApprovalParams; @@ -55,6 +57,7 @@ use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; use codex_app_server_protocol::ThreadResumeParams; @@ -78,6 +81,30 @@ use tungstenite::stream::MaybeTlsStream; use url::Url; use uuid::Uuid; +const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[ + // Legacy codex/event (v1-style) deltas. + "codex/event/agent_message_content_delta", + "codex/event/agent_message_delta", + "codex/event/agent_reasoning_delta", + "codex/event/reasoning_content_delta", + "codex/event/reasoning_raw_content_delta", + "codex/event/exec_command_output_delta", + // Other legacy events. + "codex/event/exec_approval_request", + "codex/event/exec_command_begin", + "codex/event/exec_command_end", + "codex/event/exec_output", + "codex/event/item_started", + "codex/event/item_completed", + // v2 item deltas. + "item/agentMessage/delta", + "item/plan/delta", + "item/commandExecution/outputDelta", + "item/fileChange/outputDelta", + "item/reasoning/summaryTextDelta", + "item/reasoning/textDelta", +]; + /// Minimal launcher that initializes the Codex app-server and logs the handshake. #[derive(Parser)] #[command(author = "Codex", version, about = "Bootstrap Codex app-server", long_about = None)] @@ -180,6 +207,18 @@ enum CliCommand { /// Follow-up user message for the second turn. follow_up_message: String, }, + /// Trigger zsh-fork multi-subcommand approvals and assert expected approval behavior. + #[command(name = "trigger-zsh-fork-multi-cmd-approval")] + TriggerZshForkMultiCmdApproval { + /// Optional prompt; defaults to an explicit `/usr/bin/true && /usr/bin/true` command. + user_message: Option, + /// Minimum number of command-approval callbacks expected in the turn. + #[arg(long, default_value_t = 2)] + min_approvals: usize, + /// One-based approval index to abort (e.g. --abort-on 2 aborts the second approval). + #[arg(long)] + abort_on: Option, + }, /// Trigger the ChatGPT login flow and wait for completion. TestLogin, /// Fetch the current account rate limits from the Codex app-server. @@ -265,6 +304,21 @@ pub fn run() -> Result<()> { &dynamic_tools, ) } + CliCommand::TriggerZshForkMultiCmdApproval { + user_message, + min_approvals, + abort_on, + } => { + let endpoint = resolve_endpoint(codex_bin, url)?; + trigger_zsh_fork_multi_cmd_approval( + &endpoint, + &config_overrides, + user_message, + min_approvals, + abort_on, + &dynamic_tools, + ) + } CliCommand::TestLogin => { ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?; let endpoint = resolve_endpoint(codex_bin, url)?; @@ -470,6 +524,101 @@ fn send_message_v2_endpoint( ) } +fn trigger_zsh_fork_multi_cmd_approval( + endpoint: &Endpoint, + config_overrides: &[String], + user_message: Option, + min_approvals: usize, + abort_on: Option, + dynamic_tools: &Option>, +) -> Result<()> { + if let Some(abort_on) = abort_on + && abort_on == 0 + { + bail!("--abort-on must be >= 1 when provided"); + } + + let default_prompt = "Run this exact command using shell command execution without rewriting or splitting it: /usr/bin/true && /usr/bin/true"; + let message = user_message.unwrap_or_else(|| default_prompt.to_string()); + + let mut client = CodexClient::connect(endpoint, config_overrides)?; + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; + println!("< thread/start response: {thread_response:?}"); + + client.command_approval_behavior = match abort_on { + Some(index) => CommandApprovalBehavior::AbortOn(index), + None => CommandApprovalBehavior::AlwaysAccept, + }; + client.command_approval_count = 0; + client.command_approval_item_ids.clear(); + client.command_execution_statuses.clear(); + client.last_turn_status = None; + + let mut turn_params = TurnStartParams { + thread_id: thread_response.thread.id.clone(), + input: vec![V2UserInput::Text { + text: message, + text_elements: Vec::new(), + }], + ..Default::default() + }; + turn_params.approval_policy = Some(AskForApproval::OnRequest); + turn_params.sandbox_policy = Some(SandboxPolicy::ReadOnly { + access: ReadOnlyAccess::FullAccess, + }); + + let turn_response = client.turn_start(turn_params)?; + println!("< turn/start response: {turn_response:?}"); + client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; + + if client.command_approval_count < min_approvals { + bail!( + "expected at least {min_approvals} command approvals, got {}", + client.command_approval_count + ); + } + let mut approvals_per_item = std::collections::BTreeMap::new(); + for item_id in &client.command_approval_item_ids { + *approvals_per_item.entry(item_id.clone()).or_insert(0usize) += 1; + } + let max_approvals_for_one_item = approvals_per_item.values().copied().max().unwrap_or(0); + if max_approvals_for_one_item < min_approvals { + bail!( + "expected at least {min_approvals} approvals for one command item, got max {max_approvals_for_one_item} with map {approvals_per_item:?}" + ); + } + + let last_command_status = client.command_execution_statuses.last(); + if abort_on.is_none() { + if last_command_status != Some(&CommandExecutionStatus::Completed) { + bail!("expected completed command execution, got {last_command_status:?}"); + } + if client.last_turn_status != Some(TurnStatus::Completed) { + bail!( + "expected completed turn in all-accept flow, got {:?}", + client.last_turn_status + ); + } + } else if last_command_status == Some(&CommandExecutionStatus::Completed) { + bail!( + "expected non-completed command execution in mixed approval/decline flow, got {last_command_status:?}" + ); + } + + println!( + "[zsh-fork multi-approval summary] approvals={}, approvals_per_item={approvals_per_item:?}, command_statuses={:?}, turn_status={:?}", + client.command_approval_count, client.command_execution_statuses, client.last_turn_status + ); + + Ok(()) +} + fn resume_message_v2( endpoint: &Endpoint, config_overrides: &[String], @@ -791,6 +940,17 @@ enum ClientTransport { struct CodexClient { transport: ClientTransport, pending_notifications: VecDeque, + command_approval_behavior: CommandApprovalBehavior, + command_approval_count: usize, + command_approval_item_ids: Vec, + command_execution_statuses: Vec, + last_turn_status: Option, +} + +#[derive(Debug, Clone, Copy)] +enum CommandApprovalBehavior { + AlwaysAccept, + AbortOn(usize), } impl CodexClient { @@ -804,6 +964,14 @@ impl CodexClient { fn spawn_stdio(codex_bin: &Path, config_overrides: &[String]) -> Result { let codex_bin_display = codex_bin.display(); let mut cmd = Command::new(codex_bin); + if let Some(codex_bin_parent) = codex_bin.parent() { + let mut path = OsString::from(codex_bin_parent.as_os_str()); + if let Some(existing_path) = std::env::var_os("PATH") { + path.push(":"); + path.push(existing_path); + } + cmd.env("PATH", path); + } for override_kv in config_overrides { cmd.arg("--config").arg(override_kv); } @@ -831,6 +999,11 @@ impl CodexClient { stdout: BufReader::new(stdout), }, pending_notifications: VecDeque::new(), + command_approval_behavior: CommandApprovalBehavior::AlwaysAccept, + command_approval_count: 0, + command_approval_item_ids: Vec::new(), + command_execution_statuses: Vec::new(), + last_turn_status: None, }) } @@ -847,6 +1020,11 @@ impl CodexClient { socket: Box::new(socket), }, pending_notifications: VecDeque::new(), + command_approval_behavior: CommandApprovalBehavior::AlwaysAccept, + command_approval_count: 0, + command_approval_item_ids: Vec::new(), + command_execution_statuses: Vec::new(), + last_turn_status: None, }) } @@ -862,7 +1040,12 @@ impl CodexClient { }, capabilities: Some(InitializeCapabilities { experimental_api: true, - opt_out_notification_methods: None, + opt_out_notification_methods: Some( + NOTIFICATIONS_TO_OPT_OUT + .iter() + .map(|method| (*method).to_string()) + .collect(), + ), }), }, }; @@ -1121,10 +1304,14 @@ impl CodexClient { println!("\n< item started: {:?}", payload.item); } ServerNotification::ItemCompleted(payload) => { + if let ThreadItem::CommandExecution { status, .. } = payload.item.clone() { + self.command_execution_statuses.push(status); + } println!("< item completed: {:?}", payload.item); } ServerNotification::TurnCompleted(payload) => { if payload.turn.id == turn_id { + self.last_turn_status = Some(payload.turn.status.clone()); println!("\n< turn/completed notification: {:?}", payload.turn.status); if payload.turn.status == TurnStatus::Failed && let Some(error) = payload.turn.error @@ -1314,6 +1501,8 @@ impl CodexClient { "\n< commandExecution approval requested for thread {thread_id}, turn {turn_id}, item {item_id}, approval {}", approval_id.as_deref().unwrap_or("") ); + self.command_approval_count += 1; + self.command_approval_item_ids.push(item_id.clone()); if let Some(reason) = reason.as_deref() { println!("< reason: {reason}"); } @@ -1332,11 +1521,21 @@ impl CodexClient { println!("< proposed execpolicy amendment: {execpolicy_amendment:?}"); } + let decision = match self.command_approval_behavior { + CommandApprovalBehavior::AlwaysAccept => CommandExecutionApprovalDecision::Accept, + CommandApprovalBehavior::AbortOn(index) if self.command_approval_count == index => { + CommandExecutionApprovalDecision::Cancel + } + CommandApprovalBehavior::AbortOn(_) => CommandExecutionApprovalDecision::Accept, + }; let response = CommandExecutionRequestApprovalResponse { - decision: CommandExecutionApprovalDecision::Accept, + decision: decision.clone(), }; self.send_server_request_response(request_id, &response)?; - println!("< approved commandExecution request for item {item_id}"); + println!( + "< commandExecution decision for approval #{} on item {item_id}: {:?}", + self.command_approval_count, decision + ); Ok(()) } diff --git a/codex-rs/app-server/src/main.rs b/codex-rs/app-server/src/main.rs index 5c4e5eacc..b4c5d098e 100644 --- a/codex-rs/app-server/src/main.rs +++ b/codex-rs/app-server/src/main.rs @@ -23,6 +23,9 @@ struct AppServerArgs { } fn main() -> anyhow::Result<()> { + if codex_core::maybe_run_zsh_exec_wrapper_mode()? { + return Ok(()); + } arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move { let args = AppServerArgs::parse(); let managed_config_path = managed_config_path_from_debug_env(); diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 1eacc2a84..e7705c295 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -28,4 +28,5 @@ mod thread_start; mod thread_unarchive; mod turn_interrupt; mod turn_start; +mod turn_start_zsh_fork; mod turn_steer; diff --git a/codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs b/codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs new file mode 100644 index 000000000..4233ce649 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/turn_start_zsh_fork.rs @@ -0,0 +1,676 @@ +#![cfg(not(windows))] +// +// Running these tests with the patched zsh fork: +// +// The suite uses `CODEX_TEST_ZSH_PATH` when set. Example: +// CODEX_TEST_ZSH_PATH="$HOME/.local/codex-zsh-77045ef/bin/zsh" \ +// cargo test -p codex-app-server turn_start_zsh_fork -- --nocapture +// +// For a single test: +// CODEX_TEST_ZSH_PATH="$HOME/.local/codex-zsh-77045ef/bin/zsh" \ +// cargo test -p codex-app-server turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2 -- --nocapture + +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_final_assistant_message_sse_response; +use app_test_support::create_mock_responses_server_sequence; +use app_test_support::create_shell_command_sse_response; +use app_test_support::to_response; +use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::CommandExecutionStatus; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_app_server_protocol::ItemStartedNotification; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStatus; +use codex_app_server_protocol::UserInput as V2UserInput; +use codex_core::features::FEATURES; +use codex_core::features::Feature; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use pretty_assertions::assert_eq; +use std::collections::BTreeMap; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +#[cfg(windows)] +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(15); +#[cfg(not(windows))] +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn turn_start_shell_zsh_fork_executes_command_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let Some(zsh_path) = find_test_zsh_path() else { + eprintln!("skipping zsh fork test: no zsh executable found"); + return Ok(()); + }; + eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display()); + + let responses = vec![create_shell_command_sse_response( + vec!["echo".to_string(), "hi".to_string()], + None, + Some(5000), + "call-zsh-fork", + )?]; + let server = create_mock_responses_server_sequence(responses).await; + create_config_toml( + &codex_home, + &server.uri(), + "never", + &BTreeMap::from([ + (Feature::ShellZshFork, true), + (Feature::UnifiedExec, false), + (Feature::ShellSnapshot, false), + ]), + &zsh_path, + )?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id, + input: vec![V2UserInput::Text { + text: "run echo hi".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(workspace.clone()), + approval_policy: Some(codex_app_server_protocol::AskForApproval::Never), + sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess), + model: Some("mock-model".to_string()), + effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium), + summary: Some(codex_core::protocol_config_types::ReasoningSummary::Auto), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + + let started_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let started_notif = mcp + .read_stream_until_notification_message("item/started") + .await?; + let started: ItemStartedNotification = + serde_json::from_value(started_notif.params.clone().expect("item/started params"))?; + if let ThreadItem::CommandExecution { .. } = started.item { + return Ok::(started.item); + } + } + }) + .await??; + let ThreadItem::CommandExecution { + id, + status, + command, + cwd, + .. + } = started_command_execution + else { + unreachable!("loop ensures we break on command execution items"); + }; + assert_eq!(id, "call-zsh-fork"); + assert_eq!(status, CommandExecutionStatus::InProgress); + assert!(command.starts_with(&zsh_path.display().to_string())); + assert!(command.contains(" -lc 'echo hi'")); + assert_eq!(cwd, workspace); + + Ok(()) +} + +#[tokio::test] +async fn turn_start_shell_zsh_fork_exec_approval_decline_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let Some(zsh_path) = find_test_zsh_path() else { + eprintln!("skipping zsh fork decline test: no zsh executable found"); + return Ok(()); + }; + eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display()); + + let responses = vec![ + create_shell_command_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + None, + Some(5000), + "call-zsh-fork-decline", + )?, + create_final_assistant_message_sse_response("done")?, + ]; + let server = create_mock_responses_server_sequence(responses).await; + create_config_toml( + &codex_home, + &server.uri(), + "untrusted", + &BTreeMap::from([ + (Feature::ShellZshFork, true), + (Feature::UnifiedExec, false), + (Feature::ShellSnapshot, false), + ]), + &zsh_path, + )?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run python".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else { + panic!("expected CommandExecutionRequestApproval request"); + }; + assert_eq!(params.item_id, "call-zsh-fork-decline"); + assert_eq!(params.thread_id, thread.id); + + mcp.send_response( + request_id, + serde_json::to_value(CommandExecutionRequestApprovalResponse { + decision: CommandExecutionApprovalDecision::Decline, + })?, + ) + .await?; + + let completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let completed_notif = mcp + .read_stream_until_notification_message("item/completed") + .await?; + let completed: ItemCompletedNotification = serde_json::from_value( + completed_notif + .params + .clone() + .expect("item/completed params"), + )?; + if let ThreadItem::CommandExecution { .. } = completed.item { + return Ok::(completed.item); + } + } + }) + .await??; + let ThreadItem::CommandExecution { + id, + status, + exit_code, + aggregated_output, + .. + } = completed_command_execution + else { + unreachable!("loop ensures we break on command execution items"); + }; + assert_eq!(id, "call-zsh-fork-decline"); + assert_eq!(status, CommandExecutionStatus::Declined); + assert!(exit_code.is_none()); + assert!(aggregated_output.is_none()); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await??; + + Ok(()) +} + +#[tokio::test] +async fn turn_start_shell_zsh_fork_exec_approval_cancel_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let Some(zsh_path) = find_test_zsh_path() else { + eprintln!("skipping zsh fork cancel test: no zsh executable found"); + return Ok(()); + }; + eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display()); + + let responses = vec![create_shell_command_sse_response( + vec![ + "python3".to_string(), + "-c".to_string(), + "print(42)".to_string(), + ], + None, + Some(5000), + "call-zsh-fork-cancel", + )?]; + let server = create_mock_responses_server_sequence(responses).await; + create_config_toml( + &codex_home, + &server.uri(), + "untrusted", + &BTreeMap::from([ + (Feature::ShellZshFork, true), + (Feature::UnifiedExec, false), + (Feature::ShellSnapshot, false), + ]), + &zsh_path, + )?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run python".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(workspace.clone()), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else { + panic!("expected CommandExecutionRequestApproval request"); + }; + assert_eq!(params.item_id, "call-zsh-fork-cancel"); + assert_eq!(params.thread_id, thread.id.clone()); + + mcp.send_response( + request_id, + serde_json::to_value(CommandExecutionRequestApprovalResponse { + decision: CommandExecutionApprovalDecision::Cancel, + })?, + ) + .await?; + + let completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let completed_notif = mcp + .read_stream_until_notification_message("item/completed") + .await?; + let completed: ItemCompletedNotification = serde_json::from_value( + completed_notif + .params + .clone() + .expect("item/completed params"), + )?; + if let ThreadItem::CommandExecution { .. } = completed.item { + return Ok::(completed.item); + } + } + }) + .await??; + let ThreadItem::CommandExecution { id, status, .. } = completed_command_execution else { + unreachable!("loop ensures we break on command execution items"); + }; + assert_eq!(id, "call-zsh-fork-cancel"); + assert_eq!(status, CommandExecutionStatus::Declined); + + let completed_notif = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + let completed: TurnCompletedNotification = serde_json::from_value( + completed_notif + .params + .expect("turn/completed params must be present"), + )?; + assert_eq!(completed.thread_id, thread.id); + assert_eq!(completed.turn.status, TurnStatus::Interrupted); + + Ok(()) +} + +#[tokio::test] +async fn turn_start_shell_zsh_fork_subcommand_decline_marks_parent_declined_v2() -> Result<()> { + skip_if_no_network!(Ok(())); + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let workspace = tmp.path().join("workspace"); + std::fs::create_dir(&workspace)?; + + let Some(zsh_path) = find_test_zsh_path() else { + eprintln!("skipping zsh fork subcommand decline test: no zsh executable found"); + return Ok(()); + }; + if !supports_exec_wrapper_intercept(&zsh_path) { + eprintln!( + "skipping zsh fork subcommand decline test: zsh does not support EXEC_WRAPPER intercepts ({})", + zsh_path.display() + ); + return Ok(()); + } + eprintln!("using zsh path for zsh-fork test: {}", zsh_path.display()); + + let tool_call_arguments = serde_json::to_string(&serde_json::json!({ + "command": "/usr/bin/true && /usr/bin/true", + "workdir": serde_json::Value::Null, + "timeout_ms": 5000 + }))?; + let response = responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_function_call( + "call-zsh-fork-subcommand-decline", + "shell_command", + &tool_call_arguments, + ), + responses::ev_completed("resp-1"), + ]); + let server = create_mock_responses_server_sequence(vec![response]).await; + create_config_toml( + &codex_home, + &server.uri(), + "on-request", + &BTreeMap::from([ + (Feature::ShellZshFork, true), + (Feature::UnifiedExec, false), + (Feature::ShellSnapshot, false), + ]), + &zsh_path, + )?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + cwd: Some(workspace.to_string_lossy().into_owned()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run true true".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(workspace.clone()), + approval_policy: Some(codex_app_server_protocol::AskForApproval::OnRequest), + sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::ReadOnly { + access: codex_app_server_protocol::ReadOnlyAccess::FullAccess, + }), + model: Some("mock-model".to_string()), + effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium), + summary: Some(codex_core::protocol_config_types::ReasoningSummary::Auto), + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + + let mut approval_ids = Vec::new(); + for decision in [ + CommandExecutionApprovalDecision::Accept, + CommandExecutionApprovalDecision::Cancel, + ] { + let server_req = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_request_message(), + ) + .await??; + let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req + else { + panic!("expected CommandExecutionRequestApproval request"); + }; + assert_eq!(params.item_id, "call-zsh-fork-subcommand-decline"); + approval_ids.push( + params + .approval_id + .clone() + .expect("approval_id must be present for zsh subcommand approvals"), + ); + assert_eq!(params.thread_id, thread.id); + mcp.send_response( + request_id, + serde_json::to_value(CommandExecutionRequestApprovalResponse { decision })?, + ) + .await?; + } + + let parent_completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let completed_notif = mcp + .read_stream_until_notification_message("item/completed") + .await?; + let completed: ItemCompletedNotification = serde_json::from_value( + completed_notif + .params + .clone() + .expect("item/completed params"), + )?; + if let ThreadItem::CommandExecution { id, .. } = &completed.item + && id == "call-zsh-fork-subcommand-decline" + { + return Ok::(completed.item); + } + } + }) + .await??; + + let ThreadItem::CommandExecution { + id, + status, + aggregated_output, + .. + } = parent_completed_command_execution + else { + unreachable!("loop ensures we break on parent command execution item"); + }; + assert_eq!(id, "call-zsh-fork-subcommand-decline"); + assert_eq!(status, CommandExecutionStatus::Declined); + assert!( + aggregated_output.is_none() + || aggregated_output == Some("exec command rejected by user".to_string()) + ); + assert_eq!(approval_ids.len(), 2); + assert_ne!(approval_ids[0], approval_ids[1]); + + Ok(()) +} + +fn create_config_toml( + codex_home: &Path, + server_uri: &str, + approval_policy: &str, + feature_flags: &BTreeMap, + zsh_path: &Path, +) -> std::io::Result<()> { + let mut features = BTreeMap::from([(Feature::RemoteModels, false)]); + for (feature, enabled) in feature_flags { + features.insert(*feature, *enabled); + } + let feature_entries = features + .into_iter() + .map(|(feature, enabled)| { + let key = FEATURES + .iter() + .find(|spec| spec.id == feature) + .map(|spec| spec.key) + .unwrap_or_else(|| panic!("missing feature key for {feature:?}")); + format!("{key} = {enabled}") + }) + .collect::>() + .join("\n"); + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "{approval_policy}" +sandbox_mode = "read-only" +zsh_path = "{zsh_path}" + +model_provider = "mock_provider" + +[features] +{feature_entries} + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"#, + approval_policy = approval_policy, + zsh_path = zsh_path.display() + ), + ) +} + +fn find_test_zsh_path() -> Option { + if let Some(path) = std::env::var_os("CODEX_TEST_ZSH_PATH") { + let path = std::path::PathBuf::from(path); + if path.is_file() { + return Some(path); + } + panic!( + "CODEX_TEST_ZSH_PATH is set but is not a file: {}", + path.display() + ); + } + + for candidate in ["/bin/zsh", "/usr/bin/zsh"] { + let path = Path::new(candidate); + if path.is_file() { + return Some(path.to_path_buf()); + } + } + + let shell = std::env::var_os("SHELL")?; + let shell_path = std::path::PathBuf::from(shell); + if shell_path + .file_name() + .is_some_and(|file_name| file_name == "zsh") + && shell_path.is_file() + { + return Some(shell_path); + } + + None +} + +fn supports_exec_wrapper_intercept(zsh_path: &Path) -> bool { + let status = std::process::Command::new(zsh_path) + .arg("-fc") + .arg("/usr/bin/true") + .env("EXEC_WRAPPER", "/usr/bin/false") + .status(); + match status { + Ok(status) => !status.success(), + Err(_) => false, + } +} diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index cae322c77..33c9f161b 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -543,6 +543,9 @@ fn stage_str(stage: codex_core::features::Stage) -> &'static str { } fn main() -> anyhow::Result<()> { + if codex_core::maybe_run_zsh_exec_wrapper_mode()? { + return Ok(()); + } arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move { cli_main(codex_linux_sandbox_exe).await?; Ok(()) diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index e3f7456c9..bf2d97049 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -292,6 +292,9 @@ "shell_tool": { "type": "boolean" }, + "shell_zsh_fork": { + "type": "boolean" + }, "skill_env_var_dependency_prompt": { "type": "boolean" }, @@ -380,6 +383,14 @@ } ], "default": null + }, + "zsh_path": { + "allOf": [ + { + "$ref": "#/definitions/AbsolutePathBuf" + } + ], + "description": "Optional absolute path to patched zsh used by zsh-exec-bridge-backed shell execution." } }, "type": "object" @@ -1432,6 +1443,9 @@ "shell_tool": { "type": "boolean" }, + "shell_zsh_fork": { + "type": "boolean" + }, "skill_env_var_dependency_prompt": { "type": "boolean" }, @@ -1788,6 +1802,14 @@ "windows_wsl_setup_acknowledged": { "description": "Tracks whether the Windows onboarding screen has been acknowledged.", "type": "boolean" + }, + "zsh_path": { + "allOf": [ + { + "$ref": "#/definitions/AbsolutePathBuf" + } + ], + "description": "Optional absolute path to patched zsh used by zsh-exec-bridge-backed shell execution." } }, "title": "ConfigToml", diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7744480ab..b054423b6 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -241,6 +241,7 @@ use crate::turn_diff_tracker::TurnDiffTracker; use crate::unified_exec::UnifiedExecProcessManager; use crate::util::backoff; use crate::windows_sandbox::WindowsSandboxLevelExt; +use crate::zsh_exec_bridge::ZshExecBridge; use codex_async_utils::OrCancelExt; use codex_otel::OtelManager; use codex_otel::TelemetryAuthMode; @@ -1191,7 +1192,22 @@ impl Session { config.active_profile.clone(), ); - let mut default_shell = shell::default_user_shell(); + let use_zsh_fork_shell = config.features.enabled(Feature::ShellZshFork); + let mut default_shell = if use_zsh_fork_shell { + let zsh_path = config.zsh_path.as_ref().ok_or_else(|| { + anyhow::anyhow!( + "zsh fork feature enabled, but `zsh_path` is not configured; set `zsh_path` in config.toml" + ) + })?; + shell::get_shell(shell::ShellType::Zsh, Some(zsh_path)).ok_or_else(|| { + anyhow::anyhow!( + "zsh fork feature enabled, but zsh_path `{}` is not usable; set `zsh_path` to a valid zsh executable", + zsh_path.display() + ) + })? + } else { + shell::default_user_shell() + }; // Create the mutable state for the Session. let shell_snapshot_tx = if config.features.enabled(Feature::ShellSnapshot) { ShellSnapshot::start_snapshotting( @@ -1262,10 +1278,17 @@ impl Session { (None, None) }; + let zsh_exec_bridge = + ZshExecBridge::new(config.zsh_path.clone(), config.codex_home.clone()); + zsh_exec_bridge + .initialize_for_session(&conversation_id.to_string()) + .await; + let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), + zsh_exec_bridge, analytics_events_client: AnalyticsEventsClient::new( Arc::clone(&config), Arc::clone(&auth_manager), @@ -3969,6 +3992,7 @@ mod handlers { .unified_exec_manager .terminate_all_processes() .await; + sess.services.zsh_exec_bridge.shutdown().await; info!("Shutting down Codex instance"); let history = sess.clone_history().await; let turn_count = history @@ -7201,6 +7225,81 @@ mod tests { } } + #[tokio::test] + async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { + let codex_home = tempfile::tempdir().expect("create temp dir"); + let mut config = build_test_config(codex_home.path()).await; + config.features.enable(Feature::ShellZshFork); + config.zsh_path = None; + let config = Arc::new(config); + + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); + let models_manager = Arc::new(ModelsManager::new( + config.codex_home.clone(), + auth_manager.clone(), + )); + let model = ModelsManager::get_model_offline_for_tests(config.model.as_deref()); + let model_info = + ModelsManager::construct_model_info_offline_for_tests(model.as_str(), &config); + let collaboration_mode = CollaborationMode { + mode: ModeKind::Default, + settings: Settings { + model, + reasoning_effort: config.model_reasoning_effort, + developer_instructions: None, + }, + }; + let session_configuration = SessionConfiguration { + provider: config.model_provider.clone(), + collaboration_mode, + model_reasoning_summary: config.model_reasoning_summary, + developer_instructions: config.developer_instructions.clone(), + user_instructions: config.user_instructions.clone(), + personality: config.personality, + base_instructions: config + .base_instructions + .clone() + .unwrap_or_else(|| model_info.get_model_instructions(config.personality)), + compact_prompt: config.compact_prompt.clone(), + approval_policy: config.permissions.approval_policy.clone(), + sandbox_policy: config.permissions.sandbox_policy.clone(), + windows_sandbox_level: WindowsSandboxLevel::from_config(&config), + cwd: config.cwd.clone(), + codex_home: config.codex_home.clone(), + thread_name: None, + original_config_do_not_use: Arc::clone(&config), + session_source: SessionSource::Exec, + dynamic_tools: Vec::new(), + persist_extended_history: false, + }; + + let (tx_event, _rx_event) = async_channel::unbounded(); + let (agent_status_tx, _agent_status_rx) = watch::channel(AgentStatus::PendingInit); + let result = Session::new( + session_configuration, + Arc::clone(&config), + auth_manager, + models_manager, + ExecPolicyManager::default(), + tx_event, + agent_status_tx, + InitialHistory::New, + SessionSource::Exec, + Arc::new(SkillsManager::new(config.codex_home.clone())), + Arc::new(FileWatcher::noop()), + AgentControl::default(), + ) + .await; + + let err = match result { + Ok(_) => panic!("expected startup to fail"), + Err(err) => err, + }; + let msg = format!("{err:#}"); + assert!(msg.contains("zsh fork feature enabled, but `zsh_path` is not configured")); + } + // todo: use online model info pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let (tx_event, _rx_event) = async_channel::unbounded(); @@ -7274,6 +7373,7 @@ mod tests { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), + zsh_exec_bridge: ZshExecBridge::default(), analytics_events_client: AnalyticsEventsClient::new( Arc::clone(&config), Arc::clone(&auth_manager), @@ -7422,6 +7522,7 @@ mod tests { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), + zsh_exec_bridge: ZshExecBridge::default(), analytics_events_client: AnalyticsEventsClient::new( Arc::clone(&config), Arc::clone(&auth_manager), diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 9874cc277..c2b753f51 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -338,6 +338,8 @@ pub struct Config { /// Optional absolute path to the Node runtime used by `js_repl`. pub js_repl_node_path: Option, + /// Optional absolute path to patched zsh used by zsh-exec-bridge-backed shell execution. + pub zsh_path: Option, /// Value to use for `reasoning.effort` when making a request using the /// Responses API. @@ -977,6 +979,8 @@ pub struct ConfigToml { /// Optional absolute path to the Node runtime used by `js_repl`. pub js_repl_node_path: Option, + /// Optional absolute path to patched zsh used by zsh-exec-bridge-backed shell execution. + pub zsh_path: Option, /// Profile to use from the `profiles` map. pub profile: Option, @@ -1355,6 +1359,7 @@ pub struct ConfigOverrides { pub config_profile: Option, pub codex_linux_sandbox_exe: Option, pub js_repl_node_path: Option, + pub zsh_path: Option, pub base_instructions: Option, pub developer_instructions: Option, pub personality: Option, @@ -1482,6 +1487,7 @@ impl Config { config_profile: config_profile_key, codex_linux_sandbox_exe, js_repl_node_path: js_repl_node_path_override, + zsh_path: zsh_path_override, base_instructions, developer_instructions, personality, @@ -1742,6 +1748,9 @@ impl Config { let js_repl_node_path = js_repl_node_path_override .or(config_profile.js_repl_node_path.map(Into::into)) .or(cfg.js_repl_node_path.map(Into::into)); + let zsh_path = zsh_path_override + .or(config_profile.zsh_path.map(Into::into)) + .or(cfg.zsh_path.map(Into::into)); let review_model = override_review_model.or(cfg.review_model); @@ -1866,6 +1875,7 @@ impl Config { file_opener: cfg.file_opener.unwrap_or(UriBasedFileOpener::VsCode), codex_linux_sandbox_exe, js_repl_node_path, + zsh_path, hide_agent_reasoning: cfg.hide_agent_reasoning.unwrap_or(false), show_raw_agent_reasoning: cfg @@ -4196,6 +4206,7 @@ model_verbosity = "high" file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, js_repl_node_path: None, + zsh_path: None, hide_agent_reasoning: false, show_raw_agent_reasoning: false, model_reasoning_effort: Some(ReasoningEffort::High), @@ -4309,6 +4320,7 @@ model_verbosity = "high" file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, js_repl_node_path: None, + zsh_path: None, hide_agent_reasoning: false, show_raw_agent_reasoning: false, model_reasoning_effort: None, @@ -4420,6 +4432,7 @@ model_verbosity = "high" file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, js_repl_node_path: None, + zsh_path: None, hide_agent_reasoning: false, show_raw_agent_reasoning: false, model_reasoning_effort: None, @@ -4517,6 +4530,7 @@ model_verbosity = "high" file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, js_repl_node_path: None, + zsh_path: None, hide_agent_reasoning: false, show_raw_agent_reasoning: false, model_reasoning_effort: Some(ReasoningEffort::High), diff --git a/codex-rs/core/src/config/profile.rs b/codex-rs/core/src/config/profile.rs index ff343cbf5..952331337 100644 --- a/codex-rs/core/src/config/profile.rs +++ b/codex-rs/core/src/config/profile.rs @@ -30,6 +30,8 @@ pub struct ConfigProfile { pub chatgpt_base_url: Option, /// Optional path to a file containing model instructions. pub model_instructions_file: Option, + /// Optional absolute path to patched zsh used by zsh-exec-bridge-backed shell execution. + pub zsh_path: Option, pub js_repl_node_path: Option, /// Deprecated: ignored. Use `model_instructions_file`. #[schemars(skip)] diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index 0f0f0609a..4e1220b87 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -75,7 +75,7 @@ pub struct ExecParams { } /// Mechanism to terminate an exec invocation before it finishes naturally. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum ExecExpiration { Timeout(Duration), DefaultTimeout, @@ -97,7 +97,7 @@ impl From for ExecExpiration { } impl ExecExpiration { - async fn wait(self) { + pub(crate) async fn wait(self) { match self { ExecExpiration::Timeout(duration) => tokio::time::sleep(duration).await, ExecExpiration::DefaultTimeout => { diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 49363641b..86cacae5b 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -84,6 +84,8 @@ pub enum Feature { JsReplToolsOnly, /// Use the single unified PTY-backed exec tool. UnifiedExec, + /// Route shell tool execution through the zsh exec bridge. + ShellZshFork, /// Include the freeform apply_patch tool. ApplyPatchFreeform, /// Allow the model to request web searches that fetch live content. @@ -431,6 +433,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Stable, default_enabled: !cfg!(windows), }, + FeatureSpec { + id: Feature::ShellZshFork, + key: "shell_zsh_fork", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::ShellSnapshot, key: "shell_snapshot", diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index bbfef7f8f..ccfd340df 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -109,6 +109,7 @@ pub mod terminal; mod tools; pub mod turn_diff_tracker; mod turn_metadata; +mod zsh_exec_bridge; pub use rollout::ARCHIVED_SESSIONS_SUBDIR; pub use rollout::INTERACTIVE_SESSION_SOURCES; pub use rollout::RolloutRecorder; @@ -152,6 +153,7 @@ pub use file_watcher::FileWatcherEvent; pub use safety::get_platform_sandbox; pub use tools::spec::parse_tool_input_schema; pub use turn_metadata::build_turn_metadata_header; +pub use zsh_exec_bridge::maybe_run_zsh_exec_wrapper_mode; // Re-export the protocol types from the standalone `codex-protocol` crate so existing // `codex_core::protocol::...` references continue to work across the workspace. pub use codex_protocol::protocol; diff --git a/codex-rs/core/src/sandboxing/mod.rs b/codex-rs/core/src/sandboxing/mod.rs index 487278b61..4bf8f6609 100644 --- a/codex-rs/core/src/sandboxing/mod.rs +++ b/codex-rs/core/src/sandboxing/mod.rs @@ -164,12 +164,19 @@ impl SandboxManager { SandboxType::MacosSeatbelt => { let mut seatbelt_env = HashMap::new(); seatbelt_env.insert(CODEX_SANDBOX_ENV_VAR.to_string(), "seatbelt".to_string()); + let zsh_exec_bridge_wrapper_socket = env + .get(crate::zsh_exec_bridge::ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR) + .map(PathBuf::from); + let zsh_exec_bridge_allowed_unix_sockets = zsh_exec_bridge_wrapper_socket + .as_ref() + .map_or_else(Vec::new, |path| vec![path.clone()]); let mut args = create_seatbelt_command_args( command.clone(), policy, sandbox_policy_cwd, enforce_managed_network, network, + &zsh_exec_bridge_allowed_unix_sockets, ); let mut full_command = Vec::with_capacity(1 + args.len()); full_command.push(MACOS_PATH_TO_SEATBELT_EXECUTABLE.to_string()); diff --git a/codex-rs/core/src/seatbelt.rs b/codex-rs/core/src/seatbelt.rs index 7f738797b..6ae1b061a 100644 --- a/codex-rs/core/src/seatbelt.rs +++ b/codex-rs/core/src/seatbelt.rs @@ -39,8 +39,14 @@ pub async fn spawn_command_under_seatbelt( network: Option<&NetworkProxy>, mut env: HashMap, ) -> std::io::Result { - let args = - create_seatbelt_command_args(command, sandbox_policy, sandbox_policy_cwd, false, network); + let args = create_seatbelt_command_args( + command, + sandbox_policy, + sandbox_policy_cwd, + false, + network, + &[], + ); let arg0 = None; env.insert(CODEX_SANDBOX_ENV_VAR.to_string(), "seatbelt".to_string()); spawn_child_async(SpawnChildRequest { @@ -181,6 +187,7 @@ pub(crate) fn create_seatbelt_command_args( sandbox_policy_cwd: &Path, enforce_managed_network: bool, network: Option<&NetworkProxy>, + allowed_unix_socket_paths: &[PathBuf], ) -> Vec { create_seatbelt_command_args_with_extensions( command, @@ -189,6 +196,7 @@ pub(crate) fn create_seatbelt_command_args( enforce_managed_network, network, None, + allowed_unix_socket_paths, ) } @@ -199,6 +207,7 @@ pub(crate) fn create_seatbelt_command_args_with_extensions( enforce_managed_network: bool, network: Option<&NetworkProxy>, extensions: Option<&MacOsSeatbeltProfileExtensions>, + allowed_unix_socket_paths: &[PathBuf], ) -> Vec { let (file_write_policy, file_write_dir_params) = { if sandbox_policy.has_full_disk_write_access() { @@ -295,6 +304,8 @@ pub(crate) fn create_seatbelt_command_args_with_extensions( let proxy = proxy_policy_inputs(network); let network_policy = dynamic_network_policy(sandbox_policy, enforce_managed_network, &proxy); + let (unix_socket_policy, unix_socket_params) = + unix_socket_policy_and_params(allowed_unix_socket_paths); let seatbelt_extensions = extensions.map_or_else( || { // Backward-compatibility default when no extension profile is provided. @@ -303,20 +314,24 @@ pub(crate) fn create_seatbelt_command_args_with_extensions( build_seatbelt_extensions, ); - let full_policy = if seatbelt_extensions.policy.is_empty() { - format!( - "{MACOS_SEATBELT_BASE_POLICY}\n{file_read_policy}\n{file_write_policy}\n{network_policy}" - ) - } else { - format!( - "{MACOS_SEATBELT_BASE_POLICY}\n{file_read_policy}\n{file_write_policy}\n{network_policy}\n{}", - seatbelt_extensions.policy - ) - }; + let mut policy_sections = vec![ + MACOS_SEATBELT_BASE_POLICY.to_string(), + file_read_policy, + file_write_policy, + network_policy, + ]; + if !unix_socket_policy.is_empty() { + policy_sections.push(unix_socket_policy); + } + if !seatbelt_extensions.policy.is_empty() { + policy_sections.push(seatbelt_extensions.policy.clone()); + } + let full_policy = policy_sections.join("\n"); let dir_params = [ file_read_dir_params, file_write_dir_params, + unix_socket_params, macos_dir_params(), seatbelt_extensions.dir_params, ] @@ -332,6 +347,27 @@ pub(crate) fn create_seatbelt_command_args_with_extensions( seatbelt_args } +fn unix_socket_policy_and_params( + allowed_unix_socket_paths: &[PathBuf], +) -> (String, Vec<(String, PathBuf)>) { + if allowed_unix_socket_paths.is_empty() { + return (String::new(), Vec::new()); + } + + let mut policy = String::from("; allow outbound connect to explicitly-approved unix sockets\n"); + let mut params = Vec::with_capacity(allowed_unix_socket_paths.len()); + + for (index, path) in allowed_unix_socket_paths.iter().enumerate() { + let key = format!("ALLOWED_UNIX_SOCKET_{index}"); + policy.push_str(&format!( + "(allow network-outbound (remote unix-socket (path (param \"{key}\"))))\n" + )); + params.push((key, path.clone())); + } + + (policy, params) +} + /// Wraps libc::confstr to return a String. fn confstr(name: libc::c_int) -> Option { let mut buf = vec![0_i8; (libc::PATH_MAX as usize) + 1]; @@ -451,6 +487,7 @@ mod tests { macos_accessibility: true, macos_calendar: true, }), + &[], ); let policy = &args[1]; @@ -469,6 +506,7 @@ mod tests { cwd.as_path(), false, None, + &[], ); let policy = &args[1]; assert!(policy.contains("(allow user-preference-read)")); @@ -485,6 +523,7 @@ mod tests { false, None, Some(&MacOsSeatbeltProfileExtensions::default()), + &[], ); let policy = &args[1]; assert!(!policy.contains("appleevent-send")); @@ -494,6 +533,32 @@ mod tests { assert!(!policy.contains("user-preference-write")); } + #[test] + fn seatbelt_args_allow_explicit_wrapper_unix_socket_path() { + let cwd = std::env::temp_dir(); + let socket_path = std::env::temp_dir().join("codex-zsh-wrapper-test.sock"); + let args = create_seatbelt_command_args( + vec!["echo".to_string(), "ok".to_string()], + &SandboxPolicy::new_read_only_policy(), + cwd.as_path(), + false, + None, + std::slice::from_ref(&socket_path), + ); + let policy = &args[1]; + let param_key = "ALLOWED_UNIX_SOCKET_0"; + + assert!( + policy.contains("(allow network-outbound (remote unix-socket (path (param \"ALLOWED_UNIX_SOCKET_0\"))))"), + "policy should contain explicit unix-socket allow rule:\n{policy}" + ); + assert!( + args.iter() + .any(|arg| arg == &format!("-D{param_key}={}", socket_path.to_string_lossy())), + "args should include unix-socket path param" + ); + } + #[test] fn create_seatbelt_args_allows_local_binding_when_explicitly_enabled() { let policy = dynamic_network_policy( @@ -649,7 +714,8 @@ mod tests { .iter() .map(std::string::ToString::to_string) .collect(); - let args = create_seatbelt_command_args(shell_command.clone(), &policy, &cwd, false, None); + let args = + create_seatbelt_command_args(shell_command.clone(), &policy, &cwd, false, None, &[]); // Build the expected policy text using a raw string for readability. // Note that the policy includes: @@ -746,7 +812,7 @@ mod tests { .map(std::string::ToString::to_string) .collect(); let write_hooks_file_args = - create_seatbelt_command_args(shell_command_git, &policy, &cwd, false, None); + create_seatbelt_command_args(shell_command_git, &policy, &cwd, false, None, &[]); let output = Command::new(MACOS_PATH_TO_SEATBELT_EXECUTABLE) .args(&write_hooks_file_args) .current_dir(&cwd) @@ -777,7 +843,7 @@ mod tests { .map(std::string::ToString::to_string) .collect(); let write_allowed_file_args = - create_seatbelt_command_args(shell_command_allowed, &policy, &cwd, false, None); + create_seatbelt_command_args(shell_command_allowed, &policy, &cwd, false, None, &[]); let output = Command::new(MACOS_PATH_TO_SEATBELT_EXECUTABLE) .args(&write_allowed_file_args) .current_dir(&cwd) @@ -838,7 +904,7 @@ mod tests { .iter() .map(std::string::ToString::to_string) .collect(); - let args = create_seatbelt_command_args(shell_command, &policy, &cwd, false, None); + let args = create_seatbelt_command_args(shell_command, &policy, &cwd, false, None, &[]); let output = Command::new(MACOS_PATH_TO_SEATBELT_EXECUTABLE) .args(&args) @@ -869,7 +935,7 @@ mod tests { .map(std::string::ToString::to_string) .collect(); let gitdir_args = - create_seatbelt_command_args(shell_command_gitdir, &policy, &cwd, false, None); + create_seatbelt_command_args(shell_command_gitdir, &policy, &cwd, false, None, &[]); let output = Command::new(MACOS_PATH_TO_SEATBELT_EXECUTABLE) .args(&gitdir_args) .current_dir(&cwd) @@ -932,6 +998,7 @@ mod tests { vulnerable_root.as_path(), false, None, + &[], ); let tmpdir_env_var = std::env::var("TMPDIR") diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 6f640131f..fe82ec84c 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -15,6 +15,7 @@ use crate::state_db::StateDbHandle; use crate::tools::network_approval::NetworkApprovalService; use crate::tools::sandboxing::ApprovalStore; use crate::unified_exec::UnifiedExecProcessManager; +use crate::zsh_exec_bridge::ZshExecBridge; use codex_hooks::Hooks; use codex_otel::OtelManager; use tokio::sync::Mutex; @@ -26,6 +27,7 @@ pub(crate) struct SessionServices { pub(crate) mcp_connection_manager: Arc>, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, + pub(crate) zsh_exec_bridge: ZshExecBridge, pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) hooks: Hooks, pub(crate) rollout: Mutex>, diff --git a/codex-rs/core/src/tools/runtimes/shell.rs b/codex-rs/core/src/tools/runtimes/shell.rs index a277e48a5..6f1e4b44a 100644 --- a/codex-rs/core/src/tools/runtimes/shell.rs +++ b/codex-rs/core/src/tools/runtimes/shell.rs @@ -26,6 +26,7 @@ use crate::tools::sandboxing::ToolCtx; use crate::tools::sandboxing::ToolError; use crate::tools::sandboxing::ToolRuntime; use crate::tools::sandboxing::with_cached_approval; +use crate::zsh_exec_bridge::ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR; use codex_network_proxy::NetworkProxy; use codex_protocol::protocol::ReviewDecision; use futures::future::BoxFuture; @@ -182,6 +183,36 @@ impl ToolRuntime for ShellRuntime { command }; + if ctx.session.features().enabled(Feature::ShellZshFork) { + let wrapper_socket_path = ctx + .session + .services + .zsh_exec_bridge + .next_wrapper_socket_path(); + let mut zsh_fork_env = req.env.clone(); + zsh_fork_env.insert( + ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR.to_string(), + wrapper_socket_path.to_string_lossy().to_string(), + ); + let spec = build_command_spec( + &command, + &req.cwd, + &zsh_fork_env, + req.timeout_ms.into(), + req.sandbox_permissions, + req.justification.clone(), + )?; + let env = attempt + .env_for(spec, req.network.as_ref()) + .map_err(|err| ToolError::Codex(err.into()))?; + return ctx + .session + .services + .zsh_exec_bridge + .execute_shell_request(&env, ctx.session, ctx.turn, &ctx.call_id) + .await; + } + let spec = build_command_spec( &command, &req.cwd, diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 6ea7edd00..ca49c50c3 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -69,6 +69,8 @@ impl ToolsConfig { let shell_type = if !features.enabled(Feature::ShellTool) { ConfigShellToolType::Disabled + } else if features.enabled(Feature::ShellZshFork) { + ConfigShellToolType::ShellCommand } else if features.enabled(Feature::UnifiedExec) { // If ConPTY not supported (for old Windows versions), fallback on ShellCommand. if codex_utils_pty::conpty_supported() { @@ -2346,6 +2348,23 @@ mod tests { assert_contains_tool_names(&tools, &subset); } + #[test] + fn shell_zsh_fork_prefers_shell_command_over_unified_exec() { + let config = test_config(); + let model_info = ModelsManager::construct_model_info_offline_for_tests("o3", &config); + let mut features = Features::with_defaults(); + features.enable(Feature::UnifiedExec); + features.enable(Feature::ShellZshFork); + + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_info: &model_info, + features: &features, + web_search_mode: Some(WebSearchMode::Live), + }); + + assert_eq!(tools_config.shell_type, ConfigShellToolType::ShellCommand); + } + #[test] #[ignore] fn test_parallel_support_flags() { diff --git a/codex-rs/core/src/zsh_exec_bridge/mod.rs b/codex-rs/core/src/zsh_exec_bridge/mod.rs new file mode 100644 index 000000000..2e3821acf --- /dev/null +++ b/codex-rs/core/src/zsh_exec_bridge/mod.rs @@ -0,0 +1,554 @@ +use crate::exec::ExecToolCallOutput; +use crate::tools::sandboxing::ToolError; +use std::path::PathBuf; +use tokio::sync::Mutex; +use uuid::Uuid; + +#[cfg(unix)] +use crate::error::CodexErr; +#[cfg(unix)] +use crate::error::SandboxErr; +#[cfg(unix)] +use crate::protocol::EventMsg; +#[cfg(unix)] +use crate::protocol::ExecCommandOutputDeltaEvent; +#[cfg(unix)] +use crate::protocol::ExecOutputStream; +#[cfg(unix)] +use crate::protocol::ReviewDecision; +#[cfg(unix)] +use anyhow::Context as _; +#[cfg(unix)] +use codex_protocol::approvals::ExecPolicyAmendment; +#[cfg(unix)] +use codex_utils_pty::process_group::kill_child_process_group; +#[cfg(unix)] +use serde::Deserialize; +#[cfg(unix)] +use serde::Serialize; +#[cfg(unix)] +use std::io::Read; +#[cfg(unix)] +use std::io::Write; +#[cfg(unix)] +use std::time::Instant; +#[cfg(unix)] +use tokio::io::AsyncReadExt; +#[cfg(unix)] +use tokio::net::UnixListener; +#[cfg(unix)] +use tokio::net::UnixStream; + +pub(crate) const ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR: &str = + "CODEX_ZSH_EXEC_BRIDGE_WRAPPER_SOCKET"; +pub(crate) const ZSH_EXEC_WRAPPER_MODE_ENV_VAR: &str = "CODEX_ZSH_EXEC_WRAPPER_MODE"; +#[cfg(unix)] +pub(crate) const EXEC_WRAPPER_ENV_VAR: &str = "EXEC_WRAPPER"; + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub(crate) struct ZshExecBridgeSessionState { + pub(crate) initialized_session_id: Option, +} + +#[derive(Debug, Default)] +pub(crate) struct ZshExecBridge { + zsh_path: Option, + state: Mutex, +} + +#[cfg(unix)] +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum WrapperIpcRequest { + ExecRequest { + request_id: String, + file: String, + argv: Vec, + cwd: String, + }, +} + +#[cfg(unix)] +#[derive(Debug, Deserialize, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum WrapperIpcResponse { + ExecResponse { + request_id: String, + action: WrapperExecAction, + reason: Option, + }, +} + +#[cfg(unix)] +#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +enum WrapperExecAction { + Run, + Deny, +} + +impl ZshExecBridge { + pub(crate) fn new(zsh_path: Option, _codex_home: PathBuf) -> Self { + Self { + zsh_path, + state: Mutex::new(ZshExecBridgeSessionState::default()), + } + } + + pub(crate) async fn initialize_for_session(&self, session_id: &str) { + let mut state = self.state.lock().await; + state.initialized_session_id = Some(session_id.to_string()); + } + + pub(crate) async fn shutdown(&self) { + let mut state = self.state.lock().await; + state.initialized_session_id = None; + } + + pub(crate) fn next_wrapper_socket_path(&self) -> PathBuf { + let socket_id = Uuid::new_v4().as_simple().to_string(); + let temp_dir = std::env::temp_dir(); + let canonical_temp_dir = temp_dir.canonicalize().unwrap_or(temp_dir); + canonical_temp_dir.join(format!("czs-{}.sock", &socket_id[..12])) + } + + #[cfg(not(unix))] + pub(crate) async fn execute_shell_request( + &self, + _req: &crate::sandboxing::ExecRequest, + _session: &crate::codex::Session, + _turn: &crate::codex::TurnContext, + _call_id: &str, + ) -> Result { + let _ = &self.zsh_path; + Err(ToolError::Rejected( + "shell_zsh_fork is only supported on unix".to_string(), + )) + } + + #[cfg(unix)] + pub(crate) async fn execute_shell_request( + &self, + req: &crate::sandboxing::ExecRequest, + session: &crate::codex::Session, + turn: &crate::codex::TurnContext, + call_id: &str, + ) -> Result { + let zsh_path = self.zsh_path.clone().ok_or_else(|| { + ToolError::Rejected( + "shell_zsh_fork enabled, but zsh_path is not configured".to_string(), + ) + })?; + + let command = req.command.clone(); + if command.is_empty() { + return Err(ToolError::Rejected("command args are empty".to_string())); + } + + let wrapper_socket_path = req + .env + .get(ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR) + .map(PathBuf::from) + .unwrap_or_else(|| self.next_wrapper_socket_path()); + + let listener = { + let _ = std::fs::remove_file(&wrapper_socket_path); + UnixListener::bind(&wrapper_socket_path).map_err(|err| { + ToolError::Rejected(format!( + "bind wrapper socket at {}: {err}", + wrapper_socket_path.display() + )) + })? + }; + + let wrapper_path = std::env::current_exe().map_err(|err| { + ToolError::Rejected(format!("resolve current executable path: {err}")) + })?; + + let mut cmd = tokio::process::Command::new(&command[0]); + if command.len() > 1 { + cmd.args(&command[1..]); + } + cmd.current_dir(&req.cwd); + cmd.stdin(std::process::Stdio::null()); + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); + cmd.kill_on_drop(true); + cmd.env_clear(); + cmd.envs(&req.env); + cmd.env( + ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR, + wrapper_socket_path.to_string_lossy().to_string(), + ); + cmd.env(EXEC_WRAPPER_ENV_VAR, &wrapper_path); + cmd.env(ZSH_EXEC_WRAPPER_MODE_ENV_VAR, "1"); + + let mut child = cmd.spawn().map_err(|err| { + ToolError::Rejected(format!( + "failed to start zsh fork command {} with zsh_path {}: {err}", + command[0], + zsh_path.display() + )) + })?; + + let (stream_tx, mut stream_rx) = + tokio::sync::mpsc::unbounded_channel::<(ExecOutputStream, Vec)>(); + + if let Some(mut out) = child.stdout.take() { + let tx = stream_tx.clone(); + tokio::spawn(async move { + let mut buf = [0_u8; 8192]; + loop { + let read = match out.read(&mut buf).await { + Ok(0) => break, + Ok(n) => n, + Err(err) => { + tracing::warn!("zsh fork stdout read error: {err}"); + break; + } + }; + let _ = tx.send((ExecOutputStream::Stdout, buf[..read].to_vec())); + } + }); + } + + if let Some(mut err) = child.stderr.take() { + let tx = stream_tx.clone(); + tokio::spawn(async move { + let mut buf = [0_u8; 8192]; + loop { + let read = match err.read(&mut buf).await { + Ok(0) => break, + Ok(n) => n, + Err(err) => { + tracing::warn!("zsh fork stderr read error: {err}"); + break; + } + }; + let _ = tx.send((ExecOutputStream::Stderr, buf[..read].to_vec())); + } + }); + } + drop(stream_tx); + + let mut stdout_bytes = Vec::new(); + let mut stderr_bytes = Vec::new(); + let mut child_exit = None; + let mut timed_out = false; + let mut stream_open = true; + let mut user_rejected = false; + let start = Instant::now(); + + let expiration = req.expiration.clone().wait(); + tokio::pin!(expiration); + + while child_exit.is_none() || stream_open { + tokio::select! { + result = child.wait(), if child_exit.is_none() => { + child_exit = Some(result.map_err(|err| ToolError::Rejected(format!("wait for zsh fork command exit: {err}")))?); + } + stream = stream_rx.recv(), if stream_open => { + if let Some((output_stream, chunk)) = stream { + match output_stream { + ExecOutputStream::Stdout => stdout_bytes.extend_from_slice(&chunk), + ExecOutputStream::Stderr => stderr_bytes.extend_from_slice(&chunk), + } + session + .send_event( + turn, + EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { + call_id: call_id.to_string(), + stream: output_stream, + chunk, + }), + ) + .await; + } else { + stream_open = false; + } + } + accept_result = listener.accept(), if child_exit.is_none() => { + let (stream, _) = accept_result.map_err(|err| { + ToolError::Rejected(format!("failed to accept wrapper request: {err}")) + })?; + if self + .handle_wrapper_request(stream, req.justification.clone(), session, turn, call_id) + .await? + { + user_rejected = true; + } + } + _ = &mut expiration, if child_exit.is_none() => { + timed_out = true; + kill_child_process_group(&mut child).map_err(|err| { + ToolError::Rejected(format!("kill zsh fork command process group: {err}")) + })?; + child.start_kill().map_err(|err| { + ToolError::Rejected(format!("kill zsh fork command process: {err}")) + })?; + } + } + } + + let _ = std::fs::remove_file(&wrapper_socket_path); + + let status = child_exit.ok_or_else(|| { + ToolError::Rejected("zsh fork command did not return exit status".to_string()) + })?; + + if user_rejected { + return Err(ToolError::Rejected("rejected by user".to_string())); + } + + let stdout_text = crate::text_encoding::bytes_to_string_smart(&stdout_bytes); + let stderr_text = crate::text_encoding::bytes_to_string_smart(&stderr_bytes); + let output = ExecToolCallOutput { + exit_code: status.code().unwrap_or(-1), + stdout: crate::exec::StreamOutput::new(stdout_text.clone()), + stderr: crate::exec::StreamOutput::new(stderr_text.clone()), + aggregated_output: crate::exec::StreamOutput::new(format!( + "{stdout_text}{stderr_text}" + )), + duration: start.elapsed(), + timed_out, + }; + + Self::map_exec_result(req.sandbox, output) + } + + #[cfg(unix)] + async fn handle_wrapper_request( + &self, + mut stream: UnixStream, + approval_reason: Option, + session: &crate::codex::Session, + turn: &crate::codex::TurnContext, + call_id: &str, + ) -> Result { + let mut request_buf = Vec::new(); + stream.read_to_end(&mut request_buf).await.map_err(|err| { + ToolError::Rejected(format!("read wrapper request from socket: {err}")) + })?; + let request_line = String::from_utf8(request_buf).map_err(|err| { + ToolError::Rejected(format!("decode wrapper request as utf-8: {err}")) + })?; + let request = parse_wrapper_request_line(request_line.trim())?; + + let (request_id, file, argv, cwd) = match request { + WrapperIpcRequest::ExecRequest { + request_id, + file, + argv, + cwd, + } => (request_id, file, argv, cwd), + }; + + let command_for_approval = if argv.is_empty() { + vec![file.clone()] + } else { + argv.clone() + }; + + let approval_id = Uuid::new_v4().to_string(); + let decision = session + .request_command_approval( + turn, + call_id.to_string(), + Some(approval_id), + command_for_approval, + PathBuf::from(cwd), + approval_reason, + None, + None::, + ) + .await; + + let (action, reason, user_rejected) = match decision { + ReviewDecision::Approved + | ReviewDecision::ApprovedForSession + | ReviewDecision::ApprovedExecpolicyAmendment { .. } => { + (WrapperExecAction::Run, None, false) + } + ReviewDecision::Denied => ( + WrapperExecAction::Deny, + Some("command denied by host approval policy".to_string()), + true, + ), + ReviewDecision::Abort => ( + WrapperExecAction::Deny, + Some("command aborted by host approval policy".to_string()), + true, + ), + }; + + write_json_line( + &mut stream, + &WrapperIpcResponse::ExecResponse { + request_id, + action, + reason, + }, + ) + .await?; + + Ok(user_rejected) + } + + #[cfg(unix)] + fn map_exec_result( + sandbox: crate::exec::SandboxType, + output: ExecToolCallOutput, + ) -> Result { + if output.timed_out { + return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { + output: Box::new(output), + }))); + } + + if crate::exec::is_likely_sandbox_denied(sandbox, &output) { + return Err(ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied { + output: Box::new(output), + network_policy_decision: None, + }))); + } + + Ok(output) + } +} + +pub fn maybe_run_zsh_exec_wrapper_mode() -> anyhow::Result { + if std::env::var_os(ZSH_EXEC_WRAPPER_MODE_ENV_VAR).is_none() { + return Ok(false); + } + + run_exec_wrapper_mode()?; + Ok(true) +} + +fn run_exec_wrapper_mode() -> anyhow::Result<()> { + #[cfg(not(unix))] + { + anyhow::bail!("zsh exec wrapper mode is only supported on unix"); + } + + #[cfg(unix)] + { + use std::os::unix::net::UnixStream as StdUnixStream; + + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + anyhow::bail!("exec wrapper mode requires target executable path"); + } + let file = args[1].clone(); + let argv = if args.len() > 2 { + args[2..].to_vec() + } else { + vec![file.clone()] + }; + let cwd = std::env::current_dir() + .context("resolve wrapper cwd")? + .to_string_lossy() + .to_string(); + let socket_path = std::env::var(ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR) + .context("missing wrapper socket path env var")?; + + let request_id = Uuid::new_v4().to_string(); + let request = WrapperIpcRequest::ExecRequest { + request_id: request_id.clone(), + file: file.clone(), + argv: argv.clone(), + cwd, + }; + + let mut stream = StdUnixStream::connect(&socket_path) + .with_context(|| format!("connect to wrapper socket at {socket_path}"))?; + let encoded = serde_json::to_string(&request).context("serialize wrapper request")?; + stream + .write_all(encoded.as_bytes()) + .context("write wrapper request")?; + stream + .write_all(b"\n") + .context("write wrapper request newline")?; + stream + .shutdown(std::net::Shutdown::Write) + .context("shutdown wrapper write")?; + + let mut response_buf = String::new(); + stream + .read_to_string(&mut response_buf) + .context("read wrapper response")?; + let response: WrapperIpcResponse = + serde_json::from_str(response_buf.trim()).context("parse wrapper response")?; + + let (response_request_id, action, reason) = match response { + WrapperIpcResponse::ExecResponse { + request_id, + action, + reason, + } => (request_id, action, reason), + }; + if response_request_id != request_id { + anyhow::bail!( + "wrapper response request_id mismatch: expected {request_id}, got {response_request_id}" + ); + } + + if action == WrapperExecAction::Deny { + if let Some(reason) = reason { + tracing::warn!("execution denied: {reason}"); + } else { + tracing::warn!("execution denied"); + } + std::process::exit(1); + } + + let mut command = std::process::Command::new(&file); + if argv.len() > 1 { + command.args(&argv[1..]); + } + command.env_remove(ZSH_EXEC_WRAPPER_MODE_ENV_VAR); + command.env_remove(ZSH_EXEC_BRIDGE_WRAPPER_SOCKET_ENV_VAR); + command.env_remove(EXEC_WRAPPER_ENV_VAR); + let status = command.status().context("spawn wrapped executable")?; + std::process::exit(status.code().unwrap_or(1)); + } +} + +#[cfg(unix)] +fn parse_wrapper_request_line(request_line: &str) -> Result { + serde_json::from_str(request_line) + .map_err(|err| ToolError::Rejected(format!("parse wrapper request payload: {err}"))) +} + +#[cfg(unix)] +async fn write_json_line( + writer: &mut W, + message: &T, +) -> Result<(), ToolError> { + let encoded = serde_json::to_string(message) + .map_err(|err| ToolError::Rejected(format!("serialize wrapper message: {err}")))?; + tokio::io::AsyncWriteExt::write_all(writer, encoded.as_bytes()) + .await + .map_err(|err| ToolError::Rejected(format!("write wrapper message: {err}")))?; + tokio::io::AsyncWriteExt::write_all(writer, b"\n") + .await + .map_err(|err| ToolError::Rejected(format!("write wrapper newline: {err}")))?; + tokio::io::AsyncWriteExt::flush(writer) + .await + .map_err(|err| ToolError::Rejected(format!("flush wrapper message: {err}")))?; + Ok(()) +} + +#[cfg(all(test, unix))] +mod tests { + use super::*; + + #[test] + fn parse_wrapper_request_line_rejects_malformed_json() { + let err = parse_wrapper_request_line("this-is-not-json").unwrap_err(); + let ToolError::Rejected(message) = err else { + panic!("expected ToolError::Rejected"); + }; + assert!(message.starts_with("parse wrapper request payload:")); + } +} diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 49163c900..1533e7e63 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -252,6 +252,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any model_provider: model_provider.clone(), codex_linux_sandbox_exe, js_repl_node_path: None, + zsh_path: None, base_instructions: None, developer_instructions: None, personality: None,